Skip to content

Commit 6efb47e

Browse files
committed
Use custom ID and fix attempts count
1 parent 48b89d7 commit 6efb47e

File tree

2 files changed

+67
-16
lines changed

2 files changed

+67
-16
lines changed

src/Jobs/PubSubJob.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public function __construct(Container $container, PubSubQueue $pubsub, Message $
4040
$this->queue = $queue;
4141
$this->container = $container;
4242
$this->connectionName = $connectionName;
43+
44+
$this->decoded = $this->payload();
4345
}
4446

4547
/**
@@ -49,7 +51,7 @@ public function __construct(Container $container, PubSubQueue $pubsub, Message $
4951
*/
5052
public function getJobId()
5153
{
52-
return $this->job->id();
54+
return $this->decoded['id'] ?? null;
5355
}
5456

5557
/**
@@ -81,7 +83,7 @@ public function delete()
8183
*/
8284
public function attempts()
8385
{
84-
return (int) $this->job->attributes('attempts') ?? 0;
86+
return ((int) $this->job->attribute('attempts') ?? 0) + 1;
8587
}
8688

8789
/**
@@ -94,7 +96,8 @@ public function release($delay = 0)
9496
{
9597
parent::release($delay);
9698

97-
$attempts = $this->attempts() + 1;
99+
$attempts = $this->attempts();
100+
98101
$this->pubsub->acknowledgeAndPublish(
99102
$this->job,
100103
$this->queue,

src/PubSubQueue.php

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Kainxspirits\PubSubQueue;
44

55
use Illuminate\Queue\Queue;
6+
use Illuminate\Support\Str;
67
use Google\Cloud\PubSub\Topic;
78
use Google\Cloud\PubSub\Message;
89
use Google\Cloud\PubSub\PubSubClient;
@@ -62,7 +63,7 @@ public function size($queue = null)
6263
*/
6364
public function push($job, $data = '', $queue = null)
6465
{
65-
return $this->pushRaw($this->createPayload($job, $queue, $data), $queue);
66+
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
6667
}
6768

6869
/**
@@ -86,7 +87,11 @@ public function pushRaw($payload, $queue = null, array $options = [])
8687
$publish['attributes'] = $options;
8788
}
8889

89-
return $topic->publish($publish);
90+
$topic->publish($publish);
91+
92+
$decoded_payload = json_decode(base64_decode($payload), true);
93+
94+
return $decoded_payload['id'];
9095
}
9196

9297
/**
@@ -102,7 +107,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
102107
public function later($delay, $job, $data = '', $queue = null)
103108
{
104109
return $this->pushRaw(
105-
$this->createPayload($job, $queue, $data),
110+
$this->createPayload($job, $this->getQueue($queue), $data),
106111
$queue,
107112
['available_at' => $this->availableAt($delay)]
108113
);
@@ -116,7 +121,7 @@ public function later($delay, $job, $data = '', $queue = null)
116121
*/
117122
public function pop($queue = null)
118123
{
119-
$topic = $this->getTopic($queue);
124+
$topic = $this->getTopic($this->getQueue($queue));
120125

121126
if (! $topic->exists()) {
122127
return;
@@ -134,7 +139,7 @@ public function pop($queue = null)
134139
$this,
135140
$messages[0],
136141
$this->connectionName,
137-
$queue
142+
$this->getQueue($queue)
138143
);
139144
}
140145
}
@@ -153,10 +158,10 @@ public function bulk($jobs, $data = '', $queue = null)
153158
$payloads = [];
154159

155160
foreach ((array) $jobs as $job) {
156-
$payloads[] = ['data' => $this->createPayload($job, $queue, $data)];
161+
$payloads[] = ['data' => $this->createPayload($job, $this->getQueue($queue), $data)];
157162
}
158163

159-
$topic = $this->getTopic($queue, true);
164+
$topic = $this->getTopic($this->getQueue($queue), true);
160165

161166
$this->subscribeToTopic($topic);
162167

@@ -171,7 +176,7 @@ public function bulk($jobs, $data = '', $queue = null)
171176
*/
172177
public function acknowledge(Message $message, $queue = null)
173178
{
174-
$subscription = $this->getTopic($queue)->subscription($this->getSubscriberName());
179+
$subscription = $this->getTopic($this->getQueue($queue))->subscription($this->getSubscriberName());
175180
$subscription->acknowledge($message);
176181
}
177182

@@ -185,7 +190,7 @@ public function acknowledge(Message $message, $queue = null)
185190
*/
186191
public function acknowledgeAndPublish(Message $message, $queue = null, $options = [], $delay = 0)
187192
{
188-
$topic = $this->getTopic($queue);
193+
$topic = $this->getTopic($this->getQueue($queue));
189194
$subscription = $topic->subscription($this->getSubscriberName());
190195

191196
$subscription->acknowledge($message);
@@ -201,15 +206,37 @@ public function acknowledgeAndPublish(Message $message, $queue = null, $options
201206
}
202207

203208
/**
204-
* {@inheritdoc}
209+
* Create a payload string from the given job and data.
210+
*
211+
* @param string $job
212+
* @param string $queue
213+
* @param mixed $data
214+
* @return string
215+
*
216+
* @throws \Illuminate\Queue\InvalidPayloadException
205217
*/
206-
protected function createPayload($job, $queue = null, $data = '')
218+
protected function createPayload($job, $queue, $data = '')
207219
{
208-
$payload = parent::createPayload($job, $queue, $data);
220+
$payload = parent::createPayload($job, $this->getQueue($queue), $data);
209221

210222
return base64_encode($payload);
211223
}
212224

225+
/**
226+
* Create a payload array from the given job and data.
227+
*
228+
* @param mixed $job
229+
* @param string $queue
230+
* @param mixed $data
231+
* @return array
232+
*/
233+
protected function createPayloadArray($job, $queue, $data = '')
234+
{
235+
return array_merge(parent::createPayloadArray($job, $this->getQueue($queue), $data), [
236+
'id' => $this->getRandomId(),
237+
]);
238+
}
239+
213240
/**
214241
* Get the current topic.
215242
*
@@ -220,7 +247,7 @@ protected function createPayload($job, $queue = null, $data = '')
220247
*/
221248
public function getTopic($queue, $create = false)
222249
{
223-
$queue = $queue ?: $this->default;
250+
$queue = $this->getQueue($queue);
224251
$topic = $this->pubsub->topic($queue);
225252

226253
if (! $topic->exists() && $create) {
@@ -269,4 +296,25 @@ public function getPubSub()
269296
{
270297
return $this->pubsub;
271298
}
299+
300+
/**
301+
* Get the queue or return the default.
302+
*
303+
* @param string|null $queue
304+
* @return string
305+
*/
306+
public function getQueue($queue)
307+
{
308+
return $queue ?: $this->default;
309+
}
310+
311+
/**
312+
* Get a random ID string.
313+
*
314+
* @return string
315+
*/
316+
protected function getRandomId()
317+
{
318+
return Str::random(32);
319+
}
272320
}

0 commit comments

Comments
 (0)