From 6e20e735d98a4184602ac26669769ded58ffd5b9 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 9 Jan 2019 18:32:54 -0800 Subject: [PATCH 1/8] WIP updated pipeline library to v0.10.1 --- src/seeds/pipeline/pipeline.webgmex | Bin 25823 -> 25854 bytes src/seeds/pipeline/version.txt | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/seeds/pipeline/pipeline.webgmex b/src/seeds/pipeline/pipeline.webgmex index 5d996c46b6f47f125f6aee28eb2ecd2182af947e..2e6599eafa9b0a3dabcb23fc585dee54988f0aa6 100644 GIT binary patch delta 633 zcma))%Sv295Qfj0Gddv=M??cILa;>`b(FrJ)9ykxf}*o=p@U#|S2bP&26Pkh6w)sc z$RgQj0{s-egqLk_kyVz#BZ6+vqM(ZUssH<-Zm#nGSNZs0y=YHmud*z=FBUh?#}|8y zI*&4xUTcI73Z1-ya+^O{uh=3$Titid*54A>*`E2C0?GQLZ!~ z>5~I{dVr^7^(#4XN9fP$c1Y%$nvs6AE6cq(Rtc273UB4_E%+wVoL)2|sZ z`C95e`9fADH|c&!wH6v;QW9@>W|ChysqLF3qj7e3I(6BPPV;!zl_ZVTkN4x&aIM`Z zhY-8l@6zw#QT{Z2*qDmnKDOigy-xhI(@kn`vopW{>F>WY$g;;8tmm!e;^m*7ckw5% J(aTzqWe;!ExLN=J delta 619 zcma)4yGmS96utLObR_XbP+`g(QD$^zJnx5Knpg;fI)xTGh@K~dDHODkd_)+|2Ur+L zB^Ia29|Zp)XvEfU5XVN_TWr{S?X~w_3-{Wov_p&B5eS-6Eru zQQUAyrS;SYE~yq?5hgK(S_zK>By4ch{m!g}N(bfx_Ch)utZ;&Qhl!I!5TOi2!Jmby@Zr-A-P#D(@fk){B1~8vQwBWZ4U2M61jI diff --git a/src/seeds/pipeline/version.txt b/src/seeds/pipeline/version.txt index 78bc1abd1..71172b43a 100644 --- a/src/seeds/pipeline/version.txt +++ b/src/seeds/pipeline/version.txt @@ -1 +1 @@ -0.10.0 +0.10.1 \ No newline at end of file From 0897d79590ad80149a47b9f950a519d89d017f7a Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 9 Jan 2019 18:51:13 -0800 Subject: [PATCH 2/8] WIP improved Graph node lookup --- src/plugins/ExecuteJob/ExecuteJob.Metadata.js | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js index fa68b2041..30f5fc473 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js +++ b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js @@ -16,15 +16,17 @@ define([ // I think I should convert these to just a single 'update graph' command ExecuteJob.prototype[CONSTANTS.PLOT_UPDATE] = function (job, state) { + console.log('1'); const jobId = this.core.getPath(job); // Check if the graph already exists // use the id to look up the graph - let graph = this.getExistingMetadataById(jobId, 'Graph', state.id); let id = jobId + '/' + state.id; + let graph = this.getExistingMetadataById(job, 'Graph', id); + console.log(`\n\n--> Checking for graph ${id}:`, graph); if (!graph) { graph = this.createNode('Graph', job); - this.setAttribute(graph, 'id', state.id); + this.setAttribute(graph, 'id', id); this.createIdToMetadataId[graph] = id; } @@ -179,9 +181,21 @@ define([ this.createMessage(null, msg); }; - ExecuteJob.prototype.getExistingMetadataById = function (jobId, type, id) { - return this._getExistingMetadata( - jobId, + ExecuteJob.prototype.getExistingMetadataById = function (job, type, id) { + const createId = Object.keys(this.createIdToMetadataId) + .find(createId => this.createIdToMetadataId[createId] === id); + + if (createId) { // on the queue to be created + return createId; + } + + if (this._metadata[id]) { // already created + return this._metadata[id]; + } + + console.log('7'); + return this._getExistingMetadata( // exists from prev run + this.core.getPath(job), type, node => this.getAttribute(node, 'id') === id ); From 4223a3b6a56269b92ea1ab528c902fcd5b20019c Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Wed, 9 Jan 2019 18:51:33 -0800 Subject: [PATCH 3/8] WIP improved plot line lookup --- src/plugins/ExecuteJob/ExecuteJob.Metadata.js | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js index 30f5fc473..1041b7893 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js +++ b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js @@ -41,16 +41,27 @@ define([ this.logger.info(`Updating graph named ${axes.title}`); // Delete current line nodes - this.plotLines[id] = this.plotLines[id] || []; - this.plotLines[id].forEach(line => this.deleteNode(line)); + if (this.plotLines[id]) { + this.plotLines[id].forEach(lineId => { + if (this._metadata[lineId]) { + this.deleteNode(this._metadata[lineId]); + } else { + const createId = Object.keys(this.createIdToMetadataId) + .find(createId => this.createIdToMetadataId[createId] === lineId); + + if (createId) { + this.deleteNode(createId); + } + } + }); + } + this.plotLines[id] = []; // Update the points for each of the lines axes.lines.forEach((line, index) => { let lineId = id + '/' + index; let node = this.createNode('Line', graph); - this.plotLines[id].push(node); - - this._metadata[lineId] = node; + this.plotLines[id].push(lineId); this.createIdToMetadataId[node] = lineId; this.setAttribute(node, 'name', line.label || `line ${index+1}`); From 9bc034b11ee59b7a31710ee58ee4bb192b853db0 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Thu, 10 Jan 2019 09:34:48 -0800 Subject: [PATCH 4/8] WIP use the node id to delete the line --- src/plugins/ExecuteJob/ExecuteJob.Metadata.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js index 1041b7893..5df89c2fa 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js +++ b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js @@ -44,7 +44,7 @@ define([ if (this.plotLines[id]) { this.plotLines[id].forEach(lineId => { if (this._metadata[lineId]) { - this.deleteNode(this._metadata[lineId]); + this.deleteNode(this.core.getPath(this._metadata[lineId])); } else { const createId = Object.keys(this.createIdToMetadataId) .find(createId => this.createIdToMetadataId[createId] === lineId); From e4cace6ea213373f186a711d967d865c76cd2fd1 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Thu, 10 Jan 2019 13:46:25 -0800 Subject: [PATCH 5/8] WIP fixed deleteNode errors (use nodeId) --- src/plugins/ExecuteJob/ExecuteJob.Metadata.js | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js index 5df89c2fa..fc25f2a56 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js +++ b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js @@ -15,7 +15,7 @@ define([ }; // I think I should convert these to just a single 'update graph' command - ExecuteJob.prototype[CONSTANTS.PLOT_UPDATE] = function (job, state) { + ExecuteJob.prototype[CONSTANTS.PLOT_UPDATE] = async function (job, state) { console.log('1'); const jobId = this.core.getPath(job); @@ -23,7 +23,6 @@ define([ // use the id to look up the graph let id = jobId + '/' + state.id; let graph = this.getExistingMetadataById(job, 'Graph', id); - console.log(`\n\n--> Checking for graph ${id}:`, graph); if (!graph) { graph = this.createNode('Graph', job); this.setAttribute(graph, 'id', id); @@ -41,15 +40,27 @@ define([ this.logger.info(`Updating graph named ${axes.title}`); // Delete current line nodes + if (!this.isCreateId(graph)) { + //const children = await this.core.loadChildren(graph); + const childIds = this.core.getChildrenPaths(graph); + //children.map(node => this.core.getPath(node)); + console.log('\t---- childIds', childIds, '(', this.getPath(graph), ')'); + childIds.forEach(id => this.deleteNode(id)); + } + if (this.plotLines[id]) { this.plotLines[id].forEach(lineId => { + console.log('>>> lineId is', lineId); if (this._metadata[lineId]) { - this.deleteNode(this.core.getPath(this._metadata[lineId])); + console.log('nodeId is', this.core.getPath(this._metadata[lineId])); + const nodeId = this.core.getPath(this._metadata[lineId]); + this.deleteNode(nodeId); } else { const createId = Object.keys(this.createIdToMetadataId) .find(createId => this.createIdToMetadataId[createId] === lineId); if (createId) { + console.log('createId is', createId); this.deleteNode(createId); } } From 287cfc51222d2d6f56b5785e69cd1def41886c83 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Thu, 10 Jan 2019 13:47:20 -0800 Subject: [PATCH 6/8] WIP improved logging for errors --- src/plugins/ExecuteJob/ExecuteJob.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index 7df57f2ea..af45cbba2 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -668,7 +668,7 @@ define([ this.onOperationFail(op, err); } }) - .catch(err => this.logger.error(`Could not get op info for ${opId}: ${err}`)); + .catch(err => this.logger.error(`Could not get op info for ${JSON.stringify(opId)}: ${err}`)); }; ExecuteJob.prototype.onDistOperationComplete = function (node, result) { From 3384599c0ff0501591956beabcd38e4c6c1912e7 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Sat, 12 Jan 2019 10:12:04 -0800 Subject: [PATCH 7/8] WIP removed extra debug logs --- src/plugins/ExecuteJob/ExecuteJob.Metadata.js | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js index fc25f2a56..f2ab7d2e2 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.Metadata.js +++ b/src/plugins/ExecuteJob/ExecuteJob.Metadata.js @@ -16,7 +16,6 @@ define([ // I think I should convert these to just a single 'update graph' command ExecuteJob.prototype[CONSTANTS.PLOT_UPDATE] = async function (job, state) { - console.log('1'); const jobId = this.core.getPath(job); // Check if the graph already exists @@ -43,16 +42,12 @@ define([ if (!this.isCreateId(graph)) { //const children = await this.core.loadChildren(graph); const childIds = this.core.getChildrenPaths(graph); - //children.map(node => this.core.getPath(node)); - console.log('\t---- childIds', childIds, '(', this.getPath(graph), ')'); childIds.forEach(id => this.deleteNode(id)); } if (this.plotLines[id]) { this.plotLines[id].forEach(lineId => { - console.log('>>> lineId is', lineId); if (this._metadata[lineId]) { - console.log('nodeId is', this.core.getPath(this._metadata[lineId])); const nodeId = this.core.getPath(this._metadata[lineId]); this.deleteNode(nodeId); } else { @@ -60,7 +55,6 @@ define([ .find(createId => this.createIdToMetadataId[createId] === lineId); if (createId) { - console.log('createId is', createId); this.deleteNode(createId); } } @@ -215,7 +209,6 @@ define([ return this._metadata[id]; } - console.log('7'); return this._getExistingMetadata( // exists from prev run this.core.getPath(job), type, From eaf6ecf15d3af2d2a9cc9c3607262bc31d09429c Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Sat, 12 Jan 2019 10:12:33 -0800 Subject: [PATCH 8/8] WIP add async/await to simplify async handling --- src/plugins/ExecuteJob/ExecuteJob.SafeSave.js | 13 ++- src/plugins/ExecuteJob/ExecuteJob.js | 102 ++++++++---------- 2 files changed, 50 insertions(+), 65 deletions(-) diff --git a/src/plugins/ExecuteJob/ExecuteJob.SafeSave.js b/src/plugins/ExecuteJob/ExecuteJob.SafeSave.js index 251822c36..e6155405c 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.SafeSave.js +++ b/src/plugins/ExecuteJob/ExecuteJob.SafeSave.js @@ -326,7 +326,7 @@ define([ }); }; - ExecuteJob.prototype.applyDeletions = function () { + ExecuteJob.prototype.applyDeletions = async function () { var deletions = this.deletions; // Remove any creation ids @@ -338,12 +338,11 @@ define([ } } - return Q.all(deletions.map(id => this.core.loadByPath(this.rootNode, id))) - .then(nodes => { - for (var i = nodes.length; i--;) { - this.core.deleteNode(nodes[i]); - } - }); + const nodes = await Q.all(deletions.map(id => this.core.loadByPath(this.rootNode, id))); + + for (var i = nodes.length; i--;) { + this.core.deleteNode(nodes[i]); + } }; // Override 'save' to notify the user on fork diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index af45cbba2..bd13e4c16 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -236,18 +236,16 @@ define([ this.outputLineCount[id] = count; return this.executor.getOutput(hash, 0, count); }) - .then(output => { // parse the stdout to update the job metadata + .then(async output => { // parse the stdout to update the job metadata var stdout = output.map(o => o.output).join(''), result = this.processStdout(job, stdout), - name = this.getAttribute(job, 'name'), - promise = Q(); - + name = this.getAttribute(job, 'name'); if (result.hasMetadata) { msg = `Updated graph/image output for ${name}`; - promise = this.save(msg); + await this.save(msg); } - return promise.then(() => this.getOperation(job)); + return this.getOperation(job); }) .then(opNode => this.watchOperation(hash, opNode, job)); }; @@ -322,15 +320,14 @@ define([ ExecuteJob.prototype.onOperationFail = ExecuteJob.prototype.onOperationComplete = - ExecuteJob.prototype.onComplete = function (opNode, err) { + ExecuteJob.prototype.onComplete = async function (opNode, err) { var job = this.core.getParent(opNode), exec = this.core.getParent(job), name = this.getAttribute(job, 'name'), jobId = this.core.getPath(job), status = err ? 'fail' : (this.canceled ? 'canceled' : 'success'), msg = err ? `${name} execution failed!` : - `${name} executed successfully!`, - promise = Q(); + `${name} executed successfully!`; this.setAttribute(job, 'status', status); this.logger.info(`Setting ${name} (${jobId}) status to ${status}`); @@ -349,40 +346,33 @@ define([ } else { // Check if all the other jobs are successful. If so, set the // execution status to 'success' - promise = this.core.loadChildren(exec) - .then(nodes => { - var execSuccess = true, - type, - typeName; - - for (var i = nodes.length; i--;) { - type = this.core.getMetaType(nodes[i]); - typeName = this.getAttribute(type, 'name'); - - if (typeName === 'Job' && - this.getAttribute(nodes[i], 'status') !== 'success') { - execSuccess = false; - } - } + const nodes = await this.core.loadChildren(exec); + let execSuccess = true; - if (execSuccess) { - this.setAttribute(exec, 'status', 'success'); - } - }); + for (var i = nodes.length; i--;) { + const type = this.core.getMetaType(nodes[i]); + const typeName = this.getAttribute(type, 'name'); + + if (typeName === 'Job' && + this.getAttribute(nodes[i], 'status') !== 'success') { + execSuccess = false; + } + } + + if (execSuccess) { + this.setAttribute(exec, 'status', 'success'); + } } this.createMessage(null, msg); - promise - .then(() => this.save(msg)) - .then(() => { - this.result.setSuccess(!err); - this.stopExecHeartBeat(); - this._callback(err, this.result); - }) - .catch(err => { - // Result success is false at invocation. - this._callback(err, this.result); - }); + try { + await this.save(msg); + this.result.setSuccess(!err); + this.stopExecHeartBeat(); + this._callback(err, this.result); + } catch (e) { + this._callback(e, this.result); + } }; ExecuteJob.prototype.getOperation = function (job) { @@ -558,7 +548,7 @@ define([ this.outputLineCount[jobId] = actualLine + 1; return prep .then(() => this.executor.getOutput(hash, currentLine, actualLine+1)) - .then(outputLines => { + .then(async outputLines => { var stdout = this.getAttribute(job, 'stdout'), output = outputLines.map(o => o.output).join(''), last = stdout.lastIndexOf('\n'), @@ -581,19 +571,17 @@ define([ var metadata = { lineCount: this.outputLineCount[jobId] }; - next = next - .then(() => this.logManager.appendTo(jobId, output, metadata)) - .then(() => this.notifyStdoutUpdate(jobId)); + await this.logManager.appendTo(jobId, output, metadata); + await this.notifyStdoutUpdate(jobId); } if (result.hasMetadata) { msg = `Updated graph/image output for ${name}`; - next = next.then(() => this.save(msg)); + await this.save(msg); } - return next; }); } }) - .then(() => { + .then(async () => { if (info.status === 'CREATED' || info.status === 'RUNNING') { var time = Date.now(), next = Q(); @@ -602,21 +590,19 @@ define([ this.getAttribute(job, 'status') !== 'running') { this.setAttribute(job, 'status', 'running'); - next = this.save(`Started "${name}" operation in ${this.pipelineName}`); + await this.save(`Started "${name}" operation in ${this.pipelineName}`); } - return next.then(() => { - var delta = Date.now() - time; - - if (delta > ExecuteJob.UPDATE_INTERVAL) { - return this.watchOperation(hash, op, job); - } + const delta = Date.now() - time; + + if (delta > ExecuteJob.UPDATE_INTERVAL) { + return this.watchOperation(hash, op, job); + } - setTimeout( - this.watchOperation.bind(this, hash, op, job), - ExecuteJob.UPDATE_INTERVAL - delta - ); - }); + return setTimeout( + this.watchOperation.bind(this, hash, op, job), + ExecuteJob.UPDATE_INTERVAL - delta + ); } // Record that the job hash is no longer running