diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 122ae4d3..936b1b60 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -9,7 +9,9 @@ import ( "sync" "time" + "github.com/onkernel/kernel-images/server/lib/cdpmonitor" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/logger" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" @@ -68,11 +70,24 @@ type ApiService struct { // xvfbResizeMu serializes background Xvfb restarts to prevent races // when multiple CDP fast-path resizes fire in quick succession. xvfbResizeMu sync.Mutex + + // CDP event pipeline and cdpMonitor. + eventsPipeline *events.Pipeline + cdpMonitor *cdpmonitor.Monitor + monitorMu sync.Mutex } var _ oapi.StrictServerInterface = (*ApiService)(nil) -func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFactory, upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient) (*ApiService, error) { +func New( + recordManager recorder.RecordManager, + factory recorder.FFmpegRecorderFactory, + upstreamMgr *devtoolsproxy.UpstreamManager, + stz scaletozero.Controller, + nekoAuthClient *nekoclient.AuthClient, + eventsPipeline *events.Pipeline, + displayNum int, +) (*ApiService, error) { switch { case recordManager == nil: return nil, fmt.Errorf("recordManager cannot be nil") @@ -82,8 +97,12 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa return nil, fmt.Errorf("upstreamMgr cannot be nil") case nekoAuthClient == nil: return nil, fmt.Errorf("nekoAuthClient cannot be nil") + case eventsPipeline == nil: + return nil, fmt.Errorf("eventsPipeline cannot be nil") } + mon := cdpmonitor.New(upstreamMgr, eventsPipeline.Publish, displayNum) + return &ApiService{ recordManager: recordManager, factory: factory, @@ -94,6 +113,8 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa stz: stz, nekoAuthClient: nekoAuthClient, policy: &policy.Policy{}, + eventsPipeline: eventsPipeline, + cdpMonitor: mon, }, nil } @@ -313,5 +334,9 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ } func (s *ApiService) Shutdown(ctx context.Context) error { + s.monitorMu.Lock() + s.cdpMonitor.Stop() + _ = s.eventsPipeline.Close() + s.monitorMu.Unlock() return s.recordManager.StopAll(ctx) } diff --git a/server/cmd/api/api/api_test.go b/server/cmd/api/api/api_test.go index dc192e30..7c47f08f 100644 --- a/server/cmd/api/api/api_test.go +++ b/server/cmd/api/api/api_test.go @@ -12,6 +12,7 @@ import ( "log/slog" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" @@ -25,7 +26,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("success", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{}) @@ -39,7 +40,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("already recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) // First start should succeed @@ -54,7 +55,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("custom ids don't collide", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) for i := 0; i < 5; i++ { @@ -87,7 +88,7 @@ func TestApiService_StopRecording(t *testing.T) { t.Run("no active recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) @@ -100,7 +101,7 @@ func TestApiService_StopRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) require.NoError(t, err) @@ -115,7 +116,7 @@ func TestApiService_StopRecording(t *testing.T) { force := true req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}} - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.StopRecording(ctx, req) require.NoError(t, err) @@ -129,7 +130,7 @@ func TestApiService_DownloadRecording(t *testing.T) { t.Run("not found", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -149,7 +150,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) // will return a 202 when the recording is too small resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) @@ -179,7 +180,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", recordingData: data} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -199,7 +200,7 @@ func TestApiService_Shutdown(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) require.NoError(t, svc.Shutdown(ctx)) @@ -303,10 +304,16 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient { return client } +func newEventsPipeline() *events.Pipeline { + ring := events.NewRingBuffer(64) + fw := events.NewFileWriter(os.TempDir()) + return events.NewPipeline(ring, fw) +} + func TestApiService_PatchChromiumFlags(t *testing.T) { ctx := context.Background() mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) // Test with valid flags diff --git a/server/cmd/api/api/display_test.go b/server/cmd/api/api/display_test.go index acbe74c2..b63addaa 100644 --- a/server/cmd/api/api/display_test.go +++ b/server/cmd/api/api/display_test.go @@ -34,7 +34,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService { t.Helper() - svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) + svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0) require.NoError(t, err) return svc } diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go new file mode 100644 index 00000000..6ab875b4 --- /dev/null +++ b/server/cmd/api/api/events.go @@ -0,0 +1,34 @@ +package api + +import ( + "net/http" + + "github.com/google/uuid" + "github.com/onkernel/kernel-images/server/lib/logger" +) + +// StartCapture handles POST /events/start. +// Generates a new capture session ID, seeds the pipeline, then starts the +// CDP monitor. If already running, the monitor is stopped and +// restarted with a fresh session ID +func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { + s.monitorMu.Lock() + defer s.monitorMu.Unlock() + + s.eventsPipeline.Start(uuid.New().String()) + + if err := s.cdpMonitor.Start(r.Context()); err != nil { + logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err) + http.Error(w, "failed to start capture", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) +} + +// StopCapture handles POST /events/stop +func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) { + s.monitorMu.Lock() + defer s.monitorMu.Unlock() + s.cdpMonitor.Stop() + w.WriteHeader(http.StatusOK) +} diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index c80ddd27..f277d4ce 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -24,6 +24,7 @@ import ( "github.com/onkernel/kernel-images/server/cmd/config" "github.com/onkernel/kernel-images/server/lib/chromedriverproxy" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/logger" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" @@ -90,12 +91,19 @@ func main() { os.Exit(1) } + // Construct events pipeline + eventsRing := events.NewRingBuffer(1024) + eventsFileWriter := events.NewFileWriter("/var/log") + eventsPipeline := events.NewPipeline(eventsRing, eventsFileWriter) + apiService, err := api.New( recorder.NewFFmpegManager(), recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz), upstreamMgr, stz, nekoAuthClient, + eventsPipeline, + config.DisplayNum, ) if err != nil { slogger.Error("failed to create api service", "err", err) @@ -120,6 +128,10 @@ func main() { w.Header().Set("Content-Type", "application/json") w.Write(jsonData) }) + // capture events + r.Post("/events/start", apiService.StartCapture) + r.Post("/events/stop", apiService.StopCapture) + // PTY attach endpoint (WebSocket) - not part of OpenAPI spec // Uses WebSocket for bidirectional streaming, which works well through proxies. r.Get("/process/{process_id}/attach", func(w http.ResponseWriter, r *http.Request) { diff --git a/server/lib/cdpmonitor/computed.go b/server/lib/cdpmonitor/computed.go new file mode 100644 index 00000000..1bbe4573 --- /dev/null +++ b/server/lib/cdpmonitor/computed.go @@ -0,0 +1,185 @@ +package cdpmonitor + +import ( + "encoding/json" + "sync" + "time" + + "github.com/onkernel/kernel-images/server/lib/events" +) +const ( + networkIdleDebounce = 500 * time.Millisecond + layoutSettledDebounce = 1 * time.Second +) + +// computedState holds the mutable state for all computed meta-events. +type computedState struct { + mu sync.Mutex + publish PublishFunc + + // network_idle: 500 ms debounce after all pending requests finish. + netPending int + netTimer *time.Timer + netFired bool + + // layout_settled: 1s after page_load with no intervening layout shifts. + layoutTimer *time.Timer + layoutFired bool + pageLoadSeen bool + + // navigation_settled: fires once dom_content_loaded, network_idle, and + // layout_settled have all fired after the same Page.frameNavigated. + navDOMLoaded bool + navNetIdle bool + navLayoutSettled bool + navFired bool +} + +// newComputedState creates a fresh computedState backed by the given publish func. +func newComputedState(publish PublishFunc) *computedState { + return &computedState{publish: publish} +} + +func stopTimer(t *time.Timer) { + if t == nil { + return + } + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} + +// resetOnNavigation resets all state machines. Called on Page.frameNavigated +func (s *computedState) resetOnNavigation() { + s.mu.Lock() + defer s.mu.Unlock() + + stopTimer(s.netTimer) + s.netTimer = nil + s.netPending = 0 + s.netFired = false + + stopTimer(s.layoutTimer) + s.layoutTimer = nil + s.layoutFired = false + s.pageLoadSeen = false + + s.navDOMLoaded = false + s.navNetIdle = false + s.navLayoutSettled = false + s.navFired = false +} + +func (s *computedState) onRequest() { + s.mu.Lock() + defer s.mu.Unlock() + s.netPending++ + // A new request invalidates any pending network_idle timer + stopTimer(s.netTimer) + s.netTimer = nil +} + +// onLoadingFinished is called on Network.loadingFinished or Network.loadingFailed. +func (s *computedState) onLoadingFinished() { + s.mu.Lock() + defer s.mu.Unlock() + + s.netPending-- + if s.netPending < 0 { + s.netPending = 0 + } + if s.netPending > 0 || s.netFired { + return + } + // All requests done and not yet fired — start 500 ms debounce timer. + stopTimer(s.netTimer) + s.netTimer = time.AfterFunc(networkIdleDebounce, func() { + s.mu.Lock() + defer s.mu.Unlock() + if s.netFired || s.netPending > 0 { + return + } + s.netFired = true + s.navNetIdle = true + s.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: "network_idle", + Category: events.CategoryNetwork, + Source: events.Source{Kind: events.KindCDP}, + DetailLevel: events.DetailStandard, + Data: json.RawMessage(`{}`), + }) + s.checkNavigationSettled() + }) +} + +// onPageLoad is called on Page.loadEventFired. +func (s *computedState) onPageLoad() { + s.mu.Lock() + defer s.mu.Unlock() + s.pageLoadSeen = true + if s.layoutFired { + return + } + // Start the 1 s layout_settled timer. + stopTimer(s.layoutTimer) + s.layoutTimer = time.AfterFunc(layoutSettledDebounce, s.emitLayoutSettled) +} + +// onLayoutShift is called when a layout_shift sentinel arrives from injected JS. +func (s *computedState) onLayoutShift() { + s.mu.Lock() + defer s.mu.Unlock() + if s.layoutFired || !s.pageLoadSeen { + return + } + // Reset the timer to 1 s from now. + stopTimer(s.layoutTimer) + s.layoutTimer = time.AfterFunc(layoutSettledDebounce, s.emitLayoutSettled) +} + +// emitLayoutSettled is called from the layout timer's AfterFunc goroutine +func (s *computedState) emitLayoutSettled() { + s.mu.Lock() + defer s.mu.Unlock() + if s.layoutFired || !s.pageLoadSeen { + return + } + s.layoutFired = true + s.navLayoutSettled = true + s.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: "layout_settled", + Category: events.CategoryPage, + Source: events.Source{Kind: events.KindCDP}, + DetailLevel: events.DetailStandard, + Data: json.RawMessage(`{}`), + }) + s.checkNavigationSettled() +} + +// onDOMContentLoaded is called on Page.domContentEventFired. +func (s *computedState) onDOMContentLoaded() { + s.mu.Lock() + defer s.mu.Unlock() + s.navDOMLoaded = true + s.checkNavigationSettled() +} + +// checkNavigationSettled emits navigation_settled if all three flags are set +func (s *computedState) checkNavigationSettled() { + if s.navDOMLoaded && s.navNetIdle && s.navLayoutSettled && !s.navFired { + s.navFired = true + s.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: "navigation_settled", + Category: events.CategoryPage, + Source: events.Source{Kind: events.KindCDP}, + DetailLevel: events.DetailStandard, + Data: json.RawMessage(`{}`), + }) + } +} diff --git a/server/lib/cdpmonitor/domains.go b/server/lib/cdpmonitor/domains.go new file mode 100644 index 00000000..1e95e0b3 --- /dev/null +++ b/server/lib/cdpmonitor/domains.go @@ -0,0 +1,89 @@ +package cdpmonitor + +import "context" + +// bindingName is the JS function exposed via Runtime.addBinding. +// Page JS calls this to fire Runtime.bindingCalled CDP events. +const bindingName = "__kernelEvent" + +// enableDomains enables CDP domains, registers the event binding, and starts +// layout-shift observation. Failures are non-fatal. +func (m *Monitor) enableDomains(ctx context.Context, sessionID string) { + for _, method := range []string{ + "Runtime.enable", + "Network.enable", + "Page.enable", + "DOM.enable", + } { + _, _ = m.send(ctx, method, nil, sessionID) + } + + _, _ = m.send(ctx, "Runtime.addBinding", map[string]any{ + "name": bindingName, + }, sessionID) + + _, _ = m.send(ctx, "PerformanceTimeline.enable", map[string]any{ + "eventTypes": []string{"layout-shift"}, + }, sessionID) +} + +// injectedJS tracks clicks, keys, and scrolls via the __kernelEvent binding. +// Layout shifts are handled natively by PerformanceTimeline.enable. +const injectedJS = `(function() { + if (window.__kernelEventInjected) return; + var send = window.__kernelEvent; + if (!send) return; + window.__kernelEventInjected = true; + + function sel(el) { + return el.id ? '#' + el.id : (el.className ? '.' + String(el.className).split(' ')[0] : ''); + } + + document.addEventListener('click', function(e) { + var t = e.target || {}; + send(JSON.stringify({ + type: 'interaction_click', + x: e.clientX, y: e.clientY, + selector: sel(t), tag: t.tagName || '', + text: (t.innerText || '').slice(0, 100) + })); + }, true); + + document.addEventListener('keydown', function(e) { + var t = e.target || {}; + send(JSON.stringify({ + type: 'interaction_key', + key: e.key, + selector: sel(t), tag: t.tagName || '' + })); + }, true); + + var scrollTimer = null; + var scrollStart = {x: window.scrollX, y: window.scrollY}; + document.addEventListener('scroll', function(e) { + var fromX = scrollStart.x, fromY = scrollStart.y; + var target = e.target; + var s = target === document ? 'document' : sel(target); + if (scrollTimer) clearTimeout(scrollTimer); + scrollTimer = setTimeout(function() { + var toX = window.scrollX, toY = window.scrollY; + if (Math.abs(toX - fromX) > 5 || Math.abs(toY - fromY) > 5) { + send(JSON.stringify({ + type: 'scroll_settled', + from_x: fromX, from_y: fromY, + to_x: toX, to_y: toY, + target_selector: s + })); + } + scrollStart = {x: toX, y: toY}; + }, 300); + }, true); +})();` + +// injectScript registers the interaction tracking JS for the given session. +func (m *Monitor) injectScript(ctx context.Context, sessionID string) error { + _, err := m.send(ctx, "Page.addScriptToEvaluateOnNewDocument", map[string]any{ + "source": injectedJS, + }, sessionID) + return err +} diff --git a/server/lib/cdpmonitor/handlers.go b/server/lib/cdpmonitor/handlers.go new file mode 100644 index 00000000..35664993 --- /dev/null +++ b/server/lib/cdpmonitor/handlers.go @@ -0,0 +1,374 @@ +package cdpmonitor + +import ( + "encoding/json" + "time" + + "github.com/onkernel/kernel-images/server/lib/events" +) + +// publishEvent stamps common fields and publishes an event. +func (m *Monitor) publishEvent(eventType string, detail events.DetailLevel, source events.Source, sourceEvent string, data json.RawMessage, sessionID string) { + src := source + src.Event = sourceEvent + if sessionID != "" { + if src.Metadata == nil { + src.Metadata = make(map[string]string) + } + src.Metadata["cdp_session_id"] = sessionID + } + url, _ := m.currentURL.Load().(string) + m.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: eventType, + Category: events.CategoryFor(eventType), + Source: src, + DetailLevel: detail, + URL: url, + Data: data, + }) +} + +// dispatchEvent routes a CDP event to its handler. +func (m *Monitor) dispatchEvent(msg cdpMessage) { + switch msg.Method { + case "Runtime.consoleAPICalled": + m.handleConsole(msg.Params, msg.SessionID) + case "Runtime.exceptionThrown": + m.handleExceptionThrown(msg.Params, msg.SessionID) + case "Runtime.bindingCalled": + m.handleBindingCalled(msg.Params, msg.SessionID) + case "Network.requestWillBeSent": + m.handleNetworkRequest(msg.Params, msg.SessionID) + case "Network.responseReceived": + m.handleResponseReceived(msg.Params, msg.SessionID) + case "Network.loadingFinished": + m.handleLoadingFinished(msg.Params, msg.SessionID) + case "Network.loadingFailed": + m.handleLoadingFailed(msg.Params, msg.SessionID) + case "Page.frameNavigated": + m.handleFrameNavigated(msg.Params, msg.SessionID) + case "Page.domContentEventFired": + m.handleDOMContentLoaded(msg.Params, msg.SessionID) + case "Page.loadEventFired": + m.handleLoadEventFired(msg.Params, msg.SessionID) + case "DOM.documentUpdated": + m.handleDOMUpdated(msg.Params, msg.SessionID) + case "PerformanceTimeline.timelineEventAdded": + m.handleTimelineEvent(msg.Params, msg.SessionID) + case "Target.attachedToTarget": + m.handleAttachedToTarget(msg) + case "Target.targetCreated": + m.handleTargetCreated(msg.Params, msg.SessionID) + case "Target.targetDestroyed": + m.handleTargetDestroyed(msg.Params, msg.SessionID) + } +} + +func (m *Monitor) handleConsole(params json.RawMessage, sessionID string) { + var p cdpConsoleParams + if err := json.Unmarshal(params, &p); err != nil { + return + } + + text := "" + if len(p.Args) > 0 { + text = consoleArgString(p.Args[0]) + } + argValues := make([]string, 0, len(p.Args)) + for _, a := range p.Args { + argValues = append(argValues, consoleArgString(a)) + } + data, _ := json.Marshal(map[string]any{ + "level": p.Type, + "text": text, + "args": argValues, + "stack_trace": p.StackTrace, + }) + m.publishEvent("console_log", events.DetailStandard, events.Source{Kind: events.KindCDP}, "Runtime.consoleAPICalled", data, sessionID) +} + +func (m *Monitor) handleExceptionThrown(params json.RawMessage, sessionID string) { + var p cdpExceptionDetails + if err := json.Unmarshal(params, &p); err != nil { + return + } + data, _ := json.Marshal(map[string]any{ + "text": p.ExceptionDetails.Text, + "line": p.ExceptionDetails.LineNumber, + "column": p.ExceptionDetails.ColumnNumber, + "url": p.ExceptionDetails.URL, + "stack_trace": p.ExceptionDetails.StackTrace, + }) + m.publishEvent("console_error", events.DetailStandard, events.Source{Kind: events.KindCDP}, "Runtime.exceptionThrown", data, sessionID) + go m.maybeScreenshot(m.getLifecycleCtx()) +} + +// handleBindingCalled processes __kernelEvent binding calls from the page. +func (m *Monitor) handleBindingCalled(params json.RawMessage, sessionID string) { + var p struct { + Name string `json:"name"` + Payload string `json:"payload"` + } + if err := json.Unmarshal(params, &p); err != nil || p.Name != bindingName { + return + } + payload := json.RawMessage(p.Payload) + if !json.Valid(payload) { + return + } + var header struct { + Type string `json:"type"` + } + if err := json.Unmarshal(payload, &header); err != nil { + return + } + switch header.Type { + case "interaction_click", "interaction_key", "scroll_settled": + m.publishEvent(header.Type, events.DetailStandard, events.Source{Kind: events.KindCDP}, "Runtime.bindingCalled", payload, sessionID) + } +} + +// handleTimelineEvent processes PerformanceTimeline layout-shift events. +func (m *Monitor) handleTimelineEvent(params json.RawMessage, sessionID string) { + var p struct { + Event struct { + Type string `json:"type"` + LayoutShift json.RawMessage `json:"layoutShiftDetails,omitempty"` + } `json:"event"` + } + if err := json.Unmarshal(params, &p); err != nil || p.Event.Type != "layout-shift" { + return + } + m.publishEvent("layout_shift", events.DetailStandard, events.Source{Kind: events.KindCDP}, "PerformanceTimeline.timelineEventAdded", params, sessionID) + m.computed.onLayoutShift() +} + +func (m *Monitor) handleNetworkRequest(params json.RawMessage, sessionID string) { + var p cdpNetworkRequestParams + if err := json.Unmarshal(params, &p); err != nil { + return + } + // Extract only the initiator type; the stack trace is too verbose and dominates event size. + var initiatorType string + var raw struct { + Type string `json:"type"` + } + if json.Unmarshal(p.Initiator, &raw) == nil { + initiatorType = raw.Type + } + + m.pendReqMu.Lock() + m.pendingRequests[p.RequestID] = networkReqState{ + sessionID: sessionID, + method: p.Request.Method, + url: p.Request.URL, + headers: p.Request.Headers, + postData: p.Request.PostData, + resourceType: p.ResourceType, + } + m.pendReqMu.Unlock() + data, _ := json.Marshal(map[string]any{ + "method": p.Request.Method, + "url": p.Request.URL, + "headers": p.Request.Headers, + "post_data": p.Request.PostData, + "resource_type": p.ResourceType, + "initiator_type": initiatorType, + }) + m.publishEvent("network_request", events.DetailStandard, events.Source{Kind: events.KindCDP}, "Network.requestWillBeSent", data, sessionID) + m.computed.onRequest() +} + +func (m *Monitor) handleResponseReceived(params json.RawMessage, sessionID string) { + var p cdpResponseReceivedParams + if err := json.Unmarshal(params, &p); err != nil { + return + } + m.pendReqMu.Lock() + if state, ok := m.pendingRequests[p.RequestID]; ok { + state.status = p.Response.Status + state.statusText = p.Response.StatusText + state.resHeaders = p.Response.Headers + state.mimeType = p.Response.MimeType + m.pendingRequests[p.RequestID] = state + } + m.pendReqMu.Unlock() +} + +func (m *Monitor) handleLoadingFinished(params json.RawMessage, sessionID string) { + var p struct { + RequestID string `json:"requestId"` + } + if err := json.Unmarshal(params, &p); err != nil { + return + } + m.pendReqMu.Lock() + state, ok := m.pendingRequests[p.RequestID] + if ok { + delete(m.pendingRequests, p.RequestID) + } + m.pendReqMu.Unlock() + if !ok { + return + } + // Fetch response body async to avoid blocking readLoop; binary types are skipped. + go func() { + ctx := m.getLifecycleCtx() + body := "" + if isTextualResource(state.resourceType, state.mimeType) { + result, err := m.send(ctx, "Network.getResponseBody", map[string]any{ + "requestId": p.RequestID, + }, sessionID) + if err == nil { + var resp struct { + Body string `json:"body"` + Base64Encoded bool `json:"base64Encoded"` + } + if json.Unmarshal(result, &resp) == nil { + body = truncateBody(resp.Body, bodyCapFor(state.mimeType)) + } + } + } + data, _ := json.Marshal(map[string]any{ + "method": state.method, + "url": state.url, + "status": state.status, + "status_text": state.statusText, + "headers": state.resHeaders, + "mime_type": state.mimeType, + "resource_type": state.resourceType, + "body": body, + }) + detail := events.DetailStandard + if body != "" { + detail = events.DetailVerbose + } + m.publishEvent("network_response", detail, events.Source{Kind: events.KindCDP}, "Network.loadingFinished", data, sessionID) + m.computed.onLoadingFinished() + }() +} + +func (m *Monitor) handleLoadingFailed(params json.RawMessage, sessionID string) { + var p struct { + RequestID string `json:"requestId"` + ErrorText string `json:"errorText"` + Canceled bool `json:"canceled"` + } + if err := json.Unmarshal(params, &p); err != nil { + return + } + m.pendReqMu.Lock() + state, ok := m.pendingRequests[p.RequestID] + if ok { + delete(m.pendingRequests, p.RequestID) + } + m.pendReqMu.Unlock() + + ev := map[string]any{ + "error_text": p.ErrorText, + "canceled": p.Canceled, + } + if ok { + ev["url"] = state.url + } + data, _ := json.Marshal(ev) + m.publishEvent("network_loading_failed", events.DetailStandard, events.Source{Kind: events.KindCDP}, "Network.loadingFailed", data, sessionID) + m.computed.onLoadingFinished() +} + + +func (m *Monitor) handleFrameNavigated(params json.RawMessage, sessionID string) { + var p struct { + Frame struct { + ID string `json:"id"` + ParentID string `json:"parentId"` + URL string `json:"url"` + } `json:"frame"` + } + if err := json.Unmarshal(params, &p); err != nil { + return + } + data, _ := json.Marshal(map[string]any{ + "url": p.Frame.URL, + "frame_id": p.Frame.ID, + "parent_frame_id": p.Frame.ParentID, + }) + // Only track top-level frame navigations (no parent). + if p.Frame.ParentID == "" { + m.currentURL.Store(p.Frame.URL) + } + m.publishEvent("navigation", events.DetailStandard, events.Source{Kind: events.KindCDP}, "Page.frameNavigated", data, sessionID) + + m.pendReqMu.Lock() + for id, req := range m.pendingRequests { + if req.sessionID == sessionID { + delete(m.pendingRequests, id) + } + } + m.pendReqMu.Unlock() + + m.computed.resetOnNavigation() +} + +func (m *Monitor) handleDOMContentLoaded(params json.RawMessage, sessionID string) { + m.publishEvent("dom_content_loaded", events.DetailMinimal, events.Source{Kind: events.KindCDP}, "Page.domContentEventFired", params, sessionID) + m.computed.onDOMContentLoaded() +} + +func (m *Monitor) handleLoadEventFired(params json.RawMessage, sessionID string) { + m.publishEvent("page_load", events.DetailMinimal, events.Source{Kind: events.KindCDP}, "Page.loadEventFired", params, sessionID) + m.computed.onPageLoad() + go m.maybeScreenshot(m.getLifecycleCtx()) +} + +func (m *Monitor) handleDOMUpdated(params json.RawMessage, sessionID string) { + m.publishEvent("dom_updated", events.DetailMinimal, events.Source{Kind: events.KindCDP}, "DOM.documentUpdated", params, sessionID) +} + +// handleAttachedToTarget stores the new session then enables domains and injects script. +func (m *Monitor) handleAttachedToTarget(msg cdpMessage) { + var params cdpAttachedToTargetParams + if err := json.Unmarshal(msg.Params, ¶ms); err != nil { + return + } + m.sessionsMu.Lock() + m.sessions[params.SessionID] = targetInfo{ + targetID: params.TargetInfo.TargetID, + url: params.TargetInfo.URL, + targetType: params.TargetInfo.Type, + } + m.sessionsMu.Unlock() + + // Async to avoid blocking the readLoop. + go func() { + m.enableDomains(m.getLifecycleCtx(), params.SessionID) + _ = m.injectScript(m.getLifecycleCtx(), params.SessionID) + }() +} + +func (m *Monitor) handleTargetCreated(params json.RawMessage, sessionID string) { + var p cdpTargetCreatedParams + if err := json.Unmarshal(params, &p); err != nil { + return + } + data, _ := json.Marshal(map[string]any{ + "target_id": p.TargetInfo.TargetID, + "target_type": p.TargetInfo.Type, + "url": p.TargetInfo.URL, + }) + m.publishEvent("target_created", events.DetailMinimal, events.Source{Kind: events.KindCDP}, "Target.targetCreated", data, sessionID) +} + +func (m *Monitor) handleTargetDestroyed(params json.RawMessage, sessionID string) { + var p struct { + TargetID string `json:"targetId"` + } + if err := json.Unmarshal(params, &p); err != nil { + return + } + data, _ := json.Marshal(map[string]any{ + "target_id": p.TargetID, + }) + m.publishEvent("target_destroyed", events.DetailMinimal, events.Source{Kind: events.KindCDP}, "Target.targetDestroyed", data, sessionID) +} diff --git a/server/lib/cdpmonitor/monitor.go b/server/lib/cdpmonitor/monitor.go new file mode 100644 index 00000000..4422c8a4 --- /dev/null +++ b/server/lib/cdpmonitor/monitor.go @@ -0,0 +1,425 @@ +package cdpmonitor + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/coder/websocket" + "github.com/onkernel/kernel-images/server/lib/events" +) + +// UpstreamProvider abstracts *devtoolsproxy.UpstreamManager for testability. +type UpstreamProvider interface { + Current() string + Subscribe() (<-chan string, func()) +} + +// PublishFunc publishes an Event to the pipeline. +type PublishFunc func(ev events.Event) + +const wsReadLimit = 8 * 1024 * 1024 + +// Monitor manages a CDP WebSocket connection with auto-attach session fan-out. +type Monitor struct { + upstreamMgr UpstreamProvider + publish PublishFunc + displayNum int + + // lifeMu serializes Start, Stop, and restartReadLoop to prevent races on + // conn, lifecycleCtx, cancel, and done. + lifeMu sync.Mutex + conn *websocket.Conn + + nextID atomic.Int64 + pendMu sync.Mutex + pending map[int64]chan cdpMessage + + sessionsMu sync.RWMutex + sessions map[string]targetInfo // sessionID → targetInfo + + pendReqMu sync.Mutex + pendingRequests map[string]networkReqState // requestId → networkReqState + + currentURL atomic.Value // last URL from Page.frameNavigated + + computed *computedState + + lastScreenshotAt atomic.Int64 // unix millis of last capture + screenshotFn func(ctx context.Context, displayNum int) ([]byte, error) // nil → real ffmpeg + + lifecycleCtx context.Context // cancelled on Stop() + cancel context.CancelFunc + done chan struct{} + + running atomic.Bool +} + +// New creates a Monitor. displayNum is the X display for ffmpeg screenshots. +func New(upstreamMgr UpstreamProvider, publish PublishFunc, displayNum int) *Monitor { + m := &Monitor{ + upstreamMgr: upstreamMgr, + publish: publish, + displayNum: displayNum, + sessions: make(map[string]targetInfo), + pending: make(map[int64]chan cdpMessage), + pendingRequests: make(map[string]networkReqState), + } + m.computed = newComputedState(publish) + m.lifecycleCtx = context.Background() + return m +} + +// IsRunning reports whether the monitor is actively capturing. +func (m *Monitor) IsRunning() bool { + return m.running.Load() +} + +// getLifecycleCtx returns the current lifecycle context under lifeMu. +func (m *Monitor) getLifecycleCtx() context.Context { + m.lifeMu.Lock() + ctx := m.lifecycleCtx + m.lifeMu.Unlock() + return ctx +} + +// Start begins CDP capture. Restarts if already running. +func (m *Monitor) Start(parentCtx context.Context) error { + if m.running.Load() { + m.Stop() + } + + devtoolsURL := m.upstreamMgr.Current() + if devtoolsURL == "" { + return fmt.Errorf("cdpmonitor: no DevTools URL available") + } + + // Use background context so the monitor outlives the caller's request context. + ctx, cancel := context.WithCancel(context.Background()) + + conn, _, err := websocket.Dial(ctx, devtoolsURL, nil) + if err != nil { + cancel() + return fmt.Errorf("cdpmonitor: dial %s: %w", devtoolsURL, err) + } + conn.SetReadLimit(wsReadLimit) + + m.lifeMu.Lock() + m.conn = conn + m.lifecycleCtx = ctx + m.cancel = cancel + m.done = make(chan struct{}) + m.lifeMu.Unlock() + + m.running.Store(true) + + go m.readLoop(ctx) + go m.subscribeToUpstream(ctx) + go m.initSession(ctx) // must run after readLoop starts + + return nil +} + +// Stop cancels the context and waits for goroutines to exit. +func (m *Monitor) Stop() { + if !m.running.Swap(false) { + return + } + + m.lifeMu.Lock() + if m.cancel != nil { + m.cancel() + } + done := m.done + m.lifeMu.Unlock() + + if done != nil { + <-done + } + + m.lifeMu.Lock() + if m.conn != nil { + _ = m.conn.Close(websocket.StatusNormalClosure, "stopped") + m.conn = nil + } + m.lifeMu.Unlock() + + m.clearState() +} + +// clearState resets sessions, pending requests, and computed state. +func (m *Monitor) clearState() { + m.currentURL.Store("") + + m.sessionsMu.Lock() + m.sessions = make(map[string]targetInfo) + m.sessionsMu.Unlock() + + m.pendReqMu.Lock() + m.pendingRequests = make(map[string]networkReqState) + m.pendReqMu.Unlock() + + m.computed.resetOnNavigation() +} + +// readLoop reads CDP messages, routing responses to pending callers and dispatching events. +func (m *Monitor) readLoop(ctx context.Context) { + m.lifeMu.Lock() + done := m.done + conn := m.conn + m.lifeMu.Unlock() + defer close(done) + + if conn == nil { + return + } + + for { + _, b, err := conn.Read(ctx) + if err != nil { + return + } + + var msg cdpMessage + if err := json.Unmarshal(b, &msg); err != nil { + continue + } + + if msg.ID != 0 { + m.pendMu.Lock() + ch, ok := m.pending[msg.ID] + m.pendMu.Unlock() + if ok { + select { + case ch <- msg: + default: + } + } + continue + } + + m.dispatchEvent(msg) + } +} + +// send issues a CDP command and blocks until the response arrives. +func (m *Monitor) send(ctx context.Context, method string, params any, sessionID string) (json.RawMessage, error) { + id := m.nextID.Add(1) + + var rawParams json.RawMessage + if params != nil { + b, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("marshal params: %w", err) + } + rawParams = b + } + + req := cdpMessage{ID: id, Method: method, Params: rawParams, SessionID: sessionID} + reqBytes, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + ch := make(chan cdpMessage, 1) + m.pendMu.Lock() + m.pending[id] = ch + m.pendMu.Unlock() + defer func() { + m.pendMu.Lock() + delete(m.pending, id) + m.pendMu.Unlock() + }() + + m.lifeMu.Lock() + conn := m.conn + m.lifeMu.Unlock() + if conn == nil { + return nil, fmt.Errorf("cdpmonitor: connection not open") + } + + // coder/websocket allows concurrent Read + Write on the same Conn. + if err := conn.Write(ctx, websocket.MessageText, reqBytes); err != nil { + return nil, fmt.Errorf("write: %w", err) + } + + select { + case resp := <-ch: + if resp.Error != nil { + return nil, resp.Error + } + return resp.Result, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// initSession enables CDP domains, injects the interaction-tracking script, +// and manually attaches to any targets already open when the monitor started. +func (m *Monitor) initSession(ctx context.Context) { + _, _ = m.send(ctx, "Target.setAutoAttach", map[string]any{ + "autoAttach": true, + "waitForDebuggerOnStart": false, + "flatten": true, + }, "") + m.enableDomains(ctx, "") + _ = m.injectScript(ctx, "") + m.attachExistingTargets(ctx) +} + +// attachExistingTargets fetches all open targets and attaches to any that are +// not already tracked. This catches pages that were open before Start() was called. +func (m *Monitor) attachExistingTargets(ctx context.Context) { + result, err := m.send(ctx, "Target.getTargets", nil, "") + if err != nil { + return + } + var resp struct { + TargetInfos []cdpTargetInfo `json:"targetInfos"` + } + if err := json.Unmarshal(result, &resp); err != nil { + return + } + for _, ti := range resp.TargetInfos { + if ti.Type != "page" { + continue + } + m.sessionsMu.RLock() + alreadyAttached := false + for _, info := range m.sessions { + if info.targetID == ti.TargetID { + alreadyAttached = true + break + } + } + m.sessionsMu.RUnlock() + if alreadyAttached { + continue + } + go func(targetID string) { + res, err := m.send(ctx, "Target.attachToTarget", map[string]any{ + "targetId": targetID, + "flatten": true, + }, "") + if err != nil { + return + } + var attached struct { + SessionID string `json:"sessionId"` + } + if json.Unmarshal(res, &attached) == nil && attached.SessionID != "" { + m.enableDomains(ctx, attached.SessionID) + _ = m.injectScript(ctx, attached.SessionID) + } + }(ti.TargetID) + } +} + +// restartReadLoop waits for the current readLoop to exit, then starts a new one. +func (m *Monitor) restartReadLoop(ctx context.Context) { + m.lifeMu.Lock() + done := m.done + m.lifeMu.Unlock() + + <-done + + m.lifeMu.Lock() + m.done = make(chan struct{}) + m.lifeMu.Unlock() + + go m.readLoop(ctx) +} + +// subscribeToUpstream reconnects with backoff on Chrome restarts, publishing disconnect/reconnect events. +func (m *Monitor) subscribeToUpstream(ctx context.Context) { + ch, cancel := m.upstreamMgr.Subscribe() + defer cancel() + + backoffs := []time.Duration{ + 250 * time.Millisecond, + 500 * time.Millisecond, + 1 * time.Second, + 2 * time.Second, + } + + for { + select { + case <-ctx.Done(): + return + case newURL, ok := <-ch: + if !ok { + return + } + m.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: "monitor_disconnected", + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + DetailLevel: events.DetailMinimal, + Data: json.RawMessage(`{"reason":"chrome_restarted"}`), + }) + + startReconnect := time.Now() + + m.lifeMu.Lock() + if m.conn != nil { + _ = m.conn.Close(websocket.StatusNormalClosure, "reconnecting") + m.conn = nil + } + m.lifeMu.Unlock() + + // Clear stale state from the previous Chrome instance. + m.clearState() + + var reconnErr error + for attempt := range 10 { + if ctx.Err() != nil { + return + } + + idx := min(attempt, len(backoffs)-1) + select { + case <-ctx.Done(): + return + case <-time.After(backoffs[idx]): + } + + conn, _, err := websocket.Dial(ctx, newURL, nil) + if err != nil { + reconnErr = err + continue + } + conn.SetReadLimit(wsReadLimit) + + m.lifeMu.Lock() + m.conn = conn + m.lifeMu.Unlock() + + reconnErr = nil + break + } + + if reconnErr != nil { + return + } + + m.restartReadLoop(ctx) + go m.initSession(ctx) + + m.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: "monitor_reconnected", + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + DetailLevel: events.DetailMinimal, + Data: json.RawMessage(fmt.Sprintf( + `{"reconnect_duration_ms":%d}`, + time.Since(startReconnect).Milliseconds(), + )), + }) + } + } +} diff --git a/server/lib/cdpmonitor/monitor_test.go b/server/lib/cdpmonitor/monitor_test.go new file mode 100644 index 00000000..8f793340 --- /dev/null +++ b/server/lib/cdpmonitor/monitor_test.go @@ -0,0 +1,1030 @@ +package cdpmonitor + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + "github.com/onkernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// Test infrastructure +// --------------------------------------------------------------------------- + +// fakeCDPServer is a minimal WebSocket server that accepts connections and +// lets the test drive scripted message sequences. +type fakeCDPServer struct { + srv *httptest.Server + conn *websocket.Conn + connMu sync.Mutex + connCh chan struct{} // closed when the first connection is accepted + msgCh chan []byte // inbound messages from Monitor +} + +func newFakeCDPServer(t *testing.T) *fakeCDPServer { + t.Helper() + f := &fakeCDPServer{ + msgCh: make(chan []byte, 128), + connCh: make(chan struct{}), + } + var connOnce sync.Once + f.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true}) + if err != nil { + return + } + f.connMu.Lock() + f.conn = c + f.connMu.Unlock() + connOnce.Do(func() { close(f.connCh) }) + go func() { + for { + _, b, err := c.Read(context.Background()) + if err != nil { + return + } + f.msgCh <- b + } + }() + })) + return f +} + +func (f *fakeCDPServer) wsURL() string { + return "ws" + strings.TrimPrefix(f.srv.URL, "http") +} + +func (f *fakeCDPServer) sendToMonitor(t *testing.T, msg any) { + t.Helper() + f.connMu.Lock() + c := f.conn + f.connMu.Unlock() + require.NotNil(t, c, "no active connection") + require.NoError(t, wsjson.Write(context.Background(), c, msg)) +} + +func (f *fakeCDPServer) readFromMonitor(t *testing.T, timeout time.Duration) cdpMessage { + t.Helper() + select { + case b := <-f.msgCh: + var msg cdpMessage + require.NoError(t, json.Unmarshal(b, &msg)) + return msg + case <-time.After(timeout): + t.Fatal("timeout waiting for message from Monitor") + return cdpMessage{} + } +} + +func (f *fakeCDPServer) close() { + f.connMu.Lock() + if f.conn != nil { + _ = f.conn.Close(websocket.StatusNormalClosure, "done") + } + f.connMu.Unlock() + f.srv.Close() +} + +// fakeUpstream implements UpstreamProvider for tests. +type fakeUpstream struct { + mu sync.Mutex + current string + subs []chan string +} + +func newFakeUpstream(url string) *fakeUpstream { + return &fakeUpstream{current: url} +} + +func (f *fakeUpstream) Current() string { + f.mu.Lock() + defer f.mu.Unlock() + return f.current +} + +func (f *fakeUpstream) Subscribe() (<-chan string, func()) { + ch := make(chan string, 1) + f.mu.Lock() + f.subs = append(f.subs, ch) + f.mu.Unlock() + cancel := func() { + f.mu.Lock() + for i, s := range f.subs { + if s == ch { + f.subs = append(f.subs[:i], f.subs[i+1:]...) + break + } + } + f.mu.Unlock() + close(ch) + } + return ch, cancel +} + +func (f *fakeUpstream) notifyRestart(newURL string) { + f.mu.Lock() + f.current = newURL + subs := make([]chan string, len(f.subs)) + copy(subs, f.subs) + f.mu.Unlock() + for _, ch := range subs { + select { + case ch <- newURL: + default: + } + } +} + +// eventCollector captures published events with channel-based notification. +type eventCollector struct { + mu sync.Mutex + events []events.Event + notify chan struct{} // signaled on every publish +} + +func newEventCollector() *eventCollector { + return &eventCollector{notify: make(chan struct{}, 256)} +} + +func (c *eventCollector) publishFn() PublishFunc { + return func(ev events.Event) { + c.mu.Lock() + c.events = append(c.events, ev) + c.mu.Unlock() + select { + case c.notify <- struct{}{}: + default: + } + } +} + +// waitFor blocks until an event of the given type is published, or fails. +func (c *eventCollector) waitFor(t *testing.T, eventType string, timeout time.Duration) events.Event { + t.Helper() + deadline := time.After(timeout) + for { + c.mu.Lock() + for _, ev := range c.events { + if ev.Type == eventType { + c.mu.Unlock() + return ev + } + } + c.mu.Unlock() + select { + case <-c.notify: + case <-deadline: + t.Fatalf("timeout waiting for event type=%q", eventType) + return events.Event{} + } + } +} + +// waitForNew blocks until a NEW event of the given type is published after this +// call, ignoring any events already in the collector. +func (c *eventCollector) waitForNew(t *testing.T, eventType string, timeout time.Duration) events.Event { + t.Helper() + c.mu.Lock() + skip := len(c.events) + c.mu.Unlock() + + deadline := time.After(timeout) + for { + c.mu.Lock() + for i := skip; i < len(c.events); i++ { + if c.events[i].Type == eventType { + ev := c.events[i] + c.mu.Unlock() + return ev + } + } + c.mu.Unlock() + select { + case <-c.notify: + case <-deadline: + t.Fatalf("timeout waiting for new event type=%q", eventType) + return events.Event{} + } + } +} + +// assertNone verifies that no event of the given type arrives within d. +func (c *eventCollector) assertNone(t *testing.T, eventType string, d time.Duration) { + t.Helper() + deadline := time.After(d) + for { + select { + case <-c.notify: + c.mu.Lock() + for _, ev := range c.events { + if ev.Type == eventType { + c.mu.Unlock() + t.Fatalf("unexpected event %q published", eventType) + return + } + } + c.mu.Unlock() + case <-deadline: + return + } + } +} + + +// ResponderFunc is called for each CDP command the Monitor sends. +// Return nil to use the default empty result. +type ResponderFunc func(msg cdpMessage) any + +// listenAndRespond drains srv.msgCh, calls fn for each command, and sends the +// response. If fn is nil or returns nil, sends {"id": msg.ID, "result": {}}. +func listenAndRespond(srv *fakeCDPServer, stopCh <-chan struct{}, fn ResponderFunc) { + for { + select { + case b := <-srv.msgCh: + var msg cdpMessage + if json.Unmarshal(b, &msg) != nil || msg.ID == 0 { + continue + } + srv.connMu.Lock() + c := srv.conn + srv.connMu.Unlock() + if c == nil { + continue + } + var resp any + if fn != nil { + resp = fn(msg) + } + if resp == nil { + resp = map[string]any{"id": msg.ID, "result": map[string]any{}} + } + _ = wsjson.Write(context.Background(), c, resp) + case <-stopCh: + return + } + } +} + +// startMonitor creates a Monitor against srv, starts it, waits for the +// connection, and launches a responder goroutine. Returns cleanup func. +func startMonitor(t *testing.T, srv *fakeCDPServer, fn ResponderFunc) (*Monitor, *eventCollector, func()) { + t.Helper() + ec := newEventCollector() + upstream := newFakeUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99) + require.NoError(t, m.Start(context.Background())) + + stopResponder := make(chan struct{}) + go listenAndRespond(srv, stopResponder, fn) + + // Wait for the websocket connection to be established. + select { + case <-srv.connCh: + case <-time.After(3 * time.Second): + t.Fatal("fake server never received a connection") + } + // Wait for the init sequence (setAutoAttach + domain enables + script injection + // + getTargets) to complete. The responder goroutine handles all responses; + // we just need to wait for the burst to finish. + waitForInitDone(t, srv) + + cleanup := func() { + close(stopResponder) + m.Stop() + } + return m, ec, cleanup +} + +// waitForInitDone waits for the Monitor's init sequence to complete by +// detecting a 100ms gap in activity on the message channel. The responder +// goroutine handles responses; this just waits for the burst to end. +func waitForInitDone(t *testing.T, _ *fakeCDPServer) { + t.Helper() + // The init sequence sends ~8 commands. Wait until the responder has + // processed them all by checking for a quiet period. + deadline := time.After(5 * time.Second) + for { + select { + case <-time.After(100 * time.Millisecond): + return + case <-deadline: + t.Fatal("init sequence did not complete") + } + } +} + +// newComputedMonitor creates an unconnected Monitor for testing computed state +// (network_idle, layout_settled, navigation_settled) without a real websocket. +func newComputedMonitor(t *testing.T) (*Monitor, *eventCollector) { + t.Helper() + ec := newEventCollector() + upstream := newFakeUpstream("ws://127.0.0.1:0") + m := New(upstream, ec.publishFn(), 0) + return m, ec +} + +// navigateMonitor sends a Page.frameNavigated to reset computed state. +func navigateMonitor(m *Monitor, url string) { + p, _ := json.Marshal(map[string]any{ + "frame": map[string]any{"id": "f1", "url": url}, + }) + m.handleFrameNavigated(p, "s1") +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +func TestAutoAttach(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + ec := newEventCollector() + upstream := newFakeUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99) + require.NoError(t, m.Start(context.Background())) + defer m.Stop() + + // The first command should be Target.setAutoAttach with correct params. + msg := srv.readFromMonitor(t, 3*time.Second) + assert.Equal(t, "Target.setAutoAttach", msg.Method) + + var params struct { + AutoAttach bool `json:"autoAttach"` + WaitForDebuggerOnStart bool `json:"waitForDebuggerOnStart"` + Flatten bool `json:"flatten"` + } + require.NoError(t, json.Unmarshal(msg.Params, ¶ms)) + assert.True(t, params.AutoAttach) + assert.False(t, params.WaitForDebuggerOnStart) + assert.True(t, params.Flatten) + + // Respond and drain domain-enable commands. + stopResponder := make(chan struct{}) + go listenAndRespond(srv, stopResponder, nil) + defer close(stopResponder) + srv.sendToMonitor(t, map[string]any{"id": msg.ID, "result": map[string]any{}}) + + // Simulate Target.attachedToTarget — session should be stored. + srv.sendToMonitor(t, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "session-abc", + "targetInfo": map[string]any{"targetId": "target-xyz", "type": "page", "url": "https://example.com"}, + }, + }) + require.Eventually(t, func() bool { + m.sessionsMu.RLock() + defer m.sessionsMu.RUnlock() + _, ok := m.sessions["session-abc"] + return ok + }, 2*time.Second, 50*time.Millisecond, "session not stored") + + m.sessionsMu.RLock() + info := m.sessions["session-abc"] + m.sessionsMu.RUnlock() + assert.Equal(t, "target-xyz", info.targetID) + assert.Equal(t, "page", info.targetType) +} + +func TestLifecycle(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + ec := newEventCollector() + upstream := newFakeUpstream(srv.wsURL()) + m := New(upstream, ec.publishFn(), 99) + + assert.False(t, m.IsRunning(), "idle at boot") + + require.NoError(t, m.Start(context.Background())) + assert.True(t, m.IsRunning(), "running after Start") + srv.readFromMonitor(t, 2*time.Second) // drain setAutoAttach + + m.Stop() + assert.False(t, m.IsRunning(), "stopped after Stop") + + // Restart while stopped. + require.NoError(t, m.Start(context.Background())) + assert.True(t, m.IsRunning(), "running after second Start") + srv.readFromMonitor(t, 2*time.Second) + + // Restart while running — implicit Stop+Start. + require.NoError(t, m.Start(context.Background())) + assert.True(t, m.IsRunning(), "running after implicit restart") + + m.Stop() + assert.False(t, m.IsRunning(), "stopped at end") +} + +func TestReconnect(t *testing.T) { + srv1 := newFakeCDPServer(t) + + upstream := newFakeUpstream(srv1.wsURL()) + ec := newEventCollector() + m := New(upstream, ec.publishFn(), 99) + require.NoError(t, m.Start(context.Background())) + defer m.Stop() + + srv1.readFromMonitor(t, 2*time.Second) // drain setAutoAttach + + srv2 := newFakeCDPServer(t) + defer srv2.close() + defer srv1.close() + + upstream.notifyRestart(srv2.wsURL()) + + ec.waitFor(t, "monitor_disconnected", 3*time.Second) + + // Wait for the Monitor to reconnect to srv2. + srv2.readFromMonitor(t, 5*time.Second) + + ev := ec.waitFor(t, "monitor_reconnected", 3*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + _, ok := data["reconnect_duration_ms"] + assert.True(t, ok, "missing reconnect_duration_ms") +} + +func TestConsoleEvents(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("console_log", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{map[string]any{"type": "string", "value": "hello world"}}, + }, + }) + ev := ec.waitFor(t, "console_log", 2*time.Second) + assert.Equal(t, events.CategoryConsole, ev.Category) + assert.Equal(t, events.KindCDP, ev.Source.Kind) + assert.Equal(t, "Runtime.consoleAPICalled", ev.Source.Event) + assert.Equal(t, events.DetailStandard, ev.DetailLevel) + + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "log", data["level"]) + assert.Equal(t, "hello world", data["text"]) + }) + + t.Run("exception_thrown", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.exceptionThrown", + "params": map[string]any{ + "timestamp": 1234.5, + "exceptionDetails": map[string]any{ + "text": "Uncaught TypeError", + "lineNumber": 42, + "columnNumber": 7, + "url": "https://example.com/app.js", + }, + }, + }) + ev := ec.waitFor(t, "console_error", 2*time.Second) + assert.Equal(t, events.CategoryConsole, ev.Category) + assert.Equal(t, events.DetailStandard, ev.DetailLevel) + + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "Uncaught TypeError", data["text"]) + assert.Equal(t, float64(42), data["line"]) + }) + + t.Run("non_string_args", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{ + map[string]any{"type": "number", "value": 42}, + map[string]any{"type": "object", "value": map[string]any{"key": "val"}}, + map[string]any{"type": "undefined"}, + }, + }, + }) + ev := ec.waitForNew(t, "console_log", 2*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + args := data["args"].([]any) + assert.Equal(t, "42", args[0]) + assert.Contains(t, args[1], "key") + assert.Equal(t, "undefined", args[2]) + }) +} + +func TestNetworkEvents(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + // Custom responder: return a body for Network.getResponseBody. + responder := func(msg cdpMessage) any { + if msg.Method == "Network.getResponseBody" { + return map[string]any{ + "id": msg.ID, + "result": map[string]any{"body": `{"ok":true}`, "base64Encoded": false}, + } + } + return nil + } + _, ec, cleanup := startMonitor(t, srv, responder) + defer cleanup() + + t.Run("request_and_response", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "req-001", + "resourceType": "XHR", + "request": map[string]any{ + "method": "POST", + "url": "https://api.example.com/data", + "headers": map[string]any{"Content-Type": "application/json"}, + }, + "initiator": map[string]any{"type": "script"}, + }, + }) + ev := ec.waitFor(t, "network_request", 2*time.Second) + assert.Equal(t, events.CategoryNetwork, ev.Category) + assert.Equal(t, "Network.requestWillBeSent", ev.Source.Event) + + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "POST", data["method"]) + assert.Equal(t, "https://api.example.com/data", data["url"]) + + // Complete the request lifecycle. + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", + "params": map[string]any{ + "requestId": "req-001", + "response": map[string]any{ + "status": 200, "statusText": "OK", + "headers": map[string]any{"Content-Type": "application/json"}, "mimeType": "application/json", + }, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", + "params": map[string]any{"requestId": "req-001"}, + }) + + ev2 := ec.waitFor(t, "network_response", 3*time.Second) + assert.Equal(t, "Network.loadingFinished", ev2.Source.Event) + var data2 map[string]any + require.NoError(t, json.Unmarshal(ev2.Data, &data2)) + assert.Equal(t, float64(200), data2["status"]) + assert.NotEmpty(t, data2["body"]) + }) + + t.Run("loading_failed", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "req-002", + "request": map[string]any{"method": "GET", "url": "https://fail.example.com/"}, + }, + }) + ec.waitForNew(t, "network_request", 2*time.Second) + + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFailed", + "params": map[string]any{ + "requestId": "req-002", + "errorText": "net::ERR_CONNECTION_REFUSED", + "canceled": false, + }, + }) + ev := ec.waitFor(t, "network_loading_failed", 2*time.Second) + assert.Equal(t, events.CategoryNetwork, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "net::ERR_CONNECTION_REFUSED", data["error_text"]) + }) + + t.Run("binary_resource_skips_body", func(t *testing.T) { + var getBodyCalled atomic.Bool + srv.sendToMonitor(t, map[string]any{ + "method": "Network.requestWillBeSent", + "params": map[string]any{ + "requestId": "img-001", + "resourceType": "Image", + "request": map[string]any{"method": "GET", "url": "https://example.com/photo.png"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.responseReceived", + "params": map[string]any{ + "requestId": "img-001", + "response": map[string]any{"status": 200, "statusText": "OK", "headers": map[string]any{}, "mimeType": "image/png"}, + }, + }) + srv.sendToMonitor(t, map[string]any{ + "method": "Network.loadingFinished", + "params": map[string]any{"requestId": "img-001"}, + }) + + ev := ec.waitForNew(t, "network_response", 3*time.Second) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "", data["body"], "binary resource should have empty body") + assert.False(t, getBodyCalled.Load(), "should not call getResponseBody for images") + }) +} + +func TestPageEvents(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", + "params": map[string]any{ + "frame": map[string]any{"id": "frame-1", "url": "https://example.com/page"}, + }, + }) + ev := ec.waitFor(t, "navigation", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, "Page.frameNavigated", ev.Source.Event) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "https://example.com/page", data["url"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.domContentEventFired", + "params": map[string]any{"timestamp": 1000.0}, + }) + ev2 := ec.waitFor(t, "dom_content_loaded", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev2.Category) + assert.Equal(t, events.DetailMinimal, ev2.DetailLevel) + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.loadEventFired", + "params": map[string]any{"timestamp": 1001.0}, + }) + ev3 := ec.waitFor(t, "page_load", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev3.Category) + assert.Equal(t, events.DetailMinimal, ev3.DetailLevel) + + srv.sendToMonitor(t, map[string]any{ + "method": "DOM.documentUpdated", + "params": map[string]any{}, + }) + ev4 := ec.waitFor(t, "dom_updated", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev4.Category) + assert.Equal(t, events.DetailMinimal, ev4.DetailLevel) +} + +func TestTargetEvents(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + srv.sendToMonitor(t, map[string]any{ + "method": "Target.targetCreated", + "params": map[string]any{ + "targetInfo": map[string]any{"targetId": "t-1", "type": "page", "url": "https://new.example.com"}, + }, + }) + ev := ec.waitFor(t, "target_created", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + assert.Equal(t, events.DetailMinimal, ev.DetailLevel) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, "t-1", data["target_id"]) + + srv.sendToMonitor(t, map[string]any{ + "method": "Target.targetDestroyed", + "params": map[string]any{"targetId": "t-1"}, + }) + ev2 := ec.waitFor(t, "target_destroyed", 2*time.Second) + assert.Equal(t, events.CategoryPage, ev2.Category) + assert.Equal(t, events.DetailMinimal, ev2.DetailLevel) +} + +func TestBindingAndTimeline(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + t.Run("interaction_click", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"interaction_click","x":10,"y":20,"selector":"button","tag":"BUTTON","text":"OK"}`, + }, + }) + ev := ec.waitFor(t, "interaction_click", 2*time.Second) + assert.Equal(t, events.CategoryInteraction, ev.Category) + assert.Equal(t, "Runtime.bindingCalled", ev.Source.Event) + }) + + t.Run("scroll_settled", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "__kernelEvent", + "payload": `{"type":"scroll_settled","from_x":0,"from_y":0,"to_x":0,"to_y":500,"target_selector":"body"}`, + }, + }) + ev := ec.waitFor(t, "scroll_settled", 2*time.Second) + assert.Equal(t, events.CategoryInteraction, ev.Category) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.Equal(t, float64(500), data["to_y"]) + }) + + t.Run("layout_shift", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "PerformanceTimeline.timelineEventAdded", + "params": map[string]any{ + "event": map[string]any{"type": "layout-shift"}, + }, + }) + ev := ec.waitFor(t, "layout_shift", 2*time.Second) + assert.Equal(t, events.KindCDP, ev.Source.Kind) + assert.Equal(t, "PerformanceTimeline.timelineEventAdded", ev.Source.Event) + }) + + t.Run("unknown_binding_ignored", func(t *testing.T) { + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.bindingCalled", + "params": map[string]any{ + "name": "someOtherBinding", + "payload": `{"type":"interaction_click"}`, + }, + }) + ec.assertNone(t, "interaction_click", 100*time.Millisecond) + }) +} + +func TestScreenshot(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + m, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + var captureCount atomic.Int32 + minimalPNG := []byte{ + 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, + 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, + 0xde, 0x00, 0x00, 0x00, 0x0c, 0x49, 0x44, 0x41, + 0x54, 0x08, 0xd7, 0x63, 0xf8, 0xcf, 0xc0, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x01, 0xe2, 0x21, 0xbc, + 0x33, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, + 0x44, 0xae, 0x42, 0x60, 0x82, + } + m.screenshotFn = func(ctx context.Context, displayNum int) ([]byte, error) { + captureCount.Add(1) + return minimalPNG, nil + } + + t.Run("capture_and_publish", func(t *testing.T) { + m.maybeScreenshot(context.Background()) + require.Eventually(t, func() bool { return captureCount.Load() == 1 }, 2*time.Second, 20*time.Millisecond) + + ev := ec.waitFor(t, "screenshot", 2*time.Second) + assert.Equal(t, events.CategorySystem, ev.Category) + assert.Equal(t, events.KindLocalProcess, ev.Source.Kind) + var data map[string]any + require.NoError(t, json.Unmarshal(ev.Data, &data)) + assert.NotEmpty(t, data["png"]) + }) + + t.Run("rate_limited", func(t *testing.T) { + before := captureCount.Load() + m.maybeScreenshot(context.Background()) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, before, captureCount.Load(), "should be rate-limited within 2s") + }) + + t.Run("captures_after_cooldown", func(t *testing.T) { + m.lastScreenshotAt.Store(time.Now().Add(-3 * time.Second).UnixMilli()) + before := captureCount.Load() + m.maybeScreenshot(context.Background()) + require.Eventually(t, func() bool { return captureCount.Load() > before }, 2*time.Second, 20*time.Millisecond) + }) +} + +func TestAttachExistingTargets(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + responder := func(msg cdpMessage) any { + srv.connMu.Lock() + c := srv.conn + srv.connMu.Unlock() + switch msg.Method { + case "Target.getTargets": + return map[string]any{ + "id": msg.ID, + "result": map[string]any{ + "targetInfos": []any{ + map[string]any{"targetId": "existing-1", "type": "page", "url": "https://preexisting.example.com"}, + }, + }, + } + case "Target.attachToTarget": + if c != nil { + _ = wsjson.Write(context.Background(), c, map[string]any{ + "method": "Target.attachedToTarget", + "params": map[string]any{ + "sessionId": "session-existing-1", + "targetInfo": map[string]any{"targetId": "existing-1", "type": "page", "url": "https://preexisting.example.com"}, + }, + }) + } + return map[string]any{"id": msg.ID, "result": map[string]any{"sessionId": "session-existing-1"}} + } + return nil + } + + m, _, cleanup := startMonitor(t, srv, responder) + defer cleanup() + + require.Eventually(t, func() bool { + m.sessionsMu.RLock() + defer m.sessionsMu.RUnlock() + _, ok := m.sessions["session-existing-1"] + return ok + }, 3*time.Second, 50*time.Millisecond, "existing target not auto-attached") + + m.sessionsMu.RLock() + info := m.sessions["session-existing-1"] + m.sessionsMu.RUnlock() + assert.Equal(t, "existing-1", info.targetID) +} + +func TestURLPopulated(t *testing.T) { + srv := newFakeCDPServer(t) + defer srv.close() + + _, ec, cleanup := startMonitor(t, srv, nil) + defer cleanup() + + srv.sendToMonitor(t, map[string]any{ + "method": "Page.frameNavigated", + "params": map[string]any{ + "frame": map[string]any{"id": "f1", "url": "https://example.com/page"}, + }, + }) + ec.waitFor(t, "navigation", 2*time.Second) + + srv.sendToMonitor(t, map[string]any{ + "method": "Runtime.consoleAPICalled", + "params": map[string]any{ + "type": "log", + "args": []any{map[string]any{"type": "string", "value": "test"}}, + }, + }) + ev := ec.waitFor(t, "console_log", 2*time.Second) + assert.Equal(t, "https://example.com/page", ev.URL) +} + +// --------------------------------------------------------------------------- +// Computed meta-event tests — use direct handler calls, no websocket needed. +// --------------------------------------------------------------------------- + +// simulateRequest sends a Network.requestWillBeSent through the handler. +func simulateRequest(m *Monitor, id string) { + p, _ := json.Marshal(map[string]any{ + "requestId": id, "resourceType": "Document", + "request": map[string]any{"method": "GET", "url": "https://example.com/" + id}, + }) + m.handleNetworkRequest(p, "s1") +} + +// simulateFinished stores minimal state and sends Network.loadingFinished. +func simulateFinished(m *Monitor, id string) { + m.pendReqMu.Lock() + m.pendingRequests[id] = networkReqState{method: "GET", url: "https://example.com/" + id} + m.pendReqMu.Unlock() + p, _ := json.Marshal(map[string]any{"requestId": id}) + m.handleLoadingFinished(p, "s1") +} + +func TestNetworkIdle(t *testing.T) { + t.Run("debounce_500ms", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + simulateRequest(m, "r1") + simulateRequest(m, "r2") + simulateRequest(m, "r3") + + t0 := time.Now() + simulateFinished(m, "r1") + simulateFinished(m, "r2") + simulateFinished(m, "r3") + + ev := ec.waitFor(t, "network_idle", 2*time.Second) + assert.GreaterOrEqual(t, time.Since(t0).Milliseconds(), int64(400), "fired too early") + assert.Equal(t, events.CategoryNetwork, ev.Category) + }) + + t.Run("timer_reset_on_new_request", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + simulateRequest(m, "a1") + simulateFinished(m, "a1") + time.Sleep(200 * time.Millisecond) + + simulateRequest(m, "a2") + t1 := time.Now() + simulateFinished(m, "a2") + + ec.waitFor(t, "network_idle", 2*time.Second) + assert.GreaterOrEqual(t, time.Since(t1).Milliseconds(), int64(400), "should reset timer on new request") + }) +} + +func TestLayoutSettled(t *testing.T) { + t.Run("debounce_1s_after_page_load", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + t0 := time.Now() + m.handleLoadEventFired(json.RawMessage(`{}`), "s1") + + ev := ec.waitFor(t, "layout_settled", 3*time.Second) + assert.GreaterOrEqual(t, time.Since(t0).Milliseconds(), int64(900), "fired too early") + assert.Equal(t, events.CategoryPage, ev.Category) + }) + + t.Run("layout_shift_resets_timer", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + m.handleLoadEventFired(json.RawMessage(`{}`), "s1") + + time.Sleep(600 * time.Millisecond) + shiftParams, _ := json.Marshal(map[string]any{ + "event": map[string]any{"type": "layout-shift"}, + }) + m.handleTimelineEvent(shiftParams, "s1") + t1 := time.Now() + + ec.waitFor(t, "layout_settled", 3*time.Second) + assert.GreaterOrEqual(t, time.Since(t1).Milliseconds(), int64(900), "should reset after layout_shift") + }) +} + +func TestNavigationSettled(t *testing.T) { + t.Run("fires_when_all_three_flags_set", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + m.handleDOMContentLoaded(json.RawMessage(`{}`), "s1") + + // Trigger network_idle. + simulateRequest(m, "r1") + simulateFinished(m, "r1") + + // Trigger layout_settled via page_load. + m.handleLoadEventFired(json.RawMessage(`{}`), "s1") + + ev := ec.waitFor(t, "navigation_settled", 3*time.Second) + assert.Equal(t, events.CategoryPage, ev.Category) + }) + + t.Run("interrupted_by_new_navigation", func(t *testing.T) { + m, ec := newComputedMonitor(t) + navigateMonitor(m, "https://example.com") + + m.handleDOMContentLoaded(json.RawMessage(`{}`), "s1") + + simulateRequest(m, "r2") + simulateFinished(m, "r2") + + // Interrupt before layout_settled fires. + navigateMonitor(m, "https://example.com/page2") + + ec.assertNone(t, "navigation_settled", 1500*time.Millisecond) + }) +} diff --git a/server/lib/cdpmonitor/screenshot.go b/server/lib/cdpmonitor/screenshot.go new file mode 100644 index 00000000..abb559d2 --- /dev/null +++ b/server/lib/cdpmonitor/screenshot.go @@ -0,0 +1,87 @@ +package cdpmonitor + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "os/exec" + "time" + + "github.com/onkernel/kernel-images/server/lib/events" +) + +// maybeScreenshot triggers a screenshot if the rate-limit window has elapsed. +// It uses an atomic CAS on lastScreenshotAt to ensure only one screenshot runs +// at a time. +func (m *Monitor) maybeScreenshot(ctx context.Context) { + now := time.Now().UnixMilli() + last := m.lastScreenshotAt.Load() + if now-last < 2000 { + return + } + if !m.lastScreenshotAt.CompareAndSwap(last, now) { + return + } + go m.captureScreenshot(ctx) +} + +// captureScreenshot takes a screenshot via ffmpeg x11grab (or the screenshotFn +// seam in tests), optionally downscales it, and publishes a screenshot event. +func (m *Monitor) captureScreenshot(ctx context.Context) { + var pngBytes []byte + var err error + + if m.screenshotFn != nil { + pngBytes, err = m.screenshotFn(ctx, m.displayNum) + } else { + pngBytes, err = captureViaFFmpeg(ctx, m.displayNum, 1) + } + if err != nil { + return + } + + // Downscale if base64 output would exceed 950KB (~729KB raw). + const rawThreshold = 729 * 1024 + for scale := 2; len(pngBytes) > rawThreshold && scale <= 16 && m.screenshotFn == nil; scale *= 2 { + pngBytes, err = captureViaFFmpeg(ctx, m.displayNum, scale) + if err != nil { + return + } + } + + encoded := base64.StdEncoding.EncodeToString(pngBytes) + data, _ := json.Marshal(map[string]string{"png": encoded}) + + m.publish(events.Event{ + Ts: time.Now().UnixMilli(), + Type: "screenshot", + Category: events.CategorySystem, + Source: events.Source{Kind: events.KindLocalProcess}, + DetailLevel: events.DetailStandard, + Data: data, + }) +} + +// captureViaFFmpeg runs ffmpeg x11grab to capture a PNG screenshot. +// If divisor > 1, a scale filter is applied to reduce the output size. +func captureViaFFmpeg(ctx context.Context, displayNum, divisor int) ([]byte, error) { + args := []string{ + "-f", "x11grab", + "-i", fmt.Sprintf(":%d", displayNum), + "-vframes", "1", + } + if divisor > 1 { + args = append(args, "-vf", fmt.Sprintf("scale=iw/%d:ih/%d", divisor, divisor)) + } + args = append(args, "-f", "image2", "pipe:1") + + var out bytes.Buffer + cmd := exec.CommandContext(ctx, "ffmpeg", args...) + cmd.Stdout = &out + if err := cmd.Run(); err != nil { + return nil, err + } + return out.Bytes(), nil +} diff --git a/server/lib/cdpmonitor/types.go b/server/lib/cdpmonitor/types.go new file mode 100644 index 00000000..9beab2bf --- /dev/null +++ b/server/lib/cdpmonitor/types.go @@ -0,0 +1,114 @@ +package cdpmonitor + +import ( + "encoding/json" + "fmt" +) + +// targetInfo holds metadata about an attached CDP target/session. +type targetInfo struct { + targetID string + url string + targetType string +} + +// cdpError is the JSON-RPC error object returned by Chrome. +type cdpError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (e *cdpError) Error() string { + return fmt.Sprintf("CDP error %d: %s", e.Code, e.Message) +} + +// cdpMessage is the JSON-RPC message envelope used by Chrome's DevTools Protocol. +type cdpMessage struct { + ID int64 `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + SessionID string `json:"sessionId,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *cdpError `json:"error,omitempty"` +} + +// networkReqState holds request + response metadata until loadingFinished. +type networkReqState struct { + sessionID string + method string + url string + headers json.RawMessage + postData string + resourceType string + status int + statusText string + resHeaders json.RawMessage + mimeType string +} + +// cdpConsoleArg is a single Runtime.consoleAPICalled argument. +// Value is json.RawMessage because CDP sends strings, numbers, objects, etc. +type cdpConsoleArg struct { + Type string `json:"type"` + Value json.RawMessage `json:"value,omitempty"` +} + +// cdpConsoleParams is the shape of Runtime.consoleAPICalled params. +type cdpConsoleParams struct { + Type string `json:"type"` + Args []cdpConsoleArg `json:"args"` + StackTrace json.RawMessage `json:"stackTrace"` +} + +// cdpExceptionDetails is the shape of Runtime.exceptionThrown params. +type cdpExceptionDetails struct { + ExceptionDetails struct { + Text string `json:"text"` + LineNumber int `json:"lineNumber"` + ColumnNumber int `json:"columnNumber"` + URL string `json:"url"` + StackTrace json.RawMessage `json:"stackTrace"` + } `json:"exceptionDetails"` +} + +// cdpTargetInfo is the shared TargetInfo shape used by Target events. +type cdpTargetInfo struct { + TargetID string `json:"targetId"` + Type string `json:"type"` + URL string `json:"url"` +} + +// cdpNetworkRequestParams is the shape of Network.requestWillBeSent params. +type cdpNetworkRequestParams struct { + RequestID string `json:"requestId"` + ResourceType string `json:"resourceType"` + Request struct { + Method string `json:"method"` + URL string `json:"url"` + Headers json.RawMessage `json:"headers"` + PostData string `json:"postData"` + } `json:"request"` + Initiator json.RawMessage `json:"initiator"` +} + +// cdpResponseReceivedParams is the shape of Network.responseReceived params. +type cdpResponseReceivedParams struct { + RequestID string `json:"requestId"` + Response struct { + Status int `json:"status"` + StatusText string `json:"statusText"` + Headers json.RawMessage `json:"headers"` + MimeType string `json:"mimeType"` + } `json:"response"` +} + +// cdpAttachedToTargetParams is the shape of Target.attachedToTarget params. +type cdpAttachedToTargetParams struct { + SessionID string `json:"sessionId"` + TargetInfo cdpTargetInfo `json:"targetInfo"` +} + +// cdpTargetCreatedParams is the shape of Target.targetCreated params. +type cdpTargetCreatedParams struct { + TargetInfo cdpTargetInfo `json:"targetInfo"` +} diff --git a/server/lib/cdpmonitor/util.go b/server/lib/cdpmonitor/util.go new file mode 100644 index 00000000..5dae2fce --- /dev/null +++ b/server/lib/cdpmonitor/util.go @@ -0,0 +1,111 @@ +package cdpmonitor + +import ( + "encoding/json" + "slices" + "strings" + "unicode/utf8" +) + +// consoleArgString extracts a display string from a CDP console argument. +// For strings it unquotes the JSON value; for other types it returns the raw JSON. +func consoleArgString(a cdpConsoleArg) string { + if len(a.Value) == 0 { + return a.Type // e.g. "undefined", "null" + } + if a.Type == "string" { + var s string + if json.Unmarshal(a.Value, &s) == nil { + return s + } + } + return string(a.Value) +} + +// isTextualResource reports whether the resource warrants body capture. +// resourceType is checked first; mimeType is a fallback for resources with no type (e.g. in-flight at attach time). +func isTextualResource(resourceType, mimeType string) bool { + switch resourceType { + case "Font", "Image", "Media": + return false + } + return isCapturedMIME(mimeType) +} + +// isCapturedMIME returns true for MIME types whose bodies are worth capturing. +// Binary formats (vendor types, binary encodings, raw streams) are excluded. +func isCapturedMIME(mime string) bool { + if mime == "" { + return false // unknown + } + for _, prefix := range []string{"image/", "font/", "audio/", "video/"} { + if strings.HasPrefix(mime, prefix) { + return false + } + } + if slices.Contains([]string{ + "application/octet-stream", + "application/wasm", + "application/pdf", + "application/zip", + "application/gzip", + "application/x-protobuf", + "application/x-msgpack", + "application/x-thrift", + }, mime) { + return false + } + // Skip vendor binary formats; allow vnd types with text-based suffixes (+json, +xml, +csv). + if sub, ok := strings.CutPrefix(mime, "application/vnd."); ok { + for _, textSuffix := range []string{"+json", "+xml", "+csv"} { + if strings.HasSuffix(sub, textSuffix) { + return true + } + } + return false + } + return true +} + +// bodyCapFor returns the max body capture size for a MIME type. +// Structured data (JSON, XML, form data) gets 900 KB; everything else gets 10 KB. +func bodyCapFor(mime string) int { + const fullCap = 900 * 1024 + const contextCap = 10 * 1024 + structuredPrefixes := []string{ + "application/json", + "application/xml", + "application/x-www-form-urlencoded", + "application/graphql", + "text/xml", + "text/csv", + } + for _, p := range structuredPrefixes { + if strings.HasPrefix(mime, p) { + return fullCap + } + } + // vnd types with +json/+xml suffix are treated as structured. + for _, suffix := range []string{"+json", "+xml"} { + if strings.HasSuffix(mime, suffix) { + return fullCap + } + } + return contextCap +} + +// truncateBody caps body at the given limit on a valid UTF-8 boundary. +func truncateBody(body string, maxBody int) string { + if len(body) <= maxBody { + return body + } + if maxBody <= utf8.UTFMax { + return body[:maxBody] + } + // Walk back at most UTFMax bytes to find a clean rune boundary. + i := maxBody + for i > maxBody-utf8.UTFMax && !utf8.RuneStart(body[i]) { + i-- + } + return body[:i] +} diff --git a/server/lib/events/event.go b/server/lib/events/event.go new file mode 100644 index 00000000..185d6809 --- /dev/null +++ b/server/lib/events/event.go @@ -0,0 +1,112 @@ +package events + +import ( + "encoding/json" + "log/slog" + "strings" +) + +// maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB). +const maxS2RecordBytes = 1_000_000 + +// EventCategory determines type of logging +type EventCategory string + +const ( + CategoryConsole EventCategory = "console" + CategoryNetwork EventCategory = "network" + CategoryPage EventCategory = "page" + CategoryInteraction EventCategory = "interaction" + CategoryLiveview EventCategory = "liveview" + CategoryCaptcha EventCategory = "captcha" + CategorySystem EventCategory = "system" +) + +type SourceKind string + +const ( + KindCDP SourceKind = "cdp" + KindKernelAPI SourceKind = "kernel_api" + KindExtension SourceKind = "extension" + KindLocalProcess SourceKind = "local_process" +) + +// Source captures provenance: which producer emitted the event and any +// producer-specific context (e.g. CDP target/session/frame IDs). +type Source struct { + Kind SourceKind `json:"kind"` + Event string `json:"event,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +type DetailLevel string + +const ( + DetailMinimal DetailLevel = "minimal" + DetailStandard DetailLevel = "standard" + DetailVerbose DetailLevel = "verbose" + DetailRaw DetailLevel = "raw" +) + +// Event is the portable event schema. It contains only producer-emitted content; +// pipeline metadata (seq, capture session) lives on the Envelope. +type Event struct { + Ts int64 `json:"ts"` + Type string `json:"type"` + Category EventCategory `json:"category"` + Source Source `json:"source"` + DetailLevel DetailLevel `json:"detail_level"` + URL string `json:"url,omitempty"` + Data json.RawMessage `json:"data,omitempty"` + Truncated bool `json:"truncated,omitempty"` +} + +// Envelope wraps an Event with pipeline-assigned metadata. +type Envelope struct { + CaptureSessionID string `json:"capture_session_id"` + Seq uint64 `json:"seq"` + Event Event `json:"event"` +} + +// CategoryFor derives an EventCategory from an event type string. +// It splits on the first underscore and maps the prefix to a category. +func CategoryFor(eventType string) EventCategory { + prefix, _, _ := strings.Cut(eventType, "_") + switch prefix { + case "console": + return CategoryConsole + case "network": + return CategoryNetwork + case "page", "navigation", "dom", "target": + return CategoryPage + case "interaction", "layout", "scroll": + return CategoryInteraction + case "screenshot", "monitor": + return CategorySystem + default: + return CategorySystem + } +} + +// truncateIfNeeded marshals env and returns the (possibly truncated) envelope. +// If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge +// url or source.metadata), it is returned as-is — callers must handle nil data. +func truncateIfNeeded(env Envelope) (Envelope, []byte) { + data, err := json.Marshal(env) + if err != nil { + return env, nil + } + if len(data) <= maxS2RecordBytes { + return env, data + } + env.Event.Data = json.RawMessage("null") + env.Event.Truncated = true + data, err = json.Marshal(env) + if err != nil { + return env, nil + } + if len(data) > maxS2RecordBytes { + slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "seq", env.Seq, "size", len(data)) + } + return env, data +} diff --git a/server/lib/events/events_test.go b/server/lib/events/events_test.go new file mode 100644 index 00000000..b9da4634 --- /dev/null +++ b/server/lib/events/events_test.go @@ -0,0 +1,613 @@ +package events + +import ( + "bytes" + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// readEnvelope is a test helper that calls Read and asserts a non-drop result. +func readEnvelope(t *testing.T, r *Reader, ctx context.Context) Envelope { + t.Helper() + res, err := r.Read(ctx) + require.NoError(t, err) + require.NotNil(t, res.Envelope, "expected envelope, got drop") + return *res.Envelope +} + +func TestEventSerialization(t *testing.T) { + ev := Event{ + Ts: 1234567890000, + Type: "console.log", + Category: CategoryConsole, + Source: Source{ + Kind: KindCDP, + Event: "Runtime.consoleAPICalled", + Metadata: map[string]string{ + "target_id": "target-1", + "cdp_session_id": "cdp-session-1", + "frame_id": "frame-1", + "parent_frame_id": "parent-frame-1", + }, + }, + DetailLevel: DetailStandard, + URL: "https://example.com", + Data: json.RawMessage(`{"message":"hello"}`), + } + + b, err := json.Marshal(ev) + require.NoError(t, err) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(b, &decoded)) + + assert.Equal(t, "console.log", decoded["type"]) + assert.Equal(t, "console", decoded["category"]) + assert.Equal(t, "standard", decoded["detail_level"]) + assert.Equal(t, "https://example.com", decoded["url"]) + + src, ok := decoded["source"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "cdp", src["kind"]) + assert.Equal(t, "Runtime.consoleAPICalled", src["event"]) + meta, ok := src["metadata"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "target-1", meta["target_id"]) + assert.Equal(t, "cdp-session-1", meta["cdp_session_id"]) +} + +func TestEnvelopeSerialization(t *testing.T) { + env := Envelope{ + CaptureSessionID: "test-session-id", + Seq: 1, + Event: Event{ + Ts: 1000, + Type: "console.log", + Category: CategoryConsole, + Source: Source{Kind: KindCDP}, + }, + } + + b, err := json.Marshal(env) + require.NoError(t, err) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(b, &decoded)) + + assert.Equal(t, "test-session-id", decoded["capture_session_id"]) + assert.Equal(t, float64(1), decoded["seq"]) + inner, ok := decoded["event"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "console.log", inner["type"]) +} + +func TestEventData(t *testing.T) { + rawData := json.RawMessage(`{"key":"value","num":42}`) + ev := Event{ + Ts: 1000, + Type: "page.navigation", + Category: CategoryPage, + Source: Source{Kind: KindCDP}, + Data: rawData, + } + + b, err := json.Marshal(ev) + require.NoError(t, err) + + s := string(b) + assert.Contains(t, s, `"data":{"key":"value","num":42}`) + assert.NotContains(t, s, `"data":"{`) +} + +func TestEventOmitEmpty(t *testing.T) { + ev := Event{ + Ts: 1000, + Type: "console.log", + Category: CategoryConsole, + Source: Source{Kind: KindCDP}, + } + + b, err := json.Marshal(ev) + require.NoError(t, err) + + s := string(b) + assert.NotContains(t, s, `"event"`) + assert.Contains(t, s, `"detail_level"`) +} + +func mkEnv(seq uint64, ev Event) Envelope { + return Envelope{Seq: seq, Event: ev} +} + +func cdpEvent(typ string, cat EventCategory) Event { + return Event{Type: typ, Category: cat, Source: Source{Kind: KindCDP}} +} + +// TestRingBuffer: publish 3 envelopes; reader reads all 3 in order +func TestRingBuffer(t *testing.T) { + rb := NewRingBuffer(10) + reader := rb.NewReader(0) + + envelopes := []Envelope{ + mkEnv(1, cdpEvent("console.log", CategoryConsole)), + mkEnv(2, cdpEvent("network.request", CategoryNetwork)), + mkEnv(3, cdpEvent("page.navigation", CategoryPage)), + } + + for _, env := range envelopes { + rb.Publish(env) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for i, expected := range envelopes { + got := readEnvelope(t, reader, ctx) + assert.Equal(t, expected.Event.Type, got.Event.Type, "event %d", i) + assert.Equal(t, expected.Event.Category, got.Event.Category, "event %d", i) + } +} + +// TestRingBufferOverflowNoBlock: writer never blocks even with no readers +func TestRingBufferOverflowNoBlock(t *testing.T) { + rb := NewRingBuffer(2) + + done := make(chan struct{}) + go func() { + rb.Publish(mkEnv(1, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(2, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(3, cdpEvent("console.log", CategoryConsole))) + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Millisecond): + t.Fatal("Publish blocked with no readers") + } + + reader := rb.NewReader(0) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + res, err := reader.Read(ctx) + require.NoError(t, err) + assert.Nil(t, res.Envelope, "expected drop, not envelope") + assert.True(t, res.Dropped > 0) +} + +func TestRingBufferOverflowExistingReader(t *testing.T) { + rb := NewRingBuffer(2) + reader := rb.NewReader(0) + + rb.Publish(mkEnv(1, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(2, cdpEvent("console.log", CategoryConsole))) + rb.Publish(mkEnv(3, cdpEvent("console.log", CategoryConsole))) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // First read should be a drop notification + res, err := reader.Read(ctx) + require.NoError(t, err) + assert.Nil(t, res.Envelope) + assert.Equal(t, uint64(1), res.Dropped) + + // After the drop the reader continues with the surviving envelopes + second := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(2), second.Seq) + + third := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(3), third.Seq) +} + +func TestNewReaderResume(t *testing.T) { + rb := NewRingBuffer(10) + for i := uint64(1); i <= 5; i++ { + rb.Publish(mkEnv(i, cdpEvent("console.log", CategoryConsole))) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + t.Run("resume_mid_stream", func(t *testing.T) { + reader := rb.NewReader(3) + env := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(4), env.Seq) + }) + + t.Run("resume_at_latest", func(t *testing.T) { + reader := rb.NewReader(5) + // Nothing to read — should block until ctx cancels + shortCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + _, err := reader.Read(shortCtx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("resume_before_oldest_triggers_drop", func(t *testing.T) { + small := NewRingBuffer(3) + for i := uint64(1); i <= 5; i++ { + small.Publish(mkEnv(i, cdpEvent("console.log", CategoryConsole))) + } + // oldest in ring is seq 3, requesting resume after seq 1 + reader := small.NewReader(1) + res, err := reader.Read(ctx) + require.NoError(t, err) + assert.Nil(t, res.Envelope) + assert.Equal(t, uint64(1), res.Dropped) + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, uint64(3), env.Seq) + }) +} + +func TestConcurrentPublishRead(t *testing.T) { + const numEvents = 20 + rb := NewRingBuffer(32) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + reader := rb.NewReader(0) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numEvents; i++ { + _, err := reader.Read(ctx) + if !assert.NoError(t, err) { + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i <= numEvents; i++ { + rb.Publish(mkEnv(uint64(i), cdpEvent("console.log", CategoryConsole))) + } + }() + + wg.Wait() +} + +func TestConcurrentReaders(t *testing.T) { + rb := NewRingBuffer(20) + + numReaders := 3 + numEvents := 5 + + readers := make([]*Reader, numReaders) + for i := range readers { + readers[i] = rb.NewReader(0) + } + + for i := 0; i < numEvents; i++ { + rb.Publish(mkEnv(uint64(i+1), cdpEvent("console.log", CategoryConsole))) + } + + var wg sync.WaitGroup + results := make([][]Envelope, numReaders) + + for i, r := range readers { + wg.Add(1) + go func(idx int, reader *Reader) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var envs []Envelope + for j := 0; j < numEvents; j++ { + env := readEnvelope(t, reader, ctx) + envs = append(envs, env) + } + results[idx] = envs + }(i, r) + } + + wg.Wait() + + for i, envs := range results { + assert.Len(t, envs, numEvents, "reader %d", i) + for j, env := range envs { + assert.Equal(t, uint64(j+1), env.Seq, "reader %d event %d", i, j) + } + } +} + +// TestFileWriter: per-category JSONL appender tests. +func TestFileWriter(t *testing.T) { + t.Run("category_routing", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + envsToFile := []struct { + env Envelope + file string + category string + }{ + {Envelope{Seq: 1, Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}}, "console.log", "console"}, + {Envelope{Seq: 2, Event: Event{Type: "network.request", Category: CategoryNetwork, Source: Source{Kind: KindCDP}, Ts: 1}}, "network.log", "network"}, + {Envelope{Seq: 3, Event: Event{Type: "liveview.click", Category: CategoryLiveview, Source: Source{Kind: KindKernelAPI}, Ts: 1}}, "liveview.log", "liveview"}, + {Envelope{Seq: 4, Event: Event{Type: "captcha.solve", Category: CategoryCaptcha, Source: Source{Kind: KindExtension}, Ts: 1}}, "captcha.log", "captcha"}, + {Envelope{Seq: 5, Event: Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}}, "page.log", "page"}, + {Envelope{Seq: 6, Event: Event{Type: "input.click", Category: CategoryInteraction, Source: Source{Kind: KindCDP}, Ts: 1}}, "interaction.log", "interaction"}, + {Envelope{Seq: 7, Event: Event{Type: "monitor.connected", Category: CategorySystem, Source: Source{Kind: KindKernelAPI}, Ts: 1}}, "system.log", "system"}, + } + + for _, e := range envsToFile { + data, err := json.Marshal(e.env) + require.NoError(t, err) + require.NoError(t, fw.Write(e.env, data)) + } + + for _, e := range envsToFile { + data, err := os.ReadFile(filepath.Join(dir, e.file)) + require.NoError(t, err, "missing file %s for type %s", e.file, e.env.Event.Type) + + line := bytes.TrimRight(data, "\n") + require.True(t, json.Valid(line), "invalid JSON in %s", e.file) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(line, &decoded)) + inner, ok := decoded["event"].(map[string]any) + require.True(t, ok) + assert.Equal(t, e.category, inner["category"], "wrong category in %s", e.file) + srcMap, ok := inner["source"].(map[string]any) + require.True(t, ok, "source should be an object in %s", e.file) + assert.Equal(t, string(e.env.Event.Source.Kind), srcMap["kind"], "wrong source kind in %s", e.file) + } + }) + + t.Run("empty_category_rejected", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + env := Envelope{Seq: 1, Event: Event{Type: "mystery", Category: "", Source: Source{Kind: KindCDP}, Ts: 1}} + data, _ := json.Marshal(env) + err := fw.Write(env, data) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty category") + }) + + t.Run("concurrent_writes", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + const goroutines = 10 + const eventsPerGoroutine = 100 + + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < eventsPerGoroutine; j++ { + env := Envelope{ + Seq: uint64(i*eventsPerGoroutine + j), + Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}, + } + envData, err := json.Marshal(env) + require.NoError(t, err) + require.NoError(t, fw.Write(env, envData)) + } + }(i) + } + wg.Wait() + + data, err := os.ReadFile(filepath.Join(dir, "console.log")) + require.NoError(t, err) + + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + assert.Len(t, lines, goroutines*eventsPerGoroutine) + for _, line := range lines { + assert.True(t, json.Valid([]byte(line)), "invalid JSON line: %s", line) + } + }) + + t.Run("lazy_open", func(t *testing.T) { + dir := t.TempDir() + fw := NewFileWriter(dir) + defer fw.Close() + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + assert.Empty(t, entries, "files opened before first Write") + + env := Envelope{Seq: 1, Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}} + envData, err := json.Marshal(env) + require.NoError(t, err) + require.NoError(t, fw.Write(env, envData)) + + entries, err = os.ReadDir(dir) + require.NoError(t, err) + assert.Len(t, entries, 1, "expected exactly one file after first Write") + assert.Equal(t, "console.log", entries[0].Name()) + }) +} + +func TestPipeline(t *testing.T) { + newPipeline := func(t *testing.T) (*Pipeline, string) { + t.Helper() + dir := t.TempDir() + rb := NewRingBuffer(100) + fw := NewFileWriter(dir) + p := NewPipeline(rb, fw) + t.Cleanup(func() { p.Close() }) + return p, dir + } + + t.Run("concurrent_publish_seq_order", func(t *testing.T) { + const goroutines = 8 + const eventsEach = 50 + const total = goroutines * eventsEach + + rb := NewRingBuffer(total) + fw := NewFileWriter(t.TempDir()) + p := NewPipeline(rb, fw) + t.Cleanup(func() { p.Close() }) + reader := p.NewReader(0) + + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < eventsEach; j++ { + p.Publish(cdpEvent("console.log", CategoryConsole)) + } + }() + } + wg.Wait() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for want := uint64(1); want <= total; want++ { + env := readEnvelope(t, reader, ctx) + assert.Equal(t, want, env.Seq, "events must arrive in seq order") + } + }) + + t.Run("publish_increments_seq", func(t *testing.T) { + p, _ := newPipeline(t) + reader := p.NewReader(0) + + for i := 0; i < 3; i++ { + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for want := uint64(1); want <= 3; want++ { + env := readEnvelope(t, reader, ctx) + assert.Equal(t, want, env.Seq, "expected seq %d got %d", want, env.Seq) + } + }) + + t.Run("publish_sets_ts", func(t *testing.T) { + p, _ := newPipeline(t) + reader := p.NewReader(0) + + before := time.Now().UnixMilli() + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}}) + after := time.Now().UnixMilli() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.GreaterOrEqual(t, env.Event.Ts, before) + assert.LessOrEqual(t, env.Event.Ts, after) + }) + + t.Run("publish_writes_file", func(t *testing.T) { + p, dir := newPipeline(t) + + p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) + + data, err := os.ReadFile(filepath.Join(dir, "console.log")) + require.NoError(t, err) + + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + require.Len(t, lines, 1) + assert.True(t, json.Valid([]byte(lines[0]))) + assert.Contains(t, lines[0], `"console.log"`) + }) + + t.Run("publish_writes_ring", func(t *testing.T) { + p, _ := newPipeline(t) + + reader := p.NewReader(0) + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, "page.navigation", env.Event.Type) + assert.Equal(t, CategoryPage, env.Event.Category) + }) + + t.Run("start_sets_capture_session_id", func(t *testing.T) { + p, _ := newPipeline(t) + p.Start("test-uuid") + + reader := p.NewReader(0) + p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, "test-uuid", env.CaptureSessionID) + }) + + t.Run("truncation_applied", func(t *testing.T) { + p, dir := newPipeline(t) + reader := p.NewReader(0) + + largeData := strings.Repeat("x", 1_100_000) + rawData, err := json.Marshal(map[string]string{"payload": largeData}) + require.NoError(t, err) + + p.Publish(Event{ + Type: "page.navigation", + Category: CategoryPage, + Source: Source{Kind: KindCDP}, + Ts: 1, + Data: json.RawMessage(rawData), + }) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.True(t, env.Event.Truncated) + assert.True(t, json.Valid(env.Event.Data)) + + marshaled, err := json.Marshal(env) + require.NoError(t, err) + assert.LessOrEqual(t, len(marshaled), maxS2RecordBytes) + + data, err := os.ReadFile(filepath.Join(dir, "page.log")) + require.NoError(t, err) + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + require.Len(t, lines, 1) + assert.Contains(t, lines[0], `"truncated":true`) + }) + + t.Run("defaults_detail_level", func(t *testing.T) { + p, _ := newPipeline(t) + reader := p.NewReader(0) + + p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + env := readEnvelope(t, reader, ctx) + assert.Equal(t, DetailStandard, env.Event.DetailLevel) + + p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1, DetailLevel: DetailVerbose}) + env2 := readEnvelope(t, reader, ctx) + assert.Equal(t, DetailVerbose, env2.Event.DetailLevel) + }) +} diff --git a/server/lib/events/filewriter.go b/server/lib/events/filewriter.go new file mode 100644 index 00000000..6ce5ff5f --- /dev/null +++ b/server/lib/events/filewriter.go @@ -0,0 +1,64 @@ +package events + +import ( + "fmt" + "os" + "path/filepath" + "sync" +) + +// FileWriter is a per-category JSONL appender. It opens each log file lazily on +// first write (O_APPEND|O_CREATE|O_WRONLY) and serialises all concurrent writes +// with a single mutex +type FileWriter struct { + mu sync.Mutex + files map[EventCategory]*os.File + dir string +} + +// NewFileWriter returns a FileWriter that writes to dir +func NewFileWriter(dir string) *FileWriter { + return &FileWriter{dir: dir, files: make(map[EventCategory]*os.File)} +} + +// Write appends data as a single JSONL line to the per-category log file. +func (fw *FileWriter) Write(env Envelope, data []byte) error { + cat := env.Event.Category + if cat == "" { + return fmt.Errorf("filewriter: event %q has empty category", env.Event.Type) + } + + fw.mu.Lock() + defer fw.mu.Unlock() + + f, ok := fw.files[cat] + if !ok { + path := filepath.Join(fw.dir, string(cat)+".log") + var err error + f, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("filewriter: open %s: %w", path, err) + } + fw.files[cat] = f + } + + if _, err := f.Write(append(data, '\n')); err != nil { + return fmt.Errorf("filewriter: write: %w", err) + } + + return nil +} + +// Close closes all open log file descriptors +func (fw *FileWriter) Close() error { + fw.mu.Lock() + defer fw.mu.Unlock() + + var firstErr error + for _, f := range fw.files { + if err := f.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go new file mode 100644 index 00000000..e69c254f --- /dev/null +++ b/server/lib/events/pipeline.go @@ -0,0 +1,70 @@ +package events + +import ( + "log/slog" + "sync" + "sync/atomic" + "time" +) + +// Pipeline is a single-use write path that wraps events in envelopes and fans +// them out to a FileWriter (durable) and RingBuffer (in-memory). Call Start +// once with a capture session ID, then Publish concurrently. Close flushes the +// FileWriter; there is no restart or terminal event. +type Pipeline struct { + mu sync.Mutex + ring *RingBuffer + files *FileWriter + seq atomic.Uint64 + captureSessionID atomic.Pointer[string] +} + +func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { + p := &Pipeline{ring: ring, files: files} + empty := "" + p.captureSessionID.Store(&empty) + return p +} + +// Start sets the capture session ID stamped on every subsequent envelope. +func (p *Pipeline) Start(captureSessionID string) { + p.captureSessionID.Store(&captureSessionID) +} + +// Publish wraps ev in an Envelope, truncates if needed, then writes to +// FileWriter (durable) before RingBuffer (in-memory fan-out). +func (p *Pipeline) Publish(ev Event) { + p.mu.Lock() + defer p.mu.Unlock() + + if ev.Ts == 0 { + ev.Ts = time.Now().UnixMilli() + } + if ev.DetailLevel == "" { + ev.DetailLevel = DetailStandard + } + + env := Envelope{ + CaptureSessionID: *p.captureSessionID.Load(), + Seq: p.seq.Add(1), + Event: ev, + } + env, data := truncateIfNeeded(env) + + if data == nil { + slog.Error("pipeline: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category) + } else if err := p.files.Write(env, data); err != nil { + slog.Error("pipeline: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) + } + p.ring.Publish(env) +} + +// NewReader returns a Reader positioned at the start of the ring buffer. +func (p *Pipeline) NewReader(afterSeq uint64) *Reader { + return p.ring.NewReader(afterSeq) +} + +// Close flushes and releases all open file descriptors. +func (p *Pipeline) Close() error { + return p.files.Close() +} diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go new file mode 100644 index 00000000..41659e94 --- /dev/null +++ b/server/lib/events/ringbuffer.go @@ -0,0 +1,108 @@ +package events + +import ( + "context" + "sync" +) + +// RingBuffer is a fixed-capacity circular buffer with closed-channel broadcast fan-out. +// Writers never block regardless of reader count or speed. +type RingBuffer struct { + mu sync.RWMutex + buf []Envelope + cap uint64 + latestSeq uint64 // highest envelope.Seq published + readerWake chan struct{} // closed-and-replaced on each Publish to wake blocked readers +} + +func NewRingBuffer(capacity int) *RingBuffer { + return &RingBuffer{ + buf: make([]Envelope, capacity), + cap: uint64(capacity), + readerWake: make(chan struct{}), + } +} + +// Publish adds an envelope to the ring, evicting the oldest on overflow. +func (rb *RingBuffer) Publish(env Envelope) { + rb.mu.Lock() + rb.buf[env.Seq%rb.cap] = env + rb.latestSeq = env.Seq + old := rb.readerWake + rb.readerWake = make(chan struct{}) + rb.mu.Unlock() + close(old) +} + +func (rb *RingBuffer) oldestSeq() uint64 { + if rb.latestSeq <= rb.cap { + return 1 + } + return rb.latestSeq - rb.cap + 1 +} + +// NewReader returns a Reader. afterSeq == 0 starts from the oldest available +// envelope; afterSeq > 0 resumes after that seq. +func (rb *RingBuffer) NewReader(afterSeq uint64) *Reader { + nextSeq := afterSeq + 1 + if afterSeq == 0 { + nextSeq = 1 + } + return &Reader{rb: rb, nextSeq: nextSeq} +} + +// ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is +// set: Envelope is non-nil for a normal read, Dropped is non-zero when the +// reader fell behind and events were lost. +type ReadResult struct { + Envelope *Envelope + Dropped uint64 +} + +// Reader tracks an independent read position in a RingBuffer. +type Reader struct { + rb *RingBuffer + nextSeq uint64 +} + +// Read blocks until the next envelope is available or ctx is cancelled. +func (r *Reader) Read(ctx context.Context) (ReadResult, error) { + for { + r.rb.mu.RLock() + wake := r.rb.readerWake + latest := r.rb.latestSeq + oldest := r.rb.oldestSeq() + + if latest == 0 { + r.rb.mu.RUnlock() + select { + case <-ctx.Done(): + return ReadResult{}, ctx.Err() + case <-wake: + continue + } + } + + if r.nextSeq < oldest { + dropped := oldest - r.nextSeq + r.nextSeq = oldest + r.rb.mu.RUnlock() + return ReadResult{Dropped: dropped}, nil + } + + if r.nextSeq <= latest { + env := r.rb.buf[r.nextSeq%r.rb.cap] + r.nextSeq++ + r.rb.mu.RUnlock() + return ReadResult{Envelope: &env}, nil + } + + r.rb.mu.RUnlock() + + select { + case <-ctx.Done(): + return ReadResult{}, ctx.Err() + case <-wake: + } + } +}