Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,15 @@ temporal: ## Start local Temporal dev server and open Web UI
--dynamic-config-value limit.blobSize.warn=15000000

.PHONY: dev
dev: ## Run the service locally with auto-reload on code changes
dev: ## Run the service locally (auto-reload if `entr` is installed)
@if [ -f .env ]; then set -a; . ./.env; set +a; fi; \
echo "🚀 Starting Version Guard with auto-reload (Ctrl+C to stop)..."; \
find . -name '*.go' -not -path './vendor/*' | entr -r go run ./cmd/server
if command -v entr >/dev/null 2>&1; then \
echo "🚀 Starting Version Guard with auto-reload via entr (Ctrl+C to stop)..."; \
find . -name '*.go' -not -path './vendor/*' | entr -r go run ./cmd/server; \
else \
echo "🚀 Starting Version Guard (no auto-reload — install entr for that). Ctrl+C to stop..."; \
go run ./cmd/server; \
fi

.PHONY: run-locally
run-locally: build ## Run the service locally (connects to local Temporal)
Expand All @@ -167,6 +172,68 @@ run-server: build ## Run server locally
@echo "🚀 Starting server locally..."
@CONFIG_ENV=development bin/$(BINARY_NAME) --mode=server

# ── Webhook E2E (detector → emitter) ──────────────────────────────────────────
# Everything below runs in Docker, so no local `temporal` or `curl` install is required.
# Pre-reqs (run in separate terminals before invoking these targets):
# 1. make temporal-docker (Temporal dev server in Docker)
# 2. (in version-guard-emitter) make dev (emitter worker + HTTP on host :8082, via .env)
# 3. EMITTER_WEBHOOK_URL=http://localhost:8082 make dev (detector worker + admin HTTP on host :8081)
# Resource value must be a config ID (the `id:` field in pkg/config/defaults
# resources.yaml: aurora-postgresql, aurora-mysql, eks, elasticache-redis,
# elasticache-valkey, elasticache-memcached, opensearch, rds-mysql,
# rds-postgresql, lambda) — NOT a type constant like "AURORA". The detector's
# inventory map is keyed by config ID so multiple configs of the same type
# (e.g. two aurora flavors) can have independent inventory sources.
WEBHOOK_E2E_RESOURCE := aurora-postgresql
TEMPORAL_DOCKER_IMAGE := temporalio/admin-tools:latest
CURL_DOCKER_IMAGE := curlimages/curl:latest
# Inside containers we reach host-side processes via host.docker.internal (Docker Desktop on macOS/Windows).
HOST_FROM_DOCKER := host.docker.internal
# Host ports
DETECTOR_ADMIN_PORT := 8081
EMITTER_ADMIN_PORT := 8082

.PHONY: temporal-docker
temporal-docker: ## Start Temporal dev server in Docker (alternative to `make temporal`)
@echo "🕰️ Starting Temporal dev server in Docker (namespace: $(TEMPORAL_NAMESPACE))..."
@echo " Frontend: localhost:7233 Web UI: http://localhost:8233"
@open http://localhost:8233 &
@docker run --rm \
--name version-guard-temporal-dev \
-p 7233:7233 -p 8233:8233 \
$(TEMPORAL_DOCKER_IMAGE) \
temporal server start-dev \
--ip 0.0.0.0 \
--namespace $(TEMPORAL_NAMESPACE) \
--dynamic-config-value limit.blobSize.error=20000000 \
--dynamic-config-value limit.blobSize.warn=15000000

.PHONY: webhook-e2e
webhook-e2e: ## Trigger an end-to-end run via the detector's POST /scan (in Docker)
@command -v docker >/dev/null 2>&1 || { echo "❌ docker not found"; exit 1; }
@echo "🚀 POST /scan to detector at :$(DETECTOR_ADMIN_PORT) (resource=$(WEBHOOK_E2E_RESOURCE))..."
@echo " Watch: http://localhost:8233/namespaces/$(TEMPORAL_NAMESPACE)/workflows"
@docker run --rm \
--add-host=$(HOST_FROM_DOCKER):host-gateway \
$(CURL_DOCKER_IMAGE) \
-fsSi -X POST http://$(HOST_FROM_DOCKER):$(DETECTOR_ADMIN_PORT)/scan \
-H 'Content-Type: application/json' \
-d '{"resource_types":["$(WEBHOOK_E2E_RESOURCE)"]}'
@echo ""
@echo "✅ Detector orchestrator workflow started; expect a matching version-guard-act-<snapshotID> ActWorkflow on the emitter."

.PHONY: webhook-e2e-smoke
webhook-e2e-smoke: ## Hit the emitter /trigger-act webhook directly (no detector) via Docker
@command -v docker >/dev/null 2>&1 || { echo "❌ docker not found"; exit 1; }
@SID="smoke-$$(date +%s)"; \
echo "🔎 POST /trigger-act to emitter at :$(EMITTER_ADMIN_PORT) with snapshot_id=$$SID..."; \
docker run --rm \
--add-host=$(HOST_FROM_DOCKER):host-gateway \
$(CURL_DOCKER_IMAGE) \
-fsSi -X POST http://$(HOST_FROM_DOCKER):$(EMITTER_ADMIN_PORT)/trigger-act \
-H 'Content-Type: application/json' \
-d "{\"snapshot_id\":\"$$SID\"}"

# ── Docker ────────────────────────────────────────────────────────────────────

.PHONY: docker-build
Expand Down
4 changes: 3 additions & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func (c *ScanStartCmd) Run(ctx *Context) error {
// --resource-type explicitly. An empty list propagates to the
// orchestrator, which rejects it with ErrNoResourceTypes so the
// caller gets an immediate, descriptive failure.
trigger := scan.NewTrigger(temporalClient, ctx.TemporalTaskQueue, nil)
// CLI-triggered runs do not chain to the emitter webhook — operators
// using the CLI typically just want to verify the detector path.
trigger := scan.NewTrigger(temporalClient, ctx.TemporalTaskQueue, nil, "")
res, err := trigger.Run(context.Background(), scan.Input{
ScanID: c.ScanID,
ResourceTypes: resourceTypes,
Expand Down
109 changes: 77 additions & 32 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/block/Version-Guard/pkg/eol"
eolendoflife "github.com/block/Version-Guard/pkg/eol/endoflife"
"github.com/block/Version-Guard/pkg/inventory"
mockinv "github.com/block/Version-Guard/pkg/inventory/mock"
"github.com/block/Version-Guard/pkg/inventory/wiz"
"github.com/block/Version-Guard/pkg/policy"
"github.com/block/Version-Guard/pkg/registry"
Expand Down Expand Up @@ -62,6 +63,12 @@ type ServerCLI struct {
// AWS configuration (for EOL APIs)
AWSRegion string `help:"AWS region for EOL APIs" default:"us-west-2" env:"AWS_REGION"`

// Snapshot storage backend ("s3" or "memory"). "memory" is intended
// for laptop dev and CI smoke tests; it has no durability across
// restarts but lets the orchestrator's Stage 2 succeed without AWS
// credentials.
SnapshotStore string `help:"Snapshot store backend: s3 or memory" default:"s3" enum:"s3,memory" env:"SNAPSHOT_STORE"`

// S3 configuration (for snapshots)
S3Bucket string `help:"S3 bucket for snapshots" default:"version-guard-snapshots" env:"S3_BUCKET"`
S3Prefix string `help:"S3 prefix for snapshots" default:"snapshots/" env:"S3_PREFIX"`
Expand All @@ -70,6 +77,13 @@ type ServerCLI struct {
// Service configuration
HTTPPort int `help:"HTTP admin port (POST /scan)" default:"8081" env:"HTTP_PORT"`

// Emitter webhook (Stage 3). When set, OrchestratorWorkflow POSTs to
// "<url>/trigger-act" after the snapshot is persisted, kicking the
// downstream emitter immediately instead of waiting for its own cron.
// Empty disables the webhook (snapshot still lands in S3 for any
// pull-based consumer).
EmitterWebhookURL string `help:"Base URL of the emitter webhook (e.g. http://version-guard-emitter:8080)" env:"EMITTER_WEBHOOK_URL"`

// Tag configuration (comma-separated lists for AWS resource tags)
TagAppKeys string `help:"Comma-separated tag keys for application/service name" default:"app,application,service" env:"TAG_APP_KEYS"`

Expand Down Expand Up @@ -156,25 +170,32 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
st := memory.NewStore()
fmt.Println("✓ In-memory store initialized")

// Initialize S3 snapshot store
var snapshotStore *snapshot.S3Store
// Initialize snapshot store. Production runs use S3; laptop dev / CI
// smoke tests can use the in-memory store via `SNAPSHOT_STORE=memory`
// to avoid needing AWS credentials.
var snapshotStore snapshot.Store
ctx := context.Background()
configOpts := []func(*config.LoadOptions) error{config.WithRegion(s.AWSRegion)}
cfg, err := config.LoadDefaultConfig(ctx, configOpts...)
if err != nil {
fmt.Printf("⚠️ Failed to load AWS config: %v\n", err)
fmt.Println(" Snapshots will not be persisted to S3")
if s.SnapshotStore == "memory" {
snapshotStore = snapshot.NewMemoryStore()
fmt.Println("✓ In-memory snapshot store initialized (SNAPSHOT_STORE=memory; not durable)")
} else {
s3Opts := []func(*s3.Options){}
if s.S3Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = &s.S3Endpoint
o.UsePathStyle = true
})
configOpts := []func(*config.LoadOptions) error{config.WithRegion(s.AWSRegion)}
cfg, err := config.LoadDefaultConfig(ctx, configOpts...)
if err != nil {
fmt.Printf("⚠️ Failed to load AWS config: %v\n", err)
fmt.Println(" Snapshots will not be persisted to S3")
} else {
s3Opts := []func(*s3.Options){}
if s.S3Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = &s.S3Endpoint
o.UsePathStyle = true
})
}
s3Client := s3.NewFromConfig(cfg, s3Opts...)
snapshotStore = snapshot.NewS3Store(s3Client, s.S3Bucket, s.S3Prefix)
fmt.Printf("✓ S3 snapshot store initialized (bucket: %s)\n", s.S3Bucket)
}
s3Client := s3.NewFromConfig(cfg, s3Opts...)
snapshotStore = snapshot.NewS3Store(s3Client, s.S3Bucket, s.S3Prefix)
fmt.Printf("✓ S3 snapshot store initialized (bucket: %s)\n", s.S3Bucket)
}

// Initialize Temporal client
Expand Down Expand Up @@ -259,14 +280,32 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
var invSource inventory.InventorySource
if resourceCfg.Inventory.Source == "wiz" {
if wizClient == nil {
// Wiz client not available (no credentials)
fmt.Printf(" ⚠️ Skipping %s - Wiz credentials not configured\n", resourceCfg.ID)
continue
// No Wiz credentials — use the in-process mock inventory
// so local e2e runs (webhook smoke tests etc.) can exercise
// the full detector → snapshot → emitter wire without
// CloudSec-issued credentials. Production paths always set
// the Wiz secrets, so this branch is dev-only.
configID := types.ResourceType(resourceCfg.ID)
invSource = &mockinv.InventorySource{
Resources: []*types.Resource{
{
ID: fmt.Sprintf("mock-%s-1", resourceCfg.ID),
Service: "version-guard-mock",
Type: configID,
CurrentVersion: "1.0.0",
Engine: resourceCfg.Type,
CloudProvider: types.CloudProviderAWS,
DiscoveredAt: time.Now(),
Tags: map[string]string{"env": "local-dev"},
},
},
}
fmt.Printf(" ⚠️ %s - Wiz credentials not configured, using mock inventory (1 fake resource)\n", resourceCfg.ID)
} else {
// Create generic inventory source
invSource = wiz.NewGenericInventorySource(wizClient, resourceCfg, registryClient, logger)
fmt.Printf(" ✓ Wiz inventory source created (reads from WIZ_REPORT_IDS[%s])\n", resourceCfg.ID)
}

// Create generic inventory source
invSource = wiz.NewGenericInventorySource(wizClient, resourceCfg, registryClient, logger)
fmt.Printf(" ✓ Wiz inventory source created (reads from WIZ_REPORT_IDS[%s])\n", resourceCfg.ID)
} else {
fmt.Printf(" ⚠️ Unsupported inventory source: %s\n", resourceCfg.Inventory.Source)
continue
Expand Down Expand Up @@ -369,6 +408,7 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
if snapshotStore != nil {
orchestratorActivities := orchestrator.NewActivities(st, snapshotStore)
w.RegisterActivityWithOptions(orchestratorActivities.CreateSnapshot, activity.RegisterOptions{Name: orchestrator.CreateSnapshotActivityName})
w.RegisterActivityWithOptions(orchestratorActivities.NotifyEmitter, activity.RegisterOptions{Name: orchestrator.NotifyEmitterActivityName})
fmt.Println("✓ Orchestrator activities registered (with S3)")
} else {
fmt.Println("⚠️ Orchestrator snapshot activity not registered (no S3 store)")
Expand All @@ -380,7 +420,11 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
}

// Start HTTP admin server (POST /scan to trigger manual scans)
httpServer := startAdminHTTPServer(s.HTTPPort, temporalClient, s.TemporalTaskQueue, defaultResourceTypes)
httpServer := startAdminHTTPServer(s.HTTPPort, temporalClient, s.TemporalTaskQueue, defaultResourceTypes, s.EmitterWebhookURL)

if s.EmitterWebhookURL != "" {
fmt.Printf("✓ Emitter webhook configured: %s/trigger-act\n", strings.TrimRight(s.EmitterWebhookURL, "/"))
}

// Start worker
fmt.Printf("\n✓ Temporal worker starting on queue: %s\n", s.TemporalTaskQueue)
Expand Down Expand Up @@ -430,12 +474,13 @@ func (s *ServerCLI) ensureSchedule(ctx context.Context, temporalClient client.Cl
schedCtx, schedCancel := context.WithTimeout(ctx, 10*time.Second)
defer schedCancel()
schedErr := scheduleMgr.EnsureSchedule(schedCtx, schedule.Config{
Enabled: true,
ScheduleID: s.ScheduleID,
CronExpression: s.ScheduleCron,
Jitter: jitter,
TaskQueue: s.TemporalTaskQueue,
ResourceTypes: defaultResourceTypes,
Enabled: true,
ScheduleID: s.ScheduleID,
CronExpression: s.ScheduleCron,
Jitter: jitter,
TaskQueue: s.TemporalTaskQueue,
ResourceTypes: defaultResourceTypes,
EmitterWebhookURL: s.EmitterWebhookURL,
})
if schedErr != nil {
fmt.Printf("⚠️ Failed to create/update schedule: %v\n", schedErr)
Expand All @@ -449,8 +494,8 @@ func (s *ServerCLI) ensureSchedule(ctx context.Context, temporalClient client.Cl
// startAdminHTTPServer wires the scan trigger into an HTTP admin server and
// starts listening in a background goroutine. The returned *http.Server can be
// shut down gracefully by the caller.
func startAdminHTTPServer(port int, temporalClient client.Client, taskQueue string, defaultResourceTypes []types.ResourceType) *http.Server {
scanTrigger := scan.NewTrigger(temporalClient, taskQueue, defaultResourceTypes)
func startAdminHTTPServer(port int, temporalClient client.Client, taskQueue string, defaultResourceTypes []types.ResourceType, emitterWebhookURL string) *http.Server {
scanTrigger := scan.NewTrigger(temporalClient, taskQueue, defaultResourceTypes, emitterWebhookURL)
mux := http.NewServeMux()
mux.Handle("/scan", scan.NewHandler(scanTrigger))

Expand Down
21 changes: 17 additions & 4 deletions pkg/scan/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Starter interface {
type Trigger struct {
starter Starter
taskQueue string
emitterWebhookURL string
defaultResourceTypes []types.ResourceType
}

Expand All @@ -43,8 +44,10 @@ type Trigger struct {
// defaultResourceTypes is the list used when the caller does not specify
// any (e.g. a full-fleet scan via empty HTTP body); supply it from the
// loaded YAML config so adding a resource is a YAML-only change.
func NewTrigger(c client.Client, taskQueue string, defaultResourceTypes []types.ResourceType) *Trigger {
return &Trigger{starter: c, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes}
// emitterWebhookURL, when non-empty, is forwarded to the orchestrator so
// it can fire the Stage 3 notify activity after the snapshot is saved.
func NewTrigger(c client.Client, taskQueue string, defaultResourceTypes []types.ResourceType, emitterWebhookURL string) *Trigger {
return &Trigger{starter: c, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes, emitterWebhookURL: emitterWebhookURL}
}

// NewTriggerWithStarter returns a Trigger backed by an explicit Starter
Expand All @@ -54,6 +57,15 @@ func NewTriggerWithStarter(s Starter, taskQueue string, defaultResourceTypes []t
return &Trigger{starter: s, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes}
}

// WithEmitterWebhookURL returns a copy of the trigger configured to forward
// the given URL to every started OrchestratorWorkflow. Useful in tests so
// the existing constructor signatures stay uncluttered.
func (t *Trigger) WithEmitterWebhookURL(url string) *Trigger {
clone := *t
clone.emitterWebhookURL = url
return &clone
}

// Input controls the scope of a manual scan.
type Input struct {
// ScanID lets the caller pin a correlation ID. If empty, one is generated.
Expand Down Expand Up @@ -105,8 +117,9 @@ func (t *Trigger) Run(ctx context.Context, in Input) (Result, error) {
}

run, err := t.starter.ExecuteWorkflow(ctx, opts, orchestrator.OrchestratorWorkflow, orchestrator.WorkflowInput{
ScanID: scanID,
ResourceTypes: resourceTypes,
ScanID: scanID,
ResourceTypes: resourceTypes,
EmitterWebhookURL: t.emitterWebhookURL,
})
if err != nil {
return Result{}, fmt.Errorf("scan: execute workflow: %w", err)
Expand Down
17 changes: 16 additions & 1 deletion pkg/scan/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,27 @@ func TestNewTrigger_WiresClientAsStarter(t *testing.T) {
// constructor that stores it. Passing nil is enough to exercise the line —
// we only assert the fields are wired.
defaults := []types.ResourceType{"aurora-mysql"}
tr := NewTrigger(nil, "version-guard-orchestrator", defaults)
tr := NewTrigger(nil, "version-guard-orchestrator", defaults, "http://emitter:8080")

require.NotNil(t, tr)
assert.Equal(t, "version-guard-orchestrator", tr.taskQueue)
assert.Nil(t, tr.starter, "nil client should pass through as nil Starter")
assert.Equal(t, defaults, tr.defaultResourceTypes)
assert.Equal(t, "http://emitter:8080", tr.emitterWebhookURL)
}

func TestTrigger_Run_ForwardsEmitterWebhookURL(t *testing.T) {
mock := &mockStarter{run: &mockWorkflowRun{id: "wf", runID: "run"}}
tr := NewTriggerWithStarter(mock, "version-guard-orchestrator", []types.ResourceType{"aurora-mysql"}).
WithEmitterWebhookURL("http://emitter:8080")

_, err := tr.Run(context.Background(), Input{ScanID: "abc"})

require.NoError(t, err)
require.Len(t, mock.calledArgs, 1)
in := mock.calledArgs[0].(orchestrator.WorkflowInput)
assert.Equal(t, "http://emitter:8080", in.EmitterWebhookURL,
"orchestrator must receive the emitter webhook URL on the workflow input")
}

func TestTrigger_Run_PropagatesStarterError(t *testing.T) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ import (
)

// Config holds configuration for the Temporal schedule.
// Field order is tuned for govet fieldalignment: all string fields
// before the slice keeps the pointer span minimal.
type Config struct {
ScheduleID string
CronExpression string
TaskQueue string
// EmitterWebhookURL, when non-empty, is forwarded into every
// scheduled OrchestratorWorkflow run so it can fire the Stage 3
// notify activity once the snapshot is persisted. Empty disables
// the webhook for scheduled runs.
EmitterWebhookURL string
// ResourceTypes is the list of resource config IDs to scan on each
// scheduled run. Sourced from the loaded YAML config at startup —
// empty is rejected by the orchestrator workflow because there is
Expand Down Expand Up @@ -72,7 +79,8 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error {
Action: &client.ScheduleWorkflowAction{
Workflow: orchestrator.OrchestratorWorkflow,
Args: []interface{}{orchestrator.WorkflowInput{
ResourceTypes: cfg.ResourceTypes,
ResourceTypes: cfg.ResourceTypes,
EmitterWebhookURL: cfg.EmitterWebhookURL,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate webhook URL for existing schedules

EnsureSchedule only applies EmitterWebhookURL in the create-time action args, but the already-existing-schedule path does not compare or rewrite workflow args (it exits early when cron/jitter match, and update only touches spec/task queue). In an upgraded deployment where the schedule already exists, setting EMITTER_WEBHOOK_URL later will not reach scheduled orchestrator runs, so Stage 3 webhook notifications never fire unless the schedule is recreated manually.

Useful? React with 👍 / 👎.

}},
TaskQueue: cfg.TaskQueue,
WorkflowExecutionTimeout: 2 * time.Hour,
Expand Down
Loading
Loading