forked from phlib/jobqueue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMonitorCommand.php
More file actions
106 lines (89 loc) · 3.44 KB
/
MonitorCommand.php
File metadata and controls
106 lines (89 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
<?php
declare(strict_types=1);
namespace Phlib\JobQueue\Console;
use Phlib\ConsoleProcess\Command\DaemonCommand;
use Phlib\JobQueue\BatchableJobQueueInterface;
use Phlib\JobQueue\Exception\InvalidArgumentException;
use Phlib\JobQueue\JobInterface;
use Phlib\JobQueue\JobQueueInterface;
use Phlib\JobQueue\Scheduler\BatchableSchedulerInterface;
use Phlib\JobQueue\Scheduler\SchedulerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Output\StreamOutput;
class MonitorCommand extends DaemonCommand
{
protected SchedulerInterface $scheduler;
protected JobQueueInterface $jobQueue;
protected string $logFile;
protected function initialize(InputInterface $input, OutputInterface $output): void
{
$dependencies = $this->getHelper('configuration')
->fetch();
if (!$dependencies instanceof MonitorDependencies) {
throw new InvalidArgumentException('Expected dependencies could not be determined.');
}
$this->jobQueue = $dependencies->getJobQueue();
$this->scheduler = $dependencies->getScheduler();
}
protected function configure(): void
{
$this->setName('monitor')
->setDescription('Monitor the schedule for pending jobs.')
->addOption('log', 'l', InputOption::VALUE_REQUIRED, "A file to send log out to. If no path is specified then it's disabled.");
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$logFile = $input->getOption('log');
if (!empty($logFile)) {
$this->logFile = $logFile;
}
if ($this->scheduler instanceof BatchableSchedulerInterface) {
while ($jobsData = $this->scheduler->retrieveBatch()) {
$jobs = [];
foreach ($jobsData as $jobData) {
$output->writeln("Job {$jobData['id']} added.");
$jobs[] = $this->createJob($jobData);
}
$this->putJobBatch($jobs);
$this->scheduler->removeBatch(array_column($jobsData, 'id'));
}
} else {
while ($jobData = $this->scheduler->retrieve()) {
$output->writeln("Job {$jobData['id']} added.");
$this->jobQueue->put($this->createJob($jobData));
$this->scheduler->remove($jobData['id']);
}
}
return 0;
}
protected function createJob(array $schedulerJob): JobInterface
{
return $this->jobQueue->createJob(
$schedulerJob['queue'],
$schedulerJob['data'],
null,
$schedulerJob['delay'],
$schedulerJob['priority'],
$schedulerJob['ttr'],
);
}
protected function putJobBatch(array $jobs): void
{
if ($this->jobQueue instanceof BatchableJobQueueInterface) {
$this->jobQueue->putBatch($jobs);
return;
}
foreach ($jobs as $job) {
$this->jobQueue->put($job);
}
}
protected function createChildOutput(?string $childLogFilename): OutputInterface
{
if (!isset($this->logFile) || ($this->logFile === '' || $this->logFile === '0')) {
return parent::createChildOutput($childLogFilename);
}
return new StreamOutput(fopen($this->logFile, 'ab'));
}
}