Skip to content

Commit 71077fb

Browse files
committed
update
1 parent ec09667 commit 71077fb

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

pkg/queue/queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Message struct {
1111
WorkspaceID string
1212
}
1313

14-
type HandleFunc func(ctx context.Context, messages []*Message) error
14+
type HandlerFunc func(ctx context.Context, messages []*Message) error
1515

1616
type Queue interface {
1717
Producer
@@ -20,9 +20,9 @@ type Queue interface {
2020
}
2121

2222
type Producer interface {
23-
WriteMessage(ctx context.Context, message *Message) error
23+
Enqueue(ctx context.Context, message *Message) error
2424
}
2525

2626
type Consumer interface {
27-
StartListen(ctx context.Context, handle HandleFunc)
27+
StartListen(ctx context.Context, handler HandlerFunc)
2828
}

pkg/queue/redis/redis.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewRedisQueue(opts Options, logger *zap.SugaredLogger) (queue.Queue, error)
4141
return q, nil
4242
}
4343

44-
func (q *RedisQueue) WriteMessage(ctx context.Context, message *queue.Message) error {
44+
func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error {
4545
ctx, span := tracing.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer))
4646
defer span.End()
4747

@@ -117,15 +117,15 @@ func (q *RedisQueue) delete(ctx context.Context, xmessages []redis.XMessage) err
117117
return err
118118
}
119119

120-
func (q *RedisQueue) StartListen(ctx context.Context, handle queue.HandleFunc) {
120+
func (q *RedisQueue) StartListen(ctx context.Context, handler queue.HandlerFunc) {
121121
q.log.Infof("starting %d listeners", q.opts.Listeners)
122122
for i := 0; i < q.opts.Listeners; i++ {
123-
go q.listen(ctx, handle)
123+
go q.listen(ctx, handler)
124124
}
125125
go q.process(ctx)
126126
}
127127

128-
func (q *RedisQueue) listen(ctx context.Context, handle queue.HandleFunc) {
128+
func (q *RedisQueue) listen(ctx context.Context, handler queue.HandlerFunc) {
129129
for {
130130
select {
131131
case <-ctx.Done():
@@ -146,7 +146,7 @@ func (q *RedisQueue) listen(ctx context.Context, handle queue.HandleFunc) {
146146
messages = append(messages, toMessage(msg.Values))
147147
}
148148

149-
err = handle(ctx, messages)
149+
err = handler(ctx, messages)
150150
if err != nil {
151151
q.log.Warnf("failed to handle message: %v", err)
152152
continue

proxy/gateway.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (gw *Gateway) ingestEvent(ctx context.Context, async bool, event *entities.
294294
Time: time.Now(),
295295
WorkspaceID: event.WorkspaceId,
296296
}
297-
return gw.queue.WriteMessage(ctx, &msg)
297+
return gw.queue.Enqueue(ctx, &msg)
298298
}
299299

300300
return gw.dispatch(ctx, []*entities.Event{event})

0 commit comments

Comments
 (0)