Skip to content

Commit c0de0ae

Browse files
committed
deprecate MigrateTx, convert tests to use schemas
As detailed in #600, there are certain combinations of schema changes which are not allowed to be run within the same transaction. The example we encountered with #590 is adding a new enum value, then using it in an immutable function during a subsequent migration. In Postgres, these must be separated by a commit. There are other examples of things which cannot be run in a transaction, such as `CREATE INDEX CONCURRENTLY`. While that specific one isn't solved here, moving away from a migrator that bundles migrations into a single transaction will also allow us to update our migration system to exclude certain migrations from transactions and i.e. add indexes concurrently.
1 parent 30d0691 commit c0de0ae

6 files changed

Lines changed: 217 additions & 142 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3535
}
3636
```
3737

38+
- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.
39+
3840
- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).
3941

4042
## Fixed

rivermigrate/example_migrate_database_sql_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,27 @@ import (
1818
func Example_migrateDatabaseSQL() {
1919
ctx := context.Background()
2020

21-
dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example"))
21+
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
22+
schemaName := "migration_example_dbsql"
23+
url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName
24+
dbPool, err := sql.Open("pgx", url)
2225
if err != nil {
2326
panic(err)
2427
}
2528
defer dbPool.Close()
2629

27-
tx, err := dbPool.BeginTx(ctx, nil)
30+
driver := riverdatabasesql.New(dbPool)
31+
migrator, err := rivermigrate.New(driver, nil)
2832
if err != nil {
2933
panic(err)
3034
}
31-
defer tx.Rollback()
3235

33-
migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil)
34-
if err != nil {
36+
// Create the schema used for this example. Drop it when we're done.
37+
// This isn't necessary outside this test.
38+
if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
3539
panic(err)
3640
}
37-
38-
// Our test database starts with a full River schema. Drop it so that we can
39-
// demonstrate working migrations. This isn't necessary outside this test.
40-
dropRiverSchema(ctx, migrator, tx)
41+
defer dropRiverSchema(ctx, driver, schemaName)
4142

4243
printVersions := func(res *rivermigrate.MigrateResult) {
4344
for _, version := range res.Versions {
@@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() {
4748

4849
// Migrate to version 3. An actual call may want to omit all MigrateOpts,
4950
// which will default to applying all available up migrations.
50-
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
51+
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
5152
TargetVersion: 3,
5253
})
5354
if err != nil {
@@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() {
5758

5859
// Migrate down by three steps. Down migrating defaults to running only one
5960
// step unless overridden by an option like MaxSteps or TargetVersion.
60-
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
61+
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
6162
MaxSteps: 3,
6263
})
6364
if err != nil {

rivermigrate/example_migrate_test.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/jackc/pgx/v5/pgxpool"
99

1010
"github.com/riverqueue/river/internal/riverinternaltest"
11+
"github.com/riverqueue/river/riverdriver"
1112
"github.com/riverqueue/river/riverdriver/riverpgxv5"
1213
"github.com/riverqueue/river/rivermigrate"
1314
)
@@ -17,26 +18,29 @@ import (
1718
func Example_migrate() {
1819
ctx := context.Background()
1920

20-
dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
21+
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
22+
schemaName := "migration_example"
23+
poolConfig := riverinternaltest.DatabaseConfig("river_test_example")
24+
poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName
25+
26+
dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig)
2127
if err != nil {
2228
panic(err)
2329
}
2430
defer dbPool.Close()
2531

26-
tx, err := dbPool.Begin(ctx)
32+
driver := riverpgxv5.New(dbPool)
33+
migrator, err := rivermigrate.New(driver, nil)
2734
if err != nil {
2835
panic(err)
2936
}
30-
defer tx.Rollback(ctx)
3137

32-
migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil)
33-
if err != nil {
38+
// Create the schema used for this example. Drop it when we're done.
39+
// This isn't necessary outside this test.
40+
if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
3441
panic(err)
3542
}
36-
37-
// Our test database starts with a full River schema. Drop it so that we can
38-
// demonstrate working migrations. This isn't necessary outside this test.
39-
dropRiverSchema(ctx, migrator, tx)
43+
defer dropRiverSchema(ctx, driver, schemaName)
4044

4145
printVersions := func(res *rivermigrate.MigrateResult) {
4246
for _, version := range res.Versions {
@@ -46,7 +50,7 @@ func Example_migrate() {
4650

4751
// Migrate to version 3. An actual call may want to omit all MigrateOpts,
4852
// which will default to applying all available up migrations.
49-
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
53+
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
5054
TargetVersion: 3,
5155
})
5256
if err != nil {
@@ -56,7 +60,7 @@ func Example_migrate() {
5660

5761
// Migrate down by three steps. Down migrating defaults to running only one
5862
// step unless overridden by an option like MaxSteps or TargetVersion.
59-
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
63+
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
6064
MaxSteps: 3,
6165
})
6266
if err != nil {
@@ -73,10 +77,8 @@ func Example_migrate() {
7377
// Migrated [DOWN] version 1
7478
}
7579

76-
func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) {
77-
_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
78-
TargetVersion: -1,
79-
})
80+
func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) {
81+
_, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;")
8082
if err != nil {
8183
panic(err)
8284
}

rivermigrate/river_migrate.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *
326326
// This variant lets a caller run migrations within a transaction. Postgres DDL
327327
// is transactional, so migration changes aren't visible until the transaction
328328
// commits, and are rolled back if the transaction rolls back.
329+
//
330+
// Deprecated: Use Migrate instead. Certain migrations cannot be batched together
331+
// in a single transaction, so this method is not recommended.
329332
func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
330333
switch direction {
331334
case DirectionDown:
@@ -572,7 +575,6 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex
572575
}
573576
return nil
574577
})
575-
576578
if err != nil {
577579
return nil, err
578580
}

0 commit comments

Comments
 (0)