diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index b1fa997f367..c51d4e17ce3 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -205,6 +205,10 @@ public static boolean isCompressionEnabled(){ return compress.isEnabled(); } + public static int getFederatedTimeout(){ + return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT); + } + /////////////////////////////////////// // Thread-local classes diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index c020fdde8da..b2d521e1c3e 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -108,6 +108,7 @@ public class DMLConfig public static final String USE_SSL_FEDERATED_COMMUNICATION = "sysds.federated.ssl"; // boolean public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT = "sysds.federated.initialization.timeout"; // int seconds + public static final String FEDERATED_TIMEOUT = "sysds.federated.timeout"; // single request timeout default -1 to indicate infinite. public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2; @@ -168,6 +169,7 @@ public class DMLConfig _defaultVals.put(FLOATING_POINT_PRECISION, "double" ); _defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false"); _defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10"); + _defaultVals.put(FEDERATED_TIMEOUT, "-1"); } public DMLConfig() { @@ -417,7 +419,7 @@ public String getConfigInfo() { STATS_MAX_WRAP_LEN, LINEAGECACHESPILL, COMPILERASSISTED_RW, PRINT_GPU_MEMORY_INFO, AVAILABLE_GPUS, SYNCHRONIZE_GPU, EAGER_CUDA_FREE, FLOATING_POINT_PRECISION, GPU_EVICTION_POLICY, LOCAL_SPARK_NUM_THREADS, EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR, GPU_MEMORY_UTILIZATION_FACTOR, - USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT + USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, FEDERATED_TIMEOUT }; StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java index a3c165007e4..6f063fbb668 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java @@ -35,6 +35,7 @@ import org.apache.sysds.conf.DMLConfig; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType; +import org.apache.sysds.runtime.meta.MetaData; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; @@ -52,8 +53,8 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.concurrent.Promise; -import org.apache.sysds.runtime.meta.MetaData; public class FederatedData { private static final Log LOG = LogFactory.getLog(FederatedData.class.getName()); @@ -179,13 +180,15 @@ protected void initChannel(SocketChannel ch) throws Exception { cp.addLast(SslConstructor().context .newHandler(ch.alloc(), address.getAddress().getHostAddress(), address.getPort())); } + final int timeout = ConfigurationManager.getFederatedTimeout(); + if(timeout > -1) + cp.addLast("timeout",new ReadTimeoutHandler(timeout)); cp.addLast("ObjectDecoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader()))); cp.addLast("FederatedOperationHandler", handler); cp.addLast("ObjectEncoder", new ObjectEncoder()); - } }); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java index 282d6be87db..7461765f0ac 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java @@ -275,7 +275,9 @@ private FederatedResponse readData(String filename, Types.DataType dataType, throw ex; } catch(Exception ex) { - throw new DMLRuntimeException(ex); + String msg = "Exception of type " + ex.getClass() + " thrown when processing READ request"; + LOG.error(msg, ex); + throw new DMLRuntimeException(msg); } finally { IOUtilFunctions.closeSilently(fs); @@ -425,6 +427,7 @@ private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap catch(Exception ex) { // Note it is unsafe to throw the ex trace along with the exception here. String msg = "Exception of type " + ex.getClass() + " thrown when processing EXEC_UDF request"; + LOG.error(msg, ex); throw new FederatedWorkerHandlerException(msg); } } @@ -438,6 +441,7 @@ private FederatedResponse execClear(ExecutionContextMap ecm) { } catch(Exception ex) { String msg = "Exception of type " + ex.getClass() + " thrown when processing CLEAR request"; + LOG.error(msg, ex); throw new FederatedWorkerHandlerException(msg); } return new FederatedResponse(ResponseType.SUCCESS_EMPTY); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java index ea8f0e8661c..30e90beee91 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java @@ -450,7 +450,7 @@ protected ListObject computeGradientsForNBatches(ListObject model, catch(Exception e) { if(DMLScript.STATISTICS) tFedCommunication.stop(); - throw new DMLRuntimeException("FederatedLocalPSThread: failed to execute UDF" + e.getMessage()); + throw new DMLRuntimeException("FederatedLocalPSThread: failed to execute UDF" + e.getMessage(), e); } } diff --git a/src/test/config/SystemDS-MultiTenant-config.xml b/src/test/config/SystemDS-MultiTenant-config.xml index 3e250ccaa2b..4e2ecdbd1ef 100644 --- a/src/test/config/SystemDS-MultiTenant-config.xml +++ b/src/test/config/SystemDS-MultiTenant-config.xml @@ -20,4 +20,6 @@ 30 + + 128 diff --git a/src/test/config/SystemDS-config.xml b/src/test/config/SystemDS-config.xml index 2519a38ceda..a6f5ba525f7 100644 --- a/src/test/config/SystemDS-config.xml +++ b/src/test/config/SystemDS-config.xml @@ -18,33 +18,10 @@ --> - - /tmp/systemds - - - scratch_space - - - 2 - - - 1000 - - - true - - - true - - - none - - - none - - 16 - + 2 2 + + 128 diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java index 0243f2ad511..d292e6b89f8 100644 --- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java @@ -1089,19 +1089,20 @@ protected void loadTestConfiguration(TestConfiguration config, String cacheDirec curLocalTempDir.mkdirs(); TestUtils.clearDirectory(curLocalTempDir.getPath()); - - // Create a SystemDS config file for this test case based on default template - // from src/test/config or derive from custom configuration provided by test. - String configTemplate = FileUtils.readFileToString(getConfigTemplateFile(), "UTF-8"); - String localTemp = curLocalTempDir.getPath(); - String configContents = configTemplate - .replace(createXMLElement(DMLConfig.SCRATCH_SPACE, "scratch_space"), - createXMLElement(DMLConfig.SCRATCH_SPACE, localTemp + "/target/scratch_space")) - .replace(createXMLElement(DMLConfig.LOCAL_TMP_DIR, "/tmp/systemds"), - createXMLElement(DMLConfig.LOCAL_TMP_DIR, localTemp + "/localtmp")); - + if(!disableConfigFile){ - + // Create a SystemDS config file for this test case based on default template + // from src/test/config or derive from custom configuration provided by test. + String configTemplate = FileUtils.readFileToString(getConfigTemplateFile(), "UTF-8"); + String localTemp = curLocalTempDir.getPath(); + String testScratchSpace = "\n "+ createXMLElement(DMLConfig.SCRATCH_SPACE, localTemp + "/target/scratch_space") + "\n "; + String testTempSpace = createXMLElement(DMLConfig.LOCAL_TMP_DIR, localTemp + "/localtmp"); + String configContents = configTemplate + // if the config had a tmp location remove it + .replace(createXMLElement(DMLConfig.SCRATCH_SPACE, "scratch_space"),"") + .replace(createXMLElement(DMLConfig.LOCAL_TMP_DIR, "/tmp/systemds"),"") + .replace("", testScratchSpace + testTempSpace + "\n"); + LOG.error(configContents); FileUtils.write(getCurConfigFile(), configContents, "UTF-8"); if(LOG.isDebugEnabled()) diff --git a/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml b/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml index b3d47122ad1..df1eecaaefa 100644 --- a/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml +++ b/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml @@ -18,15 +18,11 @@ --> - /tmp/systemds - scratch_space 7 true true 1 - - - 16 - + 2 auto + 128 diff --git a/src/test/scripts/functions/federated/io/SSLConfig.xml b/src/test/scripts/functions/federated/io/SSLConfig.xml index f375ba33fed..8205f0b4a5f 100644 --- a/src/test/scripts/functions/federated/io/SSLConfig.xml +++ b/src/test/scripts/functions/federated/io/SSLConfig.xml @@ -16,6 +16,9 @@ * specific language governing permissions and limitations * under the License. --> + + 2 true - \ No newline at end of file + 128 +