Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/org/apache/sysds/conf/ConfigurationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ public static boolean isCompressionEnabled(){
return compress.isEnabled();
}

public static int getFederatedTimeout(){
return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT);
}

///////////////////////////////////////
// Thread-local classes

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class DMLConfig

public static final String USE_SSL_FEDERATED_COMMUNICATION = "sysds.federated.ssl"; // boolean
public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT = "sysds.federated.initialization.timeout"; // int seconds
public static final String FEDERATED_TIMEOUT = "sysds.federated.timeout"; // single request timeout default -1 to indicate infinite.
public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port
public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;

Expand Down Expand Up @@ -168,6 +169,7 @@ public class DMLConfig
_defaultVals.put(FLOATING_POINT_PRECISION, "double" );
_defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
_defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10");
_defaultVals.put(FEDERATED_TIMEOUT, "-1");
}

public DMLConfig() {
Expand Down Expand Up @@ -417,7 +419,7 @@ public String getConfigInfo() {
STATS_MAX_WRAP_LEN, LINEAGECACHESPILL, COMPILERASSISTED_RW, PRINT_GPU_MEMORY_INFO,
AVAILABLE_GPUS, SYNCHRONIZE_GPU, EAGER_CUDA_FREE, FLOATING_POINT_PRECISION, GPU_EVICTION_POLICY,
LOCAL_SPARK_NUM_THREADS, EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR, GPU_MEMORY_UTILIZATION_FACTOR,
USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT
USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, FEDERATED_TIMEOUT
};

StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
import org.apache.sysds.runtime.meta.MetaData;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
Expand All @@ -52,8 +53,8 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.Promise;
import org.apache.sysds.runtime.meta.MetaData;

public class FederatedData {
private static final Log LOG = LogFactory.getLog(FederatedData.class.getName());
Expand Down Expand Up @@ -179,13 +180,15 @@ protected void initChannel(SocketChannel ch) throws Exception {
cp.addLast(SslConstructor().context
.newHandler(ch.alloc(), address.getAddress().getHostAddress(), address.getPort()));
}
final int timeout = ConfigurationManager.getFederatedTimeout();
if(timeout > -1)
cp.addLast("timeout",new ReadTimeoutHandler(timeout));

cp.addLast("ObjectDecoder",
new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())));
cp.addLast("FederatedOperationHandler", handler);
cp.addLast("ObjectEncoder", new ObjectEncoder());

}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ private FederatedResponse readData(String filename, Types.DataType dataType,
throw ex;
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
String msg = "Exception of type " + ex.getClass() + " thrown when processing READ request";
LOG.error(msg, ex);
throw new DMLRuntimeException(msg);
}
finally {
IOUtilFunctions.closeSilently(fs);
Expand Down Expand Up @@ -425,6 +427,7 @@ private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap
catch(Exception ex) {
// Note it is unsafe to throw the ex trace along with the exception here.
String msg = "Exception of type " + ex.getClass() + " thrown when processing EXEC_UDF request";
LOG.error(msg, ex);
throw new FederatedWorkerHandlerException(msg);
}
}
Expand All @@ -438,6 +441,7 @@ private FederatedResponse execClear(ExecutionContextMap ecm) {
}
catch(Exception ex) {
String msg = "Exception of type " + ex.getClass() + " thrown when processing CLEAR request";
LOG.error(msg, ex);
throw new FederatedWorkerHandlerException(msg);
}
return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ protected ListObject computeGradientsForNBatches(ListObject model,
catch(Exception e) {
if(DMLScript.STATISTICS)
tFedCommunication.stop();
throw new DMLRuntimeException("FederatedLocalPSThread: failed to execute UDF" + e.getMessage());
throw new DMLRuntimeException("FederatedLocalPSThread: failed to execute UDF" + e.getMessage(), e);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/test/config/SystemDS-MultiTenant-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
<root>
<!-- The timeout of the federated tests to initialize the federated matrixes -->
<sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout>
<!-- The timeout of each instruction sent to federated workers -->
<sysds.federated.timeout>128</sysds.federated.timeout>
</root>
29 changes: 3 additions & 26 deletions src/test/config/SystemDS-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,10 @@
-->

<root>
<!-- local fs tmp working directory-->
<sysds.localtmpdir>/tmp/systemds</sysds.localtmpdir>

<!-- hdfs tmp working directory-->
<sysds.scratch>scratch_space</sysds.scratch>

<!-- compiler optimization level, valid values: 0 | 1 | 2 | 3 | 4, default: 2 -->
<sysds.optlevel>2</sysds.optlevel>

<!-- default block dim for binary block files -->
<sysds.defaultblocksize>1000</sysds.defaultblocksize>

<!-- enables multi-threaded matrix operations in singlenode control program -->
<sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>

<!-- enables multi-threaded read/write in singlenode control program -->
<sysds.cp.parallel.io>true</sysds.cp.parallel.io>

<!-- enables native blas for matrix multiplication and convolution, experimental feature (options: auto, mkl, openblas, none) -->
<sysds.native.blas>none</sysds.native.blas>

<!-- custom directory where BLAS libraries are available, experimental feature (options: absolute directory path or none). If set to none, we use standard LD_LIBRARY_PATH. -->
<sysds.native.blas.directory>none</sysds.native.blas.directory>

<!-- The number of theads for the spark instance artificially selected-->
<sysds.local.spark.number.threads>16</sysds.local.spark.number.threads>

<sysds.local.spark.number.threads>2</sysds.local.spark.number.threads>
<!-- The timeout of the federated tests to initialize the federated matrixes -->
<sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout>
<!-- The timeout of each instruction sent to federated workers -->
<sysds.federated.timeout>128</sysds.federated.timeout>
</root>
25 changes: 13 additions & 12 deletions src/test/java/org/apache/sysds/test/AutomatedTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1089,19 +1089,20 @@ protected void loadTestConfiguration(TestConfiguration config, String cacheDirec

curLocalTempDir.mkdirs();
TestUtils.clearDirectory(curLocalTempDir.getPath());

// Create a SystemDS config file for this test case based on default template
// from src/test/config or derive from custom configuration provided by test.
String configTemplate = FileUtils.readFileToString(getConfigTemplateFile(), "UTF-8");
String localTemp = curLocalTempDir.getPath();
String configContents = configTemplate
.replace(createXMLElement(DMLConfig.SCRATCH_SPACE, "scratch_space"),
createXMLElement(DMLConfig.SCRATCH_SPACE, localTemp + "/target/scratch_space"))
.replace(createXMLElement(DMLConfig.LOCAL_TMP_DIR, "/tmp/systemds"),
createXMLElement(DMLConfig.LOCAL_TMP_DIR, localTemp + "/localtmp"));


if(!disableConfigFile){

// Create a SystemDS config file for this test case based on default template
// from src/test/config or derive from custom configuration provided by test.
String configTemplate = FileUtils.readFileToString(getConfigTemplateFile(), "UTF-8");
String localTemp = curLocalTempDir.getPath();
String testScratchSpace = "\n "+ createXMLElement(DMLConfig.SCRATCH_SPACE, localTemp + "/target/scratch_space") + "\n ";
String testTempSpace = createXMLElement(DMLConfig.LOCAL_TMP_DIR, localTemp + "/localtmp");
String configContents = configTemplate
// if the config had a tmp location remove it
.replace(createXMLElement(DMLConfig.SCRATCH_SPACE, "scratch_space"),"")
.replace(createXMLElement(DMLConfig.LOCAL_TMP_DIR, "/tmp/systemds"),"")
.replace("</root>", testScratchSpace + testTempSpace + "\n</root>");
LOG.error(configContents);
FileUtils.write(getCurConfigFile(), configContents, "UTF-8");

if(LOG.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
-->

<root>
<sysds.localtmpdir>/tmp/systemds</sysds.localtmpdir>
<sysds.scratch>scratch_space</sysds.scratch>
<sysds.optlevel>7</sysds.optlevel>
<sysds.codegen.enabled>true</sysds.codegen.enabled>
<sysds.codegen.plancache>true</sysds.codegen.plancache>
<sysds.codegen.literals>1</sysds.codegen.literals>

<!-- The number of theads for the spark instance artificially selected-->
<sysds.local.spark.number.threads>16</sysds.local.spark.number.threads>

<sysds.local.spark.number.threads>2</sysds.local.spark.number.threads>
<sysds.codegen.api>auto</sysds.codegen.api>
<sysds.federated.timeout>128</sysds.federated.timeout>
</root>
5 changes: 4 additions & 1 deletion src/test/scripts/functions/federated/io/SSLConfig.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
* specific language governing permissions and limitations
* under the License.
-->

<root>
<sysds.local.spark.number.threads>2</sysds.local.spark.number.threads>
<sysds.federated.ssl>true</sysds.federated.ssl>
</root>
<sysds.federated.timeout>128</sysds.federated.timeout>
</root>