From 82fd479a289f286010320898b351c421c69185d1 Mon Sep 17 00:00:00 2001 From: Richard Tibbles Date: Tue, 3 Aug 2021 16:03:16 -0700 Subject: [PATCH 1/2] Remove vendored leaderElection that was broken. Upgrade broadcast channel to handle duplicates. --- .../frontend/shared/data/index.js | 12 +- .../frontend/shared/data/leaderElection.js | 306 ------------------ yarn.lock | 58 +++- 3 files changed, 50 insertions(+), 326 deletions(-) delete mode 100644 contentcuration/contentcuration/frontend/shared/data/leaderElection.js diff --git a/contentcuration/contentcuration/frontend/shared/data/index.js b/contentcuration/contentcuration/frontend/shared/data/index.js index 7935ed0cdf..512f8f8bde 100644 --- a/contentcuration/contentcuration/frontend/shared/data/index.js +++ b/contentcuration/contentcuration/frontend/shared/data/index.js @@ -1,6 +1,6 @@ import Dexie from 'dexie'; import mapValues from 'lodash/mapValues'; -import { createLeaderElection } from './leaderElection'; +import { createLeaderElection } from 'broadcast-channel'; import channel from './broadcastChannel'; import { CHANGE_LOCKS_TABLE, CHANGES_TABLE, IGNORED_SOURCE, TABLE_NAMES } from './constants'; import db from './db'; @@ -45,11 +45,11 @@ if (process.env.NODE_ENV !== 'production' && typeof window !== 'undefined') { function runElection() { const elector = createLeaderElection(channel); - elector.awaitLeadership({ - success: startSyncing, - cleanup: stopSyncing, - }); - return elector.waitForLeader(); + elector.awaitLeadership().then(startSyncing); + elector.onduplicate = () => { + stopSyncing(); + elector.die.then(runElection); + }; } export function initializeDB() { diff --git a/contentcuration/contentcuration/frontend/shared/data/leaderElection.js b/contentcuration/contentcuration/frontend/shared/data/leaderElection.js deleted file mode 100644 index 8551ea6800..0000000000 --- a/contentcuration/contentcuration/frontend/shared/data/leaderElection.js +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Vendored and modified from the excellent: - * https://github.com/pubkey/broadcast-channel/blob/master/src/leader-election.js - * So that we can add functionality: - * 1) The ability to report back when a leader has been elected - * 2) The ability for a tab to report itself has a dictator, which will depose - any previously elected leader and will only relinquish power when it dies. - */ - -import { add } from 'unload'; -import uuidv4 from 'uuid/v4'; -import isFunction from 'lodash/isFunction'; - -function sleep(time) { - if (!time) time = 0; - return new Promise(res => setTimeout(res, time)); -} - -const MESSAGES = { - APPLY: 'APPLY', - DEATH: 'DEATH', - TELL: 'TELL', -}; - -const LEADER_CONTEXT = 'LEADER_ELECTION_CONTEXT'; - -// This is defined in the broadcast channel source -// for the postInternal method and should not be changed. -const INTERNAL_CHANNEL = 'internal'; - -const LeaderElection = function(channel, options) { - this._channel = channel; - this._options = options; - - this.isLeader = false; - this.isDead = false; - this.token = uuidv4(); - - // Track whether any leader has been elected - this._leaderExists = false; - // A place to track a waiting for leader promise - this._waitingForLeaderPromise = null; - // A place to track a resolve callback for a waiting - // for leader promise - this._waitingForLeaderFn = null; - - this._isApl = false; // _isApplying - this._reApply = false; - - // things to clean up - this._unl = []; // _unloads - this._lstns = []; // _listeners - this._invs = []; // _intervals -}; - -LeaderElection.prototype = { - applyOnce() { - if (this.isLeader) return Promise.resolve(false); - if (this.isDead) return Promise.resolve(false); - - // do nothing if already running - if (this._isApl) { - this._reApply = true; - return Promise.resolve(false); - } - this._isApl = true; - - let stopCriteria = false; - - const isDictator = this._options.dictator; - - const handleMessage = msg => { - if (msg.context === LEADER_CONTEXT && msg.token != this.token) { - const submit = !isDictator && msg.dictator; - const ignore = isDictator && !msg.dictator; - if (!ignore) { - // Ignore any messages from other non-dictatorial leaders if - // this is a dictatorial context. - if (msg.action === MESSAGES.APPLY) { - // other is applying - if (submit || msg.token > this.token) { - // other has higher token, or is a dictator and we are not - // stop applying - stopCriteria = true; - } - } - - if (msg.action === MESSAGES.TELL) { - // other is already leader - stopCriteria = true; - } - } - } - }; - this._channel.addEventListener(INTERNAL_CHANNEL, handleMessage); - - const ret = _sendMessage(this, MESSAGES.APPLY) // send out that this one is applying - .then(() => sleep(this._options.responseTime)) // let others time to respond - .then(() => { - if (stopCriteria) return Promise.reject(new Error()); - else return _sendMessage(this, MESSAGES.APPLY); - }) - .then(() => sleep(this._options.responseTime)) // let others time to respond - .then(() => { - if (stopCriteria) return Promise.reject(new Error()); - else return _sendMessage(this); - }) - .then(() => _beLeader(this)) // no one disagreed -> this one is now leader - .then(() => true) - .catch(() => false) // apply not successfull - .then(success => { - this._channel.removeEventListener(INTERNAL_CHANNEL, handleMessage); - this._isApl = false; - if (!success && this._reApply) { - this._reApply = false; - return this.applyOnce(); - } else return success; - }); - return ret; - }, - - awaitLeadership({ success = null, cleanup = null } = {}) { - this.electedCallback = success; - this.deposedCallback = cleanup; - if ( - /* _awaitLeadershipPromise */ - !this._aLP - ) { - this._aLP = _awaitLeadershipOnce(this); - } - return this._aLP; - }, - - get leaderExists() { - return this._leaderExists; - }, - - set leaderExists(exists) { - if (this._waitingForLeaderFn && exists) { - this._waitingForLeaderFn(true); - this._waitingForLeaderFn = null; - } else if (this._leaderExists && !exists) { - this._waitingForLeaderPromise = null; - this._waitingForLeaderFn = null; - } - this._leaderExists = exists; - }, - - /* - * A function to wait until anything has been elected leader. - */ - waitForLeader() { - if (!this._waitingForLeaderPromise) { - this._waitingForLeaderPromise = new Promise(resolve => { - if (this._leaderExists) { - resolve(true); - } else { - this._waitingForLeaderFn = resolve; - } - }); - } - return this._waitingForLeaderPromise; - }, - - depose() { - this.isLeader = false; - if (isFunction(this.deposedCallback)) { - this.deposedCallback(); - } - this._lstns.forEach(listener => this._channel.removeEventListener(INTERNAL_CHANNEL, listener)); - this._invs.forEach(interval => clearInterval(interval)); - this._unl.forEach(uFn => { - uFn.remove(); - }); - }, - - die() { - if (this.isDead) return; - this.isDead = true; - this.depose(); - - return _sendMessage(this, MESSAGES.DEATH); - }, -}; - -function _awaitLeadershipOnce(leaderElector) { - if (leaderElector.isLeader) return Promise.resolve(); - - return new Promise(res => { - let resolved = false; - - const finish = () => { - // applyOnce has resolved, hence there is - // now a leader. - leaderElector.leaderExists = true; - if (resolved) return; - if (leaderElector.isLeader) { - resolved = true; - clearInterval(interval); - leaderElector._channel.removeEventListener(INTERNAL_CHANNEL, whenDeathListener); - res(true); - } - }; - - // try once now - leaderElector.applyOnce().then(finish); - - // try on fallbackInterval - const interval = setInterval(() => { - leaderElector.applyOnce().then(finish); - }, leaderElector._options.fallbackInterval); - leaderElector._invs.push(interval); - - // try when other leader dies - const whenDeathListener = msg => { - if (msg.context === LEADER_CONTEXT && msg.action === MESSAGES.DEATH) { - // Leader has died, so there is now no leader. - leaderElector.leaderExists = false; - leaderElector.applyOnce().then(finish); - } - }; - leaderElector._channel.addEventListener(INTERNAL_CHANNEL, whenDeathListener); - leaderElector._lstns.push(whenDeathListener); - }); -} - -/** - * sends an internal message over the broadcast-channel - */ -function _sendMessage(leaderElector, action) { - const msgJson = { - context: LEADER_CONTEXT, - action, - token: leaderElector.token, - dictator: leaderElector._options.dictator, - }; - return leaderElector._channel.postInternal(msgJson); -} - -function _beLeader(leaderElector) { - if (!leaderElector.isLeader) { - leaderElector.isLeader = true; - if (isFunction(leaderElector.electedCallback)) { - leaderElector.electedCallback(); - } - const unloadFn = add(() => leaderElector.die()); - leaderElector._unl.push(unloadFn); - - const isLeaderListener = msg => { - if (msg.context === LEADER_CONTEXT && msg.action === MESSAGES.APPLY) { - _sendMessage(leaderElector, MESSAGES.TELL); - } - }; - const isDictator = this._options.dictator; - if (!isDictator) { - const coupListener = msg => { - if ( - msg.context === LEADER_CONTEXT && - msg.action === MESSAGES.APPLY && - !isDictator && - msg.dictator - ) { - leaderElector.depose(); - } - }; - leaderElector._channel.addEventListener(INTERNAL_CHANNEL, coupListener); - leaderElector._lstns.push(coupListener); - } - leaderElector._channel.addEventListener(INTERNAL_CHANNEL, isLeaderListener); - leaderElector._lstns.push(isLeaderListener); - return _sendMessage(leaderElector, MESSAGES.TELL); - } - return Promise.resolve(); -} - -function fillOptionsWithDefaults(options, channel) { - if (!options) options = {}; - options = JSON.parse(JSON.stringify(options)); - - if (!options.fallbackInterval) { - options.fallbackInterval = 3000; - } - - if (!options.responseTime) { - options.responseTime = channel.method.averageResponseTime(channel.options); - } - - if (!options.dictator) { - options.dictator = false; - } - - return options; -} - -export function createLeaderElection(channel, options) { - if (channel._leaderElector) { - throw new Error('BroadcastChannel already has a leader-elector'); - } - - options = fillOptionsWithDefaults(options, channel); - const elector = new LeaderElection(channel, options); - channel._befC.push(() => elector.die()); - - channel._leaderElector = elector; - return elector; -} diff --git a/yarn.lock b/yarn.lock index 6bb4fc0852..2ba2d187f0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1543,7 +1543,7 @@ core-js-pure "^3.0.0" regenerator-runtime "^0.13.4" -"@babel/runtime@^7.0.0", "@babel/runtime@^7.1.2", "@babel/runtime@^7.3.1", "@babel/runtime@^7.5.0", "@babel/runtime@^7.5.5", "@babel/runtime@^7.6.2", "@babel/runtime@^7.7.2", "@babel/runtime@^7.8.4", "@babel/runtime@^7.9.2": +"@babel/runtime@^7.0.0", "@babel/runtime@^7.1.2", "@babel/runtime@^7.3.1", "@babel/runtime@^7.5.0", "@babel/runtime@^7.5.5", "@babel/runtime@^7.8.4", "@babel/runtime@^7.9.2": version "7.10.2" resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.10.2.tgz#d103f21f2602497d38348a32e008637d506db839" integrity sha512-6sF3uQw2ivImfVIl62RZ7MXhO2tap69WeWK57vAaimT6AZbE4FbqjdEJIN1UqoD6wI6B+1n9UiagafH1sxjOtg== @@ -1557,6 +1557,13 @@ dependencies: regenerator-runtime "^0.13.4" +"@babel/runtime@^7.6.2", "@babel/runtime@^7.7.2": + version "7.14.8" + resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.14.8.tgz#7119a56f421018852694290b9f9148097391b446" + integrity sha512-twj3L8Og5SaCRCErB4x4ajbvBIVV77CGeFglHpeg5WC5FF8TZzBWXtTJ4MqaD9QszLYTtr+IsaAL2rEUevb+eg== + dependencies: + regenerator-runtime "^0.13.4" + "@babel/template@^7.10.1", "@babel/template@^7.3.3": version "7.10.1" resolved "https://registry.yarnpkg.com/@babel/template/-/template-7.10.1.tgz#e167154a94cb5f14b28dc58f5356d2162f539811" @@ -5130,9 +5137,9 @@ balanced-match@^0.4.2: integrity sha1-yz8+PHMtwPAe5wtAPzAuYddwmDg= balanced-match@^1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-1.0.0.tgz#89b4d199ab2bee49de164ea02b89ce462d71b767" - integrity sha1-ibTRmasr7kneFk6gK4nORi1xt2c= + version "1.0.2" + resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-1.0.2.tgz#e83e3a7e3f300b34cb9d87f615fa0cbf357690ee" + integrity sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw== base64-arraybuffer-es6@0.6.0: version "0.6.0" @@ -5356,15 +5363,16 @@ braces@^3.0.1, braces@~3.0.2: fill-range "^7.0.1" broadcast-channel@^3.0.3: - version "3.1.0" - resolved "https://registry.yarnpkg.com/broadcast-channel/-/broadcast-channel-3.1.0.tgz#b4a6970fc72c4d68fc859321a6af850e66cb2dfa" - integrity sha512-zrjTunJRva1aFW9UlLtoMnB05tu8hbb7qbv3PxXXGnxp3t9VA/KcTIwcC0+u7oLBdlXSnv0yy7pB+UemLjANyQ== + version "3.7.0" + resolved "https://registry.yarnpkg.com/broadcast-channel/-/broadcast-channel-3.7.0.tgz#2dfa5c7b4289547ac3f6705f9c00af8723889937" + integrity sha512-cIAKJXAxGJceNZGTZSBzMxzyOn72cVgPnKx4dc6LRjQgbaJUQqhy5rzL3zbMxkMWsGKkv2hSFkPRMEXfoMZ2Mg== dependencies: "@babel/runtime" "^7.7.2" - detect-node "^2.0.4" + detect-node "^2.1.0" js-sha3 "0.8.0" microseconds "0.2.0" nano-time "1.0.0" + oblivious-set "1.0.0" rimraf "3.0.2" unload "2.2.0" @@ -7444,10 +7452,10 @@ detect-newline@^3.0.0: resolved "https://registry.yarnpkg.com/detect-newline/-/detect-newline-3.1.0.tgz#576f5dfc63ae1a192ff192d8ad3af6308991b651" integrity sha512-TLz+x/vEXm/Y7P7wn1EJFNLxYpUD4TgMosxY6fAVJUnJMbupHBOncxyWUG9OpTaH9EBD7uFI5LfEgmMOc54DsA== -detect-node@^2.0.4: - version "2.0.4" - resolved "https://registry.yarnpkg.com/detect-node/-/detect-node-2.0.4.tgz#014ee8f8f669c5c58023da64b8179c083a28c46c" - integrity sha512-ZIzRpLJrOj7jjP2miAtgqIfmzbxa4ZOr5jJc601zklsfEx9oTzmmj2nVpIPRpNlRTIh8lc1kyViIY7BWSGNmKw== +detect-node@^2.0.4, detect-node@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/detect-node/-/detect-node-2.1.0.tgz#c9c70775a49c3d03bc2c06d9a73be550f978f8b1" + integrity sha512-T0NIuQpnTvFDATNuHN5roPwSBG83rFsuO+MXXH9/3N1eFbn4wcPjttvjMLEPWJ0RGUYgQE7cGgS3tNxbqCGM7g== detect-port-alt@1.1.6: version "1.1.6" @@ -9330,7 +9338,7 @@ glob@5.0.15: once "^1.3.0" path-is-absolute "^1.0.0" -glob@^7.0.0, glob@^7.0.3, glob@^7.1.1, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4, glob@^7.1.6, glob@~7.1.1: +glob@^7.0.0, glob@^7.0.3, glob@^7.1.1, glob@^7.1.2, glob@^7.1.4, glob@^7.1.6, glob@~7.1.1: version "7.1.6" resolved "https://registry.yarnpkg.com/glob/-/glob-7.1.6.tgz#141f33b81a7c2492e125594307480c46679278a6" integrity sha512-LwaxwyZ72Lk7vZINtNNrywX0ZuLyStrdDtabefZKAY5ZGJhVtgdznluResxNmPitE0SAO+O26sWTHeKSI2wMBA== @@ -9342,6 +9350,18 @@ glob@^7.0.0, glob@^7.0.3, glob@^7.1.1, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4, gl once "^1.3.0" path-is-absolute "^1.0.0" +glob@^7.1.3: + version "7.1.7" + resolved "https://registry.yarnpkg.com/glob/-/glob-7.1.7.tgz#3b193e9233f01d42d0b3f78294bbeeb418f94a90" + integrity sha512-OvD9ENzPLbegENnYP5UUfJIirTg4+XwMWGaQfQTY0JenxNvvIKP3U3/tAQSPIu/lHxXYSZmpXlUHeqAIdKzBLQ== + dependencies: + fs.realpath "^1.0.0" + inflight "^1.0.4" + inherits "2" + minimatch "^3.0.4" + once "^1.3.0" + path-is-absolute "^1.0.0" + "glob@~ 3.2.1": version "3.2.11" resolved "https://registry.yarnpkg.com/glob/-/glob-3.2.11.tgz#4a973f635b9190f715d10987d5c00fd2815ebe3d" @@ -14358,6 +14378,11 @@ object.values@^1.1.0, object.values@^1.1.1: function-bind "^1.1.1" has "^1.0.3" +oblivious-set@1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/oblivious-set/-/oblivious-set-1.0.0.tgz#c8316f2c2fb6ff7b11b6158db3234c49f733c566" + integrity sha512-z+pI07qxo4c2CulUHCDf9lcqDlMSo72N/4rLUpRXf6fu+q8vjt8y0xS+Tlf8NTJDdTXHbdeO1n3MlbctwEoXZw== + obuf@^1.0.0, obuf@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" @@ -16668,11 +16693,16 @@ regenerator-runtime@^0.11.0: resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.11.1.tgz#be05ad7f9bf7d22e056f9726cee5017fbf19e2e9" integrity sha512-MguG95oij0fC3QV3URf4V2SDYGJhJnJGqvIIgdECeODCT98wSWDAJ94SSuVpYQUoTcGUIL6L4yNB7j1DFFHSBg== -regenerator-runtime@^0.13.2, regenerator-runtime@^0.13.3, regenerator-runtime@^0.13.4, regenerator-runtime@^0.13.5: +regenerator-runtime@^0.13.2, regenerator-runtime@^0.13.3, regenerator-runtime@^0.13.5: version "0.13.5" resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.13.5.tgz#d878a1d094b4306d10b9096484b33ebd55e26697" integrity sha512-ZS5w8CpKFinUzOwW3c83oPeVXoNsrLsaCoLtJvAClH135j/R77RuymhiSErhm2lKcwSCIpmvIWSbDkIfAqKQlA== +regenerator-runtime@^0.13.4: + version "0.13.9" + resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.13.9.tgz#8925742a98ffd90814988d7566ad30ca3b263b52" + integrity sha512-p3VT+cOEgxFsRRA9X4lkI1E+k2/CtnKtU4gcxyaCUreilL/vqI6CdZ3wxVUx3UOUg+gnUOQQcRI7BmSI656MYA== + regenerator-transform@^0.10.0: version "0.10.1" resolved "https://registry.yarnpkg.com/regenerator-transform/-/regenerator-transform-0.10.1.tgz#1e4996837231da8b7f3cf4114d71b5691a0680dd" From 44f179230da1af1240f41282b6f99ce2d479bbd6 Mon Sep 17 00:00:00 2001 From: Richard Tibbles Date: Tue, 3 Aug 2021 16:04:10 -0700 Subject: [PATCH 2/2] Remove unnecessary sync polling now that changes are properly detected. --- .../frontend/shared/data/serverSync.js | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/contentcuration/contentcuration/frontend/shared/data/serverSync.js b/contentcuration/contentcuration/frontend/shared/data/serverSync.js index 7c34ad3103..05e7818b20 100644 --- a/contentcuration/contentcuration/frontend/shared/data/serverSync.js +++ b/contentcuration/contentcuration/frontend/shared/data/serverSync.js @@ -32,9 +32,6 @@ const SYNC_IF_NO_CHANGES_FOR = 2; // already instantiated in the broadcastChannel module. const channel = createChannel(); -// Stores last setTimeout in polling so we may clear it when we want -let unsyncedPollingTimeoutId; - // Flag to check if a sync is currently active. let syncActive = false; @@ -301,9 +298,6 @@ const debouncedSyncChanges = debounce(() => { if (process.env.NODE_ENV !== 'production' && typeof window !== 'undefined') { window.forceServerSync = forceServerSync; - - window.stopPollingUnsyncedChanges = stopPollingUnsyncedChanges; - window.pollUnsyncedChanges = pollUnsyncedChanges; } async function handleChanges(changes) { @@ -332,47 +326,18 @@ async function handleChanges(changes) { } } -async function checkAndSyncChanges() { - // Get count of changes that we care about - const changes = await db[CHANGES_TABLE].toCollection() - // Only try to sync if we have at least one change that has - // not already errored on the backend. - .filter(c => !c.errors) - .count(); - - // If more than 0, sync the changes - if (changes > 0) { - debouncedSyncChanges(); - } -} - -async function pollUnsyncedChanges() { - await checkAndSyncChanges(); - unsyncedPollingTimeoutId = setTimeout(() => pollUnsyncedChanges(), SYNC_IF_NO_CHANGES_FOR * 1000); -} - -function stopPollingUnsyncedChanges() { - if (unsyncedPollingTimeoutId) { - clearTimeout(unsyncedPollingTimeoutId); - } -} - export function startSyncing() { startChannelFetchListener(); cleanupLocks(); // Initiate a sync immediately in case any data // is left over in the database. debouncedSyncChanges(); - // Begin polling our CHANGES_TABLE - pollUnsyncedChanges(); db.on('changes', handleChanges); } export function stopSyncing() { stopChannelFetchListener(); debouncedSyncChanges.cancel(); - // Stop pollUnsyncedChanges - stopPollingUnsyncedChanges(); // Dexie's slightly counterintuitive method for unsubscribing from events db.on('changes').unsubscribe(handleChanges); }