Skip to content

Commit abebe6c

Browse files
committed
Preliminary "foot in the door" SQLite support
Here, add an implementation for SQLite. The new driver passes the entire driver test suite (including some newly added tests), and checks out against a new end-to-end suite of client tests targeting it. I think at this point we can consider it to be largely functional, although with minimal real-world vetting, but at least alpha quality that we can start putting in front of users. The main reason this turned out to be quite an effort is that SQLite, although nominally supporting a lot of the same syntax Postgres does, just has a lot of unexpected limitations and quirks that all need to be tracked down separately and a workaround added. For example: * SQLite doesn't really have data types. Everything is either an integer, real, text, or blob, and anything more sophisticated just has to be shoehorned into one of these types. For example, there's no boolean. Despite the fanfare recently, there isn't even a jsonb. Jsonb gets pushed into a blob. * The most annoying missing data type is a timestamp. Date/times are stored as either Unix integers or strings, and SQLite provides a number of obnoxious, error-prone, built-ins to work with them. * No arrays. * No modification operations in CTEs. (Although counting my lucky stars that CTEs are supported at least.) * No listen/notify. Aside from that though, we had to get some testing infrastructure in place, and our testing has traditionally been quite affixed to Postgres, which would've made a lot of changes necessary. However, the schema additions in #848 do help to make this quite a bit easier with the driver-based `TestSchema` and `TestTx` functions, and because they raise schema automatically, it doesn't complicate test setup instructions in any way by adding extra steps. A few caveats: * I've found that the combination of SQLite's reduced capabilities + sqlc's bugginess [1] make bit batch insert/update operations basically impossible (I tried every workaround I could possibly thin of), so these are done in loops of individual operations instead. I *think* this is okay for now though. For one, sqlc will hopefully get these limitations fixed eventually, and for two, SQLite databases will often be running locally, meaning the round trip cost per operation is much lower than what we'd see in a hosted Postgres somewhere. * It's not totally clear that having a SQLite implementation will be "worth it enough" in the long run given that it will add some difficulty to non-trivial future database-related operations. My hope is that as it's in a prerelease state, we can gauge how bad it is to keep up to date. If it feels like way more effort than it's worth, we can still axe it before it ever becomes a functional driver. [1] sqlc-dev/sqlc#3802 (comment)
1 parent 8fd0a3b commit abebe6c

90 files changed

Lines changed: 7676 additions & 1476 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
/river
33
/internal/cmd/riverbench/riverbench
44
/internal/cmd/testdbman/testdbman
5+
/sqlite/

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Preliminary River driver for SQLite (`riverdriver/riversqlite`). This driver seems to produce good results as judged by the test suite, but so far as minimal real world vetting. Try it and let us know how it works out. [PR #870](https://github.com/riverqueue/river/pull/870).
13+
1014
## [0.22.0] - 2025-05-10
1115

1216
### Added
@@ -19,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1923

2024
### Fixed
2125

26+
- Resuming an already unpaused queue is now fully an no-op, and won't touch the row's `updated_at` like it (unintentionally) did before. [PR #870](https://github.com/riverqueue/river/pull/870).
2227
- The `riverdatabasesql` now fully supports raw connections through [`lib/pq`](https://github.com/lib/pq) rather than just `database/sql` through Pgx. We don't recommend the use of `lib/pq` as it's an unmaintained project, but this change should help with compatibility for older projects. [PR #883](https://github.com/riverqueue/river/pull/883).
2328

2429
## [0.21.0] - 2025-05-02

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ generate/migrations: ## Sync changes of pgxv5 migrations to database/sql
2828
generate/sqlc: ## Generate sqlc
2929
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc generate
3030
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate
31+
cd riverdriver/riversqlite/internal/dbsqlc && sqlc generate
3132

3233
# Looks at comments using ## on targets and uses them to produce a help output.
3334
.PHONY: help

client.go

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,15 @@ type Config struct {
339339
// instances of rivertype.WorkerMiddleware).
340340
WorkerMiddleware []rivertype.WorkerMiddleware
341341

342+
// queuePollInterval is the amount of time between periodic checks for queue
343+
// setting changes. This is only used in poll-only mode (when no notifier is
344+
// provided).
345+
//
346+
// This is internal for the time being as it hasn't had any major demand to
347+
// be exposed, but it's needed to make sure that our poll-only tests can
348+
// finish in a timely manner.
349+
queuePollInterval time.Duration
350+
342351
// Scheduler run interval. Shared between the scheduler and producer/job
343352
// executors, but not currently exposed for configuration.
344353
schedulerInterval time.Duration
@@ -400,6 +409,7 @@ func (c *Config) WithDefaults() *Config {
400409
TestOnly: c.TestOnly,
401410
WorkerMiddleware: c.WorkerMiddleware,
402411
Workers: c.Workers,
412+
queuePollInterval: c.queuePollInterval,
403413
schedulerInterval: cmp.Or(c.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
404414
}
405415
}
@@ -715,7 +725,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
715725
// we're actually going to be working jobs (as opposed to just enqueueing
716726
// them):
717727
if config.willExecuteJobs() {
718-
if !driver.HasPool() {
728+
if !driver.PoolIsSet() {
719729
return nil, errMissingDatabasePoolWithQueues
720730
}
721731

@@ -1510,7 +1520,7 @@ var errNoDriverDBPool = errors.New("driver must have non-nil database pool to us
15101520
// // handle error
15111521
// }
15121522
func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
1513-
if !c.driver.HasPool() {
1523+
if !c.driver.PoolIsSet() {
15141524
return nil, errNoDriverDBPool
15151525
}
15161526

@@ -1587,7 +1597,7 @@ type InsertManyParams struct {
15871597
// // handle error
15881598
// }
15891599
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
1590-
if !c.driver.HasPool() {
1600+
if !c.driver.PoolIsSet() {
15911601
return nil, errNoDriverDBPool
15921602
}
15931603

@@ -1765,7 +1775,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*rivertype.
17651775
// Unlike with `InsertMany`, unique conflicts cannot be handled gracefully. If a
17661776
// unique constraint is violated, the operation will fail and no jobs will be inserted.
17671777
func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) {
1768-
if !c.driver.HasPool() {
1778+
if !c.driver.PoolIsSet() {
17691779
return 0, errNoDriverDBPool
17701780
}
17711781

@@ -1865,20 +1875,23 @@ func (c *Client[TTx]) maybeNotifyInsertForQueues(ctx context.Context, tx riverdr
18651875
return nil
18661876
}
18671877

1868-
err := tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
1869-
Payload: payloads,
1870-
Schema: c.config.Schema,
1871-
Topic: string(notifier.NotificationTopicInsert),
1872-
})
1873-
if err != nil {
1874-
c.baseService.Logger.ErrorContext(
1875-
ctx,
1876-
c.baseService.Name+": Failed to send job insert notification",
1877-
slog.String("queues", strings.Join(queuesDeduped, ",")),
1878-
slog.String("err", err.Error()),
1879-
)
1880-
return err
1878+
if c.driver.SupportsListenNotify() {
1879+
err := tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
1880+
Payload: payloads,
1881+
Schema: c.config.Schema,
1882+
Topic: string(notifier.NotificationTopicInsert),
1883+
})
1884+
if err != nil {
1885+
c.baseService.Logger.ErrorContext(
1886+
ctx,
1887+
c.baseService.Name+": Failed to send job insert notification",
1888+
slog.String("queues", strings.Join(queuesDeduped, ",")),
1889+
slog.String("err", err.Error()),
1890+
)
1891+
return err
1892+
}
18811893
}
1894+
18821895
return nil
18831896
}
18841897

@@ -1896,19 +1909,22 @@ func (c *Client[TTx]) notifyQueuePauseOrResume(ctx context.Context, tx riverdriv
18961909
return err
18971910
}
18981911

1899-
err = tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
1900-
Payload: []string{string(payload)},
1901-
Schema: c.config.Schema,
1902-
Topic: string(notifier.NotificationTopicControl),
1903-
})
1904-
if err != nil {
1905-
c.baseService.Logger.ErrorContext(
1906-
ctx,
1907-
c.baseService.Name+": Failed to send queue state change notification",
1908-
slog.String("err", err.Error()),
1909-
)
1910-
return err
1912+
if c.driver.SupportsListenNotify() {
1913+
err = tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
1914+
Payload: []string{string(payload)},
1915+
Schema: c.config.Schema,
1916+
Topic: string(notifier.NotificationTopicControl),
1917+
})
1918+
if err != nil {
1919+
c.baseService.Logger.ErrorContext(
1920+
ctx,
1921+
c.baseService.Name+": Failed to send queue state change notification",
1922+
slog.String("err", err.Error()),
1923+
)
1924+
return err
1925+
}
19111926
}
1927+
19121928
return nil
19131929
}
19141930

@@ -1943,6 +1959,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr
19431959
Notifier: c.notifier,
19441960
Queue: queueName,
19451961
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
1962+
QueuePollInterval: c.config.queuePollInterval,
19461963
RetryPolicy: c.config.RetryPolicy,
19471964
SchedulerInterval: c.config.schedulerInterval,
19481965
Schema: c.config.Schema,
@@ -1988,7 +2005,7 @@ type JobListResult struct {
19882005
// // handle error
19892006
// }
19902007
func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobListResult, error) {
1991-
if !c.driver.HasPool() {
2008+
if !c.driver.PoolIsSet() {
19922009
return nil, errNoDriverDBPool
19932010
}
19942011

@@ -1997,12 +2014,16 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
19972014
}
19982015
params.schema = c.config.Schema
19992016

2017+
if c.driver.DatabaseName() == "sqlite" && params.metadataFragment != "" {
2018+
return nil, errors.New("JobListResult.Metadata is not supported on SQLite")
2019+
}
2020+
20002021
dbParams, err := params.toDBParams()
20012022
if err != nil {
20022023
return nil, err
20032024
}
20042025

2005-
jobs, err := dblist.JobList(ctx, c.driver.GetExecutor(), dbParams)
2026+
jobs, err := dblist.JobList(ctx, c.driver.GetExecutor(), dbParams, c.driver.SQLFragmentColumnIn)
20062027
if err != nil {
20072028
return nil, err
20082029
}
@@ -2033,7 +2054,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
20332054
return nil, err
20342055
}
20352056

2036-
jobs, err := dblist.JobList(ctx, c.driver.UnwrapExecutor(tx), dbParams)
2057+
jobs, err := dblist.JobList(ctx, c.driver.UnwrapExecutor(tx), dbParams, c.driver.SQLFragmentColumnIn)
20372058
if err != nil {
20382059
return nil, err
20392060
}
@@ -2325,12 +2346,14 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex
23252346
return nil, err
23262347
}
23272348

2328-
if err := executorTx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
2329-
Payload: []string{string(payload)},
2330-
Schema: c.config.Schema,
2331-
Topic: string(notifier.NotificationTopicControl),
2332-
}); err != nil {
2333-
return nil, err
2349+
if c.driver.SupportsListenNotify() {
2350+
if err := executorTx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
2351+
Payload: []string{string(payload)},
2352+
Schema: c.config.Schema,
2353+
Topic: string(notifier.NotificationTopicControl),
2354+
}); err != nil {
2355+
return nil, err
2356+
}
23342357
}
23352358
}
23362359

client_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func newTestConfig(t *testing.T, schema string) *Config {
134134
},
135135
TestOnly: true, // disables staggered start in maintenance services
136136
Workers: workers,
137+
queuePollInterval: 50 * time.Millisecond,
137138
schedulerInterval: riverinternaltest.SchedulerShortInterval,
138139
}
139140
}
@@ -1242,7 +1243,7 @@ func Test_Client(t *testing.T) {
12421243
require.NoError(t, err)
12431244

12441245
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
1245-
require.Equal(t, (withKindAliasesArgs{}).KindAliases(), []string{event.Job.Kind})
1246+
require.Equal(t, []string{event.Job.Kind}, (withKindAliasesArgs{}).KindAliases())
12461247
})
12471248

12481249
t.Run("StartStopStress", func(t *testing.T) {

cmd/river/go.sum

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,18 @@ github.com/lmittmann/tint v1.0.7 h1:D/0OqWZ0YOGZ6AyC+5Y2kD8PBEzBk6rFHVSfOqCkF9Y=
2424
github.com/lmittmann/tint v1.0.7/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
2525
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2626
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
27-
github.com/riverqueue/river/riverdriver v0.20.2 h1:FDmWALB6DvYBBw479euIBg1KClxPmDpWjmZbhScxSBw=
28-
github.com/riverqueue/river/riverdriver v0.20.2/go.mod h1:vYSv6ZTEFWT0JVuGCwZDxJdc2U7ZMkwJQ+nPsa7/2mM=
29-
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.2 h1:llBsU1hpKyIIzZroeVjM7uavmq3W+kXuSvkUCQ/3pg4=
30-
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.2/go.mod h1:qPJ5qkfAqAYRKXxU1TNFsVwMd9dLIXEFDLrrGz6GAWM=
31-
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2 h1:O8e1vobbKhUmgbki0mLOvCptixMtBiMjJgkGPa4VFAY=
32-
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2/go.mod h1:zn3Lf6qzkq9kEOzYRe/fEgYl9c/eRTCdwBHtclxILEU=
33-
github.com/riverqueue/river/rivertype v0.20.2 h1:unmiQP7CWS6IDbDrp9cESNscPoMstxb6Luoz9kfNzOc=
34-
github.com/riverqueue/river/rivertype v0.20.2/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs=
27+
github.com/riverqueue/river v0.22.0 h1:PO4Ula2RqViQqNs6xjze7yFV6Zq4T3Ffv092+f4S8xQ=
28+
github.com/riverqueue/river v0.22.0/go.mod h1:IRoWoK4RGCiPuVJUV4EWcCl9d/TMQYkk0EEYV/Wgq+U=
29+
github.com/riverqueue/river/riverdriver v0.22.0 h1:i7OSFkUi6x4UKvttdFOIg7NYLYaBOFLJZvkZ0+JWS/8=
30+
github.com/riverqueue/river/riverdriver v0.22.0/go.mod h1:oNdjJCeAJhN/UiZGLNL+guNqWaxMFuSD4lr5x/v/was=
31+
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.22.0 h1:+no3gToOK9SmWg0pDPKfOGSCsrxqqaFdD8K1NQndRbY=
32+
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.22.0/go.mod h1:mygiHa1dnlKRjxT1//wIvfT2fMTbfXKm37NcsxoyBoQ=
33+
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0 h1:2TWbVL73gipJ2/4JNCQbifaNj+BCC/Zxpp30o1D8RTg=
34+
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0/go.mod h1:TZY/BG8w/nDxkraAEvvgyVupIz0b4+PQVUW0kIiy1fc=
35+
github.com/riverqueue/river/rivershared v0.22.0 h1:hLPHr98d6OEfmUJ4KpIXgoy2tbQ14htWILcRBHJF11U=
36+
github.com/riverqueue/river/rivershared v0.22.0/go.mod h1:BK+hvhECfdDLWNDH3xiGI95m2YoPfVtECZLT+my8XM8=
37+
github.com/riverqueue/river/rivertype v0.22.0 h1:rSRhbd5uV/BaFTPxReCxuYTAzx+/riBZJlZdREADvO4=
38+
github.com/riverqueue/river/rivertype v0.22.0/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs=
3539
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
3640
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
3741
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=

docs/development.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ To run programs locally outside of tests, create and raise a development databas
5151
git tag riverdriver/$VERSION -m "release riverdriver/$VERSION"
5252
git tag riverdriver/riverpgxv5/$VERSION -m "release riverdriver/riverpgxv5/$VERSION"
5353
git tag riverdriver/riverdatabasesql/$VERSION -m "release riverdriver/riverdatabasesql/$VERSION"
54+
git tag riverdriver/riversqlite/$VERSION -m "release riverdriver/riversqlite/$VERSION"
5455
git tag rivershared/$VERSION -m "release rivershared/$VERSION"
5556
git tag rivertype/$VERSION -m "release rivertype/$VERSION"
5657
git tag $VERSION

0 commit comments

Comments
 (0)