Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions plumber/AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Plumber Repository Guidelines

## Layout
- `ppl/` hosts the pipeline orchestrator (Elixir/OTP application) together with its GRPC endpoints and database migrations.
- `block/` implements block/task execution logic and is started as a sibling OTP app inside the same release.
- `proto/`, `spec/`, and other top-level folders expose shared protobuf contracts, YAML schemas, and helper libraries.

## Build & Test
- Run `mix deps.get` inside both `ppl/` and `block/` after cloning; each app keeps its own `mix.lock`.
- Use the provided `Makefile` targets (e.g., `make unit-test`) to spin up Postgres/RabbitMQ containers and execute the umbrella test suites.
- Database migrations live under `ppl/priv/ecto_repo/migrations` and `block/priv/ecto_repo/migrations`; run them with `mix ecto.migrate -r <Repo>`.
- All new code must pass `mix credo --strict`, `mix format`, and the relevant `mix test` suites before submitting.

## Development Notes
- Services rely on RabbitMQ (`RABBITMQ_URL`) for event streams; keep it running locally when exercising background workers (e.g., the retention policy consumer).
- Watchman (StatsD) is the default metrics sink; configure `METRICS_HOST`/`METRICS_PORT` for local debugging if needed.

## Documentation
- Service-specific guidance lives under `ppl/AGENTS.md` and `block/AGENTS.md`.
- Architectural notes, including retention policies, are documented in `docs/` (see `docs/pipeline_retention.md` for the event-driven marking flow).
19 changes: 19 additions & 0 deletions plumber/block/AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Block Service Guidelines

## Project Layout
- `lib/block/` contains block request processing, task orchestration, and API clients.
- `priv/ecto_repo/migrations/` stores migrations for `Block.EctoRepo`.
- `config/` defines repo settings, looper timings, and runtime overrides.
- Tests live under `test/`; shared helpers reside in `test/support`.

## Setup & Commands
- Install deps: `mix deps.get`.
- Migrate DB: `mix ecto.migrate -r Block.EctoRepo`.
- Run unit tests: `mix test` (requires Postgres running; use the root Makefile to spin up containers).
- Format/lint: `mix format`, `mix credo --strict`.

## Integration Notes
- Block is started as part of the Plumber release; ensure both repos (`Ppl.EctoRepo` and `Block.EctoRepo`) are migrated.
- RabbitMQ (`RABBITMQ_URL`) is required for task lifecycle events; keep it reachable during development.
- When pipelines are deleted (e.g., via the retention worker) `Block.delete_blocks_from_ppl/1` is invoked to purge block-state tables—be mindful of this coupling when making schema changes.

22 changes: 22 additions & 0 deletions plumber/ppl/AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Ppl Service Guidelines

## Project Layout
- `lib/ppl/` contains the OTP application code (workers, GRPC servers, caches, etc.).
- `priv/ecto_repo/migrations/` holds database migrations for `Ppl.EctoRepo`.
- `config/` hosts the environment-specific config (runtime DB credentials, Watchman, RabbitMQ consumers).
- `test/` mirrors `lib/` with ExUnit suites; helpers live in `test/support`.

## Common Tasks
- Install deps: `mix deps.get`.
- Run migrations: `mix ecto.migrate -r Ppl.EctoRepo`.
- Launch tests: `mix test` (set `MIX_ENV=test`, and start Postgres/RabbitMQ via the repo `Makefile` if not already running).
- Format & lint: `mix format`, `mix credo --strict`.

## Background Workers
- Looper STMs and Beholders are supervised via `Ppl.Application`.
- `Ppl.Retention.PolicyConsumer` (Tackle) listens for `usage.OrganizationPolicyApply` events and marks pipelines by setting `expires_at`.

## Configuration Tips
- StatsD via Watchman: set `METRICS_HOST`, `METRICS_PORT`, `METRICS_NAMESPACE`.
- DB settings: `POSTGRES_DB_*` env vars control `Ppl.EctoRepo`; separate vars govern the `block` repo.
- Retention consumer: `USAGE_POLICY_EXCHANGE` / `USAGE_POLICY_ROUTING_KEY` configure which RabbitMQ route the policy consumer listens to.
5 changes: 5 additions & 0 deletions plumber/ppl/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ config :gofer_client, gofer_grpc_timeout: 4_567
# Time in hours before delete requests are processed
config :ppl, deletion_offset_h: 24

# Retention policy event consumer
config :ppl, Ppl.Retention.PolicyConsumer,
exchange: System.get_env("USAGE_POLICY_EXCHANGE"),
routing_key: System.get_env("USAGE_POLICY_ROUTING_KEY")

# How many times should wormhole retry to publish pipeline events to RabbitMQ
config :ppl, publish_retry_count: 3
# Timeout for publishing pipeline events to RabbitMQ
Expand Down
4 changes: 4 additions & 0 deletions plumber/ppl/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ config :ppl, dr_pending_sp: 100
config :ppl, dr_deleting_sp: 100
config :ppl, dr_queue_deleting_sp: 100

config :ppl, Ppl.Retention.PolicyConsumer,
exchange: "usage_internal_api_test",
routing_key: "usage.apply_organization_policy.test"

config :watchman,
host: "localhost",
port: 8125,
Expand Down
1 change: 1 addition & 0 deletions plumber/ppl/lib/ppl/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule Ppl.Application do
[
Ppl.Sup.STM,
worker(Ppl.OrgEventsConsumer, []),
worker(Ppl.Retention.PolicyConsumer, [])
]
end

Expand Down
1 change: 1 addition & 0 deletions plumber/ppl/lib/ppl/ppl_requests/model/ppl_requests.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Ppl.PplRequests.Model.PplRequests do
field :wf_id, :string
field :source_args, :map
field :pre_flight_checks, :map
field :expires_at, :naive_datetime_usec

timestamps(type: :naive_datetime_usec)
end
Expand Down
28 changes: 28 additions & 0 deletions plumber/ppl/lib/ppl/retention/policy_applier.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Ppl.Retention.PolicyApplier do
@moduledoc """
Marks pipelines for expiration when an organization policy is applied.
"""

import Ecto.Query

alias Ppl.EctoRepo
alias Ppl.PplRequests.Model.PplRequests

@doc """
Sets `expires_at` on all pipelines for `org_id` inserted before `cutoff`.
Pipelines already marked with an earlier or equal expiration are left untouched.
Returns the number of rows affected.
"""
@spec mark_expiring(String.t(), NaiveDateTime.t()) :: non_neg_integer()
def mark_expiring(org_id, cutoff) do
query =
from(pr in PplRequests,
where: fragment("?->>?", pr.request_args, "organization_id") == ^org_id,
where: pr.inserted_at < ^cutoff,
where: is_nil(pr.expires_at) or pr.expires_at > ^cutoff
)

{count, _} = EctoRepo.update_all(query, set: [expires_at: cutoff])
count
end
end
55 changes: 55 additions & 0 deletions plumber/ppl/lib/ppl/retention/policy_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Ppl.Retention.PolicyConsumer do
@moduledoc """
Subscribes to usage.ApplyOrganizationPolicyEvent and marks pipelines for expiration.
"""

@consumer_opts Application.compile_env(:ppl, __MODULE__, [])

use Tackle.Consumer,
url: System.get_env("RABBITMQ_URL"),
exchange: Keyword.get(@consumer_opts, :exchange, "usage_internal_api"),
routing_key: Keyword.get(@consumer_opts, :routing_key, "usage.apply_organization_policy"),
service: "plumber-retention"

require Logger

alias Google.Protobuf.Timestamp
alias InternalApi.Usage.OrganizationPolicyApply
alias Ppl.Retention.PolicyApplier

def handle_message(message) do
with {:ok, event} <- decode(message),
{:ok, cutoff} <- cutoff_to_naive(event.cutoff_date),
{:ok, org_id} <- non_empty(event.org_id) do
count = PolicyApplier.mark_expiring(org_id, cutoff)
Watchman.increment({"retention.marked", [org_id]})
Logger.info("[Retention] Marked #{count} pipelines for org #{org_id} until #{cutoff}")
else
{:error, reason} ->
Logger.error("[Retention] Failed to apply policy event: #{inspect(reason)}")
end
end

defp decode(message) do
OrganizationPolicyApply.decode(message)
|> case do
%OrganizationPolicyApply{} = event -> {:ok, event}
other -> {:error, {:unexpected_payload, other}}
end
rescue
e -> {:error, e}
end

defp cutoff_to_naive(%Timestamp{seconds: 0, nanos: 0}), do: {:error, :missing_cutoff}

defp cutoff_to_naive(%Timestamp{seconds: seconds, nanos: nanos}) do
{:ok, datetime} = DateTime.from_unix(seconds, :second)
micros = div(nanos, 1_000)
{:ok, datetime |> DateTime.add(micros, :microsecond) |> DateTime.to_naive()}
end

defp cutoff_to_naive(nil), do: {:error, :missing_cutoff}

defp non_empty(value) when value in [nil, ""], do: {:error, :missing_org_id}
defp non_empty(value), do: {:ok, value}
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Ppl.EctoRepo.Migrations.AddExpiresAtToPipelineRequests do
use Ecto.Migration

@disable_ddl_transaction true
@disable_migration_lock true

def change do
alter table(:pipeline_requests) do
add :expires_at, :naive_datetime_usec
end

create index(:pipeline_requests, [:inserted_at, :expires_at], name: :idx_pipeline_requests_created_at_expires_at_not_null, concurrently: true, where: "expires_at IS NOT NULL")
end
end
136 changes: 136 additions & 0 deletions plumber/ppl/test/retention/policy_applier_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
defmodule Ppl.Retention.PolicyApplierTest do
use ExUnit.Case

import Ecto.Query
alias Ppl.EctoRepo
alias Ppl.PplRequests.Model.PplRequests
alias Ppl.Retention.PolicyApplier

setup do
Test.Helpers.truncate_db()
:ok
end

describe "mark_expiring/2" do
test "marks pipelines inserted before cutoff with expires_at" do
org_id = UUID.uuid4()
cutoff = ~N[2025-06-01 12:00:00.000000]

old_pipeline = insert_pipeline(org_id, ~N[2025-05-01 10:00:00.000000])
recent_pipeline = insert_pipeline(org_id, ~N[2025-06-02 10:00:00.000000])

count = PolicyApplier.mark_expiring(org_id, cutoff)

assert count == 1
assert get_expires_at(old_pipeline.id) == cutoff
assert get_expires_at(recent_pipeline.id) == nil
end

test "only marks pipelines for specified organization" do
org_1 = UUID.uuid4()
org_2 = UUID.uuid4()
cutoff = ~N[2025-06-01 12:00:00.000000]

pipeline_1 = insert_pipeline(org_1, ~N[2025-05-01 10:00:00.000000])
pipeline_2 = insert_pipeline(org_2, ~N[2025-05-01 10:00:00.000000])

count = PolicyApplier.mark_expiring(org_1, cutoff)

assert count == 1
assert get_expires_at(pipeline_1.id) == cutoff
assert get_expires_at(pipeline_2.id) == nil
end

test "does not update pipelines already marked with earlier expiration" do
org_id = UUID.uuid4()
earlier_cutoff = ~N[2025-05-15 12:00:00.000000]
later_cutoff = ~N[2025-06-01 12:00:00.000000]

pipeline = insert_pipeline(org_id, ~N[2025-05-01 10:00:00.000000])
set_expires_at(pipeline.id, earlier_cutoff)

count = PolicyApplier.mark_expiring(org_id, later_cutoff)

assert count == 0
assert get_expires_at(pipeline.id) == earlier_cutoff
end

test "updates pipelines marked with later expiration to earlier cutoff" do
org_id = UUID.uuid4()
later_cutoff = ~N[2025-06-01 12:00:00.000000]
earlier_cutoff = ~N[2025-05-15 12:00:00.000000]

pipeline = insert_pipeline(org_id, ~N[2025-05-01 10:00:00.000000])
set_expires_at(pipeline.id, later_cutoff)

count = PolicyApplier.mark_expiring(org_id, earlier_cutoff)

assert count == 1
assert get_expires_at(pipeline.id) == earlier_cutoff
end

test "handles multiple pipelines in batch" do
org_id = UUID.uuid4()
cutoff = ~N[2025-06-01 12:00:00.000000]

Enum.each(1..10, fn _ ->
insert_pipeline(org_id, ~N[2025-05-01 10:00:00.000000])
end)

Enum.each(1..5, fn _ ->
insert_pipeline(org_id, ~N[2025-06-02 10:00:00.000000])
end)

count = PolicyApplier.mark_expiring(org_id, cutoff)

assert count == 10
end

test "returns 0 when no pipelines match criteria" do
org_id = UUID.uuid4()
cutoff = ~N[2025-06-01 12:00:00.000000]

insert_pipeline(org_id, ~N[2025-06-02 10:00:00.000000])

count = PolicyApplier.mark_expiring(org_id, cutoff)

assert count == 0
end
end

defp insert_pipeline(org_id, inserted_at) do
id = UUID.uuid4()
ppl_id = UUID.uuid4()
wf_id = UUID.uuid4()

request_args = %{
"organization_id" => org_id,
"project_id" => UUID.uuid4(),
"service" => "local"
}

%PplRequests{
id: id,
ppl_artefact_id: ppl_id,
wf_id: wf_id,
request_args: request_args,
request_token: UUID.uuid1(),
definition: %{"version" => "v1.0", "blocks" => []},
top_level: true,
initial_request: true,
prev_ppl_artefact_ids: [],
inserted_at: inserted_at
}
|> EctoRepo.insert!()
end

defp get_expires_at(pipeline_id) do
from(pr in PplRequests, where: pr.id == ^pipeline_id, select: pr.expires_at)
|> EctoRepo.one()
end

defp set_expires_at(pipeline_id, expires_at) do
from(pr in PplRequests, where: pr.id == ^pipeline_id)
|> EctoRepo.update_all(set: [expires_at: expires_at])
end
end
Loading