From 9d48daf3886065ce2df350daa5f710bdaf43d513 Mon Sep 17 00:00:00 2001 From: Douglas-Lee Date: Thu, 13 Nov 2025 23:58:32 +0800 Subject: [PATCH] perf: replace gob with msgpack --- app/app.go | 12 +++++---- constants/cache_key.go | 45 ++++++++++++++++++++++++++++++++++ constants/constants.go | 27 -------------------- db/dao/attempt_dao.go | 2 +- db/dao/attempt_detail_dao.go | 2 +- db/dao/dao.go | 11 ++++----- db/dao/endpoint_dao.go | 2 +- db/dao/event_dao.go | 2 +- db/dao/plugin_dao.go | 2 +- db/dao/source_dao.go | 2 +- db/dao/workspace_dao.go | 2 +- eventbus/types.go | 10 ++++---- go.mod | 2 ++ go.sum | 4 +++ pkg/cache/redis.go | 2 +- pkg/serializer/gob.go | 23 ----------------- pkg/serializer/msgpack.go | 27 ++++++++++++++++++++ test/delivery/delivery_test.go | 3 ++- 18 files changed, 105 insertions(+), 75 deletions(-) create mode 100644 constants/cache_key.go delete mode 100644 pkg/serializer/gob.go create mode 100644 pkg/serializer/msgpack.go diff --git a/app/app.go b/app/app.go index 08921665..857fc53e 100644 --- a/app/app.go +++ b/app/app.go @@ -10,6 +10,7 @@ import ( "github.com/webhookx-io/webhookx/admin" "github.com/webhookx-io/webhookx/admin/api" "github.com/webhookx-io/webhookx/config" + "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/migrator" @@ -321,13 +322,14 @@ func registerEventHandler(bus *eventbus.EventBus) { } bus.Broadcast(eventbus.EventCRUD, eventData) }) - bus.Subscribe(eventbus.EventCRUD, func(data interface{}) { - eventData := data.(*eventbus.CrudData) - err := mcache.Invalidate(context.TODO(), eventData.CacheKey) + bus.Subscribe(eventbus.EventCRUD, func(d interface{}) { + data := d.(*eventbus.CrudData) + cacheKey := constants.CacheKeyFrom(data.CacheName) + err := mcache.Invalidate(context.TODO(), cacheKey.Build(data.ID)) if err != nil { - zap.S().Errorf("failed to invalidate cache: key=%s %v", eventData.CacheKey, err) + zap.S().Errorf("failed to invalidate cache: key=%s %v", cacheKey.Build(data.ID), err) } - bus.Broadcast(fmt.Sprintf("%s.crud", eventData.Entity), eventData) + bus.Broadcast(fmt.Sprintf("%s.crud", data.Entity), data) }) } diff --git a/constants/cache_key.go b/constants/cache_key.go new file mode 100644 index 00000000..df08676b --- /dev/null +++ b/constants/cache_key.go @@ -0,0 +1,45 @@ +package constants + +import "strings" + +// CacheKey cache key definition. +// format "webhookx:::" +type CacheKey struct { + Name string + Version string +} + +func (c CacheKey) Build(id string) string { + var sb strings.Builder + sb.WriteString("webhookx:") + sb.WriteString(c.Name) + sb.WriteString(":") + sb.WriteString(c.Version) + sb.WriteString(":") + sb.WriteString(id) + return sb.String() +} + +var ( + EventCacheKey = register(CacheKey{"events", "v1"}) + EndpointCacheKey = register(CacheKey{"endpoints", "v1"}) + EndpointPluginsKey = register(CacheKey{"endpoint_plugins", "v1"}) + SourcePluginsKey = register(CacheKey{"source_plugins", "v1"}) + SourceCacheKey = register(CacheKey{"sources", "v1"}) + WorkspaceCacheKey = register(CacheKey{"workspaces", "v1"}) + AttemptCacheKey = register(CacheKey{"attempts", "v1"}) + PluginCacheKey = register(CacheKey{"plugins", "v1"}) + AttemptDetailCacheKey = register(CacheKey{"attempt_details", "v1"}) + WorkspaceEndpointsKey = register(CacheKey{"workspaces_endpoints", "v1"}) +) + +var registry = map[string]CacheKey{} + +func register(ck CacheKey) CacheKey { + registry[ck.Name] = ck + return ck +} + +func CacheKeyFrom(name string) CacheKey { + return registry[name] +} diff --git a/constants/constants.go b/constants/constants.go index b72061c2..3d1a0929 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -2,7 +2,6 @@ package constants import ( "github.com/webhookx-io/webhookx/config" - "strings" "time" ) @@ -27,32 +26,6 @@ const ( RequeueInterval = time.Second * 60 ) -type CacheKey string - -func (c CacheKey) Build(id string) string { - var sb strings.Builder - sb.WriteString(Namespace) - sb.WriteString(":") - sb.WriteString(string(c)) - sb.WriteString(":") - sb.WriteString(id) - return sb.String() -} - -const ( - Namespace string = "webhookx" - EventCacheKey CacheKey = "events" - EndpointCacheKey CacheKey = "endpoints" - EndpointPluginsKey CacheKey = "endpoint_plugins" - SourcePluginsKey CacheKey = "source_plugins" - SourceCacheKey CacheKey = "sources" - WorkspaceCacheKey CacheKey = "workspaces" - AttemptCacheKey CacheKey = "attempts" - PluginCacheKey CacheKey = "plugins" - AttemptDetailCacheKey CacheKey = "attempt_details" - WorkspaceEndpointsKey CacheKey = "workspaces_endpoints" -) - type Header struct { Name string Value string diff --git a/db/dao/attempt_dao.go b/db/dao/attempt_dao.go index b0f57196..0b38f1e7 100644 --- a/db/dao/attempt_dao.go +++ b/db/dao/attempt_dao.go @@ -33,7 +33,7 @@ func NewAttemptDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) AttemptD EntityName: "attempt", Workspace: workspace, CachePropagate: false, - CacheKey: constants.AttemptCacheKey, + CacheName: constants.AttemptCacheKey.Name, } return &attemptDao{ DAO: NewDAO[entities.Attempt](db, bus, opts), diff --git a/db/dao/attempt_detail_dao.go b/db/dao/attempt_detail_dao.go index bc76ef5c..439f5a8e 100644 --- a/db/dao/attempt_detail_dao.go +++ b/db/dao/attempt_detail_dao.go @@ -23,7 +23,7 @@ func NewAttemptDetailDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) At EntityName: "attempt_detail", Workspace: workspace, CachePropagate: false, - CacheKey: constants.AttemptDetailCacheKey, + CacheName: constants.AttemptDetailCacheKey.Name, } return &attemptDetailDao{ DAO: NewDAO[entities.AttemptDetail](db, bus, opts), diff --git a/db/dao/dao.go b/db/dao/dao.go index 5eedbf4d..603cb415 100644 --- a/db/dao/dao.go +++ b/db/dao/dao.go @@ -8,7 +8,6 @@ import ( "fmt" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" - "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db/errs" "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/db/transaction" @@ -51,7 +50,7 @@ type Options struct { EntityName string Workspace bool CachePropagate bool - CacheKey constants.CacheKey + CacheName string } func NewDAO[T any](db *sqlx.DB, bus *eventbus.EventBus, opts Options) *DAO[T] { @@ -381,10 +380,10 @@ func (dao *DAO[T]) Upsert(ctx context.Context, fields []string, entity *T) error func (dao *DAO[T]) propagateEvent(id string, entity *T) { data := &eventbus.CrudData{ - ID: id, - CacheKey: dao.opts.CacheKey.Build(id), - Entity: dao.opts.EntityName, - Data: utils.Must(json.Marshal(entity)), + ID: id, + CacheName: dao.opts.CacheName, + Entity: dao.opts.EntityName, + Data: utils.Must(json.Marshal(entity)), } wid := reflect.ValueOf(*entity).FieldByName("WorkspaceId") if wid.IsValid() { diff --git a/db/dao/endpoint_dao.go b/db/dao/endpoint_dao.go index 4e9dd864..cefc7c9a 100644 --- a/db/dao/endpoint_dao.go +++ b/db/dao/endpoint_dao.go @@ -17,7 +17,7 @@ func NewEndpointDAO(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) Endpoin EntityName: "endpoint", Workspace: workspace, CachePropagate: true, - CacheKey: constants.EndpointCacheKey, + CacheName: constants.EndpointCacheKey.Name, } return &endpointDAO{ DAO: NewDAO[entities.Endpoint](db, bus, opts), diff --git a/db/dao/event_dao.go b/db/dao/event_dao.go index 5ef8fcbe..20dbfc6a 100644 --- a/db/dao/event_dao.go +++ b/db/dao/event_dao.go @@ -20,7 +20,7 @@ func NewEventDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) EventDAO { EntityName: "event", Workspace: workspace, CachePropagate: false, - CacheKey: constants.EventCacheKey, + CacheName: constants.EventCacheKey.Name, } return &eventDao{ DAO: NewDAO[entities.Event](db, bus, opts), diff --git a/db/dao/plugin_dao.go b/db/dao/plugin_dao.go index 7e27996d..0e6b3a16 100644 --- a/db/dao/plugin_dao.go +++ b/db/dao/plugin_dao.go @@ -20,7 +20,7 @@ func NewPluginDAO(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) PluginDAO EntityName: "plugin", Workspace: workspace, CachePropagate: true, - CacheKey: constants.PluginCacheKey, + CacheName: constants.PluginCacheKey.Name, } return &pluginDAO{ DAO: NewDAO[entities.Plugin](db, bus, opts), diff --git a/db/dao/source_dao.go b/db/dao/source_dao.go index 014de13e..82253f47 100644 --- a/db/dao/source_dao.go +++ b/db/dao/source_dao.go @@ -17,7 +17,7 @@ func NewSourceDAO(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) SourceDAO EntityName: "source", Workspace: workspace, CachePropagate: true, - CacheKey: constants.SourceCacheKey, + CacheName: constants.SourceCacheKey.Name, } return &sourceDAO{ DAO: NewDAO[entities.Source](db, bus, opts), diff --git a/db/dao/workspace_dao.go b/db/dao/workspace_dao.go index 3fec0b46..27b87792 100644 --- a/db/dao/workspace_dao.go +++ b/db/dao/workspace_dao.go @@ -18,7 +18,7 @@ func NewWorkspaceDAO(db *sqlx.DB, bus *eventbus.EventBus) WorkspaceDAO { EntityName: "workspace", Workspace: false, CachePropagate: true, - CacheKey: constants.WorkspaceCacheKey, + CacheName: constants.WorkspaceCacheKey.Name, } return &workspaceDAO{ DAO: NewDAO[entities.Workspace](db, bus, opts), diff --git a/eventbus/types.go b/eventbus/types.go index e359c46b..2afbb447 100644 --- a/eventbus/types.go +++ b/eventbus/types.go @@ -25,11 +25,11 @@ type Message struct { type Callback func(data interface{}) type CrudData struct { - Entity string `json:"entity"` - ID string `json:"id"` - WID string `json:"wid"` - CacheKey string `json:"cache_key"` - Data json.RawMessage `json:"data"` + Entity string `json:"entity"` + ID string `json:"id"` + WID string `json:"wid"` + CacheName string `json:"cache_name"` + Data json.RawMessage `json:"data"` } type EventFanoutData struct { diff --git a/go.mod b/go.mod index 32f6ae81..80eace55 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.11.1 github.com/tetratelabs/wazero v1.10.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 go.opentelemetry.io/contrib/propagators/autoprop v0.63.0 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 @@ -67,6 +68,7 @@ require ( github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/woodsbury/decimal128 v1.3.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/propagators/aws v1.38.0 // indirect diff --git a/go.sum b/go.sum index 2eb29201..3ada4b99 100644 --- a/go.sum +++ b/go.sum @@ -237,6 +237,10 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/woodsbury/decimal128 v1.3.0 h1:8pffMNWIlC0O5vbyHWFZAt5yWvWcrHA+3ovIIjVWss0= github.com/woodsbury/decimal128 v1.3.0/go.mod h1:C5UTmyTjW3JftjUFzOVhC20BEQa2a4ZKOB5I6Zjb+ds= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= diff --git a/pkg/cache/redis.go b/pkg/cache/redis.go index 75aa2bac..b90c15c5 100644 --- a/pkg/cache/redis.go +++ b/pkg/cache/redis.go @@ -16,7 +16,7 @@ type RedisCache struct { func NewRedisCache(client *redis.Client) *RedisCache { return &RedisCache{ c: client, - s: serializer.Gob, + s: serializer.MsgPack, } } diff --git a/pkg/serializer/gob.go b/pkg/serializer/gob.go deleted file mode 100644 index 609c31b9..00000000 --- a/pkg/serializer/gob.go +++ /dev/null @@ -1,23 +0,0 @@ -package serializer - -import ( - "bytes" - "encoding/gob" -) - -var Gob GobSerializer - -type GobSerializer struct{} - -func (g GobSerializer) Serialize(val interface{}) ([]byte, error) { - var buffer bytes.Buffer - err := gob.NewEncoder(&buffer).Encode(val) - if err != nil { - return nil, err - } - return buffer.Bytes(), nil -} - -func (g GobSerializer) Deserialize(b []byte, val interface{}) error { - return gob.NewDecoder(bytes.NewReader(b)).Decode(val) -} diff --git a/pkg/serializer/msgpack.go b/pkg/serializer/msgpack.go new file mode 100644 index 00000000..e2937ada --- /dev/null +++ b/pkg/serializer/msgpack.go @@ -0,0 +1,27 @@ +package serializer + +import ( + "bytes" + "github.com/vmihailenco/msgpack/v5" +) + +var MsgPack MsgPackSerializer + +type MsgPackSerializer struct{} + +func (s MsgPackSerializer) Serialize(val interface{}) ([]byte, error) { + var buffer bytes.Buffer + encoder := msgpack.NewEncoder(&buffer) + encoder.SetCustomStructTag("json") + err := encoder.Encode(val) + if err != nil { + return nil, err + } + return buffer.Bytes(), nil +} + +func (s MsgPackSerializer) Deserialize(b []byte, val interface{}) error { + decoder := msgpack.NewDecoder(bytes.NewReader(b)) + decoder.SetCustomStructTag("json") + return decoder.Decode(val) +} diff --git a/test/delivery/delivery_test.go b/test/delivery/delivery_test.go index 4159b8e6..4aa0d51d 100644 --- a/test/delivery/delivery_test.go +++ b/test/delivery/delivery_test.go @@ -124,6 +124,7 @@ var _ = Describe("delivery", Ordered, func() { BeforeAll(func() { endpoint.Request.Timeout = 1 + endpoint.Retry.Config.Attempts = []int64{0, 1, 1} entitiesConfig := helper.EntitiesConfig{ Endpoints: []*entities.Endpoint{&endpoint}, Sources: []*entities.Source{factory.SourceP()}, @@ -149,7 +150,7 @@ var _ = Describe("delivery", Ordered, func() { assert.Equal(GinkgoT(), 200, resp.StatusCode()) eventId := resp.Header().Get(constants.HeaderEventId) - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 8) q := query.AttemptQuery{} q.EventId = &eventId