-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDriver.php
121 lines (101 loc) · 2.5 KB
/
Driver.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
<?php
namespace Bernard\Driver\Predis;
use Predis\ClientInterface;
use Predis\Command\ServerInfo;
/**
* Implements a Driver for use with https://github.com/nrk/predis.
*/
final class Driver implements \Bernard\Driver
{
const QUEUE_PREFIX = 'queue:';
private $redis;
/**
* @param ClientInterface $redis
*/
public function __construct(ClientInterface $redis)
{
$this->redis = $redis;
}
/**
* {@inheritdoc}
*/
public function listQueues()
{
return $this->redis->sMembers('queues');
}
/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
$this->redis->sAdd('queues', $queueName);
}
/**
* {@inheritdoc}
*/
public function countMessages($queueName)
{
return $this->redis->lLen($this->resolveKey($queueName));
}
/**
* {@inheritdoc}
*/
public function pushMessage($queueName, $message)
{
$this->redis->rpush($this->resolveKey($queueName), $message);
}
/**
* {@inheritdoc}
*/
public function popMessage($queueName, $duration = 5)
{
list(, $message) = $this->redis->blpop($this->resolveKey($queueName), $duration) ?: null;
return [$message, null];
}
/**
* {@inheritdoc}
*/
public function peekQueue($queueName, $index = 0, $limit = 20)
{
$limit += $index - 1;
return $this->redis->lRange($this->resolveKey($queueName), $index, $limit);
}
/**
* {@inheritdoc}
*/
public function acknowledgeMessage($queueName, $receipt)
{
}
/**
* {@inheritdoc}
*/
public function removeQueue($queueName)
{
$this->redis->sRem('queues', $queueName);
$this->redis->del($this->resolveKey($queueName));
}
/**
* {@inheritdoc}
*/
public function info()
{
// Temporarily change the command use to get info as earlier and newer redis
// versions breaks it into sections.
$commandClass = $this->redis->getProfile()->getCommandClass('info');
$this->redis->getProfile()->defineCommand('info', ServerInfo::class);
$info = $this->redis->info();
$this->redis->getProfile()->defineCommand('info', $commandClass);
return $info;
}
/**
* Transform the queueName into a key.
*
* @param string $queueName
*
* @return string
*/
private function resolveKey($queueName)
{
return self::QUEUE_PREFIX.$queueName;
}
}