Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions packages/core/src/catalog.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * as Catalog from "./catalog"

import { Array, Context, Effect, Layer, Option, Order, pipe, Schema } from "effect"
import { Catalog } from "@opencode-ai/schema/catalog"
import { ModelV2 } from "./model"
import { ProviderV2 } from "./provider"
import { EventV2 } from "./event"
Expand All @@ -17,9 +18,7 @@ export type DefaultModel = { providerID: ProviderV2.ID; modelID: ModelV2.ID }

export const PolicyActions = Schema.Literals(["provider.use"])

export const Event = {
Updated: EventV2.define({ type: "catalog.updated", schema: {} }),
}
export const Event = Catalog.Event

type Data = {
providers: Map<ProviderV2.ID, ProviderRecord>
Expand Down
117 changes: 23 additions & 94 deletions packages/core/src/event.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,19 @@
export * as EventV2 from "./event"

import { Cause, Context, Effect, Layer, Option, PubSub, Schema, Stream } from "effect"
import { Event } from "@opencode-ai/schema/event"
import type { Data, Definition, Payload } from "@opencode-ai/schema/event"
import { and, asc, eq, gt } from "drizzle-orm"
import { Database } from "./database/database"
import { EventSequenceTable, EventTable } from "./event/sql"
import { Location } from "./location"
import { withStatics } from "./schema"
import { Identifier } from "./util/identifier"
import { LayerNode } from "./effect/layer-node"
import { isDeepStrictEqual } from "node:util"
import { Durable } from "@opencode-ai/schema/durable-event-manifest"

export const ID = Schema.String.check(Schema.isStartsWith("evt_")).pipe(
Schema.brand("Event.ID"),
withStatics((schema) => ({
create: () => schema.make("evt_" + Identifier.ascending()),
})),
)
export type ID = typeof ID.Type

export type Definition<Type extends string = string, DataSchema extends Schema.Top = Schema.Top> = {
readonly type: Type
readonly durable?: {
readonly version: number
readonly aggregate: string
}
readonly data: DataSchema
}

export type Data<D extends Definition> = Schema.Schema.Type<D["data"]>

export type Payload<D extends Definition = Definition> = {
readonly id: ID
readonly type: D["type"]
readonly data: Data<D>
readonly durable?: {
readonly aggregateID: string
readonly seq: number
readonly version: number
}
readonly location?: Location.Ref
readonly metadata?: Record<string, unknown>
}
export const ID = Event.ID
export type ID = import("@opencode-ai/schema/event").ID
export type { Data, Definition, Payload } from "@opencode-ai/schema/event"

export type Subscriber<D extends Definition = Definition> = (event: Payload<D>) => Effect.Effect<void>
export type Unsubscribe = Effect.Effect<void>
Expand Down Expand Up @@ -74,52 +47,8 @@ export class InvalidDurableEventError extends Schema.TaggedErrorClass<InvalidDur
},
) {}

export function versionedType(type: string, version: number) {
return `${type}.${version}`
}

export const registry = new Map<string, Definition>()
const durableRegistry = new Map<string, Definition>()

export function define<const Type extends string, Fields extends Schema.Struct.Fields>(input: {
readonly type: Type
readonly durable?: {
readonly version: number
readonly aggregate: string
}
readonly schema: Fields
}): Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> & Definition<Type, Schema.Struct<Fields>> {
const Data = Schema.Struct(input.schema)
const Payload = Schema.Struct({
id: ID,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)),
type: Schema.Literal(input.type),
durable: Schema.optional(Schema.Struct({ aggregateID: Schema.String, seq: Schema.Number, version: Schema.Number })),
location: Schema.optional(Location.Ref),
data: Data,
}).annotate({ identifier: input.type })

const definition = Object.assign(Payload, {
type: input.type,
...(input.durable === undefined ? {} : { durable: input.durable }),
data: Data,
})
const existing = registry.get(input.type)
if (
input.durable === undefined ||
existing?.durable === undefined ||
input.durable.version >= existing.durable.version
) {
registry.set(input.type, definition)
}
if (input.durable) durableRegistry.set(versionedType(input.type, input.durable.version), definition)
return definition as Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> &
Definition<Type, Schema.Struct<Fields>>
}

export function definitions() {
return registry.values().toArray()
}
export const define = Event.define
export const versionedType = Event.versionedType

export interface PublishOptions {
readonly id?: ID
Expand Down Expand Up @@ -169,6 +98,7 @@ export const layerWith = (options?: LayerOptions) =>
typed: new Map<string, PubSub.PubSub<Payload>>(),
}
const projectors = new Map<string, Subscriber[]>()
// TODO: Bind durable projectors to exact type+version before supporting incompatible historical payloads.
const listeners = new Array<Subscriber>()
const { db } = yield* Database.Service

Expand All @@ -194,6 +124,7 @@ export const layerWith = (options?: LayerOptions) =>
)

function commitDurableEvent(
definition: Definition,
event: Payload,
input?: {
readonly seq: number
Expand All @@ -204,7 +135,6 @@ export const layerWith = (options?: LayerOptions) =>
commit?: (seq: number) => Effect.Effect<void>,
) {
return Effect.gen(function* () {
const definition = registry.get(event.type)
const durable = definition?.durable
if (durable) {
const aggregateID = (event.data as Record<string, unknown>)[durable.aggregate]
Expand Down Expand Up @@ -238,9 +168,10 @@ export const layerWith = (options?: LayerOptions) =>
.get()
.pipe(Effect.orDie)
const latest = row?.seq ?? -1
const encoded = Schema.encodeUnknownSync(
definition.data as Schema.Codec<unknown, unknown, never, never>,
)(event.data) as Record<string, unknown>
const encoded = Schema.encodeUnknownSync(definition.data)(event.data) as Record<
string,
unknown
>
if (input?.strictOwner && row?.ownerID && row.ownerID !== input.ownerID) {
yield* Effect.die(
new InvalidDurableEventError({
Expand Down Expand Up @@ -356,9 +287,8 @@ export const layerWith = (options?: LayerOptions) =>
})
}

function publishEvent<D extends Definition>(event: Payload<D>, commit?: PublishOptions["commit"]) {
function publishEvent<D extends Definition>(definition: D, event: Payload<D>, commit?: PublishOptions["commit"]) {
return Effect.gen(function* () {
const definition = registry.get(event.type)
if (!definition?.durable && commit)
return yield* Effect.die(
new InvalidDurableEventError({
Expand All @@ -367,7 +297,7 @@ export const layerWith = (options?: LayerOptions) =>
}),
)
if (definition?.durable) {
const committed = yield* commitDurableEvent(event as Payload, undefined, commit)
const committed = yield* commitDurableEvent(definition, event as Payload, undefined, commit)
if (committed) {
event = {
...event,
Expand Down Expand Up @@ -416,6 +346,7 @@ export const layerWith = (options?: LayerOptions) =>
? { directory: serviceLocation.directory, workspaceID: serviceLocation.workspaceID }
: undefined)
return yield* publishEvent(
definition,
{
id: options?.id ?? ID.create(),
...(options?.metadata ? { metadata: options.metadata } : {}),
Expand All @@ -433,7 +364,7 @@ export const layerWith = (options?: LayerOptions) =>
options?: { readonly publish?: boolean; readonly ownerID?: string; readonly strictOwner?: boolean },
) {
return Effect.gen(function* () {
const definition = durableRegistry.get(event.type)
const definition = Durable.get(event.type)
if (!definition?.durable) {
yield* Effect.die(
new InvalidDurableEventError({ type: event.type, message: `Unknown durable event type ${event.type}` }),
Expand All @@ -442,11 +373,9 @@ export const layerWith = (options?: LayerOptions) =>
const payload = {
id: event.id,
type: definition.type,
data: Schema.decodeUnknownSync(definition.data as Schema.Codec<unknown, unknown, never, never>)(
event.data,
),
data: Schema.decodeUnknownSync(definition.data)(event.data),
} as Payload
const committed = yield* commitDurableEvent(payload, {
const committed = yield* commitDurableEvent(definition, payload, {
seq: event.seq,
aggregateID: event.aggregateID,
ownerID: options?.ownerID,
Expand Down Expand Up @@ -530,16 +459,16 @@ export const layerWith = (options?: LayerOptions) =>

const streamAll = (): Stream.Stream<Payload> => Stream.fromPubSub(pubsub.all)

const decodeSerializedEvent = (event: SerializedEvent): Payload => {
const definition = durableRegistry.get(event.type)
const decodeSerializedEvent = (event: SerializedEvent) => {
const definition = Durable.get(event.type)
if (!definition?.durable) {
throw new InvalidDurableEventError({ type: event.type, message: `Unknown durable event type ${event.type}` })
}
return {
id: event.id,
type: definition.type,
durable: { aggregateID: event.aggregateID, seq: event.seq, version: definition.durable.version },
data: Schema.decodeUnknownSync(definition.data as Schema.Codec<unknown, unknown, never, never>)(event.data),
data: Schema.decodeUnknownSync(definition.data)(event.data),
}
}

Expand Down
12 changes: 2 additions & 10 deletions packages/core/src/filesystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ export * as FileSystem from "./filesystem"

import path from "path"
import { Context, Effect, Layer, Schema } from "effect"
import { EventV2 } from "./event"
import { FSUtil } from "./fs-util"
import { Location } from "./location"
import { PositiveInt, RelativePath } from "./schema"
import { FileSystemSearch } from "./filesystem/search"
import { Entry, Match } from "@opencode-ai/schema/filesystem"
import { Entry, FileSystem, Match } from "@opencode-ai/schema/filesystem"
export { Entry, Match, Submatch } from "@opencode-ai/schema/filesystem"

export const ReadInput = Schema.Struct({
Expand Down Expand Up @@ -48,14 +47,7 @@ export class GrepInput extends Schema.Class<GrepInput>("FileSystem.GrepInput")({
limit: PositiveInt.pipe(Schema.optional),
}) {}

export const Event = {
Edited: EventV2.define({
type: "file.edited",
schema: {
file: Schema.String,
},
}),
}
export const Event = FileSystem.Event

export interface Interface {
readonly read: (input: ReadInput) => Effect.Effect<{ readonly content: Uint8Array; readonly mime: string }>
Expand Down
13 changes: 3 additions & 10 deletions packages/core/src/filesystem/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ export * as Watcher from "./watcher"
// @ts-ignore
import { createWrapper } from "@parcel/watcher/wrapper"
import type ParcelWatcher from "@parcel/watcher"
import { Cause, Context, Effect, Layer, Schema } from "effect"
import { Cause, Context, Effect, Layer } from "effect"
import { FileSystemWatcher } from "@opencode-ai/schema/filesystem-watcher"
import path from "path"
import { Config } from "../config"
import { EventV2 } from "../event"
Expand All @@ -19,15 +20,7 @@ declare const OPENCODE_LIBC: string | undefined

const SUBSCRIBE_TIMEOUT_MS = 10_000

export const Event = {
Updated: EventV2.define({
type: "file.watcher.updated",
schema: {
file: Schema.String,
event: Schema.Literals(["add", "change", "unlink"]),
},
}),
}
export const Event = FileSystemWatcher.Event

const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
try {
Expand Down
11 changes: 1 addition & 10 deletions packages/core/src/integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,7 @@ export class AuthorizationError extends Schema.TaggedErrorClass<AuthorizationErr

export type Error = CodeRequiredError | AuthorizationError

export const Event = {
Updated: EventV2.define({
type: "integration.updated",
schema: {},
}),
ConnectionUpdated: EventV2.define({
type: "integration.connection.updated",
schema: { integrationID: ID },
}),
}
export const Event = Integration.Event

export const Ref = Integration.Ref
export type Ref = Integration.Ref
Expand Down
8 changes: 2 additions & 6 deletions packages/core/src/models-dev.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import path from "path"
import { Context, Duration, Effect, Layer, Option, Schedule, Schema } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest } from "effect/unstable/http"
import { ModelsDev } from "@opencode-ai/schema/models-dev"
import { Global } from "./global"
import { Flag } from "./flag/flag"
import { Flock } from "./util/flock"
Expand Down Expand Up @@ -108,12 +109,7 @@ export const Provider = Schema.Struct({

export type Provider = Schema.Schema.Type<typeof Provider>

export const Event = {
Refreshed: EventV2.define({
type: "models-dev.refreshed",
schema: {},
}),
}
export const Event = ModelsDev.Event

declare const OPENCODE_MODELS_DEV: Record<string, Provider> | undefined

Expand Down
Loading
Loading