diff --git a/src/taskSchema.js b/src/taskSchema.js index 18e7db7..ac81b87 100644 --- a/src/taskSchema.js +++ b/src/taskSchema.js @@ -175,7 +175,9 @@ taskSchema.statics.expireTimedOutTasks = async function expireTimedOutTasks(opti const task = await Task.findOneAndUpdate( { status: 'in_progress', - timeoutAt: { $exists: true, $lte: now } + // If task is still in_progress 10 minutes after it was supposed + // to time out, assume the task is stuck and mark it as timed_out + timeoutAt: { $exists: true, $lte: now.valueOf() - 10 * 60 * 1000 } }, { $set: { @@ -219,31 +221,35 @@ taskSchema.statics.registerHandler = async function registerHandler(name, fn) { }; async function _handleRepeatingTask(Task, task) { + let scheduledAt; if (task.nextScheduledAt != null) { - const scheduledAt = new Date(task.nextScheduledAt); - return Task.create({ - name: task.name, - scheduledAt, - repeatAfterMS: task.repeatAfterMS, - params: task.params, - previousTaskId: task._id, - originalTaskId: task.originalTaskId || task._id, - timeoutMS: task.timeoutMS, - schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 - }); + scheduledAt = new Date(task.nextScheduledAt); } else if (task.repeatAfterMS != null) { - const scheduledAt = new Date(task.scheduledAt.valueOf() + task.repeatAfterMS); - return Task.create({ + scheduledAt = new Date(task.scheduledAt.valueOf() + task.repeatAfterMS); + } else { + return; + } + + return Task.updateOne( + { name: task.name, scheduledAt, - repeatAfterMS: task.repeatAfterMS, - params: task.params, - previousTaskId: task._id, - originalTaskId: task.originalTaskId || task._id, - timeoutMS: task.timeoutMS, - schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 - }); - } + previousTaskId: task._id + }, + { + $setOnInsert: { + name: task.name, + scheduledAt, + repeatAfterMS: task.repeatAfterMS, + params: task.params, + previousTaskId: task._id, + originalTaskId: task.originalTaskId || task._id, + timeoutMS: task.timeoutMS, + schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000 + } + }, + { upsert: true } + ); } taskSchema.statics.registerHandlers = async function registerHandlers(obj, prefix) { @@ -337,7 +343,10 @@ taskSchema.statics.execute = async function(task, options = {}) { this._handlers.get(task.name).call(task, task.params, task) ), new Promise((_, reject) => { - timeoutId = setTimeout(() => reject(new Error(`Task timed out after ${task.timeoutMS} ms`)), task.timeoutMS); + timeoutId = setTimeout( + () => reject(new TimeoutError(`Task timed out after ${task.timeoutMS} ms`)), + task.timeoutMS + ); }) ]).finally(() => clearTimeout(timeoutId)); } else { @@ -350,7 +359,7 @@ taskSchema.statics.execute = async function(task, options = {}) { task.result = result; await task.save(); } catch (error) { - task.status = 'failed'; + task.status = error instanceof TimeoutError ? 'timed_out' : 'failed'; task.error.message = error.message; task.error.stack = error.stack; task.finishedRunningAt = currentTime(); @@ -379,4 +388,11 @@ taskSchema.statics.schedule = async function schedule(name, scheduledAt, params, }); }; +class TimeoutError extends Error { + constructor(message) { + super(message); + this.name = 'TimeoutError'; + } +} + module.exports = taskSchema; diff --git a/test/task.test.js b/test/task.test.js index 4585793..d563bce 100644 --- a/test/task.test.js +++ b/test/task.test.js @@ -299,7 +299,7 @@ describe('Task', function() { task = await Task.findById(task._id); assert.ok(task); - assert.equal(task.status, 'failed'); + assert.equal(task.status, 'timed_out'); assert.equal(task.error.message, 'Task timed out after 50 ms'); assert.equal(task.finishedRunningAt.valueOf(), now.valueOf()); clearTimeout(handlerTimeout); @@ -329,8 +329,8 @@ describe('Task', function() { // Now simulate time after timeoutAt sinon.restore(); sinon.stub(time, 'now').callsFake(() => - // now after the timeoutAt - new Date(timeoutAt.valueOf() + 1000) + // now after the timeoutAt plus the expiry buffer + new Date(timeoutAt.valueOf() + 10 * 60 * 1000 + 1000) ); // Directly call expireTimedOutTasks instead of polling @@ -361,7 +361,7 @@ describe('Task', function() { // We should advance the fake clock further to catch this one too sinon.restore(); sinon.stub(time, 'now').callsFake(() => - new Date(timeoutAt.valueOf() + 2000) + new Date(timeoutAt.valueOf() + 10 * 60 * 1000 + 2000) ); await Task.expireTimedOutTasks(); @@ -377,14 +377,82 @@ describe('Task', function() { assert.ok(repeated.scheduledAt.valueOf() === repeatTaskObj.scheduledAt.valueOf() + 60000); }); + it('does not duplicate a repeating task when timeout expiry races with polling', async function() { + const scheduledAt = time.now(); + const startedRunningAt = new Date(scheduledAt.valueOf() - 20 * 60 * 1000); + const timeoutAt = new Date(scheduledAt.valueOf() - 10 * 60 * 1000 - 1000); + const repeatAfterMS = 60000; + const nextScheduledAt = new Date(scheduledAt.valueOf() + repeatAfterMS); + + const timedOutTask = await Task.create({ + name: 'dedupeTimedOutRepeat', + scheduledAt, + startedRunningAt, + timeoutAt, + status: 'in_progress', + repeatAfterMS, + params: { foo: 'bar' } + }); + + await Task.create({ + name: 'dedupeTimedOutRepeat', + scheduledAt: nextScheduledAt, + repeatAfterMS, + params: { foo: 'bar' }, + previousTaskId: timedOutTask._id, + originalTaskId: timedOutTask._id, + status: 'pending', + schedulingTimeoutAt: nextScheduledAt.valueOf() + 10 * 60 * 1000 + }); + + await Task.expireTimedOutTasks(); + + const repeatedTasks = await Task.find({ + name: 'dedupeTimedOutRepeat', + scheduledAt: nextScheduledAt, + previousTaskId: timedOutTask._id + }); + assert.equal(repeatedTasks.length, 1); + + const expiredTask = await Task.findById(timedOutTask._id); + assert.equal(expiredTask.status, 'timed_out'); + }); + + it('does not duplicate a repeating task when scheduling timeout is handled twice', async function() { + Task.registerHandler('dedupeSchedulingRepeat', async () => { + return 'should not be run'; + }); + + const scheduledAt = time.now(); + const nextScheduledAt = new Date(scheduledAt.valueOf() + 100000); + const task = await Task.create({ + name: 'dedupeSchedulingRepeat', + scheduledAt, + schedulingTimeoutAt: new Date(scheduledAt.valueOf() - 1000), + status: 'pending', + params: { foo: 'bar' }, + nextScheduledAt + }); + + await Task.execute(task); + await Task.execute(task); + + const repeatedTasks = await Task.find({ + name: 'dedupeSchedulingRepeat', + scheduledAt: nextScheduledAt, + previousTaskId: task._id + }); + assert.equal(repeatedTasks.length, 1); + }); + it('creates a retry task when a timed out task has retryOnTimeoutCount', async function() { Task.registerHandler('timeoutRetry', async () => { // handler intentionally does nothing (we'll simulate a timeout) }); - const scheduledAt = new Date(now.valueOf() - 5000); - const startedRunningAt = new Date(now.valueOf() - 20000); - const timeoutAt = new Date(now.valueOf() - 1000); + const scheduledAt = new Date(now.valueOf() - 11 * 60 * 1000); + const startedRunningAt = new Date(now.valueOf() - 20 * 60 * 1000); + const timeoutAt = new Date(now.valueOf() - 10 * 60 * 1000 - 1000); let timedOutTask = await Task.create({ name: 'timeoutRetry',