Skip to content

Commit dd20dc4

Browse files
authored
Merge pull request #3909 from learningequality/hotfixes
Patch release v2023.01.17
2 parents e150a15 + 3d89859 commit dd20dc4

File tree

10 files changed

+248
-89
lines changed

10 files changed

+248
-89
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""
2+
Constants for locking behaviors, like advisory locking in Postgres, and mutexes
3+
"""
4+
TREE_LOCK = 1001
5+
TASK_LOCK = 1002

contentcuration/contentcuration/db/advisory_lock.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,36 @@
66

77
logging = logger.getLogger(__name__)
88

9+
# signed limits are 2**32 or 2**64, so one less power of 2
10+
# to become unsigned limits (half above 0, half below 0)
11+
INT_32BIT = 2**31
12+
INT_64BIT = 2**63
13+
914

1015
class AdvisoryLockBusy(RuntimeError):
1116
pass
1217

1318

19+
def _prepare_keys(keys):
20+
"""
21+
Ensures that integers do not exceed postgres constraints:
22+
- signed 64bit allowed with single key
23+
- signed 32bit allowed with two keys
24+
:param keys: A list of unsigned integers
25+
:return: A list of signed integers
26+
"""
27+
limit = INT_64BIT if len(keys) == 1 else INT_32BIT
28+
new_keys = []
29+
for key in keys:
30+
# if key is over the limit, convert to negative int since key should be unsigned int
31+
if key >= limit:
32+
key = limit - key
33+
if key < -limit or key >= limit:
34+
raise OverflowError(f"Advisory lock key '{key}' is too large")
35+
new_keys.append(key)
36+
return new_keys
37+
38+
1439
@contextmanager
1540
def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wait=True):
1641
"""
@@ -32,6 +57,7 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
3257
keys = [key1]
3358
if key2 is not None:
3459
keys.append(key2)
60+
keys = _prepare_keys(keys)
3561

3662
query = "SELECT pg{_try}_advisory_{xact_}{lock}{_shared}({keys}) AS lock;".format(
3763
_try="" if wait else "_try",
@@ -41,11 +67,11 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
4167
keys=", ".join(["%s" for i in range(0, 2 if key2 is not None else 1)])
4268
)
4369

44-
log_query = "'{}' with params {}".format(query, keys)
45-
logging.debug("Acquiring advisory lock: {}".format(query, log_query))
70+
log_query = f"'{query}' with params {keys}"
71+
logging.debug(f"Acquiring advisory lock: {log_query}")
4672
with connection.cursor() as c:
4773
c.execute(query, keys)
48-
logging.debug("Acquired advisory lock: {}".format(query, log_query))
74+
logging.debug(f"Acquired advisory lock: {log_query}")
4975
yield c
5076

5177

contentcuration/contentcuration/db/models/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from mptt.managers import TreeManager
1313
from mptt.signals import node_moved
1414

15+
from contentcuration.constants.locking import TREE_LOCK
1516
from contentcuration.db.advisory_lock import advisory_lock
1617
from contentcuration.db.models.query import CustomTreeQuerySet
1718
from contentcuration.utils.cache import ResourceSizeCache
@@ -32,7 +33,6 @@
3233
# The exact optimum batch size is probably highly dependent on tree
3334
# topology also, so these rudimentary tests are likely insufficient
3435
BATCH_SIZE = 100
35-
TREE_LOCK = 1001
3636

3737

3838
class CustomManager(Manager.from_queryset(CTEQuerySet)):

contentcuration/contentcuration/frontend/shared/data/index.js

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import Dexie from 'dexie';
2+
import * as Sentry from '@sentry/vue';
23
import mapValues from 'lodash/mapValues';
34
import channel from './broadcastChannel';
45
import { CHANGES_TABLE, IGNORED_SOURCE, TABLE_NAMES } from './constants';
@@ -47,11 +48,25 @@ function runElection() {
4748
elector.awaitLeadership().then(startSyncing);
4849
elector.onduplicate = () => {
4950
stopSyncing();
50-
elector.die().then(runElection);
51+
elector
52+
.die()
53+
.then(() => {
54+
// manually reset reference to dead elector on the channel
55+
// which is set within `createLeaderElection` and whose
56+
// presence is also validated against, requiring its removal
57+
channel._leaderElector = null;
58+
return runElection();
59+
})
60+
.catch(Sentry.captureException);
5161
};
5262
}
5363

54-
export function initializeDB() {
55-
setupSchema();
56-
return db.open().then(runElection);
64+
export async function initializeDB() {
65+
try {
66+
setupSchema();
67+
await db.open();
68+
await runElection();
69+
} catch (e) {
70+
Sentry.captureException(e);
71+
}
5772
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Generated by Django 3.2.14 on 2022-12-09 16:09
2+
from django.db import migrations
3+
from django.db import models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
replaces = [('django_celery_results', '0140_delete_task'),]
9+
10+
def __init__(self, name, app_label):
11+
super(Migration, self).__init__(name, 'django_celery_results')
12+
13+
dependencies = [
14+
('contentcuration', '0140_delete_task'),
15+
('django_celery_results', '0011_taskresult_periodic_task_name'),
16+
]
17+
18+
operations = [
19+
migrations.AddField(
20+
model_name='taskresult',
21+
name='signature',
22+
field=models.CharField(max_length=32, null=True),
23+
),
24+
migrations.AddIndex(
25+
model_name='taskresult',
26+
index=models.Index(condition=models.Q(('status__in', frozenset(['STARTED', 'REJECTED', 'RETRY', 'RECEIVED', 'PENDING']))), fields=['signature'], name='task_result_signature_idx'),
27+
),
28+
]

contentcuration/contentcuration/models.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from datetime import datetime
88

99
import pytz
10+
from celery import states as celery_states
1011
from django.conf import settings
1112
from django.contrib.auth.base_user import AbstractBaseUser
1213
from django.contrib.auth.base_user import BaseUserManager
@@ -74,6 +75,7 @@
7475
from contentcuration.db.models.manager import CustomManager
7576
from contentcuration.statistics import record_channel_stats
7677
from contentcuration.utils.cache import delete_public_channel_cache_keys
78+
from contentcuration.utils.celery.tasks import generate_task_signature
7779
from contentcuration.utils.parser import load_json_string
7880
from contentcuration.viewsets.sync.constants import ALL_CHANGES
7981
from contentcuration.viewsets.sync.constants import ALL_TABLES
@@ -2436,13 +2438,20 @@ def serialize_to_change_dict(self):
24362438
class TaskResultCustom(object):
24372439
"""
24382440
Custom fields to add to django_celery_results's TaskResult model
2441+
2442+
If adding fields to this class, run `makemigrations` then move the generated migration from the
2443+
`django_celery_results` app to the `contentcuration` app and override the constructor to change
2444+
the app_label. See `0141_add_task_signature` for an example
24392445
"""
24402446
# user shouldn't be null, but in order to append the field, this needs to be allowed
24412447
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True)
24422448
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)
24432449
progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)])
2450+
# a hash of the task name and kwargs for identifying repeat tasks
2451+
signature = models.CharField(null=True, blank=False, max_length=32)
24442452

24452453
super_as_dict = TaskResult.as_dict
2454+
super_save = TaskResult.save
24462455

24472456
def as_dict(self):
24482457
"""
@@ -2456,16 +2465,45 @@ def as_dict(self):
24562465
)
24572466
return super_dict
24582467

2468+
def set_signature(self):
2469+
"""
2470+
Generates and sets the signature for the task if it isn't set
2471+
"""
2472+
if self.signature is not None:
2473+
# nothing to do
2474+
return
2475+
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)
2476+
2477+
def save(self, *args, **kwargs):
2478+
"""
2479+
Override save to ensure signature is generated
2480+
"""
2481+
self.set_signature()
2482+
return self.super_save(*args, **kwargs)
2483+
24592484
@classmethod
24602485
def contribute_to_class(cls, model_class=TaskResult):
24612486
"""
24622487
Adds fields to model, by default TaskResult
24632488
:param model_class: TaskResult model
24642489
"""
24652490
for field in dir(cls):
2466-
if not field.startswith("_"):
2491+
if not field.startswith("_") and field not in ('contribute_to_class', 'Meta'):
24672492
model_class.add_to_class(field, getattr(cls, field))
24682493

2494+
# manually add Meta afterwards
2495+
setattr(model_class._meta, 'indexes', getattr(model_class._meta, 'indexes', []) + cls.Meta.indexes)
2496+
2497+
class Meta:
2498+
indexes = [
2499+
# add index that matches query usage for signature
2500+
models.Index(
2501+
fields=['signature'],
2502+
name='task_result_signature_idx',
2503+
condition=Q(status__in=celery_states.UNREADY_STATES),
2504+
),
2505+
]
2506+
24692507

24702508
# trigger class contributions immediately
24712509
TaskResultCustom.contribute_to_class()

contentcuration/contentcuration/tests/test_asynctask.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
234234
self.assertEqual(expected_task.task_id, async_result.task_id)
235235

236236
def test_requeue_task(self):
237-
existing_task_ids = requeue_test_task.find_ids()
237+
signature = requeue_test_task._generate_signature({})
238+
existing_task_ids = requeue_test_task.find_ids(signature)
238239
self.assertEqual(len(existing_task_ids), 0)
239240

240241
first_async_result = requeue_test_task.enqueue(self.user, requeue=True)

0 commit comments

Comments
 (0)