Skip to content

Commit 3c82dc1

Browse files
authored
feat(worker): access control list (#253)
1 parent d67c0f6 commit 3c82dc1

File tree

11 files changed

+652
-15
lines changed

11 files changed

+652
-15
lines changed

config.yml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,28 @@ status:
6969
#------------------------------------------------------------------------------
7070

7171
worker:
72-
enabled: false
72+
enabled: false # Whether to enable the Worker.
7373
deliverer:
74-
timeout: 60000
74+
timeout: 60000 # Sets the request timeout (in milliseconds) for delivery requests.
75+
acl: # Access Control List (ACL) defines rules to control outbound network access.
76+
# A rule is a string of IPv4, IPv6, CIDR, hostname, or pre-configured group.
77+
# A hostname can contain a wildcard prefix (*.) represents its subdomains, and Unicode
78+
# characters need to be converted to Punycode.
79+
# The list of pre-configured groups:
80+
# - @default: [ '@private', '@loopback', '@linklocal', '@reserved' ]
81+
# - @private: [ '10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16' ]
82+
# - @loopback: [ '127.0.0.0/8', '::1/128' ]
83+
# - @linklocal: [ '169.254.0.0/16', 'fe80::/10' ]
84+
# - @reserved: [ '0.0.0.0/8', '100.64.0.0/10', '192.0.0.0/24', '224.0.0.0/4', '240.0.0.0/4', 'fc00::/7' ]
85+
#
86+
deny: # `deny` defines a list of rules that deny access.
87+
- '@default' # Example:
88+
# deny:
89+
# - '@default'
90+
# - '23.215.0.136'
91+
# - '2606:2800:220:1:248:1893:25c8:1946'
92+
# - '*.example.com'
93+
#
7594
pool:
7695
size: 10000 # pool size, default to 10000.
7796
concurrency: 0 # pool concurrency, default to 100 * CPUs

config/config_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,79 @@ func TestRole(t *testing.T) {
358358
assert.Equal(t, errors.New("invalid role: ''"), cfg.Validate())
359359
}
360360

361+
func TestWorkerConfig(t *testing.T) {
362+
tests := []struct {
363+
desc string
364+
cfg WorkerConfig
365+
validateErr error
366+
}{
367+
{
368+
desc: "sanity",
369+
cfg: WorkerConfig{
370+
Enabled: false,
371+
Deliverer: WorkerDeliverer{
372+
Timeout: 0,
373+
ACL: ACLConfig{
374+
Deny: []string{"@default", "0.0.0.0", "0.0.0.0/32", "*.example.com", "foo.example.com", "::1/128"},
375+
},
376+
},
377+
Pool: Pool{},
378+
},
379+
validateErr: nil,
380+
},
381+
{
382+
desc: "invalid deliverer configuration: negative timeout",
383+
cfg: WorkerConfig{
384+
Deliverer: WorkerDeliverer{
385+
Timeout: -1,
386+
ACL: ACLConfig{},
387+
},
388+
},
389+
validateErr: errors.New("deliverer.timeout cannot be negative"),
390+
},
391+
{
392+
desc: "invalid deliverer configuration: invalid acl configuration 1",
393+
cfg: WorkerConfig{
394+
Deliverer: WorkerDeliverer{
395+
Timeout: 0,
396+
ACL: ACLConfig{
397+
Deny: []string{"default"},
398+
},
399+
},
400+
},
401+
validateErr: errors.New("invalid rule 'default': requires IP, CIDR, hostname, or pre-configured name"),
402+
},
403+
{
404+
desc: "invalid deliverer configuration: invalid acl configuration 2",
405+
cfg: WorkerConfig{
406+
Deliverer: WorkerDeliverer{
407+
Timeout: 0,
408+
ACL: ACLConfig{
409+
Deny: []string{"*"},
410+
},
411+
},
412+
},
413+
validateErr: errors.New("invalid rule '*': requires IP, CIDR, hostname, or pre-configured name"),
414+
},
415+
{
416+
desc: "invalid deliverer configuration: unicode hostname",
417+
cfg: WorkerConfig{
418+
Deliverer: WorkerDeliverer{
419+
Timeout: 0,
420+
ACL: ACLConfig{
421+
Deny: []string{"тест.example.com"},
422+
},
423+
},
424+
},
425+
validateErr: errors.New("invalid rule 'тест.example.com': requires IP, CIDR, hostname, or pre-configured name"),
426+
},
427+
}
428+
for _, test := range tests {
429+
actual := test.cfg.Validate()
430+
assert.Equal(t, test.validateErr, actual, "expected %v got %v", test.validateErr, actual)
431+
}
432+
}
433+
361434
func TestConfig(t *testing.T) {
362435
cfg, err := Init()
363436
assert.Nil(t, err)

config/worker.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
11
package config
22

3+
import (
4+
"fmt"
5+
"net/netip"
6+
"regexp"
7+
"slices"
8+
)
9+
310
type WorkerDeliverer struct {
4-
Timeout int64 `yaml:"timeout" json:"timeout" default:"60000"`
11+
Timeout int64 `yaml:"timeout" json:"timeout" default:"60000"`
12+
ACL ACLConfig `yaml:"acl" json:"acl"`
13+
}
14+
15+
func (cfg *WorkerDeliverer) Validate() error {
16+
if cfg.Timeout < 0 {
17+
return fmt.Errorf("deliverer.timeout cannot be negative")
18+
}
19+
if err := cfg.ACL.Validate(); err != nil {
20+
return err
21+
}
22+
return nil
523
}
624

725
type Pool struct {
@@ -15,6 +33,40 @@ type WorkerConfig struct {
1533
Pool Pool `yaml:"pool" json:"pool"`
1634
}
1735

36+
type ACLConfig struct {
37+
Deny []string `yaml:"deny" json:"deny" default:"[\"@default\"]"`
38+
}
39+
40+
func (acl *ACLConfig) Validate() error {
41+
for _, rule := range acl.Deny {
42+
if err := validateRule(rule); err != nil {
43+
return err
44+
}
45+
}
46+
return nil
47+
}
48+
49+
func validateRule(rule string) error {
50+
groups := []string{"@default", "@private", "@loopback", "@linklocal", "@reserved"}
51+
if slices.Contains(groups, rule) {
52+
return nil
53+
}
54+
if _, err := netip.ParseAddr(rule); err == nil {
55+
return nil
56+
}
57+
if _, err := netip.ParsePrefix(rule); err == nil {
58+
return nil
59+
}
60+
r := regexp.MustCompile(`^(\*\.)?[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)+$`)
61+
if matched := r.MatchString(rule); matched {
62+
return nil
63+
}
64+
return fmt.Errorf("invalid rule '%s': requires IP, CIDR, hostname, or pre-configured name", rule)
65+
}
66+
1867
func (cfg *WorkerConfig) Validate() error {
68+
if err := cfg.Deliverer.Validate(); err != nil {
69+
return err
70+
}
1971
return nil
2072
}

db/entities/attempt.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const (
4343
AttemptErrorCodeTimeout AttemptErrorCode = "TIMEOUT"
4444
AttemptErrorCodeUnknown AttemptErrorCode = "UNKNOWN"
4545
AttemptErrorCodeEndpointDisabled AttemptErrorCode = "ENDPOINT_DISABLED"
46+
AttemptErrorCodeDenied AttemptErrorCode = "DENIED"
4647
AttemptErrorCodeEndpointNotFound AttemptErrorCode = "ENDPOINT_NOT_FOUND"
4748
)
4849

test/delivery/acl_test.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package delivery
2+
3+
import (
4+
"context"
5+
"github.com/go-resty/resty/v2"
6+
. "github.com/onsi/ginkgo/v2"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/webhookx-io/webhookx/app"
9+
"github.com/webhookx-io/webhookx/constants"
10+
"github.com/webhookx-io/webhookx/db"
11+
"github.com/webhookx-io/webhookx/db/entities"
12+
"github.com/webhookx-io/webhookx/db/query"
13+
"github.com/webhookx-io/webhookx/test/helper"
14+
"github.com/webhookx-io/webhookx/test/helper/factory"
15+
"github.com/webhookx-io/webhookx/utils"
16+
"github.com/webhookx-io/webhookx/worker/deliverer"
17+
"net/netip"
18+
"time"
19+
)
20+
21+
type ResolverFunc func(ctx context.Context, network, host string) ([]netip.Addr, error)
22+
23+
func (fn ResolverFunc) LookupNetIP(ctx context.Context, network, host string) ([]netip.Addr, error) {
24+
return fn(ctx, network, host)
25+
}
26+
27+
var _ = Describe("network acl", Ordered, func() {
28+
Context("acl", func() {
29+
var proxyClient *resty.Client
30+
31+
var app *app.Application
32+
var db *db.DB
33+
34+
entitiesConfig := helper.EntitiesConfig{
35+
Endpoints: []*entities.Endpoint{
36+
factory.EndpointP(func(o *entities.Endpoint) {
37+
o.Events = []string{"test1"}
38+
}),
39+
factory.EndpointP(func(o *entities.Endpoint) {
40+
o.Events = []string{"test2"}
41+
o.Request.URL = "http://www.example.com"
42+
}),
43+
factory.EndpointP(func(o *entities.Endpoint) {
44+
o.Events = []string{"test3"}
45+
o.Request.URL = "http://suspicious.webhookx.io"
46+
}),
47+
factory.EndpointP(func(o *entities.Endpoint) {
48+
o.Events = []string{"unicode-test"}
49+
o.Request.URL = "http://тест.foo.com"
50+
}),
51+
},
52+
Sources: []*entities.Source{factory.SourceP()},
53+
}
54+
55+
var resolver = deliverer.DefaultResolver
56+
57+
BeforeAll(func() {
58+
deliverer.DefaultResolver = ResolverFunc(func(ctx context.Context, network, host string) ([]netip.Addr, error) {
59+
if host == "suspicious.webhookx.io" {
60+
return []netip.Addr{netip.MustParseAddr("127.0.0.1")}, nil
61+
}
62+
return resolver.LookupNetIP(ctx, network, host)
63+
})
64+
65+
db = helper.InitDB(true, &entitiesConfig)
66+
proxyClient = helper.ProxyClient()
67+
68+
app = utils.Must(helper.Start(map[string]string{
69+
"WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081",
70+
"WEBHOOKX_WORKER_ENABLED": "true",
71+
"WEBHOOKX_WORKER_DELIVERER_ACL_DENY": "@default,*.example.com,xn--e1aybc.foo.com",
72+
}))
73+
74+
})
75+
76+
AfterAll(func() {
77+
deliverer.DefaultResolver = resolver
78+
app.Stop()
79+
})
80+
81+
It("request denied", func() {
82+
err := waitForServer("0.0.0.0:8081", time.Second)
83+
assert.NoError(GinkgoT(), err)
84+
85+
resp, err := proxyClient.R().
86+
SetBody(`{"event_type": "test1","data": {"key": "value"}}`).
87+
Post("/")
88+
assert.NoError(GinkgoT(), err)
89+
assert.Equal(GinkgoT(), 200, resp.StatusCode())
90+
eventId := resp.Header().Get(constants.HeaderEventId)
91+
92+
var attempt *entities.Attempt
93+
assert.Eventually(GinkgoT(), func() bool {
94+
q := query.AttemptQuery{}
95+
q.EventId = &eventId
96+
list, err := db.Attempts.List(context.TODO(), &q)
97+
if err != nil || len(list) == 0 {
98+
return false
99+
}
100+
attempt = list[0]
101+
return attempt.Status == entities.AttemptStatusFailure
102+
}, time.Second*5, time.Second)
103+
104+
// attempt.request
105+
assert.Equal(GinkgoT(), entities.AttemptErrorCodeDenied, *attempt.ErrorCode)
106+
assert.Equal(GinkgoT(), true, attempt.Exhausted)
107+
assert.Nil(GinkgoT(), attempt.Response)
108+
109+
detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID)
110+
assert.NoError(GinkgoT(), err)
111+
assert.NotNil(GinkgoT(), detail.RequestHeaders)
112+
assert.NotNil(GinkgoT(), detail.RequestBody)
113+
assert.Nil(GinkgoT(), detail.ResponseHeaders)
114+
assert.Nil(GinkgoT(), detail.ResponseBody)
115+
})
116+
117+
It("request denied by hostname", func() {
118+
err := waitForServer("0.0.0.0:8081", time.Second)
119+
assert.NoError(GinkgoT(), err)
120+
121+
resp, err := proxyClient.R().
122+
SetBody(`{"event_type": "test2","data": {"key": "value"}}`).
123+
Post("/")
124+
assert.NoError(GinkgoT(), err)
125+
assert.Equal(GinkgoT(), 200, resp.StatusCode())
126+
eventId := resp.Header().Get(constants.HeaderEventId)
127+
128+
var attempt *entities.Attempt
129+
assert.Eventually(GinkgoT(), func() bool {
130+
q := query.AttemptQuery{}
131+
q.EventId = &eventId
132+
list, err := db.Attempts.List(context.TODO(), &q)
133+
if err != nil || len(list) == 0 {
134+
return false
135+
}
136+
attempt = list[0]
137+
return attempt.Status == entities.AttemptStatusFailure
138+
}, time.Second*5, time.Second)
139+
140+
// attempt.request
141+
assert.Equal(GinkgoT(), entities.AttemptErrorCodeDenied, *attempt.ErrorCode)
142+
assert.Equal(GinkgoT(), true, attempt.Exhausted)
143+
assert.Nil(GinkgoT(), attempt.Response)
144+
})
145+
146+
It("request denied by unicode hostname", func() {
147+
err := waitForServer("0.0.0.0:8081", time.Second)
148+
assert.NoError(GinkgoT(), err)
149+
150+
resp, err := proxyClient.R().
151+
SetBody(`{"event_type": "unicode-test","data": {"key": "value"}}`).
152+
Post("/")
153+
assert.NoError(GinkgoT(), err)
154+
assert.Equal(GinkgoT(), 200, resp.StatusCode())
155+
eventId := resp.Header().Get(constants.HeaderEventId)
156+
157+
var attempt *entities.Attempt
158+
assert.Eventually(GinkgoT(), func() bool {
159+
q := query.AttemptQuery{}
160+
q.EventId = &eventId
161+
list, err := db.Attempts.List(context.TODO(), &q)
162+
if err != nil || len(list) == 0 {
163+
return false
164+
}
165+
attempt = list[0]
166+
return attempt.Status == entities.AttemptStatusFailure
167+
}, time.Second*5, time.Second)
168+
169+
// attempt.request
170+
assert.Equal(GinkgoT(), entities.AttemptErrorCodeDenied, *attempt.ErrorCode)
171+
assert.Equal(GinkgoT(), true, attempt.Exhausted)
172+
assert.Nil(GinkgoT(), attempt.Response)
173+
})
174+
175+
It("request denied by ip resolved by dns", func() {
176+
err := waitForServer("0.0.0.0:8081", time.Second)
177+
assert.NoError(GinkgoT(), err)
178+
179+
resp, err := proxyClient.R().
180+
SetBody(`{"event_type": "test3","data": {"key": "value"}}`).
181+
Post("/")
182+
assert.NoError(GinkgoT(), err)
183+
assert.Equal(GinkgoT(), 200, resp.StatusCode())
184+
eventId := resp.Header().Get(constants.HeaderEventId)
185+
186+
var attempt *entities.Attempt
187+
assert.Eventually(GinkgoT(), func() bool {
188+
q := query.AttemptQuery{}
189+
q.EventId = &eventId
190+
list, err := db.Attempts.List(context.TODO(), &q)
191+
if err != nil || len(list) == 0 {
192+
return false
193+
}
194+
attempt = list[0]
195+
return attempt.Status == entities.AttemptStatusFailure
196+
}, time.Second*5, time.Second)
197+
198+
// attempt.request
199+
assert.Equal(GinkgoT(), entities.AttemptErrorCodeDenied, *attempt.ErrorCode)
200+
assert.Equal(GinkgoT(), true, attempt.Exhausted)
201+
assert.Nil(GinkgoT(), attempt.Response)
202+
})
203+
})
204+
})

0 commit comments

Comments
 (0)