From fbdca4f385e0a36b07e3392a01091bd45051728c Mon Sep 17 00:00:00 2001 From: baunsgaard Date: Sat, 15 Jan 2022 18:12:13 +0100 Subject: [PATCH] [SYSTEMDS-3273] Federated Timeout This commit adds two things to the federated handler. 1. A timeout (default unlimited) that is enabled for tests to ensure that the tests finish instead of having a controller waiting infinitely on a crashed worker. 2. Log output of errors on the workers to enable debugging if you have access to the worker process output. This ensures that if there is sensitive information in the error message it is contained to the worker but still logged. 3. config file overwrite scratch_space if non existing in config file for tests. This is a bug where if the config file used by the test does not contain the scratch_space location it will use default, making it a race condition between parallel tests to create and delete the same scratch_space locations. --- .../sysds/conf/ConfigurationManager.java | 4 +++ .../java/org/apache/sysds/conf/DMLConfig.java | 4 ++- .../federated/FederatedData.java | 7 +++-- .../federated/FederatedWorkerHandler.java | 6 +++- .../paramserv/FederatedPSControlThread.java | 2 +- .../config/SystemDS-MultiTenant-config.xml | 2 ++ src/test/config/SystemDS-config.xml | 29 ++----------------- .../apache/sysds/test/AutomatedTestBase.java | 25 ++++++++-------- .../codegen/SystemDS-config-codegen.xml | 8 ++--- .../functions/federated/io/SSLConfig.xml | 5 +++- 10 files changed, 42 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index b1fa997f367..c51d4e17ce3 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -205,6 +205,10 @@ public static boolean isCompressionEnabled(){ return compress.isEnabled(); } + public static int getFederatedTimeout(){ + return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT); + } + /////////////////////////////////////// // Thread-local classes diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index c020fdde8da..b2d521e1c3e 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -108,6 +108,7 @@ public class DMLConfig public static final String USE_SSL_FEDERATED_COMMUNICATION = "sysds.federated.ssl"; // boolean public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT = "sysds.federated.initialization.timeout"; // int seconds + public static final String FEDERATED_TIMEOUT = "sysds.federated.timeout"; // single request timeout default -1 to indicate infinite. public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2; @@ -168,6 +169,7 @@ public class DMLConfig _defaultVals.put(FLOATING_POINT_PRECISION, "double" ); _defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false"); _defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10"); + _defaultVals.put(FEDERATED_TIMEOUT, "-1"); } public DMLConfig() { @@ -417,7 +419,7 @@ public String getConfigInfo() { STATS_MAX_WRAP_LEN, LINEAGECACHESPILL, COMPILERASSISTED_RW, PRINT_GPU_MEMORY_INFO, AVAILABLE_GPUS, SYNCHRONIZE_GPU, EAGER_CUDA_FREE, FLOATING_POINT_PRECISION, GPU_EVICTION_POLICY, LOCAL_SPARK_NUM_THREADS, EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR, GPU_MEMORY_UTILIZATION_FACTOR, - USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT + USE_SSL_FEDERATED_COMMUNICATION, DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, FEDERATED_TIMEOUT }; StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java index a3c165007e4..6f063fbb668 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java @@ -35,6 +35,7 @@ import org.apache.sysds.conf.DMLConfig; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType; +import org.apache.sysds.runtime.meta.MetaData; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; @@ -52,8 +53,8 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.concurrent.Promise; -import org.apache.sysds.runtime.meta.MetaData; public class FederatedData { private static final Log LOG = LogFactory.getLog(FederatedData.class.getName()); @@ -179,13 +180,15 @@ protected void initChannel(SocketChannel ch) throws Exception { cp.addLast(SslConstructor().context .newHandler(ch.alloc(), address.getAddress().getHostAddress(), address.getPort())); } + final int timeout = ConfigurationManager.getFederatedTimeout(); + if(timeout > -1) + cp.addLast("timeout",new ReadTimeoutHandler(timeout)); cp.addLast("ObjectDecoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader()))); cp.addLast("FederatedOperationHandler", handler); cp.addLast("ObjectEncoder", new ObjectEncoder()); - } }); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java index 282d6be87db..7461765f0ac 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java @@ -275,7 +275,9 @@ private FederatedResponse readData(String filename, Types.DataType dataType, throw ex; } catch(Exception ex) { - throw new DMLRuntimeException(ex); + String msg = "Exception of type " + ex.getClass() + " thrown when processing READ request"; + LOG.error(msg, ex); + throw new DMLRuntimeException(msg); } finally { IOUtilFunctions.closeSilently(fs); @@ -425,6 +427,7 @@ private FederatedResponse execUDF(FederatedRequest request, ExecutionContextMap catch(Exception ex) { // Note it is unsafe to throw the ex trace along with the exception here. String msg = "Exception of type " + ex.getClass() + " thrown when processing EXEC_UDF request"; + LOG.error(msg, ex); throw new FederatedWorkerHandlerException(msg); } } @@ -438,6 +441,7 @@ private FederatedResponse execClear(ExecutionContextMap ecm) { } catch(Exception ex) { String msg = "Exception of type " + ex.getClass() + " thrown when processing CLEAR request"; + LOG.error(msg, ex); throw new FederatedWorkerHandlerException(msg); } return new FederatedResponse(ResponseType.SUCCESS_EMPTY); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java index ea8f0e8661c..30e90beee91 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java @@ -450,7 +450,7 @@ protected ListObject computeGradientsForNBatches(ListObject model, catch(Exception e) { if(DMLScript.STATISTICS) tFedCommunication.stop(); - throw new DMLRuntimeException("FederatedLocalPSThread: failed to execute UDF" + e.getMessage()); + throw new DMLRuntimeException("FederatedLocalPSThread: failed to execute UDF" + e.getMessage(), e); } } diff --git a/src/test/config/SystemDS-MultiTenant-config.xml b/src/test/config/SystemDS-MultiTenant-config.xml index 3e250ccaa2b..4e2ecdbd1ef 100644 --- a/src/test/config/SystemDS-MultiTenant-config.xml +++ b/src/test/config/SystemDS-MultiTenant-config.xml @@ -20,4 +20,6 @@ 30 + + 128 diff --git a/src/test/config/SystemDS-config.xml b/src/test/config/SystemDS-config.xml index 2519a38ceda..a6f5ba525f7 100644 --- a/src/test/config/SystemDS-config.xml +++ b/src/test/config/SystemDS-config.xml @@ -18,33 +18,10 @@ --> - - /tmp/systemds - - - scratch_space - - - 2 - - - 1000 - - - true - - - true - - - none - - - none - - 16 - + 2 2 + + 128 diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java index 0243f2ad511..d292e6b89f8 100644 --- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java @@ -1089,19 +1089,20 @@ protected void loadTestConfiguration(TestConfiguration config, String cacheDirec curLocalTempDir.mkdirs(); TestUtils.clearDirectory(curLocalTempDir.getPath()); - - // Create a SystemDS config file for this test case based on default template - // from src/test/config or derive from custom configuration provided by test. - String configTemplate = FileUtils.readFileToString(getConfigTemplateFile(), "UTF-8"); - String localTemp = curLocalTempDir.getPath(); - String configContents = configTemplate - .replace(createXMLElement(DMLConfig.SCRATCH_SPACE, "scratch_space"), - createXMLElement(DMLConfig.SCRATCH_SPACE, localTemp + "/target/scratch_space")) - .replace(createXMLElement(DMLConfig.LOCAL_TMP_DIR, "/tmp/systemds"), - createXMLElement(DMLConfig.LOCAL_TMP_DIR, localTemp + "/localtmp")); - + if(!disableConfigFile){ - + // Create a SystemDS config file for this test case based on default template + // from src/test/config or derive from custom configuration provided by test. + String configTemplate = FileUtils.readFileToString(getConfigTemplateFile(), "UTF-8"); + String localTemp = curLocalTempDir.getPath(); + String testScratchSpace = "\n "+ createXMLElement(DMLConfig.SCRATCH_SPACE, localTemp + "/target/scratch_space") + "\n "; + String testTempSpace = createXMLElement(DMLConfig.LOCAL_TMP_DIR, localTemp + "/localtmp"); + String configContents = configTemplate + // if the config had a tmp location remove it + .replace(createXMLElement(DMLConfig.SCRATCH_SPACE, "scratch_space"),"") + .replace(createXMLElement(DMLConfig.LOCAL_TMP_DIR, "/tmp/systemds"),"") + .replace("", testScratchSpace + testTempSpace + "\n"); + LOG.error(configContents); FileUtils.write(getCurConfigFile(), configContents, "UTF-8"); if(LOG.isDebugEnabled()) diff --git a/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml b/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml index b3d47122ad1..df1eecaaefa 100644 --- a/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml +++ b/src/test/scripts/functions/federated/codegen/SystemDS-config-codegen.xml @@ -18,15 +18,11 @@ --> - /tmp/systemds - scratch_space 7 true true 1 - - - 16 - + 2 auto + 128 diff --git a/src/test/scripts/functions/federated/io/SSLConfig.xml b/src/test/scripts/functions/federated/io/SSLConfig.xml index f375ba33fed..8205f0b4a5f 100644 --- a/src/test/scripts/functions/federated/io/SSLConfig.xml +++ b/src/test/scripts/functions/federated/io/SSLConfig.xml @@ -16,6 +16,9 @@ * specific language governing permissions and limitations * under the License. --> + + 2 true - \ No newline at end of file + 128 +