From 76d837fbefd0721a96a123787f96d26ca64f6752 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Thu, 2 Apr 2026 17:44:01 +0000 Subject: [PATCH 1/3] feat: add CDP pipeline, cdpmonitor, and wire into API service --- server/cmd/api/api/api.go | 27 +++++++++++- server/cmd/api/api/api_test.go | 29 ++++++++----- server/cmd/api/api/display_test.go | 2 +- server/cmd/api/api/events.go | 34 +++++++++++++++ server/cmd/api/main.go | 12 +++++ server/lib/cdpmonitor/monitor.go | 41 +++++++++++++++++ server/lib/events/event.go | 23 +++++++++- server/lib/events/events_test.go | 31 +++++++------ server/lib/events/pipeline.go | 70 ++++++++++++++++++++++++++++++ server/lib/events/ringbuffer.go | 6 ++- 10 files changed, 244 insertions(+), 31 deletions(-) create mode 100644 server/cmd/api/api/events.go create mode 100644 server/lib/cdpmonitor/monitor.go create mode 100644 server/lib/events/pipeline.go diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 122ae4d3..936b1b60 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -9,7 +9,9 @@ import ( "sync" "time" + "github.com/onkernel/kernel-images/server/lib/cdpmonitor" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/logger" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" @@ -68,11 +70,24 @@ type ApiService struct { // xvfbResizeMu serializes background Xvfb restarts to prevent races // when multiple CDP fast-path resizes fire in quick succession. xvfbResizeMu sync.Mutex + + // CDP event pipeline and cdpMonitor. + eventsPipeline *events.Pipeline + cdpMonitor *cdpmonitor.Monitor + monitorMu sync.Mutex } var _ oapi.StrictServerInterface = (*ApiService)(nil) -func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFactory, upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient) (*ApiService, error) { +func New( + recordManager recorder.RecordManager, + factory recorder.FFmpegRecorderFactory, + upstreamMgr *devtoolsproxy.UpstreamManager, + stz scaletozero.Controller, + nekoAuthClient *nekoclient.AuthClient, + eventsPipeline *events.Pipeline, + displayNum int, +) (*ApiService, error) { switch { case recordManager == nil: return nil, fmt.Errorf("recordManager cannot be nil") @@ -82,8 +97,12 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa return nil, fmt.Errorf("upstreamMgr cannot be nil") case nekoAuthClient == nil: return nil, fmt.Errorf("nekoAuthClient cannot be nil") + case eventsPipeline == nil: + return nil, fmt.Errorf("eventsPipeline cannot be nil") } + mon := cdpmonitor.New(upstreamMgr, eventsPipeline.Publish, displayNum) + return &ApiService{ recordManager: recordManager, factory: factory, @@ -94,6 +113,8 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa stz: stz, nekoAuthClient: nekoAuthClient, policy: &policy.Policy{}, + eventsPipeline: eventsPipeline, + cdpMonitor: mon, }, nil } @@ -313,5 +334,9 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ } func (s *ApiService) Shutdown(ctx context.Context) error { + s.monitorMu.Lock() + s.cdpMonitor.Stop() + _ = s.eventsPipeline.Close() + s.monitorMu.Unlock() return s.recordManager.StopAll(ctx) } diff --git a/server/cmd/api/api/api_test.go b/server/cmd/api/api/api_test.go index dc192e30..7c47f08f 100644 --- a/server/cmd/api/api/api_test.go +++ b/server/cmd/api/api/api_test.go @@ -12,6 +12,7 @@ import ( "log/slog" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" @@ -25,7 +26,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("success", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{}) @@ -39,7 +40,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("already recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) // First start should succeed @@ -54,7 +55,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("custom ids don't collide", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) for i := 0; i < 5; i++ { @@ -87,7 +88,7 @@ func TestApiService_StopRecording(t *testing.T) { t.Run("no active recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) @@ -100,7 +101,7 @@ func TestApiService_StopRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) require.NoError(t, err) @@ -115,7 +116,7 @@ func TestApiService_StopRecording(t *testing.T) { force := true req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}} - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, req) require.NoError(t, err) @@ -129,7 +130,7 @@ func TestApiService_DownloadRecording(t *testing.T) { t.Run("not found", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -149,7 +150,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) // will return a 202 when the recording is too small resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) @@ -179,7 +180,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", recordingData: data} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -199,7 +200,7 @@ func TestApiService_Shutdown(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) require.NoError(t, svc.Shutdown(ctx)) @@ -303,10 +304,16 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient { return client } +func newEventsPipeline() *events.Pipeline { + ring := events.NewRingBuffer(64) + fw := events.NewFileWriter(os.TempDir()) + return events.NewPipeline(ring, fw) +} + func TestApiService_PatchChromiumFlags(t *testing.T) { ctx := context.Background() mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) // Test with valid flags diff --git a/server/cmd/api/api/display_test.go b/server/cmd/api/api/display_test.go index acbe74c2..b63addaa 100644 --- a/server/cmd/api/api/display_test.go +++ b/server/cmd/api/api/display_test.go @@ -34,7 +34,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService { t.Helper() - svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) return svc } diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go new file mode 100644 index 00000000..6ab875b4 --- /dev/null +++ b/server/cmd/api/api/events.go @@ -0,0 +1,34 @@ +package api + +import ( + "net/http" + + "github.com/google/uuid" + "github.com/onkernel/kernel-images/server/lib/logger" +) + +// StartCapture handles POST /events/start. +// Generates a new capture session ID, seeds the pipeline, then starts the +// CDP monitor. If already running, the monitor is stopped and +// restarted with a fresh session ID +func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { + s.monitorMu.Lock() + defer s.monitorMu.Unlock() + + s.eventsPipeline.Start(uuid.New().String()) + + if err := s.cdpMonitor.Start(r.Context()); err != nil { + logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err) + http.Error(w, "failed to start capture", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) +} + +// StopCapture handles POST /events/stop +func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) { + s.monitorMu.Lock() + defer s.monitorMu.Unlock() + s.cdpMonitor.Stop() + w.WriteHeader(http.StatusOK) +} diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index c80ddd27..f277d4ce 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -24,6 +24,7 @@ import ( "github.com/onkernel/kernel-images/server/cmd/config" "github.com/onkernel/kernel-images/server/lib/chromedriverproxy" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/logger" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" @@ -90,12 +91,19 @@ func main() { os.Exit(1) } + // Construct events pipeline + eventsRing := events.NewRingBuffer(1024) + eventsFileWriter := events.NewFileWriter("/var/log") + eventsPipeline := events.NewPipeline(eventsRing, eventsFileWriter) + apiService, err := api.New( recorder.NewFFmpegManager(), recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz), upstreamMgr, stz, nekoAuthClient, + eventsPipeline, + config.DisplayNum, ) if err != nil { slogger.Error("failed to create api service", "err", err) @@ -120,6 +128,10 @@ func main() { w.Header().Set("Content-Type", "application/json") w.Write(jsonData) }) + // capture events + r.Post("/events/start", apiService.StartCapture) + r.Post("/events/stop", apiService.StopCapture) + // PTY attach endpoint (WebSocket) - not part of OpenAPI spec // Uses WebSocket for bidirectional streaming, which works well through proxies. r.Get("/process/{process_id}/attach", func(w http.ResponseWriter, r *http.Request) { diff --git a/server/lib/cdpmonitor/monitor.go b/server/lib/cdpmonitor/monitor.go new file mode 100644 index 00000000..737f9650 --- /dev/null +++ b/server/lib/cdpmonitor/monitor.go @@ -0,0 +1,41 @@ +package cdpmonitor + +import ( + "context" + "sync/atomic" + + "github.com/onkernel/kernel-images/server/lib/events" +) + +// UpstreamProvider abstracts *devtoolsproxy.UpstreamManager for testability. +type UpstreamProvider interface { + Current() string + Subscribe() (<-chan string, func()) +} + +// PublishFunc publishes an Event to the pipeline. +type PublishFunc func(ev events.Event) + +// Monitor manages a CDP WebSocket connection with auto-attach session fan-out. +// Single-use per capture session: call Start to begin, Stop to tear down. +type Monitor struct { + running atomic.Bool +} + +// New creates a Monitor. displayNum is the X display for ffmpeg screenshots. +func New(_ UpstreamProvider, _ PublishFunc, _ int) *Monitor { + return &Monitor{} +} + +// IsRunning reports whether the monitor is actively capturing. +func (m *Monitor) IsRunning() bool { + return m.running.Load() +} + +// Start begins CDP capture. Restarts if already running. +func (m *Monitor) Start(_ context.Context) error { + return nil +} + +// Stop tears down the monitor. Safe to call multiple times. +func (m *Monitor) Stop() {} diff --git a/server/lib/events/event.go b/server/lib/events/event.go index 4db821d4..185d6809 100644 --- a/server/lib/events/event.go +++ b/server/lib/events/event.go @@ -3,6 +3,7 @@ package events import ( "encoding/json" "log/slog" + "strings" ) // maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB). @@ -50,7 +51,7 @@ const ( // Event is the portable event schema. It contains only producer-emitted content; // pipeline metadata (seq, capture session) lives on the Envelope. type Event struct { - Ts int64 `json:"ts"` // Unix microseconds (µs since epoch) + Ts int64 `json:"ts"` Type string `json:"type"` Category EventCategory `json:"category"` Source Source `json:"source"` @@ -67,6 +68,26 @@ type Envelope struct { Event Event `json:"event"` } +// CategoryFor derives an EventCategory from an event type string. +// It splits on the first underscore and maps the prefix to a category. +func CategoryFor(eventType string) EventCategory { + prefix, _, _ := strings.Cut(eventType, "_") + switch prefix { + case "console": + return CategoryConsole + case "network": + return CategoryNetwork + case "page", "navigation", "dom", "target": + return CategoryPage + case "interaction", "layout", "scroll": + return CategoryInteraction + case "screenshot", "monitor": + return CategorySystem + default: + return CategorySystem + } +} + // truncateIfNeeded marshals env and returns the (possibly truncated) envelope. // If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge // url or source.metadata), it is returned as-is — callers must handle nil data. diff --git a/server/lib/events/events_test.go b/server/lib/events/events_test.go index 9325c6ea..b9da4634 100644 --- a/server/lib/events/events_test.go +++ b/server/lib/events/events_test.go @@ -442,13 +442,13 @@ func TestFileWriter(t *testing.T) { }) } -func TestCaptureSession(t *testing.T) { - newSession := func(t *testing.T) (*CaptureSession, string) { +func TestPipeline(t *testing.T) { + newPipeline := func(t *testing.T) (*Pipeline, string) { t.Helper() dir := t.TempDir() rb := NewRingBuffer(100) fw := NewFileWriter(dir) - p := NewCaptureSession("", rb, fw) + p := NewPipeline(rb, fw) t.Cleanup(func() { p.Close() }) return p, dir } @@ -460,7 +460,7 @@ func TestCaptureSession(t *testing.T) { rb := NewRingBuffer(total) fw := NewFileWriter(t.TempDir()) - p := NewCaptureSession("", rb, fw) + p := NewPipeline(rb, fw) t.Cleanup(func() { p.Close() }) reader := p.NewReader(0) @@ -486,7 +486,7 @@ func TestCaptureSession(t *testing.T) { }) t.Run("publish_increments_seq", func(t *testing.T) { - p, _ := newSession(t) + p, _ := newPipeline(t) reader := p.NewReader(0) for i := 0; i < 3; i++ { @@ -503,12 +503,12 @@ func TestCaptureSession(t *testing.T) { }) t.Run("publish_sets_ts", func(t *testing.T) { - p, _ := newSession(t) + p, _ := newPipeline(t) reader := p.NewReader(0) - before := time.Now().UnixMicro() + before := time.Now().UnixMilli() p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}}) - after := time.Now().UnixMicro() + after := time.Now().UnixMilli() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -519,7 +519,7 @@ func TestCaptureSession(t *testing.T) { }) t.Run("publish_writes_file", func(t *testing.T) { - p, dir := newSession(t) + p, dir := newPipeline(t) p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) @@ -533,7 +533,7 @@ func TestCaptureSession(t *testing.T) { }) t.Run("publish_writes_ring", func(t *testing.T) { - p, _ := newSession(t) + p, _ := newPipeline(t) reader := p.NewReader(0) p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) @@ -546,10 +546,9 @@ func TestCaptureSession(t *testing.T) { assert.Equal(t, CategoryPage, env.Event.Category) }) - t.Run("constructor_sets_capture_session_id", func(t *testing.T) { - dir := t.TempDir() - p := NewCaptureSession("test-uuid", NewRingBuffer(100), NewFileWriter(dir)) - t.Cleanup(func() { p.Close() }) + t.Run("start_sets_capture_session_id", func(t *testing.T) { + p, _ := newPipeline(t) + p.Start("test-uuid") reader := p.NewReader(0) p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) @@ -562,7 +561,7 @@ func TestCaptureSession(t *testing.T) { }) t.Run("truncation_applied", func(t *testing.T) { - p, dir := newSession(t) + p, dir := newPipeline(t) reader := p.NewReader(0) largeData := strings.Repeat("x", 1_100_000) @@ -596,7 +595,7 @@ func TestCaptureSession(t *testing.T) { }) t.Run("defaults_detail_level", func(t *testing.T) { - p, _ := newSession(t) + p, _ := newPipeline(t) reader := p.NewReader(0) p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go new file mode 100644 index 00000000..e69c254f --- /dev/null +++ b/server/lib/events/pipeline.go @@ -0,0 +1,70 @@ +package events + +import ( + "log/slog" + "sync" + "sync/atomic" + "time" +) + +// Pipeline is a single-use write path that wraps events in envelopes and fans +// them out to a FileWriter (durable) and RingBuffer (in-memory). Call Start +// once with a capture session ID, then Publish concurrently. Close flushes the +// FileWriter; there is no restart or terminal event. +type Pipeline struct { + mu sync.Mutex + ring *RingBuffer + files *FileWriter + seq atomic.Uint64 + captureSessionID atomic.Pointer[string] +} + +func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { + p := &Pipeline{ring: ring, files: files} + empty := "" + p.captureSessionID.Store(&empty) + return p +} + +// Start sets the capture session ID stamped on every subsequent envelope. +func (p *Pipeline) Start(captureSessionID string) { + p.captureSessionID.Store(&captureSessionID) +} + +// Publish wraps ev in an Envelope, truncates if needed, then writes to +// FileWriter (durable) before RingBuffer (in-memory fan-out). +func (p *Pipeline) Publish(ev Event) { + p.mu.Lock() + defer p.mu.Unlock() + + if ev.Ts == 0 { + ev.Ts = time.Now().UnixMilli() + } + if ev.DetailLevel == "" { + ev.DetailLevel = DetailStandard + } + + env := Envelope{ + CaptureSessionID: *p.captureSessionID.Load(), + Seq: p.seq.Add(1), + Event: ev, + } + env, data := truncateIfNeeded(env) + + if data == nil { + slog.Error("pipeline: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category) + } else if err := p.files.Write(env, data); err != nil { + slog.Error("pipeline: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) + } + p.ring.Publish(env) +} + +// NewReader returns a Reader positioned at the start of the ring buffer. +func (p *Pipeline) NewReader(afterSeq uint64) *Reader { + return p.ring.NewReader(afterSeq) +} + +// Close flushes and releases all open file descriptors. +func (p *Pipeline) Close() error { + return p.files.Close() +} diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index d30a680c..41659e94 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -44,7 +44,11 @@ func (rb *RingBuffer) oldestSeq() uint64 { // NewReader returns a Reader. afterSeq == 0 starts from the oldest available // envelope; afterSeq > 0 resumes after that seq. func (rb *RingBuffer) NewReader(afterSeq uint64) *Reader { - return &Reader{rb: rb, nextSeq: afterSeq + 1} + nextSeq := afterSeq + 1 + if afterSeq == 0 { + nextSeq = 1 + } + return &Reader{rb: rb, nextSeq: nextSeq} } // ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is From 5c0f7ae12e7ab641b43b627d511c939d6bdce5bd Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Thu, 2 Apr 2026 18:23:31 +0000 Subject: [PATCH 2/3] refactor: rename Pipeline to CaptureSession and delete pipeline.go --- server/cmd/api/api/api.go | 14 +++--- server/cmd/api/api/api_test.go | 26 +++++------ server/cmd/api/api/display_test.go | 2 +- server/cmd/api/api/events.go | 2 +- server/cmd/api/main.go | 4 +- server/lib/events/capturesession.go | 29 +++++++----- server/lib/events/events_test.go | 22 ++++----- server/lib/events/pipeline.go | 70 ----------------------------- 8 files changed, 54 insertions(+), 115 deletions(-) delete mode 100644 server/lib/events/pipeline.go diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 936b1b60..3523904d 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -72,7 +72,7 @@ type ApiService struct { xvfbResizeMu sync.Mutex // CDP event pipeline and cdpMonitor. - eventsPipeline *events.Pipeline + captureSession *events.CaptureSession cdpMonitor *cdpmonitor.Monitor monitorMu sync.Mutex } @@ -85,7 +85,7 @@ func New( upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient, - eventsPipeline *events.Pipeline, + captureSession *events.CaptureSession, displayNum int, ) (*ApiService, error) { switch { @@ -97,11 +97,11 @@ func New( return nil, fmt.Errorf("upstreamMgr cannot be nil") case nekoAuthClient == nil: return nil, fmt.Errorf("nekoAuthClient cannot be nil") - case eventsPipeline == nil: - return nil, fmt.Errorf("eventsPipeline cannot be nil") + case captureSession == nil: + return nil, fmt.Errorf("captureSession cannot be nil") } - mon := cdpmonitor.New(upstreamMgr, eventsPipeline.Publish, displayNum) + mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum) return &ApiService{ recordManager: recordManager, @@ -113,7 +113,7 @@ func New( stz: stz, nekoAuthClient: nekoAuthClient, policy: &policy.Policy{}, - eventsPipeline: eventsPipeline, + captureSession: captureSession, cdpMonitor: mon, }, nil } @@ -336,7 +336,7 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ func (s *ApiService) Shutdown(ctx context.Context) error { s.monitorMu.Lock() s.cdpMonitor.Stop() - _ = s.eventsPipeline.Close() + _ = s.captureSession.Close() s.monitorMu.Unlock() return s.recordManager.StopAll(ctx) } diff --git a/server/cmd/api/api/api_test.go b/server/cmd/api/api/api_test.go index 7c47f08f..977c75c0 100644 --- a/server/cmd/api/api/api_test.go +++ b/server/cmd/api/api/api_test.go @@ -26,7 +26,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("success", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{}) @@ -40,7 +40,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("already recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) // First start should succeed @@ -55,7 +55,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("custom ids don't collide", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) for i := 0; i < 5; i++ { @@ -88,7 +88,7 @@ func TestApiService_StopRecording(t *testing.T) { t.Run("no active recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) @@ -101,7 +101,7 @@ func TestApiService_StopRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestApiService_StopRecording(t *testing.T) { force := true req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}} - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, req) require.NoError(t, err) @@ -130,7 +130,7 @@ func TestApiService_DownloadRecording(t *testing.T) { t.Run("not found", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -150,7 +150,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) // will return a 202 when the recording is too small resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) @@ -180,7 +180,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", recordingData: data} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -200,7 +200,7 @@ func TestApiService_Shutdown(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) require.NoError(t, svc.Shutdown(ctx)) @@ -304,16 +304,16 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient { return client } -func newEventsPipeline() *events.Pipeline { +func newCaptureSession() *events.CaptureSession { ring := events.NewRingBuffer(64) fw := events.NewFileWriter(os.TempDir()) - return events.NewPipeline(ring, fw) + return events.NewCaptureSession(ring, fw) } func TestApiService_PatchChromiumFlags(t *testing.T) { ctx := context.Background() mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) // Test with valid flags diff --git a/server/cmd/api/api/display_test.go b/server/cmd/api/api/display_test.go index b63addaa..44b0d524 100644 --- a/server/cmd/api/api/display_test.go +++ b/server/cmd/api/api/display_test.go @@ -34,7 +34,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService { t.Helper() - svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) + svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(), 0) require.NoError(t, err) return svc } diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go index 6ab875b4..5dd56d65 100644 --- a/server/cmd/api/api/events.go +++ b/server/cmd/api/api/events.go @@ -15,7 +15,7 @@ func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { s.monitorMu.Lock() defer s.monitorMu.Unlock() - s.eventsPipeline.Start(uuid.New().String()) + s.captureSession.Start(uuid.New().String()) if err := s.cdpMonitor.Start(r.Context()); err != nil { logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index f277d4ce..767c4881 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -94,7 +94,7 @@ func main() { // Construct events pipeline eventsRing := events.NewRingBuffer(1024) eventsFileWriter := events.NewFileWriter("/var/log") - eventsPipeline := events.NewPipeline(eventsRing, eventsFileWriter) + captureSession := events.NewCaptureSession(eventsRing, eventsFileWriter) apiService, err := api.New( recorder.NewFFmpegManager(), @@ -102,7 +102,7 @@ func main() { upstreamMgr, stz, nekoAuthClient, - eventsPipeline, + captureSession, config.DisplayNum, ) if err != nil { diff --git a/server/lib/events/capturesession.go b/server/lib/events/capturesession.go index a430980a..1b34973b 100644 --- a/server/lib/events/capturesession.go +++ b/server/lib/events/capturesession.go @@ -3,22 +3,32 @@ package events import ( "log/slog" "sync" + "sync/atomic" "time" ) // CaptureSession is a single-use write path that wraps events in envelopes and -// fans them out to a FileWriter (durable) and RingBuffer (in-memory). Publish -// concurrently; Close flushes the FileWriter. +// fans them out to a FileWriter (durable) and RingBuffer (in-memory). Call Start +// once with a capture session ID, then Publish concurrently. Close flushes the +// FileWriter; there is no restart or terminal event. type CaptureSession struct { mu sync.Mutex ring *RingBuffer files *FileWriter - seq uint64 - captureSessionID string + seq atomic.Uint64 + captureSessionID atomic.Pointer[string] } -func NewCaptureSession(captureSessionID string, ring *RingBuffer, files *FileWriter) *CaptureSession { - return &CaptureSession{ring: ring, files: files, captureSessionID: captureSessionID} +func NewCaptureSession(ring *RingBuffer, files *FileWriter) *CaptureSession { + s := &CaptureSession{ring: ring, files: files} + empty := "" + s.captureSessionID.Store(&empty) + return s +} + +// Start sets the capture session ID stamped on every subsequent envelope. +func (s *CaptureSession) Start(captureSessionID string) { + s.captureSessionID.Store(&captureSessionID) } // Publish wraps ev in an Envelope, truncates if needed, then writes to @@ -28,16 +38,15 @@ func (s *CaptureSession) Publish(ev Event) { defer s.mu.Unlock() if ev.Ts == 0 { - ev.Ts = time.Now().UnixMicro() + ev.Ts = time.Now().UnixMilli() } if ev.DetailLevel == "" { ev.DetailLevel = DetailStandard } - s.seq++ env := Envelope{ - CaptureSessionID: s.captureSessionID, - Seq: s.seq, + CaptureSessionID: *s.captureSessionID.Load(), + Seq: s.seq.Add(1), Event: ev, } env, data := truncateIfNeeded(env) diff --git a/server/lib/events/events_test.go b/server/lib/events/events_test.go index b9da4634..3401bd6e 100644 --- a/server/lib/events/events_test.go +++ b/server/lib/events/events_test.go @@ -442,13 +442,13 @@ func TestFileWriter(t *testing.T) { }) } -func TestPipeline(t *testing.T) { - newPipeline := func(t *testing.T) (*Pipeline, string) { +func TestCaptureSession(t *testing.T) { + newCaptureSession := func(t *testing.T) (*CaptureSession, string) { t.Helper() dir := t.TempDir() rb := NewRingBuffer(100) fw := NewFileWriter(dir) - p := NewPipeline(rb, fw) + p := NewCaptureSession(rb, fw) t.Cleanup(func() { p.Close() }) return p, dir } @@ -460,7 +460,7 @@ func TestPipeline(t *testing.T) { rb := NewRingBuffer(total) fw := NewFileWriter(t.TempDir()) - p := NewPipeline(rb, fw) + p := NewCaptureSession(rb, fw) t.Cleanup(func() { p.Close() }) reader := p.NewReader(0) @@ -486,7 +486,7 @@ func TestPipeline(t *testing.T) { }) t.Run("publish_increments_seq", func(t *testing.T) { - p, _ := newPipeline(t) + p, _ := newCaptureSession(t) reader := p.NewReader(0) for i := 0; i < 3; i++ { @@ -503,7 +503,7 @@ func TestPipeline(t *testing.T) { }) t.Run("publish_sets_ts", func(t *testing.T) { - p, _ := newPipeline(t) + p, _ := newCaptureSession(t) reader := p.NewReader(0) before := time.Now().UnixMilli() @@ -519,7 +519,7 @@ func TestPipeline(t *testing.T) { }) t.Run("publish_writes_file", func(t *testing.T) { - p, dir := newPipeline(t) + p, dir := newCaptureSession(t) p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) @@ -533,7 +533,7 @@ func TestPipeline(t *testing.T) { }) t.Run("publish_writes_ring", func(t *testing.T) { - p, _ := newPipeline(t) + p, _ := newCaptureSession(t) reader := p.NewReader(0) p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) @@ -547,7 +547,7 @@ func TestPipeline(t *testing.T) { }) t.Run("start_sets_capture_session_id", func(t *testing.T) { - p, _ := newPipeline(t) + p, _ := newCaptureSession(t) p.Start("test-uuid") reader := p.NewReader(0) @@ -561,7 +561,7 @@ func TestPipeline(t *testing.T) { }) t.Run("truncation_applied", func(t *testing.T) { - p, dir := newPipeline(t) + p, dir := newCaptureSession(t) reader := p.NewReader(0) largeData := strings.Repeat("x", 1_100_000) @@ -595,7 +595,7 @@ func TestPipeline(t *testing.T) { }) t.Run("defaults_detail_level", func(t *testing.T) { - p, _ := newPipeline(t) + p, _ := newCaptureSession(t) reader := p.NewReader(0) p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go deleted file mode 100644 index e69c254f..00000000 --- a/server/lib/events/pipeline.go +++ /dev/null @@ -1,70 +0,0 @@ -package events - -import ( - "log/slog" - "sync" - "sync/atomic" - "time" -) - -// Pipeline is a single-use write path that wraps events in envelopes and fans -// them out to a FileWriter (durable) and RingBuffer (in-memory). Call Start -// once with a capture session ID, then Publish concurrently. Close flushes the -// FileWriter; there is no restart or terminal event. -type Pipeline struct { - mu sync.Mutex - ring *RingBuffer - files *FileWriter - seq atomic.Uint64 - captureSessionID atomic.Pointer[string] -} - -func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { - p := &Pipeline{ring: ring, files: files} - empty := "" - p.captureSessionID.Store(&empty) - return p -} - -// Start sets the capture session ID stamped on every subsequent envelope. -func (p *Pipeline) Start(captureSessionID string) { - p.captureSessionID.Store(&captureSessionID) -} - -// Publish wraps ev in an Envelope, truncates if needed, then writes to -// FileWriter (durable) before RingBuffer (in-memory fan-out). -func (p *Pipeline) Publish(ev Event) { - p.mu.Lock() - defer p.mu.Unlock() - - if ev.Ts == 0 { - ev.Ts = time.Now().UnixMilli() - } - if ev.DetailLevel == "" { - ev.DetailLevel = DetailStandard - } - - env := Envelope{ - CaptureSessionID: *p.captureSessionID.Load(), - Seq: p.seq.Add(1), - Event: ev, - } - env, data := truncateIfNeeded(env) - - if data == nil { - slog.Error("pipeline: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category) - } else if err := p.files.Write(env, data); err != nil { - slog.Error("pipeline: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) - } - p.ring.Publish(env) -} - -// NewReader returns a Reader positioned at the start of the ring buffer. -func (p *Pipeline) NewReader(afterSeq uint64) *Reader { - return p.ring.NewReader(afterSeq) -} - -// Close flushes and releases all open file descriptors. -func (p *Pipeline) Close() error { - return p.files.Close() -} From ad2dd63f413e1f3f23ad4f5aa5d047a423db7e4c Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Thu, 2 Apr 2026 18:26:56 +0000 Subject: [PATCH 3/3] review: fix request context leak in StartCapture, add missing categories and fix delimiter in CategoryFor --- server/cmd/api/api/events.go | 3 ++- server/lib/events/event.go | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go index 5dd56d65..f9021a17 100644 --- a/server/cmd/api/api/events.go +++ b/server/cmd/api/api/events.go @@ -1,6 +1,7 @@ package api import ( + "context" "net/http" "github.com/google/uuid" @@ -17,7 +18,7 @@ func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { s.captureSession.Start(uuid.New().String()) - if err := s.cdpMonitor.Start(r.Context()); err != nil { + if err := s.cdpMonitor.Start(context.Background()); err != nil { logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err) http.Error(w, "failed to start capture", http.StatusInternalServerError) return diff --git a/server/lib/events/event.go b/server/lib/events/event.go index 185d6809..cb5565d8 100644 --- a/server/lib/events/event.go +++ b/server/lib/events/event.go @@ -69,9 +69,9 @@ type Envelope struct { } // CategoryFor derives an EventCategory from an event type string. -// It splits on the first underscore and maps the prefix to a category. +// It splits on the first dot and maps the prefix to a category. func CategoryFor(eventType string) EventCategory { - prefix, _, _ := strings.Cut(eventType, "_") + prefix, _, _ := strings.Cut(eventType, ".") switch prefix { case "console": return CategoryConsole @@ -81,6 +81,10 @@ func CategoryFor(eventType string) EventCategory { return CategoryPage case "interaction", "layout", "scroll": return CategoryInteraction + case "liveview": + return CategoryLiveview + case "captcha": + return CategoryCaptcha case "screenshot", "monitor": return CategorySystem default: