Skip to content

Commit 1d6c0df

Browse files
authored
perf(*): avoid scheduling tasks that are more than 3 minutes later into the task queue (#197)
1 parent a30c901 commit 1d6c0df

File tree

7 files changed

+105
-23
lines changed

7 files changed

+105
-23
lines changed

app/app.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ func (app *Application) DB() *db.DB {
303303
return app.db
304304
}
305305

306+
func (app *Application) Worker() *worker.Worker {
307+
return app.worker
308+
}
309+
306310
func (app *Application) NodeID() string {
307311
return app.nodeID
308312
}

constants/constants.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import (
88

99
// Task Queue
1010
const (
11-
TaskQueueName = "webhookx:queue"
12-
TaskQueueInvisibleQueueName = "webhookx:queue_invisible"
13-
TaskQueueDataName = "webhookx:queue_data"
14-
TaskQueueVisibilityTimeout = time.Second * 65
11+
TaskQueueName = "webhookx:queue"
12+
TaskQueueInvisibleQueueName = "webhookx:queue_invisible"
13+
TaskQueueDataName = "webhookx:queue_data"
14+
TaskQueueVisibilityTimeout = time.Second * 65
15+
TaskQueuePreScheduleTimeWindow = time.Minute * 3
1516
)
1617

1718
// Redis Queue

db/dao/attempt_dao.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/webhookx-io/webhookx/pkg/tracing"
1212
"github.com/webhookx-io/webhookx/pkg/types"
1313
"go.opentelemetry.io/otel/trace"
14+
"time"
1415
)
1516

1617
type attemptDao struct {
@@ -77,8 +78,8 @@ func (dao *attemptDao) UpdateErrorCode(ctx context.Context, id string, status en
7778
return err
7879
}
7980

80-
func (dao *attemptDao) ListUnqueuedForUpdate(ctx context.Context, limit int) (list []*entities.Attempt, err error) {
81-
sql := "SELECT * FROM attempts WHERE status = 'INIT' and created_at <= now() AT TIME ZONE 'UTC' - INTERVAL '60 SECOND' limit $1 FOR UPDATE SKIP LOCKED"
82-
err = dao.UnsafeDB(ctx).SelectContext(ctx, &list, sql, limit)
81+
func (dao *attemptDao) ListUnqueuedForUpdate(ctx context.Context, maxScheduledAt time.Time, limit int) (list []*entities.Attempt, err error) {
82+
sql := "SELECT * FROM attempts WHERE status = 'INIT' AND created_at <= now() - INTERVAL '30 SECOND' AND scheduled_at < $1 limit $2 FOR UPDATE SKIP LOCKED"
83+
err = dao.UnsafeDB(ctx).SelectContext(ctx, &list, sql, maxScheduledAt, limit)
8384
return
8485
}

db/dao/daos.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dao
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/webhookx-io/webhookx/db/entities"
78
"github.com/webhookx-io/webhookx/db/query"
@@ -40,7 +41,7 @@ type AttemptDAO interface {
4041
UpdateStatusToQueued(ctx context.Context, ids []string) error
4142
UpdateErrorCode(ctx context.Context, id string, status entities.AttemptStatus, code entities.AttemptErrorCode) error
4243
UpdateDelivery(ctx context.Context, id string, result *AttemptResult) error
43-
ListUnqueuedForUpdate(ctx context.Context, limit int) (list []*entities.Attempt, err error)
44+
ListUnqueuedForUpdate(ctx context.Context, maxScheduledAt time.Time, limit int) (list []*entities.Attempt, err error)
4445
}
4546

4647
type SourceDAO interface {

service/service.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package service
22

33
import (
44
"context"
5+
"github.com/webhookx-io/webhookx/constants"
56
"github.com/webhookx-io/webhookx/db"
67
"github.com/webhookx-io/webhookx/db/entities"
78
"github.com/webhookx-io/webhookx/pkg/taskqueue"
89
"go.uber.org/zap"
10+
"time"
911
)
1012

1113
type Service struct {
@@ -32,20 +34,23 @@ func (s *Service) ScheduleAttempts(ctx context.Context, attempts []*entities.Att
3234
return
3335
}
3436

37+
maxScheduleAt := time.Now().Add(constants.TaskQueuePreScheduleTimeWindow)
3538
tasks := make([]*taskqueue.TaskMessage, 0)
3639
ids := make([]string, 0)
3740
for _, attempt := range attempts {
38-
tasks = append(tasks, &taskqueue.TaskMessage{
39-
ID: attempt.ID,
40-
ScheduledAt: attempt.ScheduledAt.Time,
41-
Data: &taskqueue.MessageData{
42-
EventID: attempt.EventId,
43-
EndpointId: attempt.EndpointId,
44-
Attempt: attempt.AttemptNumber,
45-
Event: string(attempt.Event.Data),
46-
},
47-
})
48-
ids = append(ids, attempt.ID)
41+
if attempt.ScheduledAt.Before(maxScheduleAt) {
42+
tasks = append(tasks, &taskqueue.TaskMessage{
43+
ID: attempt.ID,
44+
ScheduledAt: attempt.ScheduledAt.Time,
45+
Data: &taskqueue.MessageData{
46+
EventID: attempt.EventId,
47+
EndpointId: attempt.EndpointId,
48+
Attempt: attempt.AttemptNumber,
49+
Event: string(attempt.Event.Data),
50+
},
51+
})
52+
ids = append(ids, attempt.ID)
53+
}
4954
}
5055

5156
if len(tasks) == 0 {

test/delivery/delivery_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package delivery
22

33
import (
44
"context"
5+
"github.com/webhookx-io/webhookx/constants"
56
"github.com/webhookx-io/webhookx/test/helper/factory"
67
"strconv"
78
"testing"
@@ -253,6 +254,75 @@ var _ = Describe("delivery", Ordered, func() {
253254
})
254255
})
255256

257+
Context("schedule task", func() {
258+
var proxyClient *resty.Client
259+
260+
var app *app.Application
261+
var db *db.DB
262+
263+
BeforeAll(func() {
264+
endpoint := factory.Endpoint()
265+
endpoint.Retry.Config.Attempts = []int64{int64(constants.TaskQueuePreScheduleTimeWindow.Seconds()) + 3}
266+
entitiesConfig := helper.EntitiesConfig{
267+
Endpoints: []*entities.Endpoint{&endpoint},
268+
Sources: []*entities.Source{factory.SourceP()},
269+
}
270+
db = helper.InitDB(true, &entitiesConfig)
271+
proxyClient = helper.ProxyClient()
272+
273+
app = utils.Must(helper.Start(map[string]string{
274+
"WEBHOOKX_ADMIN_LISTEN": "0.0.0.0:8080",
275+
"WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081",
276+
"WEBHOOKX_WORKER_ENABLED": "true",
277+
}))
278+
})
279+
280+
AfterAll(func() {
281+
app.Stop()
282+
})
283+
284+
It("scheudle task when conditions met", func() {
285+
assert.Eventually(GinkgoT(), func() bool {
286+
resp, err := proxyClient.R().
287+
SetBody(`{
288+
"event_type": "foo.bar",
289+
"data": {"key": "value"}
290+
}`).
291+
Post("/")
292+
return err == nil && resp.StatusCode() == 200
293+
}, time.Second*5, time.Second)
294+
295+
time.Sleep(time.Second)
296+
297+
var attempt *entities.Attempt
298+
assert.Eventually(GinkgoT(), func() bool {
299+
list, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{})
300+
if err != nil || len(list) == 0 {
301+
return false
302+
}
303+
attempt = list[0]
304+
return attempt.Status == entities.AttemptStatusInit // should not be enqueued
305+
}, time.Second*5, time.Second)
306+
307+
result, err := db.DB.Exec("UPDATE attempts set scheduled_at = $1, created_at = created_at - INTERVAL '30 SECOND' where id = $2", time.Now(), attempt.ID)
308+
assert.NoError(GinkgoT(), err)
309+
row, err := result.RowsAffected()
310+
assert.NoError(GinkgoT(), err)
311+
assert.Equal(GinkgoT(), int64(1), row)
312+
313+
time.Sleep(time.Second)
314+
315+
app.Worker().ProcessRequeue() // load db data that meets the conditions into task queue
316+
317+
assert.Eventually(GinkgoT(), func() bool {
318+
model, err := db.Attempts.Get(context.TODO(), attempt.ID)
319+
assert.NoError(GinkgoT(), err)
320+
return model.Status == entities.AttemptStatusSuccess
321+
}, time.Second*5, time.Second)
322+
323+
})
324+
})
325+
256326
})
257327

258328
func TestProxy(t *testing.T) {

worker/worker.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func (w *Worker) Start() error {
250250

251251
go w.run()
252252

253-
schedule.Schedule(w.ctx, w.processRequeue, w.opts.RequeueJobInterval)
253+
schedule.Schedule(w.ctx, w.ProcessRequeue, w.opts.RequeueJobInterval)
254254
return nil
255255
}
256256

@@ -265,13 +265,14 @@ func (w *Worker) Stop() error {
265265
return nil
266266
}
267267

268-
func (w *Worker) processRequeue() {
268+
func (w *Worker) ProcessRequeue() {
269269
batchSize := w.opts.RequeueJobBatch
270270

271271
var done bool
272272
for {
273273
err := w.db.TX(context.TODO(), func(ctx context.Context) error {
274-
attempts, err := w.db.Attempts.ListUnqueuedForUpdate(ctx, batchSize)
274+
maxScheduledAt := time.Now().Add(constants.TaskQueuePreScheduleTimeWindow)
275+
attempts, err := w.db.Attempts.ListUnqueuedForUpdate(ctx, maxScheduledAt, batchSize)
275276
if err != nil {
276277
return err
277278
}
@@ -285,7 +286,6 @@ func (w *Worker) processRequeue() {
285286
attempt.Event = event
286287
}
287288
w.srv.ScheduleAttempts(ctx, attempts)
288-
return nil
289289
}
290290

291291
if len(attempts) < batchSize {

0 commit comments

Comments
 (0)