Skip to content

Commit 266017a

Browse files
committed
Make prepare data plugins execution have a total timeout
1 parent 471c37e commit 266017a

File tree

3 files changed

+218
-37
lines changed

3 files changed

+218
-37
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ import (
4343
)
4444

4545
const (
46-
prepareDataTimeout = 200 * time.Millisecond
47-
prepareDataMaxRetries = 3
46+
// TODO: Make these configurable per plugin via config.
47+
prepareDataTimeout = 400 * time.Millisecond
4848
)
4949

5050
// Datastore defines the interface required by the Director.
@@ -353,46 +353,13 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
353353
}
354354
}
355355

356-
// prepareDataWithRetriesAndTimeout executes the PrepareRequestData plugins with retries and timeout.
357-
func prepareDataWithRetriesAndTimeout(plugin PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
358-
currentTimeout := prepareDataTimeout
359-
for i := 0; i <= prepareDataMaxRetries; i++ {
360-
errCh := make(chan error, 1)
361-
go func() {
362-
errCh <- plugin.PrepareRequestData(ctx, request, pods)
363-
}()
364-
365-
select {
366-
case <-ctx.Done():
367-
return ctx.Err()
368-
case err := <-errCh:
369-
if err != nil {
370-
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin failed, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "error", err)
371-
continue
372-
}
373-
return nil // Success
374-
case <-time.After(currentTimeout):
375-
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin timed out, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "timeout", currentTimeout)
376-
if i == prepareDataMaxRetries {
377-
return fmt.Errorf("PrepareData plugin %s failed after %d retries", plugin.TypedName().String(), prepareDataMaxRetries)
378-
}
379-
}
380-
}
381-
return nil
382-
}
383-
384356
// TODO: Execute plugins in parallel once DAG execution is supported.
385357
// runPrepareDataPlugins executes PrepareDataPlugins sequentially.
386358
func (d *Director) runPrepareDataPlugins(ctx context.Context,
387359
request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
388-
for _, plugin := range d.requestControlPlugins.prepareDataPlugins {
389-
err := prepareDataWithRetriesAndTimeout(plugin, ctx, request, pods)
390-
if err != nil {
391-
return err
392-
}
393-
}
360+
return prepareDataPluginsWithTimeout(
361+
prepareDataTimeout, d.requestControlPlugins.prepareDataPlugins, ctx, request, pods)
394362

395-
return nil
396363
}
397364

398365
func (d *Director) runAdmissionPlugins(ctx context.Context,
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package requestcontrol
18+
19+
import (
20+
"context"
21+
"errors"
22+
"time"
23+
24+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
25+
)
26+
27+
// prepareDataPluginsWithTimeout executes the PrepareRequestData plugins with retries and timeout.
28+
func prepareDataPluginsWithTimeout(timeout time.Duration, plugins []PrepareDataPlugin,
29+
ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
30+
errCh := make(chan error, 1)
31+
// Execute plugins sequentially in a separate goroutine
32+
go func() {
33+
for _, plugin := range plugins {
34+
err := plugin.PrepareRequestData(ctx, request, pods)
35+
if err != nil {
36+
errCh <- errors.New("prepare data plugin " + plugin.TypedName().String() + " failed: " + err.Error())
37+
return
38+
}
39+
}
40+
errCh <- nil
41+
}()
42+
for {
43+
select {
44+
case <-ctx.Done():
45+
return ctx.Err()
46+
case err := <-errCh:
47+
if err != nil {
48+
return err
49+
}
50+
return nil // Success
51+
case <-time.After(timeout):
52+
return errors.New("prepare data plugin timed out")
53+
}
54+
}
55+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package requestcontrol
18+
19+
import (
20+
"context"
21+
"errors"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/assert"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
27+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
28+
)
29+
30+
var _ PrepareDataPlugin = &mockPrepareRequestDataPlugin{}
31+
32+
type mockPrepareRequestDataPlugin struct {
33+
name string
34+
delay time.Duration
35+
returnErr error
36+
executed bool
37+
}
38+
39+
func (m *mockPrepareRequestDataPlugin) TypedName() plugins.TypedName {
40+
return plugins.TypedName{Type: "mock", Name: m.name}
41+
}
42+
43+
func (m *mockPrepareRequestDataPlugin) PrepareRequestData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
44+
m.executed = true
45+
if m.delay > 0 {
46+
select {
47+
case <-time.After(m.delay):
48+
case <-ctx.Done():
49+
return ctx.Err()
50+
}
51+
}
52+
return m.returnErr
53+
}
54+
55+
func (m *mockPrepareRequestDataPlugin) Produces() map[string]any {
56+
return nil
57+
}
58+
59+
func (m *mockPrepareRequestDataPlugin) Consumes() map[string]any {
60+
return nil
61+
}
62+
63+
func TestPrepareDataPluginsWithTimeout(t *testing.T) {
64+
testCases := []struct {
65+
name string
66+
timeout time.Duration
67+
plugins []PrepareDataPlugin
68+
ctxFn func() (context.Context, context.CancelFunc)
69+
expectErrStr string
70+
checkPlugins func(t *testing.T, plugins []PrepareDataPlugin)
71+
expectSuccess bool
72+
}{
73+
{
74+
name: "success with one plugin",
75+
timeout: 100 * time.Millisecond,
76+
plugins: []PrepareDataPlugin{
77+
&mockPrepareRequestDataPlugin{name: "p1"},
78+
},
79+
ctxFn: func() (context.Context, context.CancelFunc) {
80+
return context.Background(), func() {}
81+
},
82+
expectSuccess: true,
83+
checkPlugins: func(t *testing.T, plugins []PrepareDataPlugin) {
84+
assert.True(t, plugins[0].(*mockPrepareRequestDataPlugin).executed)
85+
},
86+
},
87+
{
88+
name: "plugin returns error",
89+
timeout: 100 * time.Millisecond,
90+
plugins: []PrepareDataPlugin{
91+
&mockPrepareRequestDataPlugin{name: "p1", returnErr: errors.New("plugin failed")},
92+
},
93+
ctxFn: func() (context.Context, context.CancelFunc) {
94+
return context.Background(), func() {}
95+
},
96+
expectErrStr: "prepare data plugin p1/mock failed: plugin failed",
97+
},
98+
{
99+
name: "plugins time out",
100+
timeout: 50 * time.Millisecond,
101+
plugins: []PrepareDataPlugin{
102+
&mockPrepareRequestDataPlugin{name: "p1", delay: 100 * time.Millisecond},
103+
},
104+
ctxFn: func() (context.Context, context.CancelFunc) {
105+
return context.Background(), func() {}
106+
},
107+
expectErrStr: "prepare data plugin timed out",
108+
},
109+
{
110+
name: "context cancelled",
111+
timeout: 200 * time.Millisecond,
112+
plugins: []PrepareDataPlugin{
113+
&mockPrepareRequestDataPlugin{name: "p1", delay: 100 * time.Millisecond},
114+
},
115+
ctxFn: func() (context.Context, context.CancelFunc) {
116+
ctx, cancel := context.WithCancel(context.Background())
117+
time.AfterFunc(50*time.Millisecond, cancel)
118+
return ctx, cancel
119+
},
120+
expectErrStr: "context canceled",
121+
},
122+
{
123+
name: "multiple plugins success",
124+
timeout: 100 * time.Millisecond,
125+
plugins: []PrepareDataPlugin{
126+
&mockPrepareRequestDataPlugin{name: "p1"},
127+
&mockPrepareRequestDataPlugin{name: "p2"},
128+
},
129+
ctxFn: func() (context.Context, context.CancelFunc) {
130+
return context.Background(), func() {}
131+
},
132+
expectSuccess: true,
133+
checkPlugins: func(t *testing.T, plugins []PrepareDataPlugin) {
134+
assert.True(t, plugins[0].(*mockPrepareRequestDataPlugin).executed)
135+
assert.True(t, plugins[1].(*mockPrepareRequestDataPlugin).executed)
136+
},
137+
},
138+
}
139+
140+
for _, tc := range testCases {
141+
t.Run(tc.name, func(t *testing.T) {
142+
ctx, cancel := tc.ctxFn()
143+
defer cancel()
144+
145+
err := prepareDataPluginsWithTimeout(tc.timeout, tc.plugins, ctx, &schedulingtypes.LLMRequest{}, nil)
146+
147+
if tc.expectSuccess {
148+
assert.NoError(t, err)
149+
} else {
150+
assert.Error(t, err)
151+
assert.Contains(t, err.Error(), tc.expectErrStr)
152+
}
153+
154+
if tc.checkPlugins != nil {
155+
tc.checkPlugins(t, tc.plugins)
156+
}
157+
})
158+
}
159+
}

0 commit comments

Comments
 (0)