From b5f11132fc948de27fa39eb7ad6b9627f1db08de Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Tue, 21 Apr 2020 13:59:03 -0500 Subject: [PATCH 1/2] Use correct storage client for inputs. Fixes #1662 --- src/common/plugin/LocalExecutor.js | 16 ++++++++++------ src/plugins/ExecuteJob/ExecuteJob.js | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/common/plugin/LocalExecutor.js b/src/common/plugin/LocalExecutor.js index 10cad897e..1e5d9e5e1 100644 --- a/src/common/plugin/LocalExecutor.js +++ b/src/common/plugin/LocalExecutor.js @@ -77,17 +77,21 @@ define([ const jobLogger = new JobLogger(this.core, this.core.getParent(node)); jobLogger.log('About to save output artifacts.'); const saveDir = `${this.projectId}/artifacts/`; - const storage = await this.getStorageClient(); - jobLogger.append(`Saving output data to ${storage.name}...`); + const dstStorage = await this.getStorageClient(); + jobLogger.append(`Saving output data to ${dstStorage.name}...`); const createParams = {base: this.META.Data, parent: artifactsDir}; for (let i = dataNodes.length; i--;) { const artifact = this.core.createNode(createParams); - const name = this.core.getOwnAttribute(node, 'saveName') || - this.core.getAttribute(dataNodes[i], 'name'); const createdAt = Date.now(); const originalData = JSON.parse(this.core.getAttribute(dataNodes[i], 'data')); - const userAsset = await storage.copy(originalData, saveDir + name); + + const name = this.core.getOwnAttribute(node, 'saveName') || + this.core.getAttribute(dataNodes[i], 'name'); + + const srcStorage = await this.getStorageClientForInputData(originalData); + const content = await srcStorage.getFile(originalData); + const userAsset = await dstStorage.putFile(saveDir + name, content); this.core.setAttribute(artifact, 'data', JSON.stringify(userAsset)); this.core.setAttribute(artifact, 'name', name); @@ -96,7 +100,7 @@ define([ } this.logger.info(`Saved ${dataNodes.length} artifacts in ${this.projectId}.`); - jobLogger.append(`Saved output data to ${storage.name}`); + jobLogger.append(`Saved output data to ${dstStorage.name}`); }; // Helper methods diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index 52cf2d4e0..78a8e54b8 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -209,6 +209,24 @@ define([ return await backend.getClient(this.logger, storage.config); }; + ExecuteJob.prototype.getInputStorageConfigs = async function () { + const inputs = Object.entries(this.getCurrentConfig().inputs || {}); + const [nodeIds=[], configs=[]] = _.unzip(inputs); + + const nodes = await Promise.all(nodeIds.map(id => this.core.loadByPath(this.rootNode, id))); + const dataInfos = nodes.map(node => this.core.getAttribute(node, 'data')); + + const config = _.object(_.zip(dataInfos, configs)); + return config; + }; + + ExecuteJob.prototype.getStorageClientForInputData = async function (dataInfo) { + const configDict = await this.getInputStorageConfigs(); + const config = configDict[JSON.stringify(dataInfo)]; + const client = await Storage.getClient(dataInfo.backend, null, config); + return client; + }; + ExecuteJob.prototype.getJobId = function (node) { return this.getJobInfo(node).hash; }; From 15a91edd1d227a7d24da2530d86832df090f6213 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 22 Apr 2020 11:33:23 -0500 Subject: [PATCH 2/2] Use correct storage for non-Input node data --- src/common/plugin/LocalExecutor.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/common/plugin/LocalExecutor.js b/src/common/plugin/LocalExecutor.js index 1e5d9e5e1..f65a920c4 100644 --- a/src/common/plugin/LocalExecutor.js +++ b/src/common/plugin/LocalExecutor.js @@ -89,7 +89,9 @@ define([ const name = this.core.getOwnAttribute(node, 'saveName') || this.core.getAttribute(dataNodes[i], 'name'); - const srcStorage = await this.getStorageClientForInputData(originalData); + const srcStorage = this.isPipelineInput(dataNodes[i]) ? + await this.getStorageClientForInputData(originalData) + : dstStorage; const content = await srcStorage.getFile(originalData); const userAsset = await dstStorage.putFile(saveDir + name, content); @@ -103,6 +105,10 @@ define([ jobLogger.append(`Saved output data to ${dstStorage.name}`); }; + LocalExecutor.prototype.isPipelineInput = function(node) { + return this.isMetaTypeOf(node, this.META.Input); + }; + // Helper methods LocalExecutor.prototype.getLocalOperationType = function(node) { for (let i = LocalExecutor.OPERATIONS.length; i--;) {