Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ The `Runtime` owns the shared infrastructure (bus, scheduler, loggers, HTTP serv
| `Scheduler` | `packages/core/src/scheduler.ts` | Poll intervals, graceful start/stop |
| `InMemoryBus` / `FileWalBus` | `packages/core/src/bus.ts` | Pluggable event bus (WAL for durability) |
| `loadConfig()` | `packages/core/src/schema.ts` | YAML loading, AJV validation, env var substitution |
| `FileCheckpointStore` | `packages/core/src/store.ts` | Source deduplication checkpoints |
| `FileCheckpointStore` | `packages/core/src/store.ts` | Source deduplication checkpoints (atomic writes, default for CLI) |
| `LoggerManager` | `packages/core/src/logger.ts` | Fan-out to loggers, non-blocking, error-isolated |
| `AuditTrail` | `packages/core/src/audit.ts` | SOP execution audit records with content hashes, chain tracking |
| `OutputValidator` | `packages/core/src/output-validator.ts` | Pre-delivery output validation (injection, echo, scope) |
Expand Down Expand Up @@ -188,11 +188,11 @@ Every plugin type (connector, transform, logger) must be wired through the **ful
| Pi-rust lifecycle conformance | `connectors/pi-rust/src/__tests__/source.test.ts` | 26 |
| Router matching | `packages/core/src/__tests__/router.test.ts` | — |
| Event bus | `packages/core/src/__tests__/bus.test.ts` | — |
| Checkpoint store | `packages/core/src/__tests__/store.test.ts` | |
| Checkpoint store (file + memory, atomic writes) | `packages/core/src/__tests__/store.test.ts` | 17 |
| Transform filter | `transforms/filter/src/__tests__/filter.test.ts` | — |
| Dedup transform | `transforms/dedup/src/__tests__/dedup.test.ts` | — |
| Daemon lifecycle (PID, signals, stop, logs, state) | `packages/cli/src/__tests__/daemon-lifecycle.test.ts` | 45 |
| Runtime lifecycle (multi-module, shared infra) | `packages/core/src/__tests__/runtime.test.ts` | 11 |
| Runtime lifecycle (multi-module, shared infra, checkpoint resolution) | `packages/core/src/__tests__/runtime.test.ts` | 16 |
| Module registry (name conflicts, lookup) | `packages/core/src/__tests__/registry.test.ts` | 8 |
| Module instance (lifecycle states, resource ownership) | `packages/core/src/__tests__/module-instance.test.ts` | 17 |
| Multi-module runtime (load, unload, reload, events) | `packages/core/src/__tests__/multi-module-runtime.test.ts` | 9 |
Expand Down
3 changes: 3 additions & 0 deletions docs-site/src/content/docs/spec/config-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ defaults:
poll_interval: 5m
event_retention: 7d
log_level: info
checkpoint:
store: file # 'file' (default) or 'memory'
dir: .orgloop/checkpoints # relative to project dir, or absolute path

# Connector definition files (file paths, not package names)
connectors:
Expand Down
2 changes: 1 addition & 1 deletion docs-site/src/content/docs/spec/runtime-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Graceful shutdown sequence:
│ ├── daemon.stdout.log # Daemon stdout
│ └── daemon.stderr.log # Daemon stderr
└── data/
├── checkpoints/ # Per-source checkpoint files
├── checkpoints/ # Per-source checkpoint files (default: <modulePath>/.orgloop/checkpoints/)
├── wal/ # Write-ahead log (event durability)
└── queue/ # Queued events (degraded actors)
```
Expand Down
22 changes: 19 additions & 3 deletions docs-site/src/content/docs/spec/scale-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ This is the right default for OrgLoop's use case. Actors may receive duplicate e

**State management:**

Each source connector maintains a **checkpoint** — an opaque string (typically a timestamp or cursor) that tells the connector where to resume polling after a restart. Checkpoints are persisted to `~/.orgloop/data/checkpoints/`.
Each source connector maintains a **checkpoint** — an opaque string (typically a timestamp or cursor) that tells the connector where to resume polling after a restart.

By default, checkpoints are persisted to `<modulePath>/.orgloop/checkpoints/<sourceId>.json` when running via the CLI (file-based is the default when a module path is available). Writes are atomic (temp file + rename) to prevent corruption on crash.

```typescript
// Checkpoint store
Expand All @@ -242,6 +244,20 @@ export interface CheckpointStore {
}

// Implementations:
// - FileCheckpointStore → JSON file per source
// - InMemoryCheckpointStore → in-memory (for testing)
// - FileCheckpointStore → JSON file per source (default for CLI)
// - InMemoryCheckpointStore → in-memory (for testing / library use)
```

**Configuration:**

```yaml
defaults:
checkpoint:
store: file # 'file' (default) or 'memory'
dir: .orgloop/checkpoints # relative to module dir, or absolute
```

If no `checkpoint` config is set, the runtime resolves the store automatically:
- With `modulePath` → `FileCheckpointStore` at `<modulePath>/.orgloop/checkpoints/`
- With `dataDir` → `FileCheckpointStore` at the specified data directory
- Neither → `InMemoryCheckpointStore` (library/test fallback)
2 changes: 1 addition & 1 deletion docs-site/src/content/docs/start/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ When OrgLoop stops (Ctrl+C, `orgloop stop`, process killed):
- **All polling stops.** Sources stop fetching new events.
- **No new events are processed.** Events that arrived but weren't yet processed are lost (no durable queue by default).
- **Actors are not notified.** Running actor sessions continue independently -- they don't know OrgLoop stopped.
- **State is preserved.** The last config state remains in `~/.orgloop/state.json`. Source checkpoints remain in `~/.orgloop/checkpoints/`. Logs remain in `~/.orgloop/logs/`.
- **State is preserved.** The last config state remains in `~/.orgloop/state.json`. Source checkpoints are persisted per-project in `<project>/.orgloop/checkpoints/` (configurable via `defaults.checkpoint.dir`). Logs remain in `~/.orgloop/logs/`.

To restart:

Expand Down
15 changes: 0 additions & 15 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,11 @@ async function resolveModuleResources(
}
}

// Create persistent checkpoint store
let checkpointStore: import('@orgloop/core').FileCheckpointStore | undefined;
try {
const { FileCheckpointStore } = await import('@orgloop/core');
checkpointStore = new FileCheckpointStore();
} catch {
// Fall through — runtime will use InMemoryCheckpointStore
}

return {
resolvedSources,
resolvedActors,
resolvedTransforms,
resolvedLoggers,
checkpointStore,
};
}

Expand Down Expand Up @@ -394,7 +384,6 @@ async function runForeground(configPath?: string, force?: boolean): Promise<void
actors: reqResolved.resolvedActors,
transforms: reqResolved.resolvedTransforms,
loggers: reqResolved.resolvedLoggers,
...(reqResolved.checkpointStore ? { checkpointStore: reqResolved.checkpointStore } : {}),
});

// Track in modules.json
Expand Down Expand Up @@ -426,7 +415,6 @@ async function runForeground(configPath?: string, force?: boolean): Promise<void
actors: resolved.resolvedActors,
transforms: resolved.resolvedTransforms,
loggers: resolved.resolvedLoggers,
...(resolved.checkpointStore ? { checkpointStore: resolved.checkpointStore } : {}),
});

// Track in modules.json
Expand Down Expand Up @@ -478,9 +466,6 @@ async function runForeground(configPath?: string, force?: boolean): Promise<void
actors: restoredResolved.resolvedActors,
transforms: restoredResolved.resolvedTransforms,
loggers: restoredResolved.resolvedLoggers,
...(restoredResolved.checkpointStore
? { checkpointStore: restoredResolved.checkpointStore }
: {}),
});

output.success(`Restored module "${restoredName}" from previous session`);
Expand Down
134 changes: 134 additions & 0 deletions packages/core/src/__tests__/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
* Tests for the Runtime class — multi-module lifecycle management.
*/

import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { OrgLoopEvent } from '@orgloop/sdk';
import { createTestEvent, MockActor, MockSource, MockTransform } from '@orgloop/sdk';
import { afterEach, describe, expect, it } from 'vitest';
import { InMemoryBus } from '../bus.js';
import type { ModuleConfig } from '../module-instance.js';
import { Runtime } from '../runtime.js';
import { FileCheckpointStore, InMemoryCheckpointStore } from '../store.js';

function makeModuleConfig(name: string, overrides?: Partial<ModuleConfig>): ModuleConfig {
return {
Expand Down Expand Up @@ -308,6 +312,136 @@ describe('Runtime', () => {
);
});

// ─── Checkpoint Store Resolution ─────────────────────────────────────

it('defaults to FileCheckpointStore when modulePath is set', async () => {
const tempDir = await mkdtemp(join(tmpdir(), 'orgloop-rt-cp-'));
try {
runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false });
await runtime.start();

const source = new MockSource('cp-source');
const actor = new MockActor('cp-actor');

const config = makeModuleConfig('cp-mod', { modulePath: tempDir });
const status = await runtime.loadModule(config, {
sources: new Map([['cp-source', source]]),
actors: new Map([['cp-actor', actor]]),
});

expect(status.state).toBe('active');
// Module loaded with modulePath — runtime should have created a FileCheckpointStore
// at tempDir/.orgloop/checkpoints/. We verify by reading back a checkpoint
// after a simulated poll writes one.
const store = new FileCheckpointStore(join(tempDir, '.orgloop', 'checkpoints'));
await store.set('cp-source', 'test-value');
expect(await store.get('cp-source')).toBe('test-value');
} finally {
await runtime.stop();
await rm(tempDir, { recursive: true, force: true });
}
});

it('uses InMemoryCheckpointStore when checkpoint.store is memory', async () => {
runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false });
await runtime.start();

const source = new MockSource('mem-source');
const actor = new MockActor('mem-actor');

const config = makeModuleConfig('mem-mod', {
defaults: { poll_interval: '5m', checkpoint: { store: 'memory' } },
});
// Should not throw — memory store doesn't need filesystem
const status = await runtime.loadModule(config, {
sources: new Map([['mem-source', source]]),
actors: new Map([['mem-actor', actor]]),
});

expect(status.state).toBe('active');
});

it('uses custom checkpoint dir from config', async () => {
const tempDir = await mkdtemp(join(tmpdir(), 'orgloop-rt-cpdir-'));
try {
runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false });
await runtime.start();

const source = new MockSource('dir-source');
const actor = new MockActor('dir-actor');

const customDir = join(tempDir, 'custom-checkpoints');
const config = makeModuleConfig('dir-mod', {
modulePath: tempDir,
defaults: {
poll_interval: '5m',
checkpoint: { dir: customDir },
},
});

await runtime.loadModule(config, {
sources: new Map([['dir-source', source]]),
actors: new Map([['dir-actor', actor]]),
});

expect(runtime.status().modules).toHaveLength(1);
} finally {
await runtime.stop();
await rm(tempDir, { recursive: true, force: true });
}
});

it('resolves relative checkpoint dir against modulePath', async () => {
const tempDir = await mkdtemp(join(tmpdir(), 'orgloop-rt-relcp-'));
try {
runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false });
await runtime.start();

const source = new MockSource('rel-source');
const actor = new MockActor('rel-actor');

const config = makeModuleConfig('rel-mod', {
modulePath: tempDir,
defaults: {
poll_interval: '5m',
checkpoint: { dir: 'my-checkpoints' },
},
});

await runtime.loadModule(config, {
sources: new Map([['rel-source', source]]),
actors: new Map([['rel-actor', actor]]),
});

expect(runtime.status().modules).toHaveLength(1);
} finally {
await runtime.stop();
await rm(tempDir, { recursive: true, force: true });
}
});

it('explicit checkpointStore option overrides config defaults', async () => {
runtime = new Runtime({ bus: new InMemoryBus(), crashHandlers: false });
await runtime.start();

const source = new MockSource('override-source');
const actor = new MockActor('override-actor');
const customStore = new InMemoryCheckpointStore();

const config = makeModuleConfig('override-mod', {
defaults: { poll_interval: '5m', checkpoint: { store: 'file' } },
});

// Passing explicit checkpointStore should override config
const status = await runtime.loadModule(config, {
sources: new Map([['override-source', source]]),
actors: new Map([['override-actor', actor]]),
checkpointStore: customStore,
});

expect(status.state).toBe('active');
});

it('stop() shuts down all modules', async () => {
runtime = new Runtime({ bus: new InMemoryBus() });
await runtime.start();
Expand Down
59 changes: 58 additions & 1 deletion packages/core/src/__tests__/store.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { mkdtemp, readdir, readFile, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createTestEvent } from '@orgloop/sdk';
Expand Down Expand Up @@ -98,6 +98,63 @@ describe('FileCheckpointStore', () => {
await deepStore.set('test-source', 'cp-value');
expect(await deepStore.get('test-source')).toBe('cp-value');
});

it('writes atomically — no temp files left behind', async () => {
await store.set('atomic-test', 'value-1');
await store.set('atomic-test', 'value-2');

// Only the final checkpoint file should exist, no .tmp files
const checkpointDir = join(tempDir, 'checkpoints');
const files = await readdir(checkpointDir);
const tmpFiles = files.filter((f) => f.endsWith('.tmp'));
expect(tmpFiles).toHaveLength(0);
expect(files).toContain('atomic-test.json');
});

it('writes valid JSON with checkpoint and updated_at', async () => {
await store.set('json-test', 'my-checkpoint-value');

const checkpointDir = join(tempDir, 'checkpoints');
const content = await readFile(join(checkpointDir, 'json-test.json'), 'utf-8');
const data = JSON.parse(content) as { checkpoint: string; updated_at: string };

expect(data.checkpoint).toBe('my-checkpoint-value');
expect(data.updated_at).toBeDefined();
// updated_at should be a valid ISO 8601 timestamp
expect(new Date(data.updated_at).toISOString()).toBe(data.updated_at);
});

it('handles concurrent writes to different sources', async () => {
// Write to multiple sources concurrently
await Promise.all([
store.set('source-a', 'cp-a'),
store.set('source-b', 'cp-b'),
store.set('source-c', 'cp-c'),
]);

expect(await store.get('source-a')).toBe('cp-a');
expect(await store.get('source-b')).toBe('cp-b');
expect(await store.get('source-c')).toBe('cp-c');
});

it('handles checkpoint values with special characters', async () => {
const special = '{"cursor":"abc123","ts":"2024-01-01T00:00:00Z"}';
await store.set('special', special);
expect(await store.get('special')).toBe(special);
});

it('returns null for corrupted checkpoint file', async () => {
// Write a valid checkpoint first
await store.set('corrupt', 'valid');

// Corrupt the file
const checkpointDir = join(tempDir, 'checkpoints');
const { writeFile } = await import('node:fs/promises');
await writeFile(join(checkpointDir, 'corrupt.json'), 'not-json{{{', 'utf-8');

// Should return null (graceful fallback)
expect(await store.get('corrupt')).toBeNull();
});
});

describe('InMemoryEventStore', () => {
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/module-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ export interface ModuleConfig {
/** Logger definitions */
loggers: LoggerDefinition[];
/** Defaults */
defaults?: { poll_interval?: string };
defaults?: {
poll_interval?: string;
checkpoint?: {
store?: 'file' | 'memory';
dir?: string;
};
};
/** Filesystem path to the module */
modulePath?: string;
}
Expand Down
Loading
Loading