diff --git a/Makefile b/Makefile index 743a756..c5b1250 100644 --- a/Makefile +++ b/Makefile @@ -147,10 +147,15 @@ temporal: ## Start local Temporal dev server and open Web UI --dynamic-config-value limit.blobSize.warn=15000000 .PHONY: dev -dev: ## Run the service locally with auto-reload on code changes +dev: ## Run the service locally (auto-reload if `entr` is installed) @if [ -f .env ]; then set -a; . ./.env; set +a; fi; \ - echo "šŸš€ Starting Version Guard with auto-reload (Ctrl+C to stop)..."; \ - find . -name '*.go' -not -path './vendor/*' | entr -r go run ./cmd/server + if command -v entr >/dev/null 2>&1; then \ + echo "šŸš€ Starting Version Guard with auto-reload via entr (Ctrl+C to stop)..."; \ + find . -name '*.go' -not -path './vendor/*' | entr -r go run ./cmd/server; \ + else \ + echo "šŸš€ Starting Version Guard (no auto-reload — install entr for that). Ctrl+C to stop..."; \ + go run ./cmd/server; \ + fi .PHONY: run-locally run-locally: build ## Run the service locally (connects to local Temporal) @@ -167,6 +172,68 @@ run-server: build ## Run server locally @echo "šŸš€ Starting server locally..." @CONFIG_ENV=development bin/$(BINARY_NAME) --mode=server +# ── Webhook E2E (detector → emitter) ────────────────────────────────────────── +# Everything below runs in Docker, so no local `temporal` or `curl` install is required. +# Pre-reqs (run in separate terminals before invoking these targets): +# 1. make temporal-docker (Temporal dev server in Docker) +# 2. (in version-guard-emitter) make dev (emitter worker + HTTP on host :8082, via .env) +# 3. EMITTER_WEBHOOK_URL=http://localhost:8082 make dev (detector worker + admin HTTP on host :8081) +# Resource value must be a config ID (the `id:` field in pkg/config/defaults +# resources.yaml: aurora-postgresql, aurora-mysql, eks, elasticache-redis, +# elasticache-valkey, elasticache-memcached, opensearch, rds-mysql, +# rds-postgresql, lambda) — NOT a type constant like "AURORA". The detector's +# inventory map is keyed by config ID so multiple configs of the same type +# (e.g. two aurora flavors) can have independent inventory sources. +WEBHOOK_E2E_RESOURCE := aurora-postgresql +TEMPORAL_DOCKER_IMAGE := temporalio/admin-tools:latest +CURL_DOCKER_IMAGE := curlimages/curl:latest +# Inside containers we reach host-side processes via host.docker.internal (Docker Desktop on macOS/Windows). +HOST_FROM_DOCKER := host.docker.internal +# Host ports +DETECTOR_ADMIN_PORT := 8081 +EMITTER_ADMIN_PORT := 8082 + +.PHONY: temporal-docker +temporal-docker: ## Start Temporal dev server in Docker (alternative to `make temporal`) + @echo "šŸ•°ļø Starting Temporal dev server in Docker (namespace: $(TEMPORAL_NAMESPACE))..." + @echo " Frontend: localhost:7233 Web UI: http://localhost:8233" + @open http://localhost:8233 & + @docker run --rm \ + --name version-guard-temporal-dev \ + -p 7233:7233 -p 8233:8233 \ + $(TEMPORAL_DOCKER_IMAGE) \ + temporal server start-dev \ + --ip 0.0.0.0 \ + --namespace $(TEMPORAL_NAMESPACE) \ + --dynamic-config-value limit.blobSize.error=20000000 \ + --dynamic-config-value limit.blobSize.warn=15000000 + +.PHONY: webhook-e2e +webhook-e2e: ## Trigger an end-to-end run via the detector's POST /scan (in Docker) + @command -v docker >/dev/null 2>&1 || { echo "āŒ docker not found"; exit 1; } + @echo "šŸš€ POST /scan to detector at :$(DETECTOR_ADMIN_PORT) (resource=$(WEBHOOK_E2E_RESOURCE))..." + @echo " Watch: http://localhost:8233/namespaces/$(TEMPORAL_NAMESPACE)/workflows" + @docker run --rm \ + --add-host=$(HOST_FROM_DOCKER):host-gateway \ + $(CURL_DOCKER_IMAGE) \ + -fsSi -X POST http://$(HOST_FROM_DOCKER):$(DETECTOR_ADMIN_PORT)/scan \ + -H 'Content-Type: application/json' \ + -d '{"resource_types":["$(WEBHOOK_E2E_RESOURCE)"]}' + @echo "" + @echo "āœ… Detector orchestrator workflow started; expect a matching version-guard-act- ActWorkflow on the emitter." + +.PHONY: webhook-e2e-smoke +webhook-e2e-smoke: ## Hit the emitter /trigger-act webhook directly (no detector) via Docker + @command -v docker >/dev/null 2>&1 || { echo "āŒ docker not found"; exit 1; } + @SID="smoke-$$(date +%s)"; \ + echo "šŸ”Ž POST /trigger-act to emitter at :$(EMITTER_ADMIN_PORT) with snapshot_id=$$SID..."; \ + docker run --rm \ + --add-host=$(HOST_FROM_DOCKER):host-gateway \ + $(CURL_DOCKER_IMAGE) \ + -fsSi -X POST http://$(HOST_FROM_DOCKER):$(EMITTER_ADMIN_PORT)/trigger-act \ + -H 'Content-Type: application/json' \ + -d "{\"snapshot_id\":\"$$SID\"}" + # ── Docker ──────────────────────────────────────────────────────────────────── .PHONY: docker-build diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 0d873dc..f81a694 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -212,7 +212,9 @@ func (c *ScanStartCmd) Run(ctx *Context) error { // --resource-type explicitly. An empty list propagates to the // orchestrator, which rejects it with ErrNoResourceTypes so the // caller gets an immediate, descriptive failure. - trigger := scan.NewTrigger(temporalClient, ctx.TemporalTaskQueue, nil) + // CLI-triggered runs do not chain to the emitter webhook — operators + // using the CLI typically just want to verify the detector path. + trigger := scan.NewTrigger(temporalClient, ctx.TemporalTaskQueue, nil, "") res, err := trigger.Run(context.Background(), scan.Input{ ScanID: c.ScanID, ResourceTypes: resourceTypes, diff --git a/cmd/server/main.go b/cmd/server/main.go index 909457f..533360d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -25,6 +25,7 @@ import ( "github.com/block/Version-Guard/pkg/eol" eolendoflife "github.com/block/Version-Guard/pkg/eol/endoflife" "github.com/block/Version-Guard/pkg/inventory" + mockinv "github.com/block/Version-Guard/pkg/inventory/mock" "github.com/block/Version-Guard/pkg/inventory/wiz" "github.com/block/Version-Guard/pkg/policy" "github.com/block/Version-Guard/pkg/registry" @@ -62,6 +63,12 @@ type ServerCLI struct { // AWS configuration (for EOL APIs) AWSRegion string `help:"AWS region for EOL APIs" default:"us-west-2" env:"AWS_REGION"` + // Snapshot storage backend ("s3" or "memory"). "memory" is intended + // for laptop dev and CI smoke tests; it has no durability across + // restarts but lets the orchestrator's Stage 2 succeed without AWS + // credentials. + SnapshotStore string `help:"Snapshot store backend: s3 or memory" default:"s3" enum:"s3,memory" env:"SNAPSHOT_STORE"` + // S3 configuration (for snapshots) S3Bucket string `help:"S3 bucket for snapshots" default:"version-guard-snapshots" env:"S3_BUCKET"` S3Prefix string `help:"S3 prefix for snapshots" default:"snapshots/" env:"S3_PREFIX"` @@ -70,6 +77,13 @@ type ServerCLI struct { // Service configuration HTTPPort int `help:"HTTP admin port (POST /scan)" default:"8081" env:"HTTP_PORT"` + // Emitter webhook (Stage 3). When set, OrchestratorWorkflow POSTs to + // "/trigger-act" after the snapshot is persisted, kicking the + // downstream emitter immediately instead of waiting for its own cron. + // Empty disables the webhook (snapshot still lands in S3 for any + // pull-based consumer). + EmitterWebhookURL string `help:"Base URL of the emitter webhook (e.g. http://version-guard-emitter:8080)" env:"EMITTER_WEBHOOK_URL"` + // Tag configuration (comma-separated lists for AWS resource tags) TagAppKeys string `help:"Comma-separated tag keys for application/service name" default:"app,application,service" env:"TAG_APP_KEYS"` @@ -156,25 +170,32 @@ func (s *ServerCLI) Run(_ *kong.Context) error { st := memory.NewStore() fmt.Println("āœ“ In-memory store initialized") - // Initialize S3 snapshot store - var snapshotStore *snapshot.S3Store + // Initialize snapshot store. Production runs use S3; laptop dev / CI + // smoke tests can use the in-memory store via `SNAPSHOT_STORE=memory` + // to avoid needing AWS credentials. + var snapshotStore snapshot.Store ctx := context.Background() - configOpts := []func(*config.LoadOptions) error{config.WithRegion(s.AWSRegion)} - cfg, err := config.LoadDefaultConfig(ctx, configOpts...) - if err != nil { - fmt.Printf("āš ļø Failed to load AWS config: %v\n", err) - fmt.Println(" Snapshots will not be persisted to S3") + if s.SnapshotStore == "memory" { + snapshotStore = snapshot.NewMemoryStore() + fmt.Println("āœ“ In-memory snapshot store initialized (SNAPSHOT_STORE=memory; not durable)") } else { - s3Opts := []func(*s3.Options){} - if s.S3Endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = &s.S3Endpoint - o.UsePathStyle = true - }) + configOpts := []func(*config.LoadOptions) error{config.WithRegion(s.AWSRegion)} + cfg, err := config.LoadDefaultConfig(ctx, configOpts...) + if err != nil { + fmt.Printf("āš ļø Failed to load AWS config: %v\n", err) + fmt.Println(" Snapshots will not be persisted to S3") + } else { + s3Opts := []func(*s3.Options){} + if s.S3Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = &s.S3Endpoint + o.UsePathStyle = true + }) + } + s3Client := s3.NewFromConfig(cfg, s3Opts...) + snapshotStore = snapshot.NewS3Store(s3Client, s.S3Bucket, s.S3Prefix) + fmt.Printf("āœ“ S3 snapshot store initialized (bucket: %s)\n", s.S3Bucket) } - s3Client := s3.NewFromConfig(cfg, s3Opts...) - snapshotStore = snapshot.NewS3Store(s3Client, s.S3Bucket, s.S3Prefix) - fmt.Printf("āœ“ S3 snapshot store initialized (bucket: %s)\n", s.S3Bucket) } // Initialize Temporal client @@ -259,14 +280,32 @@ func (s *ServerCLI) Run(_ *kong.Context) error { var invSource inventory.InventorySource if resourceCfg.Inventory.Source == "wiz" { if wizClient == nil { - // Wiz client not available (no credentials) - fmt.Printf(" āš ļø Skipping %s - Wiz credentials not configured\n", resourceCfg.ID) - continue + // No Wiz credentials — use the in-process mock inventory + // so local e2e runs (webhook smoke tests etc.) can exercise + // the full detector → snapshot → emitter wire without + // CloudSec-issued credentials. Production paths always set + // the Wiz secrets, so this branch is dev-only. + configID := types.ResourceType(resourceCfg.ID) + invSource = &mockinv.InventorySource{ + Resources: []*types.Resource{ + { + ID: fmt.Sprintf("mock-%s-1", resourceCfg.ID), + Service: "version-guard-mock", + Type: configID, + CurrentVersion: "1.0.0", + Engine: resourceCfg.Type, + CloudProvider: types.CloudProviderAWS, + DiscoveredAt: time.Now(), + Tags: map[string]string{"env": "local-dev"}, + }, + }, + } + fmt.Printf(" āš ļø %s - Wiz credentials not configured, using mock inventory (1 fake resource)\n", resourceCfg.ID) + } else { + // Create generic inventory source + invSource = wiz.NewGenericInventorySource(wizClient, resourceCfg, registryClient, logger) + fmt.Printf(" āœ“ Wiz inventory source created (reads from WIZ_REPORT_IDS[%s])\n", resourceCfg.ID) } - - // Create generic inventory source - invSource = wiz.NewGenericInventorySource(wizClient, resourceCfg, registryClient, logger) - fmt.Printf(" āœ“ Wiz inventory source created (reads from WIZ_REPORT_IDS[%s])\n", resourceCfg.ID) } else { fmt.Printf(" āš ļø Unsupported inventory source: %s\n", resourceCfg.Inventory.Source) continue @@ -369,6 +408,7 @@ func (s *ServerCLI) Run(_ *kong.Context) error { if snapshotStore != nil { orchestratorActivities := orchestrator.NewActivities(st, snapshotStore) w.RegisterActivityWithOptions(orchestratorActivities.CreateSnapshot, activity.RegisterOptions{Name: orchestrator.CreateSnapshotActivityName}) + w.RegisterActivityWithOptions(orchestratorActivities.NotifyEmitter, activity.RegisterOptions{Name: orchestrator.NotifyEmitterActivityName}) fmt.Println("āœ“ Orchestrator activities registered (with S3)") } else { fmt.Println("āš ļø Orchestrator snapshot activity not registered (no S3 store)") @@ -380,7 +420,11 @@ func (s *ServerCLI) Run(_ *kong.Context) error { } // Start HTTP admin server (POST /scan to trigger manual scans) - httpServer := startAdminHTTPServer(s.HTTPPort, temporalClient, s.TemporalTaskQueue, defaultResourceTypes) + httpServer := startAdminHTTPServer(s.HTTPPort, temporalClient, s.TemporalTaskQueue, defaultResourceTypes, s.EmitterWebhookURL) + + if s.EmitterWebhookURL != "" { + fmt.Printf("āœ“ Emitter webhook configured: %s/trigger-act\n", strings.TrimRight(s.EmitterWebhookURL, "/")) + } // Start worker fmt.Printf("\nāœ“ Temporal worker starting on queue: %s\n", s.TemporalTaskQueue) @@ -430,12 +474,13 @@ func (s *ServerCLI) ensureSchedule(ctx context.Context, temporalClient client.Cl schedCtx, schedCancel := context.WithTimeout(ctx, 10*time.Second) defer schedCancel() schedErr := scheduleMgr.EnsureSchedule(schedCtx, schedule.Config{ - Enabled: true, - ScheduleID: s.ScheduleID, - CronExpression: s.ScheduleCron, - Jitter: jitter, - TaskQueue: s.TemporalTaskQueue, - ResourceTypes: defaultResourceTypes, + Enabled: true, + ScheduleID: s.ScheduleID, + CronExpression: s.ScheduleCron, + Jitter: jitter, + TaskQueue: s.TemporalTaskQueue, + ResourceTypes: defaultResourceTypes, + EmitterWebhookURL: s.EmitterWebhookURL, }) if schedErr != nil { fmt.Printf("āš ļø Failed to create/update schedule: %v\n", schedErr) @@ -449,8 +494,8 @@ func (s *ServerCLI) ensureSchedule(ctx context.Context, temporalClient client.Cl // startAdminHTTPServer wires the scan trigger into an HTTP admin server and // starts listening in a background goroutine. The returned *http.Server can be // shut down gracefully by the caller. -func startAdminHTTPServer(port int, temporalClient client.Client, taskQueue string, defaultResourceTypes []types.ResourceType) *http.Server { - scanTrigger := scan.NewTrigger(temporalClient, taskQueue, defaultResourceTypes) +func startAdminHTTPServer(port int, temporalClient client.Client, taskQueue string, defaultResourceTypes []types.ResourceType, emitterWebhookURL string) *http.Server { + scanTrigger := scan.NewTrigger(temporalClient, taskQueue, defaultResourceTypes, emitterWebhookURL) mux := http.NewServeMux() mux.Handle("/scan", scan.NewHandler(scanTrigger)) diff --git a/pkg/scan/scan.go b/pkg/scan/scan.go index e966352..3f6ea0c 100644 --- a/pkg/scan/scan.go +++ b/pkg/scan/scan.go @@ -35,6 +35,7 @@ type Starter interface { type Trigger struct { starter Starter taskQueue string + emitterWebhookURL string defaultResourceTypes []types.ResourceType } @@ -43,8 +44,10 @@ type Trigger struct { // defaultResourceTypes is the list used when the caller does not specify // any (e.g. a full-fleet scan via empty HTTP body); supply it from the // loaded YAML config so adding a resource is a YAML-only change. -func NewTrigger(c client.Client, taskQueue string, defaultResourceTypes []types.ResourceType) *Trigger { - return &Trigger{starter: c, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes} +// emitterWebhookURL, when non-empty, is forwarded to the orchestrator so +// it can fire the Stage 3 notify activity after the snapshot is saved. +func NewTrigger(c client.Client, taskQueue string, defaultResourceTypes []types.ResourceType, emitterWebhookURL string) *Trigger { + return &Trigger{starter: c, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes, emitterWebhookURL: emitterWebhookURL} } // NewTriggerWithStarter returns a Trigger backed by an explicit Starter @@ -54,6 +57,15 @@ func NewTriggerWithStarter(s Starter, taskQueue string, defaultResourceTypes []t return &Trigger{starter: s, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes} } +// WithEmitterWebhookURL returns a copy of the trigger configured to forward +// the given URL to every started OrchestratorWorkflow. Useful in tests so +// the existing constructor signatures stay uncluttered. +func (t *Trigger) WithEmitterWebhookURL(url string) *Trigger { + clone := *t + clone.emitterWebhookURL = url + return &clone +} + // Input controls the scope of a manual scan. type Input struct { // ScanID lets the caller pin a correlation ID. If empty, one is generated. @@ -105,8 +117,9 @@ func (t *Trigger) Run(ctx context.Context, in Input) (Result, error) { } run, err := t.starter.ExecuteWorkflow(ctx, opts, orchestrator.OrchestratorWorkflow, orchestrator.WorkflowInput{ - ScanID: scanID, - ResourceTypes: resourceTypes, + ScanID: scanID, + ResourceTypes: resourceTypes, + EmitterWebhookURL: t.emitterWebhookURL, }) if err != nil { return Result{}, fmt.Errorf("scan: execute workflow: %w", err) diff --git a/pkg/scan/scan_test.go b/pkg/scan/scan_test.go index 2b63d98..bb612e2 100644 --- a/pkg/scan/scan_test.go +++ b/pkg/scan/scan_test.go @@ -134,12 +134,27 @@ func TestNewTrigger_WiresClientAsStarter(t *testing.T) { // constructor that stores it. Passing nil is enough to exercise the line — // we only assert the fields are wired. defaults := []types.ResourceType{"aurora-mysql"} - tr := NewTrigger(nil, "version-guard-orchestrator", defaults) + tr := NewTrigger(nil, "version-guard-orchestrator", defaults, "http://emitter:8080") require.NotNil(t, tr) assert.Equal(t, "version-guard-orchestrator", tr.taskQueue) assert.Nil(t, tr.starter, "nil client should pass through as nil Starter") assert.Equal(t, defaults, tr.defaultResourceTypes) + assert.Equal(t, "http://emitter:8080", tr.emitterWebhookURL) +} + +func TestTrigger_Run_ForwardsEmitterWebhookURL(t *testing.T) { + mock := &mockStarter{run: &mockWorkflowRun{id: "wf", runID: "run"}} + tr := NewTriggerWithStarter(mock, "version-guard-orchestrator", []types.ResourceType{"aurora-mysql"}). + WithEmitterWebhookURL("http://emitter:8080") + + _, err := tr.Run(context.Background(), Input{ScanID: "abc"}) + + require.NoError(t, err) + require.Len(t, mock.calledArgs, 1) + in := mock.calledArgs[0].(orchestrator.WorkflowInput) + assert.Equal(t, "http://emitter:8080", in.EmitterWebhookURL, + "orchestrator must receive the emitter webhook URL on the workflow input") } func TestTrigger_Run_PropagatesStarterError(t *testing.T) { diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index 6df92c1..51972ed 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -14,10 +14,17 @@ import ( ) // Config holds configuration for the Temporal schedule. +// Field order is tuned for govet fieldalignment: all string fields +// before the slice keeps the pointer span minimal. type Config struct { ScheduleID string CronExpression string TaskQueue string + // EmitterWebhookURL, when non-empty, is forwarded into every + // scheduled OrchestratorWorkflow run so it can fire the Stage 3 + // notify activity once the snapshot is persisted. Empty disables + // the webhook for scheduled runs. + EmitterWebhookURL string // ResourceTypes is the list of resource config IDs to scan on each // scheduled run. Sourced from the loaded YAML config at startup — // empty is rejected by the orchestrator workflow because there is @@ -72,7 +79,8 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { Action: &client.ScheduleWorkflowAction{ Workflow: orchestrator.OrchestratorWorkflow, Args: []interface{}{orchestrator.WorkflowInput{ - ResourceTypes: cfg.ResourceTypes, + ResourceTypes: cfg.ResourceTypes, + EmitterWebhookURL: cfg.EmitterWebhookURL, }}, TaskQueue: cfg.TaskQueue, WorkflowExecutionTimeout: 2 * time.Hour, diff --git a/pkg/snapshot/memory_store.go b/pkg/snapshot/memory_store.go new file mode 100644 index 0000000..ad28fab --- /dev/null +++ b/pkg/snapshot/memory_store.go @@ -0,0 +1,97 @@ +package snapshot + +import ( + "context" + "fmt" + "sort" + "sync" + + "github.com/block/Version-Guard/pkg/types" +) + +// MemoryStore is an in-memory implementation of Store, intended for local +// development and tests. It is goroutine-safe but obviously not durable — +// snapshots disappear on process restart. +// +// The production deployment uses S3Store; pick MemoryStore via the +// `SNAPSHOT_STORE=memory` env flag (cmd/server) when AWS credentials are +// not available (laptop dev box, CI hermetic runs, etc.). +// +//nolint:govet // field alignment sacrificed for logical grouping (mu next to data it guards) +type MemoryStore struct { + mu sync.RWMutex + snapshots map[string]*types.Snapshot + order []string // insertion order, most-recent last +} + +// NewMemoryStore creates an empty in-process snapshot store. +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ + snapshots: make(map[string]*types.Snapshot), + } +} + +// SaveSnapshot stores the snapshot under its SnapshotID. +func (m *MemoryStore) SaveSnapshot(_ context.Context, s *types.Snapshot) error { + if s == nil { + return fmt.Errorf("memory store: snapshot is nil") + } + if s.SnapshotID == "" { + return fmt.Errorf("memory store: snapshot has empty SnapshotID") + } + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.snapshots[s.SnapshotID]; !exists { + m.order = append(m.order, s.SnapshotID) + } + m.snapshots[s.SnapshotID] = s + return nil +} + +// GetLatestSnapshot returns the most recently saved snapshot. +func (m *MemoryStore) GetLatestSnapshot(_ context.Context) (*types.Snapshot, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if len(m.order) == 0 { + return nil, fmt.Errorf("memory store: no snapshots available") + } + return m.snapshots[m.order[len(m.order)-1]], nil +} + +// GetSnapshot returns a snapshot by ID. +func (m *MemoryStore) GetSnapshot(_ context.Context, snapshotID string) (*types.Snapshot, error) { + m.mu.RLock() + defer m.mu.RUnlock() + s, ok := m.snapshots[snapshotID] + if !ok { + return nil, fmt.Errorf("memory store: snapshot %q not found", snapshotID) + } + return s, nil +} + +// ListSnapshots returns metadata for stored snapshots, most-recent first. +// limit <= 0 means "all". +func (m *MemoryStore) ListSnapshots(_ context.Context, limit int) ([]*Metadata, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + out := make([]*Metadata, 0, len(m.order)) + for _, id := range m.order { + s := m.snapshots[id] + out = append(out, &Metadata{ + SnapshotID: s.SnapshotID, + GeneratedAt: s.GeneratedAt, + TotalResources: s.Summary.TotalResources, + CompliancePercentage: s.Summary.CompliancePercentage, + S3Key: "memory://" + s.SnapshotID, + }) + } + // Most-recent first + sort.SliceStable(out, func(i, j int) bool { + return out[i].GeneratedAt.After(out[j].GeneratedAt) + }) + if limit > 0 && len(out) > limit { + out = out[:limit] + } + return out, nil +} diff --git a/pkg/workflow/orchestrator/activities.go b/pkg/workflow/orchestrator/activities.go index ea0ac4a..064dccd 100644 --- a/pkg/workflow/orchestrator/activities.go +++ b/pkg/workflow/orchestrator/activities.go @@ -37,6 +37,10 @@ type SnapshotResult struct { type Activities struct { Store store.Store SnapshotStore snapshot.Store + // HTTPDoer is used by NotifyEmitter for the Stage 3 webhook. Optional; + // nil falls back to a default *http.Client with a 10s timeout. Tests + // inject a fake to avoid real HTTP. + HTTPDoer HTTPDoer } // NewActivities creates a new Activities instance diff --git a/pkg/workflow/orchestrator/notify.go b/pkg/workflow/orchestrator/notify.go new file mode 100644 index 0000000..6e3610e --- /dev/null +++ b/pkg/workflow/orchestrator/notify.go @@ -0,0 +1,119 @@ +package orchestrator + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "go.temporal.io/sdk/activity" +) + +// NotifyEmitterActivityName is the registered name of the Stage 3 webhook +// activity that pings the downstream emitter once the snapshot is in S3. +const NotifyEmitterActivityName = "version-guard.NotifyEmitter" + +// NotifyEmitterInput is the activity input. EmitterWebhookURL is the base +// URL of the emitter's admin HTTP server (e.g. http://version-guard-emitter:8080); +// the activity appends "/trigger-act". +type NotifyEmitterInput struct { + EmitterWebhookURL string + SnapshotID string +} + +// NotifyEmitterResult mirrors the emitter's /trigger-act response so the +// workflow history records which downstream execution was started. +type NotifyEmitterResult struct { + WorkflowID string + RunID string + SnapshotID string +} + +// HTTPDoer is the subset of *http.Client used by NotifyEmitter, so tests +// can swap in a fake without spinning up a real server. +type HTTPDoer interface { + Do(req *http.Request) (*http.Response, error) +} + +// NotifyEmitter POSTs the snapshot id to the emitter's webhook so it can +// start its ActWorkflow. Returns the started workflow's identifiers. +// +// The activity is intentionally short-timeout / retry-friendly: the +// snapshot is already durable in S3, so a transient emitter outage just +// delays emission and Temporal's retry policy handles the rest. +func (a *Activities) NotifyEmitter(ctx context.Context, input NotifyEmitterInput) (*NotifyEmitterResult, error) { + if input.EmitterWebhookURL == "" { + return nil, fmt.Errorf("notify emitter: EmitterWebhookURL is empty") + } + + logger := activity.GetLogger(ctx) + logger.Info("Notifying emitter", "url", input.EmitterWebhookURL, "snapshotID", input.SnapshotID) + + url := strings.TrimRight(input.EmitterWebhookURL, "/") + "/trigger-act" + payload, err := json.Marshal(struct { + SnapshotID string `json:"snapshot_id,omitempty"` + }{SnapshotID: input.SnapshotID}) + if err != nil { + // json.Marshal of a fixed-shape struct cannot fail; this is + // defensive and should never trip in practice. + return nil, fmt.Errorf("notify emitter: marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload)) + if err != nil { + return nil, fmt.Errorf("notify emitter: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + doer := a.HTTPDoer + if doer == nil { + doer = &http.Client{Timeout: 10 * time.Second} + } + + resp, err := doer.Do(req) + if err != nil { + return nil, fmt.Errorf("notify emitter: POST %s: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + // Read failures on a successful HTTP status are unusual but + // possible (e.g. truncated response). Log and treat as empty + // body — the status code below still drives success/failure. + logger.Warn("Failed to read emitter response body", "error", readErr) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("notify emitter: %s returned status %d: %s", + url, resp.StatusCode, strings.TrimSpace(string(body))) + } + + var out struct { + WorkflowID string `json:"workflow_id"` + RunID string `json:"run_id"` + SnapshotID string `json:"snapshot_id"` + } + if len(body) > 0 { + if jsonErr := json.Unmarshal(body, &out); jsonErr != nil { + // Successful status but unparseable body — emitter is + // happy, our reply contract drifted. Don't fail the + // workflow; just log and return what we have. + logger.Warn("Emitter responded with unparseable body", "error", jsonErr, "body", string(body)) + } + } + + logger.Info("Emitter notified", + "workflowID", out.WorkflowID, + "runID", out.RunID, + "snapshotID", out.SnapshotID) + + return &NotifyEmitterResult{ + WorkflowID: out.WorkflowID, + RunID: out.RunID, + SnapshotID: out.SnapshotID, + }, nil +} diff --git a/pkg/workflow/orchestrator/notify_test.go b/pkg/workflow/orchestrator/notify_test.go new file mode 100644 index 0000000..ed59358 --- /dev/null +++ b/pkg/workflow/orchestrator/notify_test.go @@ -0,0 +1,165 @@ +package orchestrator + +import ( + "errors" + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +// fakeDoer captures the request and returns a canned response or error. +type fakeDoer struct { + resp *http.Response + err error + gotURL string + gotBody string +} + +func (f *fakeDoer) Do(req *http.Request) (*http.Response, error) { + f.gotURL = req.URL.String() + if req.Body != nil { + b, _ := io.ReadAll(req.Body) + f.gotBody = string(b) + } + if f.err != nil { + return nil, f.err + } + return f.resp, nil +} + +func makeResp(t *testing.T, status int, body string) *http.Response { + t.Helper() + resp := &http.Response{ + StatusCode: status, + Body: io.NopCloser(strings.NewReader(body)), + Header: make(http.Header), + } + // The activity-under-test closes resp.Body itself, but the linter + // (bodyclose) can't see across the fake transport boundary, so we + // register an idempotent close at test teardown to satisfy it. + t.Cleanup(func() { _ = resp.Body.Close() }) + return resp +} + +// runNotifyEmitterActivity executes the activity through Temporal's activity +// test environment so activity.GetLogger / activity context plumbing works. +func runNotifyEmitterActivity(t *testing.T, a *Activities, in NotifyEmitterInput) (*NotifyEmitterResult, error) { + t.Helper() + suite := &testsuite.WorkflowTestSuite{} + env := suite.NewTestActivityEnvironment() + env.RegisterActivity(a.NotifyEmitter) + + val, err := env.ExecuteActivity(a.NotifyEmitter, in) + if err != nil { + return nil, err + } + var result NotifyEmitterResult + require.NoError(t, val.Get(&result)) + return &result, nil +} + +func TestNotifyEmitter_Success_ReturnsParsedIDs(t *testing.T) { + //nolint:bodyclose // body is closed inside the activity-under-test and again via t.Cleanup in makeResp + doer := &fakeDoer{ + resp: makeResp(t, http.StatusAccepted, `{"workflow_id":"wf-1","run_id":"run-1","snapshot_id":"snap-1"}`), + } + a := &Activities{HTTPDoer: doer} + + out, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "snap-1", + }) + + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, "wf-1", out.WorkflowID) + assert.Equal(t, "run-1", out.RunID) + assert.Equal(t, "snap-1", out.SnapshotID) + assert.Equal(t, "http://emitter:8080/trigger-act", doer.gotURL) + assert.Contains(t, doer.gotBody, `"snapshot_id":"snap-1"`) +} + +func TestNotifyEmitter_TrimsTrailingSlash(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusAccepted, `{}`)} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080/", + SnapshotID: "snap", + }) + + require.NoError(t, err) + assert.Equal(t, "http://emitter:8080/trigger-act", doer.gotURL, + "trailing slash on webhook base must not double-up the path separator") +} + +func TestNotifyEmitter_EmptyURL_Errors(t *testing.T) { + a := &Activities{} + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{}) + require.Error(t, err) +} + +func TestNotifyEmitter_TransportError_Wraps(t *testing.T) { + doer := &fakeDoer{err: errors.New("connection refused")} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "x", + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "connection refused") +} + +func TestNotifyEmitter_Non2xxStatus_Errors(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusInternalServerError, "boom")} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "x", + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + assert.Contains(t, err.Error(), "boom") +} + +func TestNotifyEmitter_OmitsSnapshotIDWhenEmpty(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusAccepted, `{}`)} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + // SnapshotID intentionally empty — emitter falls back to "latest" + }) + + require.NoError(t, err) + // The marshaled body should be just "{}" since omitempty drops the field. + assert.Equal(t, "{}", strings.TrimSpace(doer.gotBody), + "empty snapshot id should not appear in the request body (omitempty)") +} + +func TestNotifyEmitter_UnparseableBody_StillSucceeds(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusAccepted, `not json`)} + a := &Activities{HTTPDoer: doer} + + out, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "snap", + }) + + require.NoError(t, err, "successful status with garbage body should not fail the activity") + require.NotNil(t, out) + assert.Empty(t, out.WorkflowID, "unparseable body leaves workflow id empty") +} diff --git a/pkg/workflow/orchestrator/workflow.go b/pkg/workflow/orchestrator/workflow.go index 34044be..c04960c 100644 --- a/pkg/workflow/orchestrator/workflow.go +++ b/pkg/workflow/orchestrator/workflow.go @@ -25,10 +25,17 @@ const ( TaskQueueName = "version-guard-orchestrator" ) -// WorkflowInput defines the input for the orchestrator workflow +// WorkflowInput defines the input for the orchestrator workflow. +// Field order is tuned for govet fieldalignment: scalar/string fields +// before the slice keeps the pointer span minimal. type WorkflowInput struct { - ScanID string - ResourceTypes []types.ResourceType // If empty, scan all supported types + ScanID string + // EmitterWebhookURL, when set, makes the orchestrator POST to + // "/trigger-act" after the snapshot is persisted (Stage 3). + // Empty disables the webhook — the snapshot remains durable in S3 + // and downstream emitters can pull on their own cadence. + EmitterWebhookURL string + ResourceTypes []types.ResourceType // If empty, scan all supported types } // WorkflowOutput contains the results of the orchestrator workflow @@ -199,11 +206,51 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO logger.Info("Stage 2: Store - Snapshot created and persisted", "snapshotID", snapshotResult.SnapshotID) - // Stage 3: Emit - Implementers should create their own workflow or process - // to consume the snapshot from S3 and emit findings to their chosen destinations. - // See pkg/emitters/emitters.go for interface definitions and examples in - // pkg/emitters/examples/ for sample implementations. - logger.Info("Detector workflow complete - snapshot available in S3", "snapshotID", snapshotResult.SnapshotID) + // Stage 3: NOTIFY EMITTER (optional) + // + // When EmitterWebhookURL is configured, POST the snapshot id to the + // downstream emitter so it can start its own workflow against the + // freshly-persisted snapshot. The snapshot is already durable in S3, + // so we treat a webhook failure as non-fatal: log and proceed. Other + // implementers can subscribe to S3 events, poll, or run a schedule + // instead — the webhook is one supported integration, not the only one. + if input.EmitterWebhookURL != "" { + logger.Info("Stage 3: Notify - Calling emitter webhook", + "url", input.EmitterWebhookURL, + "snapshotID", snapshotResult.SnapshotID) + + var notifyResult NotifyEmitterResult + notifyErr := workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: 10 * time.Second, + MaximumAttempts: 3, + }, + }), + NotifyEmitterActivityName, + NotifyEmitterInput{ + EmitterWebhookURL: input.EmitterWebhookURL, + SnapshotID: snapshotResult.SnapshotID, + }, + ).Get(ctx, ¬ifyResult) + + if notifyErr != nil { + logger.Warn("Stage 3: Notify - Emitter webhook failed; snapshot remains in S3 for later pickup", + "error", notifyErr, + "snapshotID", snapshotResult.SnapshotID) + } else { + logger.Info("Stage 3: Notify - Emitter accepted snapshot", + "snapshotID", snapshotResult.SnapshotID, + "emitterWorkflowID", notifyResult.WorkflowID, + "emitterRunID", notifyResult.RunID) + } + } else { + logger.Info("Stage 3: Notify - Skipped (no EmitterWebhookURL configured); snapshot available in S3", + "snapshotID", snapshotResult.SnapshotID) + } endTime := workflow.Now(ctx)