5555use OCP \IRequestId ;
5656use OCP \PreConditionNotMetException ;
5757use OCP \Profiler \IProfiler ;
58+ use Psr \Clock \ClockInterface ;
5859use Psr \Log \LoggerInterface ;
60+ use function in_array ;
5961
6062class Connection extends PrimaryReadReplicaConnection {
6163 /** @var string */
@@ -67,6 +69,8 @@ class Connection extends PrimaryReadReplicaConnection {
6769 /** @var SystemConfig */
6870 private $ systemConfig ;
6971
72+ private ClockInterface $ clock ;
73+
7074 private LoggerInterface $ logger ;
7175
7276 protected $ lockedTable = null ;
@@ -83,6 +87,7 @@ class Connection extends PrimaryReadReplicaConnection {
8387
8488 protected ?float $ transactionActiveSince = null ;
8589
90+ /** @var array<string, int> */
8691 protected $ tableDirtyWrites = [];
8792
8893 /**
@@ -110,6 +115,7 @@ public function __construct(
110115 $ this ->tablePrefix = $ params ['tablePrefix ' ];
111116
112117 $ this ->systemConfig = \OC ::$ server ->getSystemConfig ();
118+ $ this ->clock = \OCP \Server::get (ClockInterface::class);
113119 $ this ->logger = \OC ::$ server ->get (LoggerInterface::class);
114120
115121 /** @var \OCP\Profiler\IProfiler */
@@ -265,10 +271,19 @@ public function prepare($sql, $limit = null, $offset = null): Statement {
265271 */
266272 public function executeQuery (string $ sql , array $ params = [], $ types = [], QueryCacheProfile $ qcp = null ): Result {
267273 $ tables = $ this ->getQueriedTables ($ sql );
274+ $ now = $ this ->clock ->now ()->getTimestamp ();
275+ $ dirtyTableWrites = [];
276+ foreach ($ tables as $ table ) {
277+ $ lastAccess = $ this ->tableDirtyWrites [$ table ] ?? 0 ;
278+ // Only very recent writes are considered dirty
279+ if ($ lastAccess >= ($ now - 3 )) {
280+ $ dirtyTableWrites [] = $ table ;
281+ }
282+ }
268283 if ($ this ->isTransactionActive ()) {
269284 // Transacted queries go to the primary. The consistency of the primary guarantees that we can not run
270285 // into a dirty read.
271- } elseif (count (array_intersect ( $ this -> tableDirtyWrites , $ tables ) ) === 0 ) {
286+ } elseif (count ($ dirtyTableWrites ) === 0 ) {
272287 // No tables read that could have been written already in the same request and no transaction active
273288 // so we can switch back to the replica for reading as long as no writes happen that switch back to the primary
274289 // We cannot log here as this would log too early in the server boot process
@@ -280,7 +295,7 @@ public function executeQuery(string $sql, array $params = [], $types = [], Query
280295 (int ) ($ this ->systemConfig ->getValue ('loglevel_dirty_database_queries ' , null ) ?? 0 ),
281296 'dirty table reads: ' . $ sql ,
282297 [
283- 'tables ' => $ this ->tableDirtyWrites ,
298+ 'tables ' => array_keys ( $ this ->tableDirtyWrites ) ,
284299 'reads ' => $ tables ,
285300 'exception ' => new \Exception (),
286301 ],
@@ -335,7 +350,9 @@ public function executeUpdate(string $sql, array $params = [], array $types = []
335350 */
336351 public function executeStatement ($ sql , array $ params = [], array $ types = []): int {
337352 $ tables = $ this ->getQueriedTables ($ sql );
338- $ this ->tableDirtyWrites = array_unique (array_merge ($ this ->tableDirtyWrites , $ tables ));
353+ foreach ($ tables as $ table ) {
354+ $ this ->tableDirtyWrites [$ table ] = $ this ->clock ->now ()->getTimestamp ();
355+ }
339356 $ sql = $ this ->replaceTablePrefix ($ sql );
340357 $ sql = $ this ->adapter ->fixupStatement ($ sql );
341358 $ this ->queriesExecuted ++;
0 commit comments