Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/webhookx-io/webhookx/admin"
"github.com/webhookx-io/webhookx/admin/api"
"github.com/webhookx-io/webhookx/config"
"github.com/webhookx-io/webhookx/constants"
"github.com/webhookx-io/webhookx/db"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/db/migrator"
Expand Down Expand Up @@ -321,13 +322,14 @@ func registerEventHandler(bus *eventbus.EventBus) {
}
bus.Broadcast(eventbus.EventCRUD, eventData)
})
bus.Subscribe(eventbus.EventCRUD, func(data interface{}) {
eventData := data.(*eventbus.CrudData)
err := mcache.Invalidate(context.TODO(), eventData.CacheKey)
bus.Subscribe(eventbus.EventCRUD, func(d interface{}) {
data := d.(*eventbus.CrudData)
cacheKey := constants.CacheKeyFrom(data.CacheName)
err := mcache.Invalidate(context.TODO(), cacheKey.Build(data.ID))
if err != nil {
zap.S().Errorf("failed to invalidate cache: key=%s %v", eventData.CacheKey, err)
zap.S().Errorf("failed to invalidate cache: key=%s %v", cacheKey.Build(data.ID), err)
}
bus.Broadcast(fmt.Sprintf("%s.crud", eventData.Entity), eventData)
bus.Broadcast(fmt.Sprintf("%s.crud", data.Entity), data)
})
}

Expand Down
45 changes: 45 additions & 0 deletions constants/cache_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package constants

import "strings"

// CacheKey cache key definition.
// format "webhookx:<name>:<version>:<id>"
type CacheKey struct {
Name string
Version string
}

func (c CacheKey) Build(id string) string {
var sb strings.Builder
sb.WriteString("webhookx:")
sb.WriteString(c.Name)
sb.WriteString(":")
sb.WriteString(c.Version)
sb.WriteString(":")
sb.WriteString(id)
return sb.String()
}

var (
EventCacheKey = register(CacheKey{"events", "v1"})
EndpointCacheKey = register(CacheKey{"endpoints", "v1"})
EndpointPluginsKey = register(CacheKey{"endpoint_plugins", "v1"})
SourcePluginsKey = register(CacheKey{"source_plugins", "v1"})
SourceCacheKey = register(CacheKey{"sources", "v1"})
WorkspaceCacheKey = register(CacheKey{"workspaces", "v1"})
AttemptCacheKey = register(CacheKey{"attempts", "v1"})
PluginCacheKey = register(CacheKey{"plugins", "v1"})
AttemptDetailCacheKey = register(CacheKey{"attempt_details", "v1"})
WorkspaceEndpointsKey = register(CacheKey{"workspaces_endpoints", "v1"})
)

var registry = map[string]CacheKey{}

func register(ck CacheKey) CacheKey {
registry[ck.Name] = ck
return ck
}

func CacheKeyFrom(name string) CacheKey {
return registry[name]
}
27 changes: 0 additions & 27 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package constants

import (
"github.com/webhookx-io/webhookx/config"
"strings"
"time"
)

Expand All @@ -27,32 +26,6 @@ const (
RequeueInterval = time.Second * 60
)

type CacheKey string

func (c CacheKey) Build(id string) string {
var sb strings.Builder
sb.WriteString(Namespace)
sb.WriteString(":")
sb.WriteString(string(c))
sb.WriteString(":")
sb.WriteString(id)
return sb.String()
}

const (
Namespace string = "webhookx"
EventCacheKey CacheKey = "events"
EndpointCacheKey CacheKey = "endpoints"
EndpointPluginsKey CacheKey = "endpoint_plugins"
SourcePluginsKey CacheKey = "source_plugins"
SourceCacheKey CacheKey = "sources"
WorkspaceCacheKey CacheKey = "workspaces"
AttemptCacheKey CacheKey = "attempts"
PluginCacheKey CacheKey = "plugins"
AttemptDetailCacheKey CacheKey = "attempt_details"
WorkspaceEndpointsKey CacheKey = "workspaces_endpoints"
)

type Header struct {
Name string
Value string
Expand Down
2 changes: 1 addition & 1 deletion db/dao/attempt_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewAttemptDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) AttemptD
EntityName: "attempt",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.AttemptCacheKey,
CacheName: constants.AttemptCacheKey.Name,
}
return &attemptDao{
DAO: NewDAO[entities.Attempt](db, bus, opts),
Expand Down
2 changes: 1 addition & 1 deletion db/dao/attempt_detail_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewAttemptDetailDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) At
EntityName: "attempt_detail",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.AttemptDetailCacheKey,
CacheName: constants.AttemptDetailCacheKey.Name,
}
return &attemptDetailDao{
DAO: NewDAO[entities.AttemptDetail](db, bus, opts),
Expand Down
11 changes: 5 additions & 6 deletions db/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/webhookx-io/webhookx/constants"
"github.com/webhookx-io/webhookx/db/errs"
"github.com/webhookx-io/webhookx/db/query"
"github.com/webhookx-io/webhookx/db/transaction"
Expand Down Expand Up @@ -51,7 +50,7 @@ type Options struct {
EntityName string
Workspace bool
CachePropagate bool
CacheKey constants.CacheKey
CacheName string
}

func NewDAO[T any](db *sqlx.DB, bus *eventbus.EventBus, opts Options) *DAO[T] {
Expand Down Expand Up @@ -381,10 +380,10 @@ func (dao *DAO[T]) Upsert(ctx context.Context, fields []string, entity *T) error

func (dao *DAO[T]) propagateEvent(id string, entity *T) {
data := &eventbus.CrudData{
ID: id,
CacheKey: dao.opts.CacheKey.Build(id),
Entity: dao.opts.EntityName,
Data: utils.Must(json.Marshal(entity)),
ID: id,
CacheName: dao.opts.CacheName,
Entity: dao.opts.EntityName,
Data: utils.Must(json.Marshal(entity)),
}
wid := reflect.ValueOf(*entity).FieldByName("WorkspaceId")
if wid.IsValid() {
Expand Down
2 changes: 1 addition & 1 deletion db/dao/endpoint_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewEndpointDAO(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) Endpoin
EntityName: "endpoint",
Workspace: workspace,
CachePropagate: true,
CacheKey: constants.EndpointCacheKey,
CacheName: constants.EndpointCacheKey.Name,
}
return &endpointDAO{
DAO: NewDAO[entities.Endpoint](db, bus, opts),
Expand Down
2 changes: 1 addition & 1 deletion db/dao/event_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewEventDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) EventDAO {
EntityName: "event",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.EventCacheKey,
CacheName: constants.EventCacheKey.Name,
}
return &eventDao{
DAO: NewDAO[entities.Event](db, bus, opts),
Expand Down
2 changes: 1 addition & 1 deletion db/dao/plugin_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewPluginDAO(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) PluginDAO
EntityName: "plugin",
Workspace: workspace,
CachePropagate: true,
CacheKey: constants.PluginCacheKey,
CacheName: constants.PluginCacheKey.Name,
}
return &pluginDAO{
DAO: NewDAO[entities.Plugin](db, bus, opts),
Expand Down
2 changes: 1 addition & 1 deletion db/dao/source_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewSourceDAO(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) SourceDAO
EntityName: "source",
Workspace: workspace,
CachePropagate: true,
CacheKey: constants.SourceCacheKey,
CacheName: constants.SourceCacheKey.Name,
}
return &sourceDAO{
DAO: NewDAO[entities.Source](db, bus, opts),
Expand Down
2 changes: 1 addition & 1 deletion db/dao/workspace_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewWorkspaceDAO(db *sqlx.DB, bus *eventbus.EventBus) WorkspaceDAO {
EntityName: "workspace",
Workspace: false,
CachePropagate: true,
CacheKey: constants.WorkspaceCacheKey,
CacheName: constants.WorkspaceCacheKey.Name,
}
return &workspaceDAO{
DAO: NewDAO[entities.Workspace](db, bus, opts),
Expand Down
10 changes: 5 additions & 5 deletions eventbus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type Message struct {
type Callback func(data interface{})

type CrudData struct {
Entity string `json:"entity"`
ID string `json:"id"`
WID string `json:"wid"`
CacheKey string `json:"cache_key"`
Data json.RawMessage `json:"data"`
Entity string `json:"entity"`
ID string `json:"id"`
WID string `json:"wid"`
CacheName string `json:"cache_name"`
Data json.RawMessage `json:"data"`
}

type EventFanoutData struct {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/spf13/cobra v1.10.1
github.com/stretchr/testify v1.11.1
github.com/tetratelabs/wazero v1.10.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.opentelemetry.io/contrib/propagators/autoprop v0.63.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0
Expand Down Expand Up @@ -67,6 +68,7 @@ require (
github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect
github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/woodsbury/decimal128 v1.3.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.38.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/woodsbury/decimal128 v1.3.0 h1:8pffMNWIlC0O5vbyHWFZAt5yWvWcrHA+3ovIIjVWss0=
github.com/woodsbury/decimal128 v1.3.0/go.mod h1:C5UTmyTjW3JftjUFzOVhC20BEQa2a4ZKOB5I6Zjb+ds=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type RedisCache struct {
func NewRedisCache(client *redis.Client) *RedisCache {
return &RedisCache{
c: client,
s: serializer.Gob,
s: serializer.MsgPack,
}
}

Expand Down
23 changes: 0 additions & 23 deletions pkg/serializer/gob.go

This file was deleted.

27 changes: 27 additions & 0 deletions pkg/serializer/msgpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package serializer

import (
"bytes"
"github.com/vmihailenco/msgpack/v5"
)

var MsgPack MsgPackSerializer

type MsgPackSerializer struct{}

func (s MsgPackSerializer) Serialize(val interface{}) ([]byte, error) {
var buffer bytes.Buffer
encoder := msgpack.NewEncoder(&buffer)
encoder.SetCustomStructTag("json")
err := encoder.Encode(val)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}

func (s MsgPackSerializer) Deserialize(b []byte, val interface{}) error {
decoder := msgpack.NewDecoder(bytes.NewReader(b))
decoder.SetCustomStructTag("json")
return decoder.Decode(val)
}
3 changes: 2 additions & 1 deletion test/delivery/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ var _ = Describe("delivery", Ordered, func() {

BeforeAll(func() {
endpoint.Request.Timeout = 1
endpoint.Retry.Config.Attempts = []int64{0, 1, 1}
entitiesConfig := helper.EntitiesConfig{
Endpoints: []*entities.Endpoint{&endpoint},
Sources: []*entities.Source{factory.SourceP()},
Expand All @@ -149,7 +150,7 @@ var _ = Describe("delivery", Ordered, func() {
assert.Equal(GinkgoT(), 200, resp.StatusCode())
eventId := resp.Header().Get(constants.HeaderEventId)

time.Sleep(time.Second * 10)
time.Sleep(time.Second * 8)

q := query.AttemptQuery{}
q.EventId = &eventId
Expand Down
Loading