Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
"psr-4": {"Utopia\\Queue\\": "src/Queue"}
},
"autoload-dev": {
"psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"}
"psr-4": {
"Tests\\E2E\\": "tests/Queue/E2E",
"Tests\\Unit\\": "tests/Queue/Unit"
}
},
"scripts":{
"test": "phpunit",
Expand Down
3 changes: 3 additions & 0 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
stopOnFailure="false"
>
<testsuites>
<testsuite name="Unit">
<directory>./tests/Queue/Unit</directory>
</testsuite>
<testsuite name="E2E">
<directory>./tests/Queue/E2E/Adapter</directory>
</testsuite>
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Broker/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public function close(): void
$this->channel?->getConnection()?->close();
}

public function enqueue(Queue $queue, array $payload): bool
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
{
$payload = [
'pid' => \uniqid(more_entropy: true),
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Broker/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public function __construct(
) {
}

public function enqueue(Queue $queue, array $payload): bool
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
{
return $this->delegatePublish(__FUNCTION__, \func_get_args());
}
Expand Down
5 changes: 4 additions & 1 deletion src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,17 @@ public function close(): void
$this->closed = true;
}

public function enqueue(Queue $queue, array $payload): bool
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
{
$payload = [
'pid' => \uniqid(more_entropy: true),
'queue' => $queue->name,
'timestamp' => time(),
'payload' => $payload
];
if ($priority) {
return $this->connection->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload);
}
return $this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Publisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface Publisher
* @param array $payload
* @return bool
*/
public function enqueue(Queue $queue, array $payload): bool;
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool;

/**
* Retries failed jobs.
Expand Down
11 changes: 11 additions & 0 deletions tests/Queue/E2E/Adapter/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ public function testConcurrency(): void
});
}

public function testEnqueuePriority(): void
{
$publisher = $this->getPublisher();

$result = $publisher->enqueue($this->getQueue(), ['type' => 'test_string', 'value' => 'priority'], priority: true);

$this->assertTrue($result);

sleep(1);
}

/**
* @depends testEvents
*/
Expand Down
73 changes: 73 additions & 0 deletions tests/Queue/E2E/Adapter/RedisPriorityTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Tests\E2E\Adapter;

use PHPUnit\Framework\TestCase;
use Utopia\Queue\Broker\Redis as RedisBroker;
use Utopia\Queue\Connection\Redis;
use Utopia\Queue\Queue;

/**
* Verifies that priority jobs (pushed to the tail via rightPushArray) are consumed
* before normal jobs (pushed to the head via leftPushArray) when BRPOP reads from
* the tail.
*
* This test bypasses the worker and reads directly from the queue so it can assert ordering.
*/
class RedisPriorityTest extends TestCase
{
private RedisBroker $broker;
private Queue $queue;
private Redis $connection;

protected function setUp(): void
{
$this->connection = new Redis('redis', 6379);
$this->broker = new RedisBroker($this->connection);
$this->queue = new Queue('priority-e2e-test');

// Flush any leftover state from previous runs.
$key = "{$this->queue->namespace}.queue.{$this->queue->name}";
while ($this->connection->rightPopArray($key, 0) !== false) {
// drain
}
Comment thread
hmacr marked this conversation as resolved.
Outdated
}

public function testPriorityJobIsConsumedBeforeNormalJobs(): void
{
// Enqueue three normal jobs (pushed to head/left).
$this->broker->enqueue($this->queue, ['order' => 'normal-1']);
$this->broker->enqueue($this->queue, ['order' => 'normal-2']);
$this->broker->enqueue($this->queue, ['order' => 'normal-3']);

// Enqueue one priority job (pushed to tail/right — same end BRPOP reads from).
$this->broker->enqueue($this->queue, ['order' => 'priority'], priority: true);

$key = "{$this->queue->namespace}.queue.{$this->queue->name}";

// The first pop should yield the priority job.
$first = $this->connection->rightPopArray($key, 1);
$this->assertNotFalse($first, 'Expected a job but queue was empty');
$this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first');

// The remaining three should be normal jobs (consumed oldest-first).
$second = $this->connection->rightPopArray($key, 1);
$this->assertSame('normal-1', $second['payload']['order']);

$third = $this->connection->rightPopArray($key, 1);
$this->assertSame('normal-2', $third['payload']['order']);

$fourth = $this->connection->rightPopArray($key, 1);
$this->assertSame('normal-3', $fourth['payload']['order']);

// Queue should now be empty.
$this->assertFalse($this->connection->rightPopArray($key, 0));
Comment thread
hmacr marked this conversation as resolved.
Outdated
}

public function testEnqueuePriorityReturnsBool(): void
{
$result = $this->broker->enqueue($this->queue, ['check' => 'return-value'], priority: true);
$this->assertIsBool($result);
$this->assertTrue($result);
}
}
97 changes: 97 additions & 0 deletions tests/Queue/Unit/Broker/RedisBrokerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

namespace Tests\Unit\Broker;

use PHPUnit\Framework\TestCase;
use Utopia\Queue\Broker\Redis;
use Utopia\Queue\Connection;
use Utopia\Queue\Queue;

class RedisBrokerTest extends TestCase
{
private Connection $connection;
private Redis $broker;
private Queue $queue;

protected function setUp(): void
{
$this->connection = $this->createMock(Connection::class);
$this->broker = new Redis($this->connection);
Comment thread
hmacr marked this conversation as resolved.
Outdated
$this->queue = new Queue('test');
}

public function testEnqueueNormalUsesLeftPush(): void
{
$this->connection
->expects($this->once())
->method('leftPushArray')
->with(
$this->equalTo('utopia-queue.queue.test'),
$this->callback(fn($p) => $p['queue'] === 'test' && $p['payload'] === ['foo' => 'bar'])
)
->willReturn(true);

$this->connection->expects($this->never())->method('rightPushArray');

$result = $this->broker->enqueue($this->queue, ['foo' => 'bar']);

$this->assertTrue($result);
}

public function testEnqueuePriorityFalseUsesLeftPush(): void
{
$this->connection
->expects($this->once())
->method('leftPushArray')
->willReturn(true);

$this->connection->expects($this->never())->method('rightPushArray');

$result = $this->broker->enqueue($this->queue, ['foo' => 'bar'], priority: false);

$this->assertTrue($result);
}

public function testEnqueuePriorityUsesRightPush(): void
{
$this->connection
->expects($this->once())
->method('rightPushArray')
->with(
$this->equalTo('utopia-queue.queue.test'),
$this->callback(fn($p) => $p['queue'] === 'test' && $p['payload'] === ['urgent' => true])
)
->willReturn(true);

$this->connection->expects($this->never())->method('leftPushArray');

$result = $this->broker->enqueue($this->queue, ['urgent' => true], priority: true);

$this->assertTrue($result);
}

public function testEnqueuePriorityPayloadHasRequiredFields(): void
{
$capturedPayload = null;

$this->connection
->expects($this->once())
->method('rightPushArray')
->with(
$this->anything(),
$this->callback(function ($p) use (&$capturedPayload) {
$capturedPayload = $p;
return true;
})
)
->willReturn(true);

$this->broker->enqueue($this->queue, ['data' => 1], priority: true);

$this->assertArrayHasKey('pid', $capturedPayload);
$this->assertArrayHasKey('queue', $capturedPayload);
$this->assertArrayHasKey('timestamp', $capturedPayload);
$this->assertArrayHasKey('payload', $capturedPayload);
$this->assertNotEmpty($capturedPayload['pid']);
}
}
Comment thread
hmacr marked this conversation as resolved.
Outdated
Loading