-
Notifications
You must be signed in to change notification settings - Fork 119
Expand file tree
/
Copy pathWebSocketPolyfill.js
More file actions
121 lines (110 loc) · 3.14 KB
/
WebSocketPolyfill.js
File metadata and controls
121 lines (110 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* SPDX-FileCopyrightText: 2022 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
import { logger } from '../helpers/logger.js'
import { decodeArrayBuffer } from '../helpers/base64.ts'
import { getSteps, getAwareness } from '../helpers/yjs.js'
import getNotifyBus from './NotifyService.js'
/**
*
* @param {object} syncService - the sync service to build upon
* @param {number} fileId - id of the file to open
* @param {object} initialSession - initial session to open
* @param {object[]} queue - queue for the outgoing steps
*/
export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) {
return class WebSocketPolyfill {
#url
#session
#version
binaryType
onmessage
onerror
onclose
onopen
#handlers
#notifyPushBus
constructor(url) {
this.#notifyPushBus = getNotifyBus()
this.#notifyPushBus?.on('notify_push', this.#onNotifyPush.bind(this))
this.url = url
logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession })
if (syncService.hasActiveConnection) {
setTimeout(() => this.onopen?.(), 0)
}
this.#registerHandlers({
opened: ({ version, session }) => {
this.#version = version
logger.debug('opened ', { version, session })
this.#session = session
this.onopen?.()
},
loaded: ({ version, session, content }) => {
logger.debug('loaded ', { version, session })
this.#version = version
this.#session = session
},
sync: ({ steps, version }) => {
logger.debug('synced ', { version, steps })
this.#version = version
if (steps) {
steps.forEach(s => {
const data = decodeArrayBuffer(s.step)
this.onmessage({ data })
})
}
},
})
syncService.open({ fileId, initialSession })
}
#registerHandlers(handlers) {
this.#handlers = handlers
Object.entries(this.#handlers)
.forEach(([key, value]) => syncService.on(key, value))
}
send(...data) {
// Useful for debugging what steps are sent and how they were initiated
// data.forEach(logStep)
queue.push(...data)
let outbox = []
return syncService.sendSteps(() => {
const data = {
steps: getSteps(queue),
awareness: getAwareness(queue),
version: this.#version,
}
outbox = [...queue]
logger.debug('sending steps ', data)
return data
})?.then(ret => {
// only keep the steps that were not send yet
queue.splice(0,
queue.length,
...queue.filter(s => !outbox.includes(s)),
)
return ret
}, err => {
logger.error(`Failed to push the queue with ${queue.length} steps to the server`, err)
this.onerror?.(err)
})
}
async close() {
Object.entries(this.#handlers)
.forEach(([key, value]) => syncService.off(key, value))
this.#handlers = []
this.#notifyPushBus?.off('notify_push', this.#onNotifyPush.bind(this))
this.onclose()
logger.debug('Websocket closed')
}
#onNotifyPush({ messageType, messageBody }) {
if (messageBody.documentId !== fileId) {
return
}
messageBody.steps.forEach(step => {
const data = decodeArrayBuffer(step)
this.onmessage({ data })
})
}
}
}