diff --git a/github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb b/github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb new file mode 100644 index 000000000..b0e2973d5 --- /dev/null +++ b/github_hooks/db/migrate/20251118091829_add_expires_at_to_jobs.rb @@ -0,0 +1,5 @@ +class AddExpiresAtToJobs < ActiveRecord::Migration[6.1] + def change + add_column :jobs, :expires_at, :datetime + end +end diff --git a/github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb b/github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb new file mode 100644 index 000000000..02162cd05 --- /dev/null +++ b/github_hooks/db/migrate/20251119103829_add_expires_created_index_at_jobs_table.rb @@ -0,0 +1,11 @@ +class AddExpiresCreatedIndexAtJobsTable < ActiveRecord::Migration[6.1] + disable_ddl_transaction! + + def change + add_index :jobs, + %i[expires_at created_at], + name: "index_jobs_on_expires_created_not_null", + algorithm: :concurrently, + where: "expires_at IS NOT NULL" + end +end diff --git a/github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb b/github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb new file mode 100644 index 000000000..cb69b2786 --- /dev/null +++ b/github_hooks/db/migrate/20251119104017_add_organization_created_index_at_jobs_table.rb @@ -0,0 +1,11 @@ +class AddOrganizationCreatedIndexAtJobsTable < ActiveRecord::Migration[6.1] + disable_ddl_transaction! + + def change + add_index :jobs, + %i[organization_id created_at], + name: "index_jobs_on_organization_created_expires_is_null", + algorithm: :concurrently, + where: "expires_at IS NULL" + end +end diff --git a/zebra/config/config.exs b/zebra/config/config.exs index 675756cdb..a36ad8e0d 100644 --- a/zebra/config/config.exs +++ b/zebra/config/config.exs @@ -24,6 +24,14 @@ config :zebra, Zebra.Workers.TaskFinisher, timeout: 10_000 config :zebra, Zebra.Workers.Dispatcher, timeout: 1_000 config :zebra, Zebra.Workers.Monitor, timeout: 60_000 +config :zebra, Zebra.Workers.JobDeletionPolicyWorker, + naptime: 1_000, # 1 second + longnaptime: 3_600_000, # 1 hour + limit: 100 + +config :zebra, Zebra.Workers.JobDeletionPolicyMarker, + days: 14 + config :zebra, Zebra.Workers.Scheduler, cooldown_period: 1_000, batch_size: 3 diff --git a/zebra/lib/protos/internal_api/artifacthub.pb.ex b/zebra/lib/protos/internal_api/artifacthub.pb.ex index d952ea7c3..46c16957b 100644 --- a/zebra/lib/protos/internal_api/artifacthub.pb.ex +++ b/zebra/lib/protos/internal_api/artifacthub.pb.ex @@ -388,12 +388,14 @@ defmodule InternalApi.Artifacthub.ListItem do @type t :: %__MODULE__{ name: String.t(), - is_directory: boolean + is_directory: boolean, + size: integer } - defstruct [:name, :is_directory] + defstruct [:name, :is_directory, :size] field(:name, 1, type: :string) field(:is_directory, 2, type: :bool) + field(:size, 3, type: :int64) end defmodule InternalApi.Artifacthub.Artifact do diff --git a/zebra/lib/protos/internal_api/rbac.pb.ex b/zebra/lib/protos/internal_api/rbac.pb.ex index 29fb21c76..2aa5958d1 100644 --- a/zebra/lib/protos/internal_api/rbac.pb.ex +++ b/zebra/lib/protos/internal_api/rbac.pb.ex @@ -508,12 +508,39 @@ defmodule InternalApi.RBAC.Permission do field(:scope, 4, type: InternalApi.RBAC.Scope, enum: true) end +defmodule InternalApi.RBAC.ListSubjectsRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + subject_ids: [String.t()] + } + defstruct [:org_id, :subject_ids] + + field(:org_id, 1, type: :string) + field(:subject_ids, 2, repeated: true, type: :string) +end + +defmodule InternalApi.RBAC.ListSubjectsResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + subjects: [InternalApi.RBAC.Subject.t()] + } + defstruct [:subjects] + + field(:subjects, 1, repeated: true, type: InternalApi.RBAC.Subject) +end + defmodule InternalApi.RBAC.SubjectType do @moduledoc false use Protobuf, enum: true, syntax: :proto3 field(:USER, 0) field(:GROUP, 1) + field(:SERVICE_ACCOUNT, 2) end defmodule InternalApi.RBAC.Scope do @@ -588,6 +615,8 @@ defmodule InternalApi.RBAC.RBAC.Service do InternalApi.RBAC.RefreshCollaboratorsRequest, InternalApi.RBAC.RefreshCollaboratorsResponse ) + + rpc(:ListSubjects, InternalApi.RBAC.ListSubjectsRequest, InternalApi.RBAC.ListSubjectsResponse) end defmodule InternalApi.RBAC.RBAC.Stub do diff --git a/zebra/lib/protos/internal_api/usage.pb.ex b/zebra/lib/protos/internal_api/usage.pb.ex new file mode 100644 index 000000000..b72ab1d74 --- /dev/null +++ b/zebra/lib/protos/internal_api/usage.pb.ex @@ -0,0 +1,358 @@ +defmodule InternalApi.Usage.ListDailyUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:period_started_at, 2, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 3, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListDailyUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + daily_usages: [InternalApi.Usage.DailyUsage.t()] + } + defstruct [:status, :daily_usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:daily_usages, 2, repeated: true, type: InternalApi.Usage.DailyUsage) +end + +defmodule InternalApi.Usage.DailyUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + resource_usages: [InternalApi.Usage.DailyResourceUsage.t()], + date: Google.Protobuf.Timestamp.t() + } + defstruct [:resource_usages, :date] + + field(:resource_usages, 1, repeated: true, type: InternalApi.Usage.DailyResourceUsage) + field(:date, 2, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.DailyResourceUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + machine_type: String.t(), + minutes_used: integer, + seconds_used: integer + } + defstruct [:machine_type, :minutes_used, :seconds_used] + + field(:machine_type, 1, type: :string) + field(:minutes_used, 2, type: :int32) + field(:seconds_used, 3, type: :int32) +end + +defmodule InternalApi.Usage.ProjectsUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:period_started_at, 2, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 3, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ProjectsUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + project_usages: [InternalApi.Usage.ProjectUsage.t()] + } + defstruct [:status, :project_usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:project_usages, 2, repeated: true, type: InternalApi.Usage.ProjectUsage) +end + +defmodule InternalApi.Usage.ProjectUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + project_id: String.t(), + resource_usages: [InternalApi.Usage.ResourceUsage.t()] + } + defstruct [:project_id, :resource_usages] + + field(:project_id, 1, type: :string) + field(:resource_usages, 2, repeated: true, type: InternalApi.Usage.ResourceUsage) +end + +defmodule InternalApi.Usage.TotalUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t(), + org_id: String.t() + } + defstruct [:period_started_at, :period_finished_at, :org_id] + + field(:period_started_at, 1, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 2, type: Google.Protobuf.Timestamp) + field(:org_id, 3, type: :string) +end + +defmodule InternalApi.Usage.TotalUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + resource_usages: [InternalApi.Usage.ResourceUsage.t()] + } + defstruct [:status, :resource_usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:resource_usages, 2, repeated: true, type: InternalApi.Usage.ResourceUsage) +end + +defmodule InternalApi.Usage.TotalMembersUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:period_started_at, 3, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 4, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.TotalMembersUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + members: integer, + requesters: integer + } + defstruct [:members, :requesters] + + field(:members, 1, type: :int32) + field(:requesters, 2, type: :int32) +end + +defmodule InternalApi.Usage.ResourceUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + machine_type: String.t(), + seconds_used: integer + } + defstruct [:machine_type, :seconds_used] + + field(:machine_type, 1, type: :string) + field(:seconds_used, 2, type: :int32) +end + +defmodule InternalApi.Usage.ListQuotaUsageRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + points: integer, + period_started_at: Google.Protobuf.Timestamp.t(), + period_finished_at: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :points, :period_started_at, :period_finished_at] + + field(:org_id, 1, type: :string) + field(:points, 2, type: :int32) + field(:period_started_at, 3, type: Google.Protobuf.Timestamp) + field(:period_finished_at, 4, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListQuotaUsageResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + status: Google.Rpc.Status.t(), + usages: [InternalApi.Usage.QuotaUsage.t()] + } + defstruct [:status, :usages] + + field(:status, 1, type: Google.Rpc.Status) + field(:usages, 2, repeated: true, type: InternalApi.Usage.QuotaUsage) +end + +defmodule InternalApi.Usage.QuotaUsage do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + machine_type: String.t(), + points: [InternalApi.Usage.QuotaUsage.Point.t()] + } + defstruct [:machine_type, :points] + + field(:machine_type, 1, type: :string) + field(:points, 2, repeated: true, type: InternalApi.Usage.QuotaUsage.Point) +end + +defmodule InternalApi.Usage.QuotaUsage.Point do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + usage: integer, + date: Google.Protobuf.Timestamp.t() + } + defstruct [:usage, :date] + + field(:usage, 1, type: :int32) + field(:date, 2, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListSeatsRequest do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + from_gte: Google.Protobuf.Timestamp.t(), + to_lt: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :from_gte, :to_lt] + + field(:org_id, 1, type: :string) + field(:from_gte, 2, type: Google.Protobuf.Timestamp) + field(:to_lt, 3, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.ListSeatsResponse do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + seats: [InternalApi.Usage.Seat.t()] + } + defstruct [:seats] + + field(:seats, 1, repeated: true, type: InternalApi.Usage.Seat) +end + +defmodule InternalApi.Usage.Seat do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + user_id: String.t(), + display_name: String.t(), + origin: integer, + status: integer, + date: Google.Protobuf.Timestamp.t() + } + defstruct [:user_id, :display_name, :origin, :status, :date] + + field(:user_id, 1, type: :string) + field(:display_name, 2, type: :string) + field(:origin, 3, type: InternalApi.Usage.SeatOrigin, enum: true) + field(:status, 4, type: InternalApi.Usage.SeatStatus, enum: true) + field(:date, 5, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.OrganizationPolicyApply do + @moduledoc false + use Protobuf, syntax: :proto3 + + @type t :: %__MODULE__{ + org_id: String.t(), + cutoff_date: Google.Protobuf.Timestamp.t() + } + defstruct [:org_id, :cutoff_date] + + field(:org_id, 1, type: :string) + field(:cutoff_date, 2, type: Google.Protobuf.Timestamp) +end + +defmodule InternalApi.Usage.SeatOrigin do + @moduledoc false + use Protobuf, enum: true, syntax: :proto3 + + field(:SEAT_ORIGIN_UNSPECIFIED, 0) + field(:SEAT_ORIGIN_SEMAPHORE, 1) + field(:SEAT_ORIGIN_GITHUB, 2) + field(:SEAT_ORIGIN_BITBUCKET, 3) + field(:SEAT_ORIGIN_GITLAB, 4) +end + +defmodule InternalApi.Usage.SeatStatus do + @moduledoc false + use Protobuf, enum: true, syntax: :proto3 + + field(:SEAT_TYPE_UNSPECIFIED, 0) + field(:SEAT_TYPE_ACTIVE_MEMBER, 1) + field(:SEAT_TYPE_NON_ACTIVE_MEMBER, 2) + field(:SEAT_TYPE_NON_MEMBER, 3) +end + +defmodule InternalApi.Usage.UsageService.Service do + @moduledoc false + use GRPC.Service, name: "InternalApi.Usage.UsageService" + + rpc( + :ListDailyUsage, + InternalApi.Usage.ListDailyUsageRequest, + InternalApi.Usage.ListDailyUsageResponse + ) + + rpc( + :ProjectsUsage, + InternalApi.Usage.ProjectsUsageRequest, + InternalApi.Usage.ProjectsUsageResponse + ) + + rpc(:TotalUsage, InternalApi.Usage.TotalUsageRequest, InternalApi.Usage.TotalUsageResponse) + + rpc( + :ListQuotaUsage, + InternalApi.Usage.ListQuotaUsageRequest, + InternalApi.Usage.ListQuotaUsageResponse + ) + + rpc( + :TotalMembersUsage, + InternalApi.Usage.TotalMembersUsageRequest, + InternalApi.Usage.TotalMembersUsageResponse + ) + + rpc(:ListSeats, InternalApi.Usage.ListSeatsRequest, InternalApi.Usage.ListSeatsResponse) +end + +defmodule InternalApi.Usage.UsageService.Stub do + @moduledoc false + use GRPC.Stub, service: InternalApi.Usage.UsageService.Service +end diff --git a/zebra/lib/zebra/models/job.ex b/zebra/lib/zebra/models/job.ex index 8a30ff760..25df4f746 100644 --- a/zebra/lib/zebra/models/job.ex +++ b/zebra/lib/zebra/models/job.ex @@ -43,7 +43,7 @@ defmodule Zebra.Models.Job do @primary_key {:id, :binary_id, autogenerate: true} @foreign_key_type :binary_id @required_fields ~w(name organization_id project_id aasm_state created_at updated_at machine_type spec)a - @optional_fields ~w(build_id priority execution_time_limit deployment_target_id repository_id enqueued_at scheduled_at started_at finished_at request index port name machine_os_image failure_reason result agent_id agent_name agent_ip_address agent_ctrl_port agent_auth_token private_ssh_key)a + @optional_fields ~w(build_id priority execution_time_limit deployment_target_id repository_id enqueued_at scheduled_at started_at finished_at request index port name machine_os_image failure_reason result agent_id agent_name agent_ip_address agent_ctrl_port agent_auth_token private_ssh_key expires_at)a schema "jobs" do belongs_to(:task, Zebra.Models.Task, foreign_key: :build_id) @@ -79,6 +79,7 @@ defmodule Zebra.Models.Job do field(:scheduled_at, :utc_datetime) field(:started_at, :utc_datetime) field(:finished_at, :utc_datetime) + field(:expires_at, :utc_datetime) end def create(params) do @@ -356,6 +357,68 @@ defmodule Zebra.Models.Job do ) end + def mark_jobs_for_deletion(org_id, cutoff_date, deletion_days) do + import Ecto.Query, only: [from: 2] + + query = + from(j in Zebra.Models.Job, + where: + is_nil(j.expires_at) and + j.organization_id == ^org_id and + j.created_at <= ^cutoff_date, + update: [ + set: [ + expires_at: fragment("CURRENT_TIMESTAMP + (? * INTERVAL '1 day')", ^deletion_days) + ] + ] + ) + + Zebra.LegacyRepo.update_all(query, []) + end + + def delete_old_job_stop_requests(limit) do + import Ecto.Query, + only: [from: 2, where: 3, subquery: 1, limit: 2, order_by: 2] + + jobs_subquery = + from(j in Zebra.Models.Job, + where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"), + order_by: [asc: j.created_at], + limit: ^limit, + select: j.id + ) + + query = + from(jsr in Zebra.Models.JobStopRequest, + where: jsr.job_id in subquery(jobs_subquery) + ) + + {deleted_count, _} = Zebra.LegacyRepo.delete_all(query) + + {:ok, deleted_count} + end + + def delete_old_jobs(limit) do + import Ecto.Query, only: [from: 2, subquery: 1] + + jobs_subquery = + from(j in Zebra.Models.Job, + where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"), + order_by: [asc: j.created_at], + limit: ^limit, + select: j.id + ) + + query = + from(j in Zebra.Models.Job, + where: j.id in subquery(jobs_subquery) + ) + + {deleted_count, _} = Zebra.LegacyRepo.delete_all(query) + + {:ok, deleted_count} + end + def wait_for_agent(job) do if valid_transition?(job.aasm_state, state_waiting_for_agent()) do update(job, %{aasm_state: state_waiting_for_agent()}) diff --git a/zebra/lib/zebra/workers.ex b/zebra/lib/zebra/workers.ex index 9dd482b28..3a0093823 100644 --- a/zebra/lib/zebra/workers.ex +++ b/zebra/lib/zebra/workers.ex @@ -1,5 +1,7 @@ defmodule Zebra.Workers do @all [ + %{name: Zebra.Workers.JobDeletionPolicyMarker, flag: "START_JOB_DELETION_POLICY_MARKER"}, + %{name: Zebra.Workers.JobDeletionPolicyWorker, flag: "START_JOB_DELETION_POLICY_WORKER"}, %{name: Zebra.Workers.JobStartedCallbackWorker, flag: "START_JOB_STARTED_CALLBACK_WORKER"}, %{name: Zebra.Workers.JobFinishedCallbackWorker, flag: "START_JOB_FINISHED_CALLBACK_WORKER"}, %{name: Zebra.Workers.JobTeardownCallbackWorker, flag: "START_JOB_TEARDOWN_CALLBACK_WORKER"}, diff --git a/zebra/lib/zebra/workers/job_deletion_policy_marker.ex b/zebra/lib/zebra/workers/job_deletion_policy_marker.ex new file mode 100644 index 000000000..b9f105c85 --- /dev/null +++ b/zebra/lib/zebra/workers/job_deletion_policy_marker.ex @@ -0,0 +1,41 @@ +defmodule Zebra.Workers.JobDeletionPolicyMarker do + require Logger + + alias Zebra.Models.Job + alias Google.Protobuf.Timestamp + + use Tackle.Consumer, + url: Application.get_env(:zebra, :amqp_url), + service: "zebra", + exchange: "policy_exchange", + routing_key: "policy_applied", + retry_limit: 10, + retry_delay: 10 + + def handle_message(message) do + decoded = InternalApi.Usage.OrganizationPolicyApply.decode(message) + org_id = decoded.org_id + cutoff_date = cutoff_date_from_proto(decoded.cutoff_date) + days = policy_days() + + {count, _} = Job.mark_jobs_for_deletion(org_id, cutoff_date, days) + Logger.info("Marked #{count} jobs for deletion for org #{org_id}.") + end + + defp policy_days do + Application.fetch_env!(:zebra, __MODULE__) + |> Keyword.fetch!(:days) + end + + defp cutoff_date_from_proto(timestamp = %Timestamp{}) do + total_nanoseconds = timestamp.seconds * 1_000_000_000 + timestamp.nanos + + total_nanoseconds + |> DateTime.from_unix!(:nanosecond) + |> DateTime.truncate(:second) + end + + defp cutoff_date_from_proto(nil) do + raise ArgumentError, "cutoff_date is missing in policy payload" + end +end diff --git a/zebra/lib/zebra/workers/job_deletion_policy_worker.ex b/zebra/lib/zebra/workers/job_deletion_policy_worker.ex new file mode 100644 index 000000000..53bccf062 --- /dev/null +++ b/zebra/lib/zebra/workers/job_deletion_policy_worker.ex @@ -0,0 +1,62 @@ +defmodule Zebra.Workers.JobDeletionPolicyWorker do + require Logger + + @self_hosted_prefix "s1-%" + + defstruct [ + # period of sleep between worker ticks + :naptime, + # longer period of sleep when there is nothing to delete + :longnaptime, + # limit for deletions per batch + :limit + ] + + def start_link do + worker_config = Application.fetch_env!(:zebra, __MODULE__) + worker = struct(__MODULE__, worker_config) + + pid = + spawn_link(fn -> + loop(worker) + end) + + {:ok, pid} + end + + def loop(worker) do + # Perform a tick (cleaning operation) + deleted_any? = Task.async(fn -> tick(worker) end) |> Task.await(:infinity) + + sleep_for = + if deleted_any? do + worker.naptime + else + worker.longnaptime || worker.naptime + end + + :timer.sleep(sleep_for) + + # Recursively call loop to continue periodic execution + loop(worker) + end + + def tick(worker) do + Logger.info("Starting cleanup tick...") + + limit = worker.limit + + {:ok, deleted_stop_requests} = Zebra.Models.Job.delete_old_job_stop_requests(limit) + {:ok, deleted_jobs} = Zebra.Models.Job.delete_old_jobs(limit) + + total_deleted = deleted_stop_requests + deleted_jobs + + if total_deleted == 0 do + Logger.info("No jobs found for deletion.") + false + else + Logger.info("Deleted #{deleted_stop_requests} job stop requests and #{deleted_jobs} jobs.") + true + end + end +end diff --git a/zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs b/zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs new file mode 100644 index 000000000..de901037e --- /dev/null +++ b/zebra/priv/legacy_repo/migrations/20251118091829_add_expires_at_to_jobs.exs @@ -0,0 +1,9 @@ +defmodule Zebra.LegacyRepo.Migrations.AddExpiresAtToJobs do + use Ecto.Migration + + def change do + alter table(:jobs) do + add :expires_at, :utc_datetime + end + end +end diff --git a/zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs b/zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs new file mode 100644 index 000000000..529363978 --- /dev/null +++ b/zebra/priv/legacy_repo/migrations/20251119103829_add_expires_created_index_at_jobs_table.exs @@ -0,0 +1,13 @@ +defmodule Zebra.LegacyRepo.Migrations.AddExpiresCreatedIndexAtJobsTable do + use Ecto.Migration + @disable_migration_lock true + @disable_ddl_transaction true + + def change do + create index(:jobs, [:expires_at, :created_at], + name: "index_jobs_on_expires_created_not_null", + concurrently: true, + where: "expires_at IS NOT NULL" + ) + end +end diff --git a/zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs b/zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs new file mode 100644 index 000000000..c893f76c2 --- /dev/null +++ b/zebra/priv/legacy_repo/migrations/20251119104017_add_organization_created_index_at_jobs_table.exs @@ -0,0 +1,13 @@ +defmodule Zebra.LegacyRepo.Migrations.AddOrganizationCreatedIndexAtJobsTable do + use Ecto.Migration + @disable_migration_lock true + @disable_ddl_transaction true + + def change do + create index(:jobs, [:organization_id, :created_at], + name: "index_jobs_on_organization_created_expires_is_null", + concurrently: true, + where: "expires_at IS NULL" + ) + end +end diff --git a/zebra/scripts/internal_protos.sh b/zebra/scripts/internal_protos.sh index 741affd71..afcbc1ee6 100755 --- a/zebra/scripts/internal_protos.sh +++ b/zebra/scripts/internal_protos.sh @@ -18,7 +18,8 @@ secrethub self_hosted server_farm.job server_farm.mq.job_state_exchange -task' +task +usage' for element in $list;do echo "$element" diff --git a/zebra/test/zebra/workers/job_deletion_policy_marker_test.exs b/zebra/test/zebra/workers/job_deletion_policy_marker_test.exs new file mode 100644 index 000000000..12f6044ab --- /dev/null +++ b/zebra/test/zebra/workers/job_deletion_policy_marker_test.exs @@ -0,0 +1,87 @@ +defmodule Zebra.Workers.JobDeletionPolicyMarkerTest do + use Zebra.DataCase + + alias Google.Protobuf.Timestamp + alias InternalApi.Usage.OrganizationPolicyApply + alias Zebra.Models.Job + alias Zebra.Workers.JobDeletionPolicyMarker, as: Worker + + describe ".handle_message" do + setup do + original_config = Application.get_env(:zebra, Worker) + + on_exit(fn -> + Application.put_env(:zebra, Worker, original_config) + end) + + {:ok, original_config: original_config || []} + end + + test "marks eligible jobs for deletion", %{original_config: original_config} do + days = 3 + Application.put_env(:zebra, Worker, Keyword.put(original_config, :days, days)) + + org_id = Ecto.UUID.generate() + + cutoff_date = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + |> DateTime.truncate(:second) + + older_created_at = DateTime.add(cutoff_date, -3600, :second) + newer_created_at = DateTime.add(cutoff_date, 3600, :second) + + {:ok, job_to_mark} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: older_created_at, + updated_at: older_created_at + }) + + {:ok, newer_job} = + Support.Factories.Job.create(:finished, %{ + organization_id: org_id, + created_at: newer_created_at, + updated_at: newer_created_at + }) + + {:ok, other_org_job} = + Support.Factories.Job.create(:finished, %{ + organization_id: Ecto.UUID.generate(), + created_at: older_created_at, + updated_at: older_created_at + }) + + cutoff_timestamp = Timestamp.new(seconds: DateTime.to_unix(cutoff_date)) + + message = + %OrganizationPolicyApply{org_id: org_id, cutoff_date: cutoff_timestamp} + |> OrganizationPolicyApply.encode() + + Worker.handle_message(message) + + {:ok, updated_job} = Job.find(job_to_mark.id) + + assert updated_job.expires_at + assert DateTime.diff(updated_job.expires_at, DateTime.utc_now()) > 0 + + assert {:ok, newer_job} = Job.find(newer_job.id) + assert is_nil(newer_job.expires_at) + + assert {:ok, other_org_job} = Job.find(other_org_job.id) + assert is_nil(other_org_job.expires_at) + end + + test "raises when cutoff date is missing", %{original_config: original_config} do + Application.put_env(:zebra, Worker, original_config) + + message = + %OrganizationPolicyApply{org_id: Ecto.UUID.generate(), cutoff_date: nil} + |> OrganizationPolicyApply.encode() + + assert_raise ArgumentError, "cutoff_date is missing in policy payload", fn -> + Worker.handle_message(message) + end + end + end +end diff --git a/zebra/test/zebra/workers/job_deletion_policy_worker_test.exs b/zebra/test/zebra/workers/job_deletion_policy_worker_test.exs new file mode 100644 index 000000000..49a7bdfc3 --- /dev/null +++ b/zebra/test/zebra/workers/job_deletion_policy_worker_test.exs @@ -0,0 +1,42 @@ +defmodule Zebra.Workers.JobDeletionPolicyWorkerTest do + use Zebra.DataCase + + alias Zebra.Models.{Job, JobStopRequest} + alias Zebra.Workers.JobDeletionPolicyWorker, as: Worker + + describe ".tick" do + test "deletes expired jobs and related stop requests" do + worker = %Worker{limit: 10, naptime: 0, longnaptime: 0} + + {:ok, job} = Support.Factories.Job.create(:finished) + {:ok, _} = JobStopRequest.create(job.build_id, job.id) + + expired_at = + DateTime.utc_now() + |> DateTime.add(-3600, :second) + + {:ok, _} = Job.update(job, %{expires_at: expired_at}) + + assert Worker.tick(worker) + + assert {:error, :not_found} = Job.find(job.id) + assert {:error, :not_found} = JobStopRequest.find_by_job_id(job.id) + end + + test "returns false when nothing is eligible for deletion" do + worker = %Worker{limit: 10, naptime: 0, longnaptime: 0} + + {:ok, job} = Support.Factories.Job.create(:finished) + + future_expiration = + DateTime.utc_now() + |> DateTime.add(3600, :second) + + {:ok, _} = Job.update(job, %{expires_at: future_expiration}) + + refute Worker.tick(worker) + + assert {:ok, _} = Job.find(job.id) + end + end +end diff --git a/zebra/test/zebra/workers_test.exs b/zebra/test/zebra/workers_test.exs index e728ab6db..3f157cb70 100644 --- a/zebra/test/zebra/workers_test.exs +++ b/zebra/test/zebra/workers_test.exs @@ -1,8 +1,11 @@ defmodule Zebra.Workers.Test do use ExUnit.Case, async: false - test "no environment variables set => only feature provider invalidator starts" do - assert Zebra.Workers.active() == [Zebra.FeatureProviderInvalidatorWorker] + test "no environment variables set => only default workers start" do + assert Zebra.Workers.active() == [ + Zebra.Workers.JobDeletionPolicyWorker, + Zebra.FeatureProviderInvalidatorWorker + ] end describe "with environment variables set" do @@ -20,6 +23,7 @@ defmodule Zebra.Workers.Test do test "active workers are returned" do assert Zebra.Workers.active() == [ + Zebra.Workers.JobDeletionPolicyWorker, Zebra.Workers.TaskFinisher, Zebra.Workers.TaskFailFast, Zebra.Workers.JobStopper,