Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a03a5bd
feat: add failing test stubs for events package
archandatta Mar 20, 2026
42f415f
feat: add BrowserEvent struct, CategoryFor, and truncateIfNeeded
archandatta Mar 20, 2026
fa67dff
feat: add RingBuffer with closed-channel broadcast fan-out
archandatta Mar 20, 2026
115c720
feat: add FileWriter per-category JSONL appender
archandatta Mar 20, 2026
f07e40d
feat: add Pipeline glue type sequencing truncation, file write, and r…
archandatta Mar 20, 2026
18fdb6d
review: fix truncateIfNeeded branch split, atomic.Pointer[string], Re…
archandatta Mar 27, 2026
997edb4
review: remove dead RingBuffer count field, fix FileWriter mutex doc,…
archandatta Mar 27, 2026
e5153da
chore: clean up maxS2RecordBytes comment
archandatta Mar 27, 2026
1644fe7
fix: serialise Pipeline.Publish to guarantee monotonic seq delivery o…
archandatta Mar 27, 2026
36cff2d
review
archandatta Mar 30, 2026
339d7d3
refactor: rename BrowserEvent to Event, DetailDefault to DetailStandard
archandatta Mar 31, 2026
b370416
refactor: restructure Source as nested object with Kind, Event, Metadata
archandatta Mar 31, 2026
41a7aee
refactor: extract Envelope wrapper, move seq and capture_session_id o…
archandatta Mar 31, 2026
9f4c808
refactor: unify seq as universal cursor, add NewReader(afterSeq)
archandatta Mar 31, 2026
6c82459
refactor: return ReadResult instead of synthetic drop events
archandatta Mar 31, 2026
6506ed7
test: add NewReader resume tests for mid-stream, at-latest, and evict…
archandatta Mar 31, 2026
8ecd492
review: fmt
archandatta Apr 1, 2026
e572e7b
fix: guard against nil marshal data and oversized non-data envelopes
archandatta Apr 1, 2026
6fc54c5
review: address naming & constructor feedback
archandatta Apr 1, 2026
d791a9e
fix: file rename
archandatta Apr 1, 2026
bf091f5
review: remove redundant atomic and dead branch in events package
archandatta Apr 2, 2026
5d958df
Merge branch 'main' into archand/kernel-1116/browser-logging
archandatta Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions server/lib/events/capturesession.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package events

import (
"log/slog"
"sync"
"time"
)

// CaptureSession is a single-use write path that wraps events in envelopes and
// fans them out to a FileWriter (durable) and RingBuffer (in-memory). Publish
// concurrently; Close flushes the FileWriter.
type CaptureSession struct {
mu sync.Mutex
ring *RingBuffer
files *FileWriter
seq uint64
captureSessionID string
}

func NewCaptureSession(captureSessionID string, ring *RingBuffer, files *FileWriter) *CaptureSession {
return &CaptureSession{ring: ring, files: files, captureSessionID: captureSessionID}
}

// Publish wraps ev in an Envelope, truncates if needed, then writes to
// FileWriter (durable) before RingBuffer (in-memory fan-out).
func (s *CaptureSession) Publish(ev Event) {
s.mu.Lock()
defer s.mu.Unlock()

if ev.Ts == 0 {
ev.Ts = time.Now().UnixMicro()
}
if ev.DetailLevel == "" {
ev.DetailLevel = DetailStandard
}

s.seq++
env := Envelope{
CaptureSessionID: s.captureSessionID,
Seq: s.seq,
Event: ev,
}
env, data := truncateIfNeeded(env)

if data == nil {
slog.Error("capture_session: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category)
} else if err := s.files.Write(env, data); err != nil {
slog.Error("capture_session: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err)
}
s.ring.Publish(env)
}

// NewReader returns a Reader positioned at the start of the ring buffer.
func (s *CaptureSession) NewReader(afterSeq uint64) *Reader {
return s.ring.NewReader(afterSeq)
}

// Close flushes and releases all open file descriptors.
func (s *CaptureSession) Close() error {
return s.files.Close()
}
91 changes: 91 additions & 0 deletions server/lib/events/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package events

import (
"encoding/json"
"log/slog"
)

// maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB).
const maxS2RecordBytes = 1_000_000

// EventCategory determines type of logging
type EventCategory string

const (
CategoryConsole EventCategory = "console"
CategoryNetwork EventCategory = "network"
CategoryPage EventCategory = "page"
CategoryInteraction EventCategory = "interaction"
CategoryLiveview EventCategory = "liveview"
CategoryCaptcha EventCategory = "captcha"
CategorySystem EventCategory = "system"
)

type SourceKind string

const (
KindCDP SourceKind = "cdp"
KindKernelAPI SourceKind = "kernel_api"
KindExtension SourceKind = "extension"
KindLocalProcess SourceKind = "local_process"
)

// Source captures provenance: which producer emitted the event and any
// producer-specific context (e.g. CDP target/session/frame IDs).
type Source struct {
Kind SourceKind `json:"kind"`
Event string `json:"event,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}

type DetailLevel string

const (
DetailMinimal DetailLevel = "minimal"
DetailStandard DetailLevel = "standard"
DetailVerbose DetailLevel = "verbose"
DetailRaw DetailLevel = "raw"
)

// Event is the portable event schema. It contains only producer-emitted content;
// pipeline metadata (seq, capture session) lives on the Envelope.
type Event struct {
Ts int64 `json:"ts"` // Unix microseconds (µs since epoch)
Type string `json:"type"`
Category EventCategory `json:"category"`
Source Source `json:"source"`
DetailLevel DetailLevel `json:"detail_level"`
URL string `json:"url,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Truncated bool `json:"truncated,omitempty"`
}

// Envelope wraps an Event with pipeline-assigned metadata.
type Envelope struct {
CaptureSessionID string `json:"capture_session_id"`
Seq uint64 `json:"seq"`
Event Event `json:"event"`
}

// truncateIfNeeded marshals env and returns the (possibly truncated) envelope.
// If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge
// url or source.metadata), it is returned as-is — callers must handle nil data.
func truncateIfNeeded(env Envelope) (Envelope, []byte) {
data, err := json.Marshal(env)
if err != nil {
return env, nil
}
if len(data) <= maxS2RecordBytes {
return env, data
}
env.Event.Data = json.RawMessage("null")
env.Event.Truncated = true
data, err = json.Marshal(env)
if err != nil {
return env, nil
}
if len(data) > maxS2RecordBytes {
slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "seq", env.Seq, "size", len(data))
}
return env, data
}
Loading
Loading