Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a03a5bd
feat: add failing test stubs for events package
archandatta Mar 20, 2026
42f415f
feat: add BrowserEvent struct, CategoryFor, and truncateIfNeeded
archandatta Mar 20, 2026
fa67dff
feat: add RingBuffer with closed-channel broadcast fan-out
archandatta Mar 20, 2026
115c720
feat: add FileWriter per-category JSONL appender
archandatta Mar 20, 2026
f07e40d
feat: add Pipeline glue type sequencing truncation, file write, and r…
archandatta Mar 20, 2026
18fdb6d
review: fix truncateIfNeeded branch split, atomic.Pointer[string], Re…
archandatta Mar 27, 2026
997edb4
review: remove dead RingBuffer count field, fix FileWriter mutex doc,…
archandatta Mar 27, 2026
e5153da
chore: clean up maxS2RecordBytes comment
archandatta Mar 27, 2026
1644fe7
fix: serialise Pipeline.Publish to guarantee monotonic seq delivery o…
archandatta Mar 27, 2026
36cff2d
review
archandatta Mar 30, 2026
339d7d3
refactor: rename BrowserEvent to Event, DetailDefault to DetailStandard
archandatta Mar 31, 2026
b370416
refactor: restructure Source as nested object with Kind, Event, Metadata
archandatta Mar 31, 2026
41a7aee
refactor: extract Envelope wrapper, move seq and capture_session_id o…
archandatta Mar 31, 2026
9f4c808
refactor: unify seq as universal cursor, add NewReader(afterSeq)
archandatta Mar 31, 2026
6c82459
refactor: return ReadResult instead of synthetic drop events
archandatta Mar 31, 2026
6506ed7
test: add NewReader resume tests for mid-stream, at-latest, and evict…
archandatta Mar 31, 2026
8ecd492
review: fmt
archandatta Apr 1, 2026
e572e7b
fix: guard against nil marshal data and oversized non-data envelopes
archandatta Apr 1, 2026
2719a32
feat: add CategoryFor helper to derive event category from type string
archandatta Apr 1, 2026
46884b8
feat: add cdpmonitor stub with start/stop lifecycle
archandatta Apr 1, 2026
1e544a7
feat: wire events pipeline and CDP monitor into API service
archandatta Apr 1, 2026
ebf9374
feat: add CDP protocol message types and internal state structs
archandatta Apr 1, 2026
16edb0d
feat: implement CDP monitor with websocket capture, event handlers, a…
archandatta Apr 1, 2026
8fe30d0
test: add CDP monitor test suite with in-process websocket mock
archandatta Apr 1, 2026
36b1484
review: create util.go for helper funcs
archandatta Apr 2, 2026
306bc0e
review
archandatta Apr 2, 2026
a775160
review: update test
archandatta Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion server/cmd/api/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"sync"
"time"

"github.com/onkernel/kernel-images/server/lib/cdpmonitor"
"github.com/onkernel/kernel-images/server/lib/devtoolsproxy"
"github.com/onkernel/kernel-images/server/lib/events"
"github.com/onkernel/kernel-images/server/lib/logger"
"github.com/onkernel/kernel-images/server/lib/nekoclient"
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
Expand Down Expand Up @@ -68,11 +70,24 @@ type ApiService struct {
// xvfbResizeMu serializes background Xvfb restarts to prevent races
// when multiple CDP fast-path resizes fire in quick succession.
xvfbResizeMu sync.Mutex

// CDP event pipeline and cdpMonitor.
eventsPipeline *events.Pipeline
cdpMonitor *cdpmonitor.Monitor
monitorMu sync.Mutex
}

var _ oapi.StrictServerInterface = (*ApiService)(nil)

func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFactory, upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient) (*ApiService, error) {
func New(
recordManager recorder.RecordManager,
factory recorder.FFmpegRecorderFactory,
upstreamMgr *devtoolsproxy.UpstreamManager,
stz scaletozero.Controller,
nekoAuthClient *nekoclient.AuthClient,
eventsPipeline *events.Pipeline,
displayNum int,
) (*ApiService, error) {
switch {
case recordManager == nil:
return nil, fmt.Errorf("recordManager cannot be nil")
Expand All @@ -82,8 +97,12 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa
return nil, fmt.Errorf("upstreamMgr cannot be nil")
case nekoAuthClient == nil:
return nil, fmt.Errorf("nekoAuthClient cannot be nil")
case eventsPipeline == nil:
return nil, fmt.Errorf("eventsPipeline cannot be nil")
}

mon := cdpmonitor.New(upstreamMgr, eventsPipeline.Publish, displayNum)

return &ApiService{
recordManager: recordManager,
factory: factory,
Expand All @@ -94,6 +113,8 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa
stz: stz,
nekoAuthClient: nekoAuthClient,
policy: &policy.Policy{},
eventsPipeline: eventsPipeline,
cdpMonitor: mon,
}, nil
}

Expand Down Expand Up @@ -313,5 +334,9 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ
}

func (s *ApiService) Shutdown(ctx context.Context) error {
s.monitorMu.Lock()
s.cdpMonitor.Stop()
_ = s.eventsPipeline.Close()
s.monitorMu.Unlock()
return s.recordManager.StopAll(ctx)
}
29 changes: 18 additions & 11 deletions server/cmd/api/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"log/slog"

"github.com/onkernel/kernel-images/server/lib/devtoolsproxy"
"github.com/onkernel/kernel-images/server/lib/events"
"github.com/onkernel/kernel-images/server/lib/nekoclient"
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
"github.com/onkernel/kernel-images/server/lib/recorder"
Expand All @@ -25,7 +26,7 @@ func TestApiService_StartRecording(t *testing.T) {

t.Run("success", func(t *testing.T) {
mgr := recorder.NewFFmpegManager()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)

resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{})
Expand All @@ -39,7 +40,7 @@ func TestApiService_StartRecording(t *testing.T) {

t.Run("already recording", func(t *testing.T) {
mgr := recorder.NewFFmpegManager()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)

// First start should succeed
Expand All @@ -54,7 +55,7 @@ func TestApiService_StartRecording(t *testing.T) {

t.Run("custom ids don't collide", func(t *testing.T) {
mgr := recorder.NewFFmpegManager()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestApiService_StopRecording(t *testing.T) {

t.Run("no active recording", func(t *testing.T) {
mgr := recorder.NewFFmpegManager()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)

resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{})
Expand All @@ -100,7 +101,7 @@ func TestApiService_StopRecording(t *testing.T) {
rec := &mockRecorder{id: "default", isRecordingFlag: true}
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")

svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)
resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{})
require.NoError(t, err)
Expand All @@ -115,7 +116,7 @@ func TestApiService_StopRecording(t *testing.T) {

force := true
req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}}
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)
resp, err := svc.StopRecording(ctx, req)
require.NoError(t, err)
Expand All @@ -129,7 +130,7 @@ func TestApiService_DownloadRecording(t *testing.T) {

t.Run("not found", func(t *testing.T) {
mgr := recorder.NewFFmpegManager()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
require.NoError(t, err)
Expand All @@ -149,7 +150,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)}
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")

svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)
// will return a 202 when the recording is too small
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
rec := &mockRecorder{id: "default", recordingData: data}
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")

svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
require.NoError(t, err)
Expand All @@ -199,7 +200,7 @@ func TestApiService_Shutdown(t *testing.T) {
rec := &mockRecorder{id: "default", isRecordingFlag: true}
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")

svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)

require.NoError(t, svc.Shutdown(ctx))
Expand Down Expand Up @@ -303,10 +304,16 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient {
return client
}

func newEventsPipeline() *events.Pipeline {
ring := events.NewRingBuffer(64)
fw := events.NewFileWriter(os.TempDir())
return events.NewPipeline(ring, fw)
}

func TestApiService_PatchChromiumFlags(t *testing.T) {
ctx := context.Background()
mgr := recorder.NewFFmpegManager()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)

// Test with valid flags
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/display_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact

func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService {
t.Helper()
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
require.NoError(t, err)
return svc
}
Expand Down
34 changes: 34 additions & 0 deletions server/cmd/api/api/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package api

import (
"net/http"

"github.com/google/uuid"
"github.com/onkernel/kernel-images/server/lib/logger"
)

// StartCapture handles POST /events/start.
// Generates a new capture session ID, seeds the pipeline, then starts the
// CDP monitor. If already running, the monitor is stopped and
// restarted with a fresh session ID
func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) {
s.monitorMu.Lock()
defer s.monitorMu.Unlock()

s.eventsPipeline.Start(uuid.New().String())

if err := s.cdpMonitor.Start(r.Context()); err != nil {
logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err)
http.Error(w, "failed to start capture", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}

// StopCapture handles POST /events/stop
func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) {
s.monitorMu.Lock()
defer s.monitorMu.Unlock()
s.cdpMonitor.Stop()
w.WriteHeader(http.StatusOK)
}
12 changes: 12 additions & 0 deletions server/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/onkernel/kernel-images/server/cmd/config"
"github.com/onkernel/kernel-images/server/lib/chromedriverproxy"
"github.com/onkernel/kernel-images/server/lib/devtoolsproxy"
"github.com/onkernel/kernel-images/server/lib/events"
"github.com/onkernel/kernel-images/server/lib/logger"
"github.com/onkernel/kernel-images/server/lib/nekoclient"
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
Expand Down Expand Up @@ -90,12 +91,19 @@ func main() {
os.Exit(1)
}

// Construct events pipeline
eventsRing := events.NewRingBuffer(1024)
eventsFileWriter := events.NewFileWriter("/var/log")
eventsPipeline := events.NewPipeline(eventsRing, eventsFileWriter)

apiService, err := api.New(
recorder.NewFFmpegManager(),
recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz),
upstreamMgr,
stz,
nekoAuthClient,
eventsPipeline,
config.DisplayNum,
)
if err != nil {
slogger.Error("failed to create api service", "err", err)
Expand All @@ -120,6 +128,10 @@ func main() {
w.Header().Set("Content-Type", "application/json")
w.Write(jsonData)
})
// capture events
r.Post("/events/start", apiService.StartCapture)
r.Post("/events/stop", apiService.StopCapture)

// PTY attach endpoint (WebSocket) - not part of OpenAPI spec
// Uses WebSocket for bidirectional streaming, which works well through proxies.
r.Get("/process/{process_id}/attach", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading