diff --git a/AGENTS.md b/AGENTS.md index 0f0a42a..0fa53f0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -125,7 +125,7 @@ The `Runtime` owns the shared infrastructure (bus, scheduler, loggers, HTTP serv | `Scheduler` | `packages/core/src/scheduler.ts` | Poll intervals, graceful start/stop | | `InMemoryBus` / `FileWalBus` | `packages/core/src/bus.ts` | Pluggable event bus (WAL for durability) | | `loadConfig()` | `packages/core/src/schema.ts` | YAML loading, AJV validation, env var substitution | -| `FileCheckpointStore` | `packages/core/src/store.ts` | Source deduplication checkpoints | +| `FileCheckpointStore` | `packages/core/src/store.ts` | Source deduplication checkpoints (atomic writes, default for CLI) | | `LoggerManager` | `packages/core/src/logger.ts` | Fan-out to loggers, non-blocking, error-isolated | | `AuditTrail` | `packages/core/src/audit.ts` | SOP execution audit records with content hashes, chain tracking | | `OutputValidator` | `packages/core/src/output-validator.ts` | Pre-delivery output validation (injection, echo, scope) | @@ -188,11 +188,11 @@ Every plugin type (connector, transform, logger) must be wired through the **ful | Pi-rust lifecycle conformance | `connectors/pi-rust/src/__tests__/source.test.ts` | 26 | | Router matching | `packages/core/src/__tests__/router.test.ts` | — | | Event bus | `packages/core/src/__tests__/bus.test.ts` | — | -| Checkpoint store | `packages/core/src/__tests__/store.test.ts` | — | +| Checkpoint store (file + memory, atomic writes) | `packages/core/src/__tests__/store.test.ts` | 17 | | Transform filter | `transforms/filter/src/__tests__/filter.test.ts` | — | | Dedup transform | `transforms/dedup/src/__tests__/dedup.test.ts` | — | | Daemon lifecycle (PID, signals, stop, logs, state) | `packages/cli/src/__tests__/daemon-lifecycle.test.ts` | 45 | -| Runtime lifecycle (multi-module, shared infra) | `packages/core/src/__tests__/runtime.test.ts` | 11 | +| Runtime lifecycle (multi-module, shared infra, checkpoint resolution) | `packages/core/src/__tests__/runtime.test.ts` | 16 | | Module registry (name conflicts, lookup) | `packages/core/src/__tests__/registry.test.ts` | 8 | | Module instance (lifecycle states, resource ownership) | `packages/core/src/__tests__/module-instance.test.ts` | 17 | | Multi-module runtime (load, unload, reload, events) | `packages/core/src/__tests__/multi-module-runtime.test.ts` | 9 | diff --git a/docs-site/src/content/docs/spec/config-schema.md b/docs-site/src/content/docs/spec/config-schema.md index ddb1450..fe548ca 100644 --- a/docs-site/src/content/docs/spec/config-schema.md +++ b/docs-site/src/content/docs/spec/config-schema.md @@ -56,6 +56,9 @@ defaults: poll_interval: 5m event_retention: 7d log_level: info + checkpoint: + store: file # 'file' (default) or 'memory' + dir: .orgloop/checkpoints # relative to project dir, or absolute path # Connector definition files (file paths, not package names) connectors: diff --git a/docs-site/src/content/docs/spec/runtime-lifecycle.md b/docs-site/src/content/docs/spec/runtime-lifecycle.md index 6560996..5fb00bb 100644 --- a/docs-site/src/content/docs/spec/runtime-lifecycle.md +++ b/docs-site/src/content/docs/spec/runtime-lifecycle.md @@ -144,7 +144,7 @@ Graceful shutdown sequence: │ ├── daemon.stdout.log # Daemon stdout │ └── daemon.stderr.log # Daemon stderr └── data/ - ├── checkpoints/ # Per-source checkpoint files + ├── checkpoints/ # Per-source checkpoint files (default: /.orgloop/checkpoints/) ├── wal/ # Write-ahead log (event durability) └── queue/ # Queued events (degraded actors) ``` diff --git a/docs-site/src/content/docs/spec/scale-design.md b/docs-site/src/content/docs/spec/scale-design.md index e8e8e07..84d578c 100644 --- a/docs-site/src/content/docs/spec/scale-design.md +++ b/docs-site/src/content/docs/spec/scale-design.md @@ -232,7 +232,9 @@ This is the right default for OrgLoop's use case. Actors may receive duplicate e **State management:** -Each source connector maintains a **checkpoint** — an opaque string (typically a timestamp or cursor) that tells the connector where to resume polling after a restart. Checkpoints are persisted to `~/.orgloop/data/checkpoints/`. +Each source connector maintains a **checkpoint** — an opaque string (typically a timestamp or cursor) that tells the connector where to resume polling after a restart. + +By default, checkpoints are persisted to `/.orgloop/checkpoints/.json` when running via the CLI (file-based is the default when a module path is available). Writes are atomic (temp file + rename) to prevent corruption on crash. ```typescript // Checkpoint store @@ -242,6 +244,20 @@ export interface CheckpointStore { } // Implementations: -// - FileCheckpointStore → JSON file per source -// - InMemoryCheckpointStore → in-memory (for testing) +// - FileCheckpointStore → JSON file per source (default for CLI) +// - InMemoryCheckpointStore → in-memory (for testing / library use) +``` + +**Configuration:** + +```yaml +defaults: + checkpoint: + store: file # 'file' (default) or 'memory' + dir: .orgloop/checkpoints # relative to module dir, or absolute ``` + +If no `checkpoint` config is set, the runtime resolves the store automatically: +- With `modulePath` → `FileCheckpointStore` at `/.orgloop/checkpoints/` +- With `dataDir` → `FileCheckpointStore` at the specified data directory +- Neither → `InMemoryCheckpointStore` (library/test fallback) diff --git a/docs-site/src/content/docs/start/user-guide.md b/docs-site/src/content/docs/start/user-guide.md index bc0f0a6..a08eae6 100644 --- a/docs-site/src/content/docs/start/user-guide.md +++ b/docs-site/src/content/docs/start/user-guide.md @@ -464,7 +464,7 @@ When OrgLoop stops (Ctrl+C, `orgloop stop`, process killed): - **All polling stops.** Sources stop fetching new events. - **No new events are processed.** Events that arrived but weren't yet processed are lost (no durable queue by default). - **Actors are not notified.** Running actor sessions continue independently -- they don't know OrgLoop stopped. -- **State is preserved.** The last config state remains in `~/.orgloop/state.json`. Source checkpoints remain in `~/.orgloop/checkpoints/`. Logs remain in `~/.orgloop/logs/`. +- **State is preserved.** The last config state remains in `~/.orgloop/state.json`. Source checkpoints are persisted per-project in `/.orgloop/checkpoints/` (configurable via `defaults.checkpoint.dir`). Logs remain in `~/.orgloop/logs/`. To restart: diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index edb6f7f..e338088 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -138,21 +138,11 @@ async function resolveModuleResources( } } - // Create persistent checkpoint store - let checkpointStore: import('@orgloop/core').FileCheckpointStore | undefined; - try { - const { FileCheckpointStore } = await import('@orgloop/core'); - checkpointStore = new FileCheckpointStore(); - } catch { - // Fall through — runtime will use InMemoryCheckpointStore - } - return { resolvedSources, resolvedActors, resolvedTransforms, resolvedLoggers, - checkpointStore, }; } @@ -394,7 +384,6 @@ async function runForeground(configPath?: string, force?: boolean): Promise): ModuleConfig { return { @@ -308,6 +312,136 @@ describe('Runtime', () => { ); }); + // ─── Checkpoint Store Resolution ───────────────────────────────────── + + it('defaults to FileCheckpointStore when modulePath is set', async () => { + const tempDir = await mkdtemp(join(tmpdir(), 'orgloop-rt-cp-')); + try { + runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false }); + await runtime.start(); + + const source = new MockSource('cp-source'); + const actor = new MockActor('cp-actor'); + + const config = makeModuleConfig('cp-mod', { modulePath: tempDir }); + const status = await runtime.loadModule(config, { + sources: new Map([['cp-source', source]]), + actors: new Map([['cp-actor', actor]]), + }); + + expect(status.state).toBe('active'); + // Module loaded with modulePath — runtime should have created a FileCheckpointStore + // at tempDir/.orgloop/checkpoints/. We verify by reading back a checkpoint + // after a simulated poll writes one. + const store = new FileCheckpointStore(join(tempDir, '.orgloop', 'checkpoints')); + await store.set('cp-source', 'test-value'); + expect(await store.get('cp-source')).toBe('test-value'); + } finally { + await runtime.stop(); + await rm(tempDir, { recursive: true, force: true }); + } + }); + + it('uses InMemoryCheckpointStore when checkpoint.store is memory', async () => { + runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false }); + await runtime.start(); + + const source = new MockSource('mem-source'); + const actor = new MockActor('mem-actor'); + + const config = makeModuleConfig('mem-mod', { + defaults: { poll_interval: '5m', checkpoint: { store: 'memory' } }, + }); + // Should not throw — memory store doesn't need filesystem + const status = await runtime.loadModule(config, { + sources: new Map([['mem-source', source]]), + actors: new Map([['mem-actor', actor]]), + }); + + expect(status.state).toBe('active'); + }); + + it('uses custom checkpoint dir from config', async () => { + const tempDir = await mkdtemp(join(tmpdir(), 'orgloop-rt-cpdir-')); + try { + runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false }); + await runtime.start(); + + const source = new MockSource('dir-source'); + const actor = new MockActor('dir-actor'); + + const customDir = join(tempDir, 'custom-checkpoints'); + const config = makeModuleConfig('dir-mod', { + modulePath: tempDir, + defaults: { + poll_interval: '5m', + checkpoint: { dir: customDir }, + }, + }); + + await runtime.loadModule(config, { + sources: new Map([['dir-source', source]]), + actors: new Map([['dir-actor', actor]]), + }); + + expect(runtime.status().modules).toHaveLength(1); + } finally { + await runtime.stop(); + await rm(tempDir, { recursive: true, force: true }); + } + }); + + it('resolves relative checkpoint dir against modulePath', async () => { + const tempDir = await mkdtemp(join(tmpdir(), 'orgloop-rt-relcp-')); + try { + runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false }); + await runtime.start(); + + const source = new MockSource('rel-source'); + const actor = new MockActor('rel-actor'); + + const config = makeModuleConfig('rel-mod', { + modulePath: tempDir, + defaults: { + poll_interval: '5m', + checkpoint: { dir: 'my-checkpoints' }, + }, + }); + + await runtime.loadModule(config, { + sources: new Map([['rel-source', source]]), + actors: new Map([['rel-actor', actor]]), + }); + + expect(runtime.status().modules).toHaveLength(1); + } finally { + await runtime.stop(); + await rm(tempDir, { recursive: true, force: true }); + } + }); + + it('explicit checkpointStore option overrides config defaults', async () => { + runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false }); + await runtime.start(); + + const source = new MockSource('override-source'); + const actor = new MockActor('override-actor'); + const customStore = new InMemoryCheckpointStore(); + + const config = makeModuleConfig('override-mod', { + defaults: { poll_interval: '5m', checkpoint: { store: 'file' } }, + }); + + // Passing explicit checkpointStore should override config + const status = await runtime.loadModule(config, { + sources: new Map([['override-source', source]]), + actors: new Map([['override-actor', actor]]), + checkpointStore: customStore, + }); + + expect(status.state).toBe('active'); + }); + it('stop() shuts down all modules', async () => { runtime = new Runtime({ bus: new InMemoryBus() }); await runtime.start(); diff --git a/packages/core/src/__tests__/store.test.ts b/packages/core/src/__tests__/store.test.ts index 016257c..ef5a1e5 100644 --- a/packages/core/src/__tests__/store.test.ts +++ b/packages/core/src/__tests__/store.test.ts @@ -1,4 +1,4 @@ -import { mkdtemp, rm } from 'node:fs/promises'; +import { mkdtemp, readdir, readFile, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { createTestEvent } from '@orgloop/sdk'; @@ -98,6 +98,63 @@ describe('FileCheckpointStore', () => { await deepStore.set('test-source', 'cp-value'); expect(await deepStore.get('test-source')).toBe('cp-value'); }); + + it('writes atomically — no temp files left behind', async () => { + await store.set('atomic-test', 'value-1'); + await store.set('atomic-test', 'value-2'); + + // Only the final checkpoint file should exist, no .tmp files + const checkpointDir = join(tempDir, 'checkpoints'); + const files = await readdir(checkpointDir); + const tmpFiles = files.filter((f) => f.endsWith('.tmp')); + expect(tmpFiles).toHaveLength(0); + expect(files).toContain('atomic-test.json'); + }); + + it('writes valid JSON with checkpoint and updated_at', async () => { + await store.set('json-test', 'my-checkpoint-value'); + + const checkpointDir = join(tempDir, 'checkpoints'); + const content = await readFile(join(checkpointDir, 'json-test.json'), 'utf-8'); + const data = JSON.parse(content) as { checkpoint: string; updated_at: string }; + + expect(data.checkpoint).toBe('my-checkpoint-value'); + expect(data.updated_at).toBeDefined(); + // updated_at should be a valid ISO 8601 timestamp + expect(new Date(data.updated_at).toISOString()).toBe(data.updated_at); + }); + + it('handles concurrent writes to different sources', async () => { + // Write to multiple sources concurrently + await Promise.all([ + store.set('source-a', 'cp-a'), + store.set('source-b', 'cp-b'), + store.set('source-c', 'cp-c'), + ]); + + expect(await store.get('source-a')).toBe('cp-a'); + expect(await store.get('source-b')).toBe('cp-b'); + expect(await store.get('source-c')).toBe('cp-c'); + }); + + it('handles checkpoint values with special characters', async () => { + const special = '{"cursor":"abc123","ts":"2024-01-01T00:00:00Z"}'; + await store.set('special', special); + expect(await store.get('special')).toBe(special); + }); + + it('returns null for corrupted checkpoint file', async () => { + // Write a valid checkpoint first + await store.set('corrupt', 'valid'); + + // Corrupt the file + const checkpointDir = join(tempDir, 'checkpoints'); + const { writeFile } = await import('node:fs/promises'); + await writeFile(join(checkpointDir, 'corrupt.json'), 'not-json{{{', 'utf-8'); + + // Should return null (graceful fallback) + expect(await store.get('corrupt')).toBeNull(); + }); }); describe('InMemoryEventStore', () => { diff --git a/packages/core/src/module-instance.ts b/packages/core/src/module-instance.ts index 7ba39ea..271bff7 100644 --- a/packages/core/src/module-instance.ts +++ b/packages/core/src/module-instance.ts @@ -40,7 +40,13 @@ export interface ModuleConfig { /** Logger definitions */ loggers: LoggerDefinition[]; /** Defaults */ - defaults?: { poll_interval?: string }; + defaults?: { + poll_interval?: string; + checkpoint?: { + store?: 'file' | 'memory'; + dir?: string; + }; + }; /** Filesystem path to the module */ modulePath?: string; } diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ce0ad70..468a832 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -41,7 +41,7 @@ import { ModuleRegistry } from './registry.js'; import { interpolateConfig, matchRoutes } from './router.js'; import { Scheduler } from './scheduler.js'; import type { CheckpointStore } from './store.js'; -import { InMemoryCheckpointStore } from './store.js'; +import { FileCheckpointStore, InMemoryCheckpointStore } from './store.js'; import type { TransformPipelineOptions } from './transform.js'; import { executeTransformPipeline } from './transform.js'; @@ -286,7 +286,7 @@ class Runtime extends EventEmitter implements RuntimeControl { // ─── Module Management ─────────────────────────────────────────────────── async loadModule(config: ModuleConfig, options?: LoadModuleOptions): Promise { - const checkpointStore = options?.checkpointStore ?? new InMemoryCheckpointStore(); + const checkpointStore = options?.checkpointStore ?? this.resolveCheckpointStore(config); const mod = new ModuleInstance(config, { sources: options?.sources ?? new Map(), @@ -925,6 +925,36 @@ class Runtime extends EventEmitter implements RuntimeControl { // ─── Helpers ──────────────────────────────────────────────────────────── + private resolveCheckpointStore(config: ModuleConfig): CheckpointStore { + const cpConfig = config.defaults?.checkpoint; + if (cpConfig?.store === 'memory') { + return new InMemoryCheckpointStore(); + } + // Explicit file store requested or directory configured + if (cpConfig?.store === 'file' || cpConfig?.dir) { + if (cpConfig?.dir) { + const dir = + cpConfig.dir.startsWith('/') || !config.modulePath + ? cpConfig.dir + : join(config.modulePath, cpConfig.dir); + return new FileCheckpointStore(dir); + } + if (config.modulePath) { + return new FileCheckpointStore(join(config.modulePath, '.orgloop', 'checkpoints')); + } + return new FileCheckpointStore(); + } + // No explicit config — use file if we have a directory hint + if (config.modulePath) { + return new FileCheckpointStore(join(config.modulePath, '.orgloop', 'checkpoints')); + } + if (this.dataDir) { + return new FileCheckpointStore(this.dataDir); + } + // No directory context at all — fall back to in-memory + return new InMemoryCheckpointStore(); + } + private countAllSources(): number { let count = 0; for (const mod of this.registry.list()) { diff --git a/packages/core/src/schema.ts b/packages/core/src/schema.ts index 7b941e9..eeb949f 100644 --- a/packages/core/src/schema.ts +++ b/packages/core/src/schema.ts @@ -46,6 +46,13 @@ const projectSchema = { poll_interval: { type: 'string' }, event_retention: { type: 'string' }, log_level: { type: 'string' }, + checkpoint: { + type: 'object', + properties: { + store: { type: 'string', enum: ['file', 'memory'] }, + dir: { type: 'string' }, + }, + }, }, }, connectors: { type: 'array', items: { type: 'string' } }, diff --git a/packages/core/src/store.ts b/packages/core/src/store.ts index 9c2ea28..f0f5328 100644 --- a/packages/core/src/store.ts +++ b/packages/core/src/store.ts @@ -5,7 +5,8 @@ * EventStore: append-only JSONL WAL for at-least-once delivery guarantee. */ -import { appendFile, mkdir, readFile, writeFile } from 'node:fs/promises'; +import { randomBytes } from 'node:crypto'; +import { appendFile, mkdir, readFile, rename, writeFile } from 'node:fs/promises'; import { homedir } from 'node:os'; import { join } from 'node:path'; import type { OrgLoopEvent } from '@orgloop/sdk'; @@ -42,11 +43,11 @@ export class FileCheckpointStore implements CheckpointStore { async set(sourceId: string, checkpoint: string): Promise { await mkdir(this.dir, { recursive: true }); - await writeFile( - this.filePath(sourceId), - JSON.stringify({ checkpoint, updated_at: new Date().toISOString() }), - 'utf-8', - ); + const target = this.filePath(sourceId); + const tmp = `${target}.${randomBytes(6).toString('hex')}.tmp`; + const data = JSON.stringify({ checkpoint, updated_at: new Date().toISOString() }); + await writeFile(tmp, data, 'utf-8'); + await rename(tmp, target); } } diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index c5532e5..c18e1aa 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -284,6 +284,10 @@ export interface ProjectConfig { poll_interval?: string; event_retention?: string; log_level?: string; + checkpoint?: { + store?: 'file' | 'memory'; + dir?: string; + }; }; connectors?: string[]; transforms?: string[]; @@ -312,6 +316,12 @@ export interface OrgLoopConfig { poll_interval?: string; event_retention?: string; log_level?: string; + checkpoint?: { + /** Store type: 'file' (default) or 'memory' */ + store?: 'file' | 'memory'; + /** Directory for checkpoint files (default: .orgloop/checkpoints relative to module) */ + dir?: string; + }; }; /** Data directory for WAL, checkpoints, etc. */ data_dir?: string;