Skip to content

Commit cf9514f

Browse files
cchenggitvm-001
andauthored
feat(plugins): add inbound plugin jsonschema-validator (#239)
Signed-off-by: Yusheng Li <leeys.top@gmail.com> Co-authored-by: Yusheng Li <leeys.top@gmail.com>
1 parent c8d1618 commit cf9514f

File tree

16 files changed

+632
-2
lines changed

16 files changed

+632
-2
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a
3333
- `webhookx-signature`: Sign outbound requests with HMAC(SHA-256) by adding `Webhookx-Signature` and `Webhookx-Timestamp` headers.
3434
- `wasm`: Transform outbound requests using high-level languages such as AssemblyScript, Rust or TinyGo. See [plugin/wasm](plugins/wasm).
3535
- `function`: Customize inbound behavior with JavaScript, e.g. signature verification or request body transformation.
36+
- `jsonschema-validator`: Validate event payloads against JSONSchema definitions. Up to Draft v6 is supported.
3637
- **Observability:** OpenTelemetry metrics and tracing for monitoring and troubleshooting.
3738

3839

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package jsonschema
2+
3+
import (
4+
"github.com/getkin/kin-openapi/openapi3"
5+
lru "github.com/hashicorp/golang-lru/v2"
6+
"github.com/webhookx-io/webhookx/pkg/openapi"
7+
"github.com/webhookx-io/webhookx/utils"
8+
)
9+
10+
type JSONSchema struct {
11+
schemaDef string
12+
hex string
13+
}
14+
15+
func New(schemaDef []byte) *JSONSchema {
16+
return &JSONSchema{
17+
schemaDef: string(schemaDef),
18+
hex: utils.Sha256(string(schemaDef)),
19+
}
20+
}
21+
22+
var cache, _ = lru.New[string, *openapi3.Schema](128)
23+
24+
func (s *JSONSchema) Validate(ctx *ValidatorContext) error {
25+
schema, ok := cache.Get(s.hex)
26+
if !ok {
27+
schema = &openapi3.Schema{}
28+
err := schema.UnmarshalJSON([]byte(s.schemaDef))
29+
if err != nil {
30+
return err
31+
}
32+
cache.Add(s.hex, schema)
33+
}
34+
35+
err := openapi.Validate(schema, ctx.HTTPRequest.Data)
36+
if err != nil {
37+
return err
38+
}
39+
return nil
40+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package jsonschema
2+
3+
import (
4+
"encoding/json"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
"testing"
8+
)
9+
10+
func TestJSONSchema(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "Schema Validator Suite")
13+
}
14+
15+
var _ = Describe("Schema Validator Plugin", func() {
16+
17+
Context("JSONSchema Validator", func() {
18+
It("should validate valid JSON data against the schema", func() {
19+
schemaDef := `{
20+
"type": "object",
21+
"properties": {
22+
"name": { "type": "string" },
23+
"age": { "type": "integer", "minimum": 0 }
24+
},
25+
"required": ["name", "age"]
26+
}`
27+
28+
validator := New([]byte(schemaDef))
29+
30+
validData := map[string]interface{}{"name": "John Doe", "age": 30}
31+
ctx := &ValidatorContext{
32+
HTTPRequest: &HTTPRequest{
33+
Data: validData,
34+
},
35+
}
36+
37+
err := validator.Validate(ctx)
38+
Expect(err).To(BeNil())
39+
})
40+
41+
It("should return an error for invalid JSON data against the schema", func() {
42+
schemaDef := `{
43+
"type": "object",
44+
"properties": {
45+
"name": { "type": "string" },
46+
"age": { "type": "integer", "minimum": 0 }
47+
},
48+
"required": ["name", "age"]
49+
}`
50+
51+
validator := New([]byte(schemaDef))
52+
53+
invalidData := map[string]interface{}{"name": "John Doe", "age": -5}
54+
ctx := &ValidatorContext{
55+
HTTPRequest: &HTTPRequest{
56+
Data: invalidData,
57+
},
58+
}
59+
60+
err := validator.Validate(ctx)
61+
Expect(err).ToNot(BeNil())
62+
b, _ := json.Marshal(err)
63+
Expect(string(b)).To(Equal(`{"message":"request validation","fields":{"age":"number must be at least 0"}}`))
64+
})
65+
})
66+
})
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package jsonschema
2+
3+
import (
4+
"net/http"
5+
)
6+
7+
type Validator interface {
8+
Validate(ctx *ValidatorContext) error
9+
}
10+
11+
type ValidatorContext struct {
12+
HTTPRequest *HTTPRequest
13+
}
14+
15+
type HTTPRequest struct {
16+
R *http.Request
17+
Data map[string]any
18+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package jsonschema_validator
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"github.com/getkin/kin-openapi/openapi3"
9+
"github.com/webhookx-io/webhookx/pkg/errs"
10+
"github.com/webhookx-io/webhookx/pkg/http/response"
11+
"github.com/webhookx-io/webhookx/pkg/plugin"
12+
"github.com/webhookx-io/webhookx/pkg/types"
13+
"github.com/webhookx-io/webhookx/plugins/jsonschema_validator/jsonschema"
14+
"github.com/webhookx-io/webhookx/utils"
15+
)
16+
17+
type Config struct {
18+
Draft string `json:"draft" validate:"required,oneof=6 default:6"`
19+
DefaultSchema string `json:"default_schema" validate:"omitempty,json,max=1048576"`
20+
Schemas map[string]*Schema `json:"schemas" validate:"dive"`
21+
}
22+
23+
type Schema struct {
24+
Schema string `json:"schema" validate:"omitempty,json,max=1048576"`
25+
}
26+
27+
type SchemaValidatorPlugin struct {
28+
plugin.BasePlugin[Config]
29+
}
30+
31+
func New(config []byte) (plugin.Plugin, error) {
32+
p := &SchemaValidatorPlugin{}
33+
p.Name = "jsonschema-validator"
34+
35+
if config != nil {
36+
if err := p.UnmarshalConfig(config); err != nil {
37+
return nil, err
38+
}
39+
}
40+
return p, nil
41+
}
42+
43+
func unmarshalAndValidateSchema(schema string) (*openapi3.Schema, error) {
44+
openapiSchema := &openapi3.Schema{}
45+
err := openapiSchema.UnmarshalJSON([]byte(schema))
46+
if err != nil {
47+
return nil, fmt.Errorf("value must be a valid jsonschema")
48+
}
49+
err = openapiSchema.Validate(context.Background(), openapi3.EnableSchemaFormatValidation())
50+
if err != nil {
51+
return openapiSchema, err
52+
}
53+
return openapiSchema, nil
54+
}
55+
56+
func (p *SchemaValidatorPlugin) ValidateConfig() error {
57+
err := utils.Validate(p.Config)
58+
if err != nil {
59+
return err
60+
}
61+
62+
e := errs.NewValidateError(errors.New("request validation"))
63+
64+
var defaultErr error
65+
if p.Config.DefaultSchema != "" {
66+
_, err := unmarshalAndValidateSchema(p.Config.DefaultSchema)
67+
if err != nil {
68+
defaultErr = err
69+
e.Fields = map[string]interface{}{
70+
"default_schema": err.Error(),
71+
}
72+
}
73+
}
74+
75+
for event, schema := range p.Config.Schemas {
76+
field := fmt.Sprintf("schemas[%s]", event)
77+
if schema == nil || schema.Schema == "" {
78+
if defaultErr != nil {
79+
e.Fields[field] = map[string]string{
80+
"schema": "invalid due to reusing the default_schema definition",
81+
}
82+
}
83+
} else {
84+
_, err = unmarshalAndValidateSchema(schema.Schema)
85+
if err != nil {
86+
e.Fields[field] = map[string]string{
87+
"schema": err.Error(),
88+
}
89+
}
90+
}
91+
}
92+
if len(e.Fields) > 0 {
93+
return e
94+
}
95+
return nil
96+
}
97+
98+
func (p *SchemaValidatorPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plugin.InboundResult, err error) {
99+
var event map[string]any
100+
body := inbound.RawBody
101+
if err = json.Unmarshal(body, &event); err != nil {
102+
return
103+
}
104+
105+
eventType, ok := event["event_type"].(string)
106+
if !ok || eventType == "" {
107+
res.Payload = body
108+
return
109+
}
110+
111+
data := event["data"]
112+
if data == nil {
113+
res.Payload = body
114+
return
115+
}
116+
117+
schema, ok := p.Config.Schemas[eventType]
118+
if !ok {
119+
res.Payload = body
120+
return
121+
}
122+
if schema == nil || schema.Schema == "" {
123+
if p.Config.DefaultSchema == "" {
124+
res.Payload = body
125+
return
126+
}
127+
schema = &Schema{
128+
Schema: p.Config.DefaultSchema,
129+
}
130+
}
131+
132+
validator := jsonschema.New([]byte(schema.Schema))
133+
e := validator.Validate(&jsonschema.ValidatorContext{
134+
HTTPRequest: &jsonschema.HTTPRequest{
135+
R: inbound.Request,
136+
Data: data.(map[string]any),
137+
},
138+
})
139+
if e != nil {
140+
response.JSON(inbound.Response, 400, types.ErrorResponse{
141+
Message: "Request Validation",
142+
Error: e,
143+
})
144+
res.Terminated = true
145+
return
146+
}
147+
res.Payload = body
148+
return
149+
}

plugins/plugins.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package plugins
33
import (
44
"github.com/webhookx-io/webhookx/pkg/plugin"
55
"github.com/webhookx-io/webhookx/plugins/function"
6+
"github.com/webhookx-io/webhookx/plugins/jsonschema_validator"
67
"github.com/webhookx-io/webhookx/plugins/wasm"
78
"github.com/webhookx-io/webhookx/plugins/webhookx_signature"
89
)
910

1011
func LoadPlugins() {
1112
plugin.RegisterPlugin(plugin.TypeInbound, "function", function.New)
13+
plugin.RegisterPlugin(plugin.TypeInbound, "jsonschema-validator", jsonschema_validator.New)
1214
plugin.RegisterPlugin(plugin.TypeOutbound, "wasm", wasm.New)
1315
plugin.RegisterPlugin(plugin.TypeOutbound, "webhookx-signature", webhookx_signature.New)
1416
}

proxy/gateway.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ func (gw *Gateway) handle(w http.ResponseWriter, r *http.Request) bool {
260260
response.JSON(w, 500, types.ErrorResponse{Message: "internal error"})
261261
return false
262262
}
263+
263264
if result.Terminated {
264265
return false
265266
}

test/cmd/admin_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,13 @@ var _ = Describe("admin", Ordered, func() {
101101

102102
plugins, err = db.Plugins.ListSourcePlugin(context.TODO(), source.ID)
103103
assert.NoError(GinkgoT(), err)
104-
assert.Equal(GinkgoT(), 1, len(plugins))
104+
assert.Equal(GinkgoT(), 2, len(plugins))
105105
assert.Equal(GinkgoT(), "function", plugins[0].Name)
106106
assert.Equal(GinkgoT(), true, plugins[0].Enabled)
107107
assert.Equal(GinkgoT(), `{"function": "function handle() {}"}`, string(plugins[0].Config))
108+
assert.Equal(GinkgoT(), `jsonschema-validator`, plugins[1].Name)
109+
assert.Equal(GinkgoT(), true, plugins[1].Enabled)
110+
assert.Equal(GinkgoT(), `{"draft": "6", "schemas": {"charge.succeeded": {"schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}, "default_schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" }\n },\n \"required\": [\"id\"]\n}\n"}`, string(plugins[1].Config))
108111
})
109112

110113
It("entities not defined in the declarative configuration should be deleted", func() {

0 commit comments

Comments
 (0)