Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ jobs:
- name: Terraform validate
run: terraform -chdir=grantstack-backend/terraform validate

- name: Terraform tests
run: terraform -chdir=grantstack-backend/terraform test

- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ This repo is designed to show practical full-stack cloud engineering judgment:
- `grantstack-backend/lambda/` - Python Lambda handlers.
- `grantstack-backend/scripts/smoke_test.py` - public API workflow smoke test.
- `grantstack-backend/docs/DLQ_REPLAY_RUNBOOK.md` - dead-letter queue recovery runbook.
- `grantstack-backend/docs/SQS_WORKER_RUNBOOK.md` - intentional dev worker operation and empty-receive monitoring.
- `grantstack-landing/` - Cloudflare Pages static site.

## Backend Deploy
Expand Down Expand Up @@ -228,6 +229,7 @@ python3 -m py_compile grantstack-backend/scripts/sync_vector_index.py
python3 -m unittest discover -s grantstack-backend/tests
terraform -chdir=grantstack-backend/terraform fmt -check -recursive
terraform -chdir=grantstack-backend/terraform validate
terraform -chdir=grantstack-backend/terraform test
grantstack-backend/scripts/sync_vector_index.py --dry-run --limit 3
grantstack-backend/scripts/smoke_test.py --timeout 180 --interval 5
```
Expand Down Expand Up @@ -309,7 +311,7 @@ The infrastructure is designed for near-zero idle cost:
- API Gateway HTTP API charges only by request.
- API Gateway stage throttling reduces accidental or abusive request bursts.
- Lambda charges only during execution.
- SQS charges by request.
- SQS charges by request. The dev Lambda poller is disabled by default; set `enable_sqs_worker = true` and apply only when dev queue processing is intentional.
- DynamoDB uses `PAY_PER_REQUEST`.
- DynamoDB TTL is enabled on `expires_at` so pilot project and analytics data can age out automatically.
- CloudWatch Logs, dashboards, alarms, X-Ray traces, S3 catalog storage, and remote Terraform state may create small charges as usage grows.
Expand Down
4 changes: 3 additions & 1 deletion grantstack-backend/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ GrantStack uses a fully asynchronous, decoupled serverless workflow designed for
8. A scheduled Source Refresh Lambda verifies official source URLs, records retrieval status and content hashes, and writes the active catalog to S3.
9. The frontend posts privacy-light product events to `POST /analytics`; the Analytics Lambda validates and stores them in a separate on-demand DynamoDB table with TTL.

The SQS event source mapping is disabled by default in `dev` and requires `enable_sqs_worker = true` for intentional processing. Staging and production remain enabled by default. Messages sent while the dev mapping is disabled remain queued until processing is enabled.

```mermaid
flowchart LR
Client["Client"] --> Api["API Gateway HTTP API"]
Expand All @@ -37,7 +39,7 @@ flowchart LR

- API Gateway HTTP API has no provisioned gateway capacity.
- Lambda uses no provisioned concurrency and is not attached to a VPC, avoiding NAT gateway idle cost.
- SQS charges per request and stores messages only while work exists.
- SQS charges per request. Dev disables the Lambda event source mapping by default so managed pollers do not issue empty receives while idle.
- DynamoDB uses `PAY_PER_REQUEST`; there is no provisioned read/write capacity.
- The analytics table also uses `PAY_PER_REQUEST` and TTL, preserving the same zero-idle compute posture for product telemetry.
- CloudWatch log groups use explicit retention.
Expand Down
1 change: 1 addition & 0 deletions grantstack-backend/docs/DLQ_REPLAY_RUNBOOK.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ aws sqs receive-message \
--attribute-names All \
--message-attribute-names All \
--max-number-of-messages 1 \
--wait-time-seconds 20 \
--visibility-timeout 30
```

Expand Down
50 changes: 50 additions & 0 deletions grantstack-backend/docs/SQS_WORKER_RUNBOOK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# SQS Worker Runbook

GrantStack consumes `grantstack-<environment>-processing` through an AWS Lambda event source mapping. There is no application-side polling loop. Lambda owns the `ReceiveMessage` calls, long polling, scaling, and idle backoff behavior.

The processing queue uses a 20-second receive wait. The 960-second visibility timeout exceeds the processor Lambda's 900-second timeout. The mapping keeps a batch size of one because a single report can use most of that processing window; increasing the batch would risk messages becoming visible before the batch completes. Lambda deletes successfully processed messages and the handler reports individual failures through `ReportBatchItemFailures`.

## Dev Worker Lifecycle

The event source mapping is disabled by default when `environment = "dev"`. This stops idle SQS receives without deleting the queue, DLQ, or pending messages. Staging and production remain enabled by default.

`enable_sqs_worker` is a deployment-time control because the Lambda service poller exists outside the function process. Automation can pass it as `TF_VAR_enable_sqs_worker`; a checked-in or CLI variable value takes normal Terraform precedence.

Enable dev processing intentionally:

```sh
terraform -chdir=grantstack-backend/terraform plan \
-var-file=env/dev.tfvars \
-var='enable_sqs_worker=true' \
-out=grantstack-dev-worker.tfplan
terraform -chdir=grantstack-backend/terraform apply grantstack-dev-worker.tfplan
```

After the dev work is complete, set `enable_sqs_worker = false` in the dev variable file and apply again. `terraform output -raw sqs_worker_enabled` reports the deployed intent. Processor cold invocations log the queue name, 20-second wait, Lambda-managed idle behavior, and received batch size.

Lambda event source mappings do not expose a configurable exponential idle-backoff setting. AWS manages their pollers and can keep multiple long polls active even when a queue is empty. Disabling the dev mapping is therefore the reliable zero-idle-request control; adding sleep or jitter inside the Lambda handler would not affect the separate AWS-managed pollers.

## Verify Empty Receives

In CloudWatch Metrics, select `AWS/SQS`, `Queue Metrics`, `NumberOfEmptyReceives`, and the `QueueName` dimension. Use a one-day period, the `Sum` statistic, and the deployment date through the current date. The operations dashboard also includes this metric at one-minute resolution.

CLI example for UTC dates:

```sh
aws cloudwatch get-metric-statistics \
--namespace AWS/SQS \
--metric-name NumberOfEmptyReceives \
--dimensions Name=QueueName,Value=grantstack-dev-processing \
--statistics Sum \
--period 86400 \
--start-time 2026-06-01T00:00:00Z \
--end-time 2026-07-01T00:00:00Z
```

After disabling the mapping, confirm that daily sums fall to zero after any in-flight polls finish. Also verify the mapping state:

```sh
aws lambda list-event-source-mappings \
--function-name grantstack-dev-processor \
--event-source-arn "$(terraform -chdir=grantstack-backend/terraform output -raw processing_queue_arn)"
```
11 changes: 10 additions & 1 deletion grantstack-backend/lambda/processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,17 @@ def lambda_handler(event: Mapping[str, Any], context: Any) -> Dict[str, List[Dic
config = load_config()
table = dynamodb.Table(config.table_name)
failed_items: List[Dict[str, str]] = []
records = event.get("Records", [])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If event is None or if the "Records" key is explicitly set to null (which parses as None in Python), calling event.get("Records", []) will return None. This will subsequently cause a TypeError when len(records) is evaluated on line 69, or a TypeError when iterating over records on line 72.

To make this more robust and adhere to defensive programming practices, use a guard to handle both None event and None records cases.

Suggested change
records = event.get("Records", [])
records = event.get("Records") or [] if event else []


logger.info(
"sqs_worker_started queue=%s wait_time_seconds=%s idle_backoff=%s batch_size=%s",
os.environ.get("SQS_QUEUE_NAME", "unknown"),
os.environ.get("SQS_RECEIVE_WAIT_TIME_SECS", "20"),
os.environ.get("SQS_IDLE_BACKOFF_MODE", "aws-lambda-managed"),
len(records),
)

for record in event.get("Records", []):
for record in records:
message_id = record.get("messageId", "unknown-message")
try:
asyncio.run(process_record(record, table, config))
Expand Down
1 change: 1 addition & 0 deletions grantstack-backend/terraform/env/dev.tfvars.example
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enable_xray_tracing = true
enable_api_access_logs = true
enable_cloudwatch_dashboard = true
enable_source_refresh = true
enable_sqs_worker = false

processor_memory_size = 1024
processor_timeout_seconds = 900
Expand Down
1 change: 1 addition & 0 deletions grantstack-backend/terraform/env/prod.tfvars.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enable_xray_tracing = true
enable_api_access_logs = true
enable_cloudwatch_dashboard = true
enable_source_refresh = true
enable_sqs_worker = true

processor_memory_size = 1024
processor_timeout_seconds = 900
Expand Down
1 change: 1 addition & 0 deletions grantstack-backend/terraform/env/staging.tfvars.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enable_xray_tracing = true
enable_api_access_logs = true
enable_cloudwatch_dashboard = true
enable_source_refresh = true
enable_sqs_worker = true

processor_memory_size = 1024
processor_timeout_seconds = 900
Expand Down
29 changes: 21 additions & 8 deletions grantstack-backend/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ provider "aws" {
data "aws_caller_identity" "current" {}

locals {
name_prefix = "${var.project_name}-${var.environment}"
dynamodb_table_name = coalesce(var.dynamodb_table_name, "${local.name_prefix}-projects")
analytics_table_name = coalesce(var.analytics_table_name, "${local.name_prefix}-analytics")
source_catalog_bucket_name = coalesce(var.source_catalog_bucket_name, "${local.name_prefix}-source-catalog-${data.aws_caller_identity.current.account_id}")
name_prefix = "${var.project_name}-${var.environment}"
dynamodb_table_name = coalesce(var.dynamodb_table_name, "${local.name_prefix}-projects")
analytics_table_name = coalesce(var.analytics_table_name, "${local.name_prefix}-analytics")
source_catalog_bucket_name = coalesce(var.source_catalog_bucket_name, "${local.name_prefix}-source-catalog-${data.aws_caller_identity.current.account_id}")
sqs_receive_wait_time_seconds = 20
sqs_worker_enabled = var.enable_sqs_worker != null ? var.enable_sqs_worker : var.environment != "dev"
external_secret_arns = compact([
var.vector_db_api_key_secret_arn,
var.embedding_api_key_secret_arn,
Expand Down Expand Up @@ -111,6 +113,7 @@ data "archive_file" "source_refresh_lambda" {
resource "aws_sqs_queue" "processing_dlq" {
name = "${local.name_prefix}-processing-dlq"
message_retention_seconds = 1209600
receive_wait_time_seconds = local.sqs_receive_wait_time_seconds
sqs_managed_sse_enabled = true

tags = local.common_tags
Expand All @@ -120,7 +123,7 @@ resource "aws_sqs_queue" "processing" {
name = "${local.name_prefix}-processing"
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
message_retention_seconds = var.sqs_message_retention_seconds
receive_wait_time_seconds = 20
receive_wait_time_seconds = local.sqs_receive_wait_time_seconds
sqs_managed_sse_enabled = true

redrive_policy = jsonencode({
Expand Down Expand Up @@ -701,6 +704,9 @@ resource "aws_lambda_function" "processor" {
LLM_MODEL = var.llm_model
MOCK_EXTERNAL_CALLS = tostring(var.mock_external_calls)
HTTP_CLIENT_TIMEOUT_SECS = tostring(var.http_client_timeout_seconds)
SQS_QUEUE_NAME = aws_sqs_queue.processing.name
SQS_RECEIVE_WAIT_TIME_SECS = tostring(local.sqs_receive_wait_time_seconds)
SQS_IDLE_BACKOFF_MODE = "aws-lambda-managed"
}
}

Expand Down Expand Up @@ -814,12 +820,13 @@ resource "aws_lambda_function" "source_refresh" {
}

resource "aws_lambda_event_source_mapping" "processor_sqs" {
event_source_arn = aws_sqs_queue.processing.arn
function_name = aws_lambda_function.processor.arn
event_source_arn = aws_sqs_queue.processing.arn
function_name = aws_lambda_function.processor.arn
# One report can use the full processor timeout, so larger batches risk visibility expiry.
batch_size = 1
maximum_batching_window_in_seconds = 0
function_response_types = ["ReportBatchItemFailures"]
enabled = true
enabled = local.sqs_worker_enabled
}

resource "aws_cloudwatch_event_rule" "source_refresh" {
Expand Down Expand Up @@ -1262,6 +1269,7 @@ resource "aws_cloudwatch_dashboard" "grantstack" {
metrics = [
["AWS/SQS", "ApproximateNumberOfMessagesVisible", "QueueName", aws_sqs_queue.processing.name, { stat = "Maximum", label = "Processing visible" }],
[".", "ApproximateAgeOfOldestMessage", ".", aws_sqs_queue.processing.name, { stat = "Maximum", label = "Oldest age" }],
[".", "NumberOfEmptyReceives", ".", aws_sqs_queue.processing.name, { stat = "Sum", label = "Empty receives" }],
[".", "ApproximateNumberOfMessagesVisible", ".", aws_sqs_queue.processing_dlq.name, { stat = "Maximum", label = "DLQ visible" }]
]
period = 60
Expand Down Expand Up @@ -1367,6 +1375,11 @@ output "processing_queue_arn" {
value = aws_sqs_queue.processing.arn
}

output "sqs_worker_enabled" {
description = "Whether the Lambda SQS event source mapping is actively polling the processing queue."
value = local.sqs_worker_enabled
}

output "processing_dlq_url" {
description = "SQS processing DLQ URL."
value = aws_sqs_queue.processing_dlq.url
Expand Down
1 change: 1 addition & 0 deletions grantstack-backend/terraform/terraform.tfvars.example
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enable_xray_tracing = true
enable_api_access_logs = true
enable_cloudwatch_dashboard = true
enable_source_refresh = true
enable_sqs_worker = false

processor_memory_size = 1024
processor_timeout_seconds = 900
Expand Down
64 changes: 64 additions & 0 deletions grantstack-backend/terraform/tests/sqs_worker.tftest.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
mock_provider "aws" {
mock_data "aws_caller_identity" {
defaults = {
account_id = "123456789012"
}
}

mock_data "aws_iam_policy_document" {
defaults = {
json = "{\"Version\":\"2012-10-17\",\"Statement\":[]}"
}
}
}
mock_provider "archive" {}

run "dev_worker_is_disabled_by_default" {
command = plan

variables {
environment = "dev"
}

assert {
condition = aws_lambda_event_source_mapping.processor_sqs.enabled == false
error_message = "The dev SQS worker must not poll unless it is explicitly enabled."
}

assert {
condition = aws_sqs_queue.processing.receive_wait_time_seconds == 20
error_message = "The processing queue must use 20-second long polling."
}

assert {
condition = aws_sqs_queue.processing_dlq.receive_wait_time_seconds == 20
error_message = "The processing DLQ must use 20-second long polling."
}
}

run "dev_worker_can_be_enabled_explicitly" {
command = plan

variables {
environment = "dev"
enable_sqs_worker = true
}

assert {
condition = aws_lambda_event_source_mapping.processor_sqs.enabled == true
error_message = "The dev SQS worker should run when explicitly enabled."
}
}

run "production_worker_remains_enabled_by_default" {
command = plan

variables {
environment = "prod"
}

assert {
condition = aws_lambda_event_source_mapping.processor_sqs.enabled == true
error_message = "The production SQS worker must remain enabled by default."
}
}
7 changes: 7 additions & 0 deletions grantstack-backend/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ variable "sqs_visibility_timeout_seconds" {
default = 960
}

variable "enable_sqs_worker" {
description = "Enable the Lambda SQS processor poller. Defaults to false in dev and true in other environments."
type = bool
default = null
nullable = true
}

variable "sqs_message_retention_seconds" {
description = "Processing queue message retention in seconds."
type = number
Expand Down
Loading