Skip to content

Commit ec09667

Browse files
committed
refactor ingest queue
1 parent 6d7f047 commit ec09667

File tree

3 files changed

+155
-179
lines changed

3 files changed

+155
-179
lines changed

pkg/queue/queue.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,23 @@ import (
66
)
77

88
type Message struct {
9-
ID string
10-
Data []byte
9+
Value []byte
1110
Time time.Time
1211
WorkspaceID string
1312
}
1413

15-
type Options struct {
16-
Count int64
17-
Block bool
18-
Timeout time.Duration
19-
}
14+
type HandleFunc func(ctx context.Context, messages []*Message) error
2015

2116
type Queue interface {
22-
Enqueue(ctx context.Context, message *Message) error
23-
Dequeue(ctx context.Context, opts *Options) ([]*Message, error)
24-
Delete(ctx context.Context, message []*Message) error
25-
Size(ctx context.Context) (int64, error)
17+
Producer
18+
Consumer
2619
Stats() map[string]interface{}
2720
}
21+
22+
type Producer interface {
23+
WriteMessage(ctx context.Context, message *Message) error
24+
}
25+
26+
type Consumer interface {
27+
StartListen(ctx context.Context, handle HandleFunc)
28+
}

pkg/queue/redis/redis.go

Lines changed: 106 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ import (
44
"context"
55
"errors"
66
"github.com/redis/go-redis/v9"
7-
"github.com/webhookx-io/webhookx/constants"
8-
"github.com/webhookx-io/webhookx/pkg/metrics"
7+
"github.com/webhookx-io/webhookx/pkg/loglimiter"
98
"github.com/webhookx-io/webhookx/pkg/queue"
109
"github.com/webhookx-io/webhookx/pkg/tracing"
11-
"github.com/webhookx-io/webhookx/utils"
1210
"go.opentelemetry.io/otel/trace"
1311
"go.uber.org/zap"
1412
"strconv"
@@ -17,66 +15,53 @@ import (
1715
)
1816

1917
type RedisQueue struct {
20-
stream string
21-
group string
22-
consumer string
23-
visibilityTimeout time.Duration
24-
18+
opts Options
2519
c *redis.Client
2620
log *zap.SugaredLogger
27-
metrics *metrics.Metrics
21+
limiter *loglimiter.Limiter
2822
}
2923

30-
type RedisQueueOptions struct {
24+
type Options struct {
3125
StreamName string
32-
GroupName string
26+
ConsumerGroupName string
3327
ConsumerName string
3428
VisibilityTimeout time.Duration
29+
Listeners int
3530

3631
Client *redis.Client
3732
}
3833

39-
func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger, metrics *metrics.Metrics) (queue.Queue, error) {
34+
func NewRedisQueue(opts Options, logger *zap.SugaredLogger) (queue.Queue, error) {
4035
q := &RedisQueue{
41-
stream: utils.DefaultIfZero(opts.StreamName, constants.QueueRedisQueueName),
42-
group: utils.DefaultIfZero(opts.GroupName, constants.QueueRedisGroupName),
43-
consumer: utils.DefaultIfZero(opts.ConsumerName, constants.QueueRedisConsumerName),
44-
visibilityTimeout: utils.DefaultIfZero(opts.VisibilityTimeout, constants.QueueRedisVisibilityTimeout),
45-
c: opts.Client,
46-
log: logger,
47-
metrics: metrics,
48-
}
49-
50-
go q.process()
51-
if metrics.Enabled {
52-
go q.monitoring()
36+
opts: opts,
37+
c: opts.Client,
38+
log: logger.Named("queue-redis"),
39+
limiter: loglimiter.NewLimiter(time.Second),
5340
}
54-
5541
return q, nil
5642
}
5743

58-
func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error {
44+
func (q *RedisQueue) WriteMessage(ctx context.Context, message *queue.Message) error {
5945
ctx, span := tracing.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer))
6046
defer span.End()
6147

6248
args := &redis.XAddArgs{
63-
Stream: q.stream,
49+
Stream: q.opts.StreamName,
6450
ID: "*",
65-
Values: []interface{}{"data", message.Data, "time", message.Time.UnixMilli(), "ws_id", message.WorkspaceID},
66-
}
67-
res := q.c.XAdd(ctx, args)
68-
if res.Err() != nil {
69-
return res.Err()
51+
Values: []interface{}{
52+
"data", message.Value,
53+
"time", message.Time.UnixMilli(),
54+
"ws_id", message.WorkspaceID,
55+
},
7056
}
71-
message.ID = res.Val()
72-
return nil
57+
return q.c.XAdd(ctx, args).Err()
7358
}
7459

7560
func toMessage(values map[string]interface{}) *queue.Message {
7661
message := &queue.Message{}
7762

7863
if data, ok := values["data"].(string); ok {
79-
message.Data = []byte(data)
64+
message.Value = []byte(data)
8065
}
8166

8267
if timestr, ok := values["time"].(string); ok {
@@ -91,76 +76,97 @@ func toMessage(values map[string]interface{}) *queue.Message {
9176
return message
9277
}
9378

94-
func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue.Message, error) {
79+
func (q *RedisQueue) dequeue(ctx context.Context) ([]redis.XMessage, error) {
9580
ctx, span := tracing.Start(ctx, "redis.queue.dequeue", trace.WithSpanKind(trace.SpanKindServer))
9681
defer span.End()
9782

98-
var count int64 = 1
99-
if opt != nil && opt.Count != 0 {
100-
count = opt.Count
101-
}
102-
var block time.Duration = -1
103-
if opt != nil && opt.Block {
104-
block = opt.Timeout
105-
}
106-
10783
args := &redis.XReadGroupArgs{
108-
Group: q.group,
109-
Consumer: q.consumer,
110-
Streams: []string{q.stream, ">"},
111-
Count: count,
112-
Block: block,
84+
Group: q.opts.ConsumerGroupName,
85+
Consumer: q.opts.ConsumerName,
86+
Streams: []string{q.opts.StreamName, ">"},
87+
Count: 20,
88+
Block: time.Second,
11389
}
114-
res := q.c.XReadGroup(ctx, args)
115-
if res.Err() != nil {
116-
err := res.Err()
90+
streams, err := q.c.XReadGroup(ctx, args).Result()
91+
if err != nil {
11792
if errors.Is(err, redis.Nil) {
11893
err = nil
11994
} else if strings.HasPrefix(err.Error(), "NOGROUP") {
120-
go q.createConsumerGroup()
12195
err = nil
96+
go q.createConsumerGroup(q.opts.StreamName, q.opts.ConsumerGroupName)
12297
}
12398
return nil, err
12499
}
125100

126-
messages := make([]*queue.Message, 0)
127-
for _, stream := range res.Val() {
128-
for _, xmessage := range stream.Messages {
129-
message := toMessage(xmessage.Values)
130-
message.ID = xmessage.ID
131-
messages = append(messages, message)
132-
}
133-
}
134-
135-
return messages, nil
101+
return streams[0].Messages, nil
136102
}
137103

138-
func (q *RedisQueue) Delete(ctx context.Context, messages []*queue.Message) error {
104+
func (q *RedisQueue) delete(ctx context.Context, xmessages []redis.XMessage) error {
139105
ctx, span := tracing.Start(ctx, "redis.queue.delete", trace.WithSpanKind(trace.SpanKindServer))
140106
defer span.End()
141107

142-
ids := make([]string, 0, len(messages))
143-
for _, message := range messages {
108+
ids := make([]string, 0, len(xmessages))
109+
for _, message := range xmessages {
144110
ids = append(ids, message.ID)
145111
}
112+
146113
pipeline := q.c.Pipeline()
147-
pipeline.XAck(ctx, q.stream, q.group, ids...)
148-
pipeline.XDel(ctx, q.stream, ids...)
114+
pipeline.XAck(ctx, q.opts.StreamName, q.opts.ConsumerGroupName, ids...)
115+
pipeline.XDel(ctx, q.opts.StreamName, ids...)
149116
_, err := pipeline.Exec(ctx)
150-
if err != nil {
151-
return err
117+
return err
118+
}
119+
120+
func (q *RedisQueue) StartListen(ctx context.Context, handle queue.HandleFunc) {
121+
q.log.Infof("starting %d listeners", q.opts.Listeners)
122+
for i := 0; i < q.opts.Listeners; i++ {
123+
go q.listen(ctx, handle)
124+
}
125+
go q.process(ctx)
126+
}
127+
128+
func (q *RedisQueue) listen(ctx context.Context, handle queue.HandleFunc) {
129+
for {
130+
select {
131+
case <-ctx.Done():
132+
return
133+
default:
134+
xmessages, err := q.dequeue(ctx)
135+
if err != nil && q.limiter.Allow(err.Error()) {
136+
q.log.Warnf("failed to dequeue: %v", err)
137+
time.Sleep(time.Second)
138+
continue
139+
}
140+
if len(xmessages) == 0 {
141+
continue
142+
}
143+
144+
messages := make([]*queue.Message, 0, len(xmessages))
145+
for _, msg := range xmessages {
146+
messages = append(messages, toMessage(msg.Values))
147+
}
148+
149+
err = handle(ctx, messages)
150+
if err != nil {
151+
q.log.Warnf("failed to handle message: %v", err)
152+
continue
153+
}
154+
err = q.delete(ctx, xmessages)
155+
if err != nil {
156+
q.log.Warnf("failed to delete message: %v", err)
157+
}
158+
}
152159
}
153-
return nil
154160
}
155161

156-
func (q *RedisQueue) Size(ctx context.Context) (int64, error) {
157-
return q.c.XLen(ctx, q.stream).Result()
162+
func (q *RedisQueue) size(ctx context.Context) (int64, error) {
163+
return q.c.XLen(ctx, q.opts.StreamName).Result()
158164
}
159165

160166
func (q *RedisQueue) Stats() map[string]interface{} {
161167
stats := make(map[string]interface{})
162168

163-
size, err := q.Size(context.TODO())
169+
size, err := q.size(context.TODO())
164170
if err != nil {
165171
q.log.Errorf("failed to retrieve status: %v", err)
166172
}
@@ -169,20 +175,18 @@ func (q *RedisQueue) Stats() map[string]interface{} {
169175
return stats
170176
}
171177

172-
func (q *RedisQueue) createConsumerGroup() {
173-
res := q.c.XGroupCreateMkStream(context.TODO(), q.stream, q.group, "0")
174-
if res.Err() == nil {
175-
q.log.Debugf("created default consumer group: %s", q.group)
178+
func (q *RedisQueue) createConsumerGroup(stream string, group string) {
179+
err := q.c.XGroupCreateMkStream(context.TODO(), stream, group, "0").Err()
180+
if err != nil {
181+
if err.Error() != "BUSYGROUP Consumer Group name already exists" {
182+
q.log.Errorf("failed to create Consumer Group '%s': %s", group, err.Error())
183+
}
176184
return
177185
}
178-
179-
if res.Err().Error() != "BUSYGROUP Consumer Group name already exists" {
180-
q.log.Errorf("failed to create the default consumer group: %s", res.Err().Error())
181-
}
186+
q.log.Debugf("Consumer Group '%s' created", group)
182187
}
183188

184-
// process re-enqueue invisible messages that reach the visibility timeout
185-
func (q *RedisQueue) process() {
189+
func (q *RedisQueue) process(ctx context.Context) {
186190
var reenqueueScript = redis.NewScript(`
187191
local entries = redis.call('XPENDING', KEYS[1], KEYS[2], 'IDLE', ARGV[1], '-', '+', 1000)
188192
local ids = {}
@@ -200,39 +204,25 @@ func (q *RedisQueue) process() {
200204
return ids
201205
`)
202206

203-
go func() {
204-
ticker := time.NewTicker(time.Second)
205-
defer ticker.Stop()
206-
207-
for {
208-
select {
209-
case <-ticker.C:
210-
keys := []string{q.stream, q.group}
211-
argv := []interface{}{q.visibilityTimeout.Milliseconds()}
212-
res, err := reenqueueScript.Run(context.Background(), q.c, keys, argv...).Result()
213-
if err != nil {
214-
q.log.Errorf("failed to reenqueue: %v", err)
215-
continue
216-
}
217-
218-
if ids, ok := res.([]interface{}); ok && len(ids) > 0 {
219-
q.log.Debugf("enqueued invisible messages: %d", len(ids))
220-
}
221-
}
222-
}
207+
ticker := time.NewTicker(time.Second)
208+
defer ticker.Stop()
223209

224-
}()
225-
}
210+
for {
211+
select {
212+
case <-ctx.Done():
213+
return
214+
case <-ticker.C:
215+
keys := []string{q.opts.StreamName, q.opts.ConsumerGroupName}
216+
argv := []interface{}{q.opts.VisibilityTimeout.Milliseconds()}
217+
res, err := reenqueueScript.Run(context.TODO(), q.c, keys, argv...).Result()
218+
if err != nil {
219+
q.log.Errorf("failed to reenqueue: %v", err)
220+
continue
221+
}
226222

227-
func (q *RedisQueue) monitoring() {
228-
ticker := time.NewTicker(q.metrics.Interval)
229-
defer ticker.Stop()
230-
for range ticker.C {
231-
size, err := q.Size(context.TODO())
232-
if err != nil {
233-
q.log.Errorf("failed to get redis queue size: %v", err)
234-
continue
223+
if ids, ok := res.([]interface{}); ok && len(ids) > 0 {
224+
q.log.Debugf("enqueued invisible messages: %d", len(ids))
225+
}
235226
}
236-
q.metrics.EventPendingGauge.Set(float64(size))
237227
}
238228
}

0 commit comments

Comments
 (0)