@@ -5,6 +5,7 @@ import kotlinx.coroutines.CancellationException
55import kotlinx.coroutines.CoroutineScope
66import kotlinx.coroutines.Dispatchers
77import kotlinx.coroutines.Job
8+ import kotlinx.coroutines.awaitCancellation
89import kotlinx.coroutines.cancel
910import kotlinx.coroutines.delay
1011import kotlinx.coroutines.isActive
@@ -20,6 +21,7 @@ import org.modelix.model.api.INodeReference
2021import org.modelix.model.api.runSynchronized
2122import org.modelix.model.lazy.BranchReference
2223import org.modelix.model.lazy.CLVersion
24+ import org.modelix.model.lazy.RepositoryId
2325import org.modelix.model.mutable.IGenericMutableModelTree
2426import org.modelix.model.mutable.IMutableModelTree
2527import org.modelix.model.mutable.INodeIdGenerator
@@ -56,27 +58,52 @@ import org.modelix.model.mutable.asModel
5658 * Dispose should be called on this, as otherwise a regular polling will go on.
5759 *
5860 * @property client the model client to connect to the model server
59- * @property branchRef the model server branch to fetch the data from
61+ * @property branchRef branch or repository reference
6062 * @property providedScope the CoroutineScope to use for the suspendable tasks
6163 * @property initialRemoteVersion the last version on the server from which we want to start the synchronization
6264 */
6365class ReplicatedModel (
6466 val client : IModelClientV2 ,
65- val branchRef : BranchReference ,
67+ private val branchRefOrNull : BranchReference ? ,
6668 val idGenerator : (TreeId ) -> INodeIdGenerator <INodeReference >,
6769 private val providedScope : CoroutineScope ? = null ,
6870 initialRemoteVersion : CLVersion ? = null ,
71+ repositoryId : RepositoryId ? = null ,
72+ versionHash : String? = null ,
6973) : Closeable {
74+
75+ constructor (
76+ client: IModelClientV2 ,
77+ branchRef: BranchReference ,
78+ idGenerator: (TreeId ) -> INodeIdGenerator <INodeReference >,
79+ providedScope: CoroutineScope ? = null ,
80+ initialRemoteVersion: CLVersion ? = null ,
81+ ) : this (client, branchRef, idGenerator, providedScope, initialRemoteVersion, null , null )
82+
83+ val branchRef: BranchReference get() = branchRefOrNull ? : throw IllegalStateException (" ReplicatedModel is in read-only version mode" )
84+
7085 private val scope = providedScope ? : CoroutineScope (Dispatchers .Default )
7186 private var state = State .New
7287 private var localModel: LocalModel ? = null
73- private val remoteVersion = RemoteVersion (client, branchRef, initialRemoteVersion)
88+
89+ private val remoteVersion: IRemoteVersion
90+
7491 private var pollingJob: Job ? = null
7592
7693 init {
7794 if (initialRemoteVersion != null ) {
7895 localModel = LocalModel (initialRemoteVersion, client.getIdGenerator(), idGenerator(initialRemoteVersion.getModelTree().getId())) { client.getUserId() }
7996 }
97+
98+ if (branchRefOrNull != null ) {
99+ check(versionHash == null ) { " Cannot provide both branchRef and versionHash" }
100+ remoteVersion = RemoteVersionFromBranch (client, branchRefOrNull, initialRemoteVersion)
101+ } else if (versionHash != null ) {
102+ val repoId = repositoryId ? : throw IllegalArgumentException (" repositoryId is required when versionHash is provided" )
103+ remoteVersion = RemoteVersionFromHash (client, repoId, versionHash)
104+ } else {
105+ throw IllegalArgumentException (" Either branchRef or versionHash must be provided" )
106+ }
80107 }
81108
82109 private fun getLocalModel (): LocalModel = checkNotNull(localModel) { " Model is not initialized yet" }
@@ -92,7 +119,7 @@ class ReplicatedModel(
92119 state = State .Starting
93120
94121 if (localModel == null ) {
95- val initialVersion = remoteVersion.pull ()
122+ val initialVersion = remoteVersion.getInitialVersion ()
96123 localModel = LocalModel (initialVersion, client.getIdGenerator(), idGenerator(initialVersion.getModelTree().getId())) { client.getUserId() }
97124 }
98125
@@ -106,10 +133,10 @@ class ReplicatedModel(
106133 remoteVersionReceived(newRemoteVersion, null )
107134 nextDelayMs = 0
108135 } catch (ex: CancellationException ) {
109- LOG .debug { " Stop polling branch $branchRef after disposing." }
136+ LOG .debug { " Stop polling after disposing." }
110137 throw ex
111138 } catch (ex: Throwable ) {
112- LOG .error(ex) { " Failed polling branch $branchRef " }
139+ LOG .error(ex) { " Failed polling" }
113140 nextDelayMs = (nextDelayMs * 3 / 2 ).coerceIn(1000 , 30000 )
114141 }
115142 }
@@ -134,7 +161,9 @@ class ReplicatedModel(
134161 }
135162
136163 suspend fun resetToServerVersion () {
137- getLocalModel().resetToVersion(client.pull(branchRef, lastKnownVersion = null ).upcast())
164+ // This delegates to remoteVersion which handles pull/load
165+ val version = remoteVersion.getInitialVersion()
166+ getLocalModel().resetToVersion(version)
138167 }
139168
140169 fun isDisposed (): Boolean = state == State .Disposed
@@ -308,16 +337,22 @@ private class LocalModel(initialVersion: CLVersion, val versionIdGenerator: IIdG
308337 }
309338}
310339
311- private class RemoteVersion (
340+ private interface IRemoteVersion {
341+ suspend fun getInitialVersion (): CLVersion
342+ suspend fun poll (): CLVersion
343+ suspend fun push (version : CLVersion ): CLVersion
344+ }
345+
346+ private class RemoteVersionFromBranch (
312347 val client : IModelClientV2 ,
313348 val branchRef : BranchReference ,
314349 private var lastKnownRemoteVersion : CLVersion ? = null ,
315- ) {
350+ ) : IRemoteVersion {
316351 private val unconfirmedVersions: MutableSet <String > = LinkedHashSet ()
317352
318353 fun getNumberOfUnconfirmed () = runSynchronized(unconfirmedVersions) { unconfirmedVersions.size }
319354
320- suspend fun pull (): CLVersion {
355+ override suspend fun getInitialVersion (): CLVersion {
321356 return versionReceived(
322357 client.pull(
323358 branchRef,
@@ -332,11 +367,11 @@ private class RemoteVersion(
332367 )
333368 }
334369
335- suspend fun poll (): CLVersion {
370+ override suspend fun poll (): CLVersion {
336371 return versionReceived(client.poll(branchRef, lastKnownVersion = lastKnownRemoteVersion).upcast())
337372 }
338373
339- suspend fun push (version : CLVersion ): CLVersion {
374+ override suspend fun push (version : CLVersion ): CLVersion {
340375 if (lastKnownRemoteVersion?.getContentHash() == version.getContentHash()) return version
341376 runSynchronized(unconfirmedVersions) {
342377 if (! unconfirmedVersions.add(version.getContentHash())) return version
@@ -359,4 +394,29 @@ private class RemoteVersion(
359394 }
360395}
361396
397+ private class RemoteVersionFromHash (
398+ val client : IModelClientV2 ,
399+ val repositoryId : RepositoryId ,
400+ val versionHash : String ,
401+ private var lastKnownRemoteVersion : CLVersion ? = null ,
402+ ) : IRemoteVersion {
403+
404+ override suspend fun getInitialVersion (): CLVersion {
405+ return client.loadVersion(
406+ repositoryId,
407+ versionHash,
408+ lastKnownRemoteVersion,
409+ ).upcast().also { lastKnownRemoteVersion = it }
410+ }
411+
412+ override suspend fun poll (): CLVersion {
413+ // let's pretent to do something. The version is actually immutable and won't ever change…
414+ awaitCancellation()
415+ }
416+
417+ override suspend fun push (version : CLVersion ): CLVersion {
418+ throw UnsupportedOperationException (" Read-only model" )
419+ }
420+ }
421+
362422private fun IVersion.upcast (): CLVersion = this as CLVersion
0 commit comments