Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions src/taskSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ taskSchema.statics.expireTimedOutTasks = async function expireTimedOutTasks(opti
const task = await Task.findOneAndUpdate(
{
status: 'in_progress',
timeoutAt: { $exists: true, $lte: now }
// If task is still in_progress 10 minutes after it was supposed
// to time out, assume the task is stuck and mark it as timed_out
timeoutAt: { $exists: true, $lte: now.valueOf() - 10 * 60 * 1000 }
},
Comment thread
vkarpov15 marked this conversation as resolved.
{
$set: {
Expand Down Expand Up @@ -219,31 +221,35 @@ taskSchema.statics.registerHandler = async function registerHandler(name, fn) {
};

async function _handleRepeatingTask(Task, task) {
let scheduledAt;
if (task.nextScheduledAt != null) {
const scheduledAt = new Date(task.nextScheduledAt);
return Task.create({
name: task.name,
scheduledAt,
repeatAfterMS: task.repeatAfterMS,
params: task.params,
previousTaskId: task._id,
originalTaskId: task.originalTaskId || task._id,
timeoutMS: task.timeoutMS,
schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000
});
scheduledAt = new Date(task.nextScheduledAt);
} else if (task.repeatAfterMS != null) {
const scheduledAt = new Date(task.scheduledAt.valueOf() + task.repeatAfterMS);
return Task.create({
scheduledAt = new Date(task.scheduledAt.valueOf() + task.repeatAfterMS);
} else {
return;
}

return Task.updateOne(
{
name: task.name,
scheduledAt,
repeatAfterMS: task.repeatAfterMS,
params: task.params,
previousTaskId: task._id,
originalTaskId: task.originalTaskId || task._id,
timeoutMS: task.timeoutMS,
schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000
});
}
previousTaskId: task._id
},
Comment on lines +234 to +238
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Add a unique key for repeat-task upserts

When two workers enter _handleRepeatingTask() concurrently for the same timed-out or scheduling-timed-out task, both updateOne(..., { upsert: true }) calls can evaluate this filter before either insert is visible and both insert a repeat task; the schema only declares a non-unique { status, scheduledAt } index, so MongoDB has no unique constraint to collapse that race. The new sequential tests pass, but the production race this change targets still remains unless these fields are backed by a unique index or another atomic guard.

Useful? React with 👍 / 👎.

{
$setOnInsert: {
name: task.name,
scheduledAt,
repeatAfterMS: task.repeatAfterMS,
params: task.params,
previousTaskId: task._id,
originalTaskId: task.originalTaskId || task._id,
timeoutMS: task.timeoutMS,
schedulingTimeoutAt: scheduledAt.valueOf() + 10 * 60 * 1000
}
},
{ upsert: true }
);
Comment on lines +233 to +252
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a reasonable concern, but highly unlikely given the buffer we give for hanging tasks. Will consider improving this for the future.

}

taskSchema.statics.registerHandlers = async function registerHandlers(obj, prefix) {
Expand Down Expand Up @@ -337,7 +343,10 @@ taskSchema.statics.execute = async function(task, options = {}) {
this._handlers.get(task.name).call(task, task.params, task)
),
new Promise((_, reject) => {
timeoutId = setTimeout(() => reject(new Error(`Task timed out after ${task.timeoutMS} ms`)), task.timeoutMS);
timeoutId = setTimeout(
() => reject(new TimeoutError(`Task timed out after ${task.timeoutMS} ms`)),
task.timeoutMS
);
})
]).finally(() => clearTimeout(timeoutId));
} else {
Expand All @@ -350,7 +359,7 @@ taskSchema.statics.execute = async function(task, options = {}) {
task.result = result;
await task.save();
} catch (error) {
task.status = 'failed';
task.status = error instanceof TimeoutError ? 'timed_out' : 'failed';
task.error.message = error.message;
task.error.stack = error.stack;
task.finishedRunningAt = currentTime();
Expand Down Expand Up @@ -379,4 +388,11 @@ taskSchema.statics.schedule = async function schedule(name, scheduledAt, params,
});
};

class TimeoutError extends Error {
constructor(message) {
super(message);
this.name = 'TimeoutError';
}
}

module.exports = taskSchema;
82 changes: 75 additions & 7 deletions test/task.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ describe('Task', function() {

task = await Task.findById(task._id);
assert.ok(task);
assert.equal(task.status, 'failed');
assert.equal(task.status, 'timed_out');
assert.equal(task.error.message, 'Task timed out after 50 ms');
assert.equal(task.finishedRunningAt.valueOf(), now.valueOf());
clearTimeout(handlerTimeout);
Expand Down Expand Up @@ -329,8 +329,8 @@ describe('Task', function() {
// Now simulate time after timeoutAt
sinon.restore();
sinon.stub(time, 'now').callsFake(() =>
// now after the timeoutAt
new Date(timeoutAt.valueOf() + 1000)
// now after the timeoutAt plus the expiry buffer
new Date(timeoutAt.valueOf() + 10 * 60 * 1000 + 1000)
);

// Directly call expireTimedOutTasks instead of polling
Expand Down Expand Up @@ -361,7 +361,7 @@ describe('Task', function() {
// We should advance the fake clock further to catch this one too
sinon.restore();
sinon.stub(time, 'now').callsFake(() =>
new Date(timeoutAt.valueOf() + 2000)
new Date(timeoutAt.valueOf() + 10 * 60 * 1000 + 2000)
);

await Task.expireTimedOutTasks();
Expand All @@ -377,14 +377,82 @@ describe('Task', function() {
assert.ok(repeated.scheduledAt.valueOf() === repeatTaskObj.scheduledAt.valueOf() + 60000);
});

it('does not duplicate a repeating task when timeout expiry races with polling', async function() {
const scheduledAt = time.now();
const startedRunningAt = new Date(scheduledAt.valueOf() - 20 * 60 * 1000);
const timeoutAt = new Date(scheduledAt.valueOf() - 10 * 60 * 1000 - 1000);
const repeatAfterMS = 60000;
const nextScheduledAt = new Date(scheduledAt.valueOf() + repeatAfterMS);

const timedOutTask = await Task.create({
name: 'dedupeTimedOutRepeat',
scheduledAt,
startedRunningAt,
timeoutAt,
status: 'in_progress',
repeatAfterMS,
params: { foo: 'bar' }
});

await Task.create({
name: 'dedupeTimedOutRepeat',
scheduledAt: nextScheduledAt,
repeatAfterMS,
params: { foo: 'bar' },
previousTaskId: timedOutTask._id,
originalTaskId: timedOutTask._id,
status: 'pending',
schedulingTimeoutAt: nextScheduledAt.valueOf() + 10 * 60 * 1000
});

await Task.expireTimedOutTasks();

const repeatedTasks = await Task.find({
name: 'dedupeTimedOutRepeat',
scheduledAt: nextScheduledAt,
previousTaskId: timedOutTask._id
});
assert.equal(repeatedTasks.length, 1);

const expiredTask = await Task.findById(timedOutTask._id);
assert.equal(expiredTask.status, 'timed_out');
});

it('does not duplicate a repeating task when scheduling timeout is handled twice', async function() {
Task.registerHandler('dedupeSchedulingRepeat', async () => {
return 'should not be run';
});

const scheduledAt = time.now();
const nextScheduledAt = new Date(scheduledAt.valueOf() + 100000);
const task = await Task.create({
name: 'dedupeSchedulingRepeat',
scheduledAt,
schedulingTimeoutAt: new Date(scheduledAt.valueOf() - 1000),
status: 'pending',
params: { foo: 'bar' },
nextScheduledAt
});

await Task.execute(task);
await Task.execute(task);

const repeatedTasks = await Task.find({
name: 'dedupeSchedulingRepeat',
scheduledAt: nextScheduledAt,
previousTaskId: task._id
});
assert.equal(repeatedTasks.length, 1);
});

it('creates a retry task when a timed out task has retryOnTimeoutCount', async function() {
Task.registerHandler('timeoutRetry', async () => {
// handler intentionally does nothing (we'll simulate a timeout)
});

const scheduledAt = new Date(now.valueOf() - 5000);
const startedRunningAt = new Date(now.valueOf() - 20000);
const timeoutAt = new Date(now.valueOf() - 1000);
const scheduledAt = new Date(now.valueOf() - 11 * 60 * 1000);
const startedRunningAt = new Date(now.valueOf() - 20 * 60 * 1000);
const timeoutAt = new Date(now.valueOf() - 10 * 60 * 1000 - 1000);

let timedOutTask = await Task.create({
name: 'timeoutRetry',
Expand Down
Loading