@@ -282,6 +282,13 @@ type Config struct {
282282 // Defaults to DefaultRetryPolicy.
283283 RetryPolicy ClientRetryPolicy
284284
285+ // schema is a non-standard schema where River tables are located. All table
286+ // references in database queries will use this value as a prefix.
287+ //
288+ // Defaults to empty, which causes the client to look for tables using the
289+ // setting of Postgres `search_path`.
290+ schema string
291+
285292 // SkipUnknownJobCheck is a flag to control whether the client should skip
286293 // checking to see if a registered worker exists in the client's worker bundle
287294 // for a job arg prior to insertion.
@@ -376,6 +383,7 @@ func (c *Config) WithDefaults() *Config {
376383 ReindexerSchedule : c .ReindexerSchedule ,
377384 RescueStuckJobsAfter : valutil .ValOrDefault (c .RescueStuckJobsAfter , rescueAfter ),
378385 RetryPolicy : retryPolicy ,
386+ schema : c .schema ,
379387 SkipUnknownJobCheck : c .SkipUnknownJobCheck ,
380388 Test : c .Test ,
381389 TestOnly : c .TestOnly ,
@@ -668,7 +676,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
668676 // uses listen/notify. Instead, each service polls for changes it's
669677 // interested in. e.g. Elector polls to see if leader has expired.
670678 if ! config .PollOnly {
671- client .notifier = notifier .New (archetype , driver .GetListener ())
679+ client .notifier = notifier .New (archetype , driver .GetListener (config . schema ))
672680 client .services = append (client .services , client .notifier )
673681 }
674682 } else {
@@ -705,6 +713,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
705713 CancelledJobRetentionPeriod : config .CancelledJobRetentionPeriod ,
706714 CompletedJobRetentionPeriod : config .CompletedJobRetentionPeriod ,
707715 DiscardedJobRetentionPeriod : config .DiscardedJobRetentionPeriod ,
716+ Schema : config .schema ,
708717 Timeout : config .JobCleanerTimeout ,
709718 }, driver .GetExecutor ())
710719 maintenanceServices = append (maintenanceServices , jobCleaner )
@@ -715,6 +724,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
715724 jobRescuer := maintenance .NewRescuer (archetype , & maintenance.JobRescuerConfig {
716725 ClientRetryPolicy : config .RetryPolicy ,
717726 RescueAfter : config .RescueStuckJobsAfter ,
727+ Schema : config .schema ,
718728 WorkUnitFactoryFunc : func (kind string ) workunit.WorkUnitFactory {
719729 if workerInfo , ok := config .Workers .workersMap [kind ]; ok {
720730 return workerInfo .workUnitFactory
@@ -730,6 +740,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
730740 jobScheduler := maintenance .NewJobScheduler (archetype , & maintenance.JobSchedulerConfig {
731741 Interval : config .schedulerInterval ,
732742 NotifyInsert : client .maybeNotifyInsertForQueues ,
743+ Schema : config .schema ,
733744 }, driver .GetExecutor ())
734745 maintenanceServices = append (maintenanceServices , jobScheduler )
735746 client .testSignals .jobScheduler = & jobScheduler .TestSignals
@@ -750,6 +761,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
750761 {
751762 queueCleaner := maintenance .NewQueueCleaner (archetype , & maintenance.QueueCleanerConfig {
752763 RetentionPeriod : maintenance .QueueRetentionPeriodDefault ,
764+ Schema : config .schema ,
753765 }, driver .GetExecutor ())
754766 maintenanceServices = append (maintenanceServices , queueCleaner )
755767 client .testSignals .queueCleaner = & queueCleaner .TestSignals
@@ -761,7 +773,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
761773 scheduleFunc = config .ReindexerSchedule .Next
762774 }
763775
764- reindexer := maintenance .NewReindexer (archetype , & maintenance.ReindexerConfig {ScheduleFunc : scheduleFunc }, driver .GetExecutor ())
776+ reindexer := maintenance .NewReindexer (archetype , & maintenance.ReindexerConfig {
777+ ScheduleFunc : scheduleFunc ,
778+ Schema : config .schema ,
779+ }, driver .GetExecutor ())
765780 maintenanceServices = append (maintenanceServices , reindexer )
766781 client .testSignals .reindexer = & reindexer .TestSignals
767782 }
@@ -1236,14 +1251,18 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor,
12361251 ID : jobID ,
12371252 CancelAttemptedAt : c .baseService .Time .NowUTC (),
12381253 ControlTopic : string (notifier .NotificationTopicControl ),
1254+ Schema : c .config .schema ,
12391255 })
12401256}
12411257
12421258// JobDelete deletes the job with the given ID from the database, returning the
12431259// deleted row if it was deleted. Jobs in the running state are not deleted,
12441260// instead returning rivertype.ErrJobRunning.
12451261func (c * Client [TTx ]) JobDelete (ctx context.Context , id int64 ) (* rivertype.JobRow , error ) {
1246- return c .driver .GetExecutor ().JobDelete (ctx , id )
1262+ return c .driver .GetExecutor ().JobDelete (ctx , & riverdriver.JobDeleteParams {
1263+ ID : id ,
1264+ Schema : c .config .schema ,
1265+ })
12471266}
12481267
12491268// JobDelete deletes the job with the given ID from the database, returning the
@@ -1253,20 +1272,29 @@ func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRo
12531272// until the transaction commits, and if the transaction rolls back, so too is
12541273// the deleted job.
12551274func (c * Client [TTx ]) JobDeleteTx (ctx context.Context , tx TTx , id int64 ) (* rivertype.JobRow , error ) {
1256- return c .driver .UnwrapExecutor (tx ).JobDelete (ctx , id )
1275+ return c .driver .UnwrapExecutor (tx ).JobDelete (ctx , & riverdriver.JobDeleteParams {
1276+ ID : id ,
1277+ Schema : c .config .schema ,
1278+ })
12571279}
12581280
12591281// JobGet fetches a single job by its ID. Returns the up-to-date JobRow for the
12601282// specified jobID if it exists. Returns ErrNotFound if the job doesn't exist.
12611283func (c * Client [TTx ]) JobGet (ctx context.Context , id int64 ) (* rivertype.JobRow , error ) {
1262- return c .driver .GetExecutor ().JobGetByID (ctx , id )
1284+ return c .driver .GetExecutor ().JobGetByID (ctx , & riverdriver.JobGetByIDParams {
1285+ ID : id ,
1286+ Schema : c .config .schema ,
1287+ })
12631288}
12641289
12651290// JobGetTx fetches a single job by its ID, within a transaction. Returns the
12661291// up-to-date JobRow for the specified jobID if it exists. Returns ErrNotFound
12671292// if the job doesn't exist.
12681293func (c * Client [TTx ]) JobGetTx (ctx context.Context , tx TTx , id int64 ) (* rivertype.JobRow , error ) {
1269- return c .driver .UnwrapExecutor (tx ).JobGetByID (ctx , id )
1294+ return c .driver .UnwrapExecutor (tx ).JobGetByID (ctx , & riverdriver.JobGetByIDParams {
1295+ ID : id ,
1296+ Schema : c .config .schema ,
1297+ })
12701298}
12711299
12721300// JobRetry updates the job with the given ID to make it immediately available
@@ -1278,7 +1306,10 @@ func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertyp
12781306// MaxAttempts is also incremented by one if the job has already exhausted its
12791307// max attempts.
12801308func (c * Client [TTx ]) JobRetry (ctx context.Context , id int64 ) (* rivertype.JobRow , error ) {
1281- return c .driver .GetExecutor ().JobRetry (ctx , id )
1309+ return c .driver .GetExecutor ().JobRetry (ctx , & riverdriver.JobRetryParams {
1310+ ID : id ,
1311+ Schema : c .config .schema ,
1312+ })
12821313}
12831314
12841315// JobRetryTx updates the job with the given ID to make it immediately available
@@ -1295,7 +1326,10 @@ func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow
12951326// MaxAttempts is also incremented by one if the job has already exhausted its
12961327// max attempts.
12971328func (c * Client [TTx ]) JobRetryTx (ctx context.Context , tx TTx , id int64 ) (* rivertype.JobRow , error ) {
1298- return c .driver .UnwrapExecutor (tx ).JobRetry (ctx , id )
1329+ return c .driver .UnwrapExecutor (tx ).JobRetry (ctx , & riverdriver.JobRetryParams {
1330+ ID : id ,
1331+ Schema : c .config .schema ,
1332+ })
12991333}
13001334
13011335// ID returns the unique ID of this client as set in its config or
@@ -1561,7 +1595,10 @@ func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, tx riverd
15611595// by the PeriodicJobEnqueuer.
15621596func (c * Client [TTx ]) insertMany (ctx context.Context , tx riverdriver.ExecutorTx , insertParams []* rivertype.JobInsertParams ) ([]* rivertype.JobInsertResult , error ) {
15631597 return c .insertManyShared (ctx , tx , insertParams , func (ctx context.Context , insertParams []* riverdriver.JobInsertFastParams ) ([]* rivertype.JobInsertResult , error ) {
1564- results , err := c .pilot .JobInsertMany (ctx , tx , insertParams )
1598+ results , err := c .pilot .JobInsertMany (ctx , tx , & riverdriver.JobInsertFastManyParams {
1599+ Jobs : insertParams ,
1600+ Schema : c .config .schema ,
1601+ })
15651602 if err != nil {
15661603 return nil , err
15671604 }
@@ -1731,7 +1768,10 @@ func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.Executo
17311768 }
17321769
17331770 results , err := c .insertManyShared (ctx , tx , insertParams , func (ctx context.Context , insertParams []* riverdriver.JobInsertFastParams ) ([]* rivertype.JobInsertResult , error ) {
1734- count , err := tx .JobInsertFastManyNoReturning (ctx , insertParams )
1771+ count , err := tx .JobInsertFastManyNoReturning (ctx , & riverdriver.JobInsertFastManyParams {
1772+ Jobs : insertParams ,
1773+ Schema : c .config .schema ,
1774+ })
17351775 if err != nil {
17361776 return nil , err
17371777 }
@@ -1773,8 +1813,9 @@ func (c *Client[TTx]) maybeNotifyInsertForQueues(ctx context.Context, tx riverdr
17731813 }
17741814
17751815 err := tx .NotifyMany (ctx , & riverdriver.NotifyManyParams {
1776- Topic : string (notifier .NotificationTopicInsert ),
17771816 Payload : payloads ,
1817+ Schema : c .config .schema ,
1818+ Topic : string (notifier .NotificationTopicInsert ),
17781819 })
17791820 if err != nil {
17801821 c .baseService .Logger .ErrorContext (
@@ -1804,6 +1845,7 @@ func (c *Client[TTx]) notifyQueuePauseOrResume(ctx context.Context, tx riverdriv
18041845
18051846 err = tx .NotifyMany (ctx , & riverdriver.NotifyManyParams {
18061847 Payload : []string {string (payload )},
1848+ Schema : c .config .schema ,
18071849 Topic : string (notifier .NotificationTopicControl ),
18081850 })
18091851 if err != nil {
@@ -1850,6 +1892,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr
18501892 QueueEventCallback : c .subscriptionManager .distributeQueueEvent ,
18511893 RetryPolicy : c .config .RetryPolicy ,
18521894 SchedulerInterval : c .config .schedulerInterval ,
1895+ Schema : c .config .schema ,
18531896 StaleProducerRetentionPeriod : 5 * time .Minute ,
18541897 Workers : c .config .Workers ,
18551898 })
@@ -1966,7 +2009,10 @@ func (c *Client[TTx]) Queues() *QueueBundle { return c.queues }
19662009// The provided context is used for the underlying Postgres query and can be
19672010// used to cancel the operation or apply a timeout.
19682011func (c * Client [TTx ]) QueueGet (ctx context.Context , name string ) (* rivertype.Queue , error ) {
1969- return c .driver .GetExecutor ().QueueGet (ctx , name )
2012+ return c .driver .GetExecutor ().QueueGet (ctx , & riverdriver.QueueGetParams {
2013+ Name : name ,
2014+ Schema : c .config .schema ,
2015+ })
19702016}
19712017
19722018// QueueGetTx returns the queue with the given name. If the queue has not recently
@@ -1975,7 +2021,10 @@ func (c *Client[TTx]) QueueGet(ctx context.Context, name string) (*rivertype.Que
19752021// The provided context is used for the underlying Postgres query and can be
19762022// used to cancel the operation or apply a timeout.
19772023func (c * Client [TTx ]) QueueGetTx (ctx context.Context , tx TTx , name string ) (* rivertype.Queue , error ) {
1978- return c .driver .UnwrapExecutor (tx ).QueueGet (ctx , name )
2024+ return c .driver .UnwrapExecutor (tx ).QueueGet (ctx , & riverdriver.QueueGetParams {
2025+ Name : name ,
2026+ Schema : c .config .schema ,
2027+ })
19792028}
19802029
19812030// QueueListResult is the result of a job list operation. It contains a list of
@@ -2001,7 +2050,10 @@ func (c *Client[TTx]) QueueList(ctx context.Context, params *QueueListParams) (*
20012050 params = NewQueueListParams ()
20022051 }
20032052
2004- queues , err := c .driver .GetExecutor ().QueueList (ctx , int (params .paginationCount ))
2053+ queues , err := c .driver .GetExecutor ().QueueList (ctx , & riverdriver.QueueListParams {
2054+ Limit : int (params .paginationCount ),
2055+ Schema : c .config .schema ,
2056+ })
20052057 if err != nil {
20062058 return nil , err
20072059 }
@@ -2025,7 +2077,10 @@ func (c *Client[TTx]) QueueListTx(ctx context.Context, tx TTx, params *QueueList
20252077 params = NewQueueListParams ()
20262078 }
20272079
2028- queues , err := c .driver .UnwrapExecutor (tx ).QueueList (ctx , int (params .paginationCount ))
2080+ queues , err := c .driver .UnwrapExecutor (tx ).QueueList (ctx , & riverdriver.QueueListParams {
2081+ Limit : int (params .paginationCount ),
2082+ Schema : c .config .schema ,
2083+ })
20292084 if err != nil {
20302085 return nil , err
20312086 }
@@ -2051,7 +2106,10 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
20512106 }
20522107 defer tx .Rollback (ctx )
20532108
2054- if err := tx .QueuePause (ctx , name ); err != nil {
2109+ if err := tx .QueuePause (ctx , & riverdriver.QueuePauseParams {
2110+ Name : name ,
2111+ Schema : c .config .schema ,
2112+ }); err != nil {
20552113 return err
20562114 }
20572115
@@ -2076,7 +2134,10 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
20762134func (c * Client [TTx ]) QueuePauseTx (ctx context.Context , tx TTx , name string , opts * QueuePauseOpts ) error {
20772135 executorTx := c .driver .UnwrapExecutor (tx )
20782136
2079- if err := executorTx .QueuePause (ctx , name ); err != nil {
2137+ if err := executorTx .QueuePause (ctx , & riverdriver.QueuePauseParams {
2138+ Name : name ,
2139+ Schema : c .config .schema ,
2140+ }); err != nil {
20802141 return err
20812142 }
20822143
@@ -2106,7 +2167,10 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
21062167 }
21072168 defer tx .Rollback (ctx )
21082169
2109- if err := tx .QueueResume (ctx , name ); err != nil {
2170+ if err := tx .QueueResume (ctx , & riverdriver.QueueResumeParams {
2171+ Name : name ,
2172+ Schema : c .config .schema ,
2173+ }); err != nil {
21102174 return err
21112175 }
21122176
@@ -2132,7 +2196,10 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
21322196func (c * Client [TTx ]) QueueResumeTx (ctx context.Context , tx TTx , name string , opts * QueuePauseOpts ) error {
21332197 executorTx := c .driver .UnwrapExecutor (tx )
21342198
2135- if err := executorTx .QueueResume (ctx , name ); err != nil {
2199+ if err := executorTx .QueueResume (ctx , & riverdriver.QueueResumeParams {
2200+ Name : name ,
2201+ Schema : c .config .schema ,
2202+ }); err != nil {
21362203 return err
21372204 }
21382205
@@ -2202,8 +2269,9 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex
22022269 }
22032270
22042271 if err := executorTx .NotifyMany (ctx , & riverdriver.NotifyManyParams {
2205- Topic : string (notifier .NotificationTopicControl ),
22062272 Payload : []string {string (payload )},
2273+ Schema : c .config .schema ,
2274+ Topic : string (notifier .NotificationTopicControl ),
22072275 }); err != nil {
22082276 return nil , err
22092277 }
0 commit comments