Skip to content

Commit 9be1d11

Browse files
authored
unify all bulk inserts to a single code path (#610)
1 parent 185dbb8 commit 9be1d11

1 file changed

Lines changed: 40 additions & 54 deletions

File tree

client.go

Lines changed: 40 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,14 +1441,41 @@ func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertM
14411441
}
14421442

14431443
func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
1444+
return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
1445+
results, err := tx.JobInsertFastMany(ctx, insertParams)
1446+
if err != nil {
1447+
return nil, err
1448+
}
1449+
1450+
return sliceutil.Map(results,
1451+
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
1452+
return (*rivertype.JobInsertResult)(result)
1453+
},
1454+
), nil
1455+
})
1456+
}
1457+
1458+
// The shared code path for all InsertMany methods. It takes a function that
1459+
// executes the actual insert operation and allows for different implementations
1460+
// of the insert query to be passed in, each mapping their results back to a
1461+
// common result type.
1462+
//
1463+
// TODO(bgentry): this isn't yet used for the single insert path. The only thing
1464+
// blocking that is the removal of advisory lock unique inserts.
1465+
func (c *Client[TTx]) insertManyShared(
1466+
ctx context.Context,
1467+
tx riverdriver.ExecutorTx,
1468+
params []InsertManyParams,
1469+
execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error),
1470+
) ([]*rivertype.JobInsertResult, error) {
14441471
insertParams, err := c.insertManyParams(params)
14451472
if err != nil {
14461473
return nil, err
14471474
}
14481475

1449-
jobRows, err := tx.JobInsertFastMany(ctx, insertParams)
1476+
inserted, err := execute(ctx, insertParams)
14501477
if err != nil {
1451-
return nil, err
1478+
return inserted, err
14521479
}
14531480

14541481
queues := make([]string, 0, 10)
@@ -1460,12 +1487,7 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
14601487
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
14611488
return nil, err
14621489
}
1463-
1464-
return sliceutil.Map(jobRows,
1465-
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
1466-
return (*rivertype.JobInsertResult)(result)
1467-
},
1468-
), nil
1490+
return inserted, nil
14691491
}
14701492

14711493
// Validates input parameters for a batch insert operation and generates a set
@@ -1516,19 +1538,14 @@ func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyPar
15161538
return 0, errNoDriverDBPool
15171539
}
15181540

1519-
insertParams, err := c.insertManyFastParams(params)
1520-
if err != nil {
1521-
return 0, err
1522-
}
1523-
15241541
// Wrap in a transaction in case we need to notify about inserts.
15251542
tx, err := c.driver.GetExecutor().Begin(ctx)
15261543
if err != nil {
15271544
return 0, err
15281545
}
15291546
defer tx.Rollback(ctx)
15301547

1531-
inserted, err := c.insertManyFast(ctx, tx, insertParams)
1548+
inserted, err := c.insertManyFast(ctx, tx, params)
15321549
if err != nil {
15331550
return 0, err
15341551
}
@@ -1562,54 +1579,23 @@ func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyPar
15621579
// unique conflicts cannot be handled gracefully. If a unique constraint is
15631580
// violated, the operation will fail and no jobs will be inserted.
15641581
func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
1565-
insertParams, err := c.insertManyFastParams(params)
1566-
if err != nil {
1567-
return 0, err
1568-
}
1569-
15701582
exec := c.driver.UnwrapExecutor(tx)
1571-
return c.insertManyFast(ctx, exec, insertParams)
1572-
}
1573-
1574-
func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
1575-
inserted, err := tx.JobInsertFastManyNoReturning(ctx, insertParams)
1576-
if err != nil {
1577-
return inserted, err
1578-
}
1579-
1580-
queues := make([]string, 0, 10)
1581-
for _, params := range insertParams {
1582-
if params.State == rivertype.JobStateAvailable {
1583-
queues = append(queues, params.Queue)
1584-
}
1585-
}
1586-
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
1587-
return 0, err
1588-
}
1589-
return inserted, nil
1583+
return c.insertManyFast(ctx, exec, params)
15901584
}
15911585

1592-
// Validates input parameters for an a batch insert operation and generates a
1593-
// set of batch insert parameters.
1594-
func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
1595-
if len(params) < 1 {
1596-
return nil, errors.New("no jobs to insert")
1597-
}
1598-
1599-
insertParams := make([]*riverdriver.JobInsertFastParams, len(params))
1600-
for i, param := range params {
1601-
if err := c.validateJobArgs(param.Args); err != nil {
1602-
return nil, err
1603-
}
1604-
1605-
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
1586+
func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) (int, error) {
1587+
results, err := c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
1588+
count, err := tx.JobInsertFastManyNoReturning(ctx, insertParams)
16061589
if err != nil {
16071590
return nil, err
16081591
}
1609-
insertParams[i] = insertParamsItem
1592+
return make([]*rivertype.JobInsertResult, count), nil
1593+
})
1594+
if err != nil {
1595+
return 0, err
16101596
}
16111597

1612-
return insertParams, nil
1598+
return len(results), nil
16131599
}
16141600

16151601
func (c *Client[TTx]) maybeNotifyInsert(ctx context.Context, tx riverdriver.ExecutorTx, state rivertype.JobState, queue string) error {

0 commit comments

Comments
 (0)