Skip to content

Commit c5616d7

Browse files
authored
feat: last copy pipe executions monitoring endpoint (#3520)
1 parent 83efdca commit c5616d7

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
DESCRIPTION >
2+
Monitors Tinybird copy job executions and exposes their status as Prometheus gauge metrics. Consumed by datadog scraper (`dd-tinybird-copy-pipe-executions-scraper`)
3+
Tracks the latest execution status for each copy pipe (starting today) and generates one-hot encoded metrics where each pipe gets a value of `1` for its current status (`'ok'`, `'error'`, `'cancelled'`, `'queued'`, `'working'`) and `0` for all other statuses.
4+
**Metric:** `copy_pipes_latest_execution_status` (gauge)
5+
**Labels:** `pipe_name`, `status`
6+
**Note:** The `'queued'` status is virtual—it's derived by detecting the error message `"You have reached the maximum number of copy jobs"` rather than being a native Tinybird status.
7+
This is because tinybird retries these again when possible but returns an error status. After 3 retries, if the copy pipe still gets the same limit error, we return an error state.
8+
Similarly, `'ok'` is mapped from the native `'done'` status for datadog color-scheme convention.
9+
10+
TAGS "Monitoring"
11+
12+
NODE copy_pipes_latest_executions
13+
SQL >
14+
WITH 'You have reached the maximum number of copy jobs%' AS max_jobs_err
15+
SELECT
16+
JSONExtract(job_metadata, 'pipe_name', 'String') AS pipe_name,
17+
multiIf(
18+
status = 'done',
19+
'ok',
20+
/* queued: current has max-jobs error AND the previous two do NOT */
21+
(error LIKE max_jobs_err)
22+
AND (coalesce(prev_error1, '') NOT LIKE max_jobs_err)
23+
AND (coalesce(prev_error2, '') NOT LIKE max_jobs_err),
24+
'queued',
25+
/* hard error: last three consecutive runs have the same max-jobs error */
26+
(error LIKE max_jobs_err)
27+
AND (coalesce(prev_error1, '') LIKE max_jobs_err)
28+
AND (coalesce(prev_error2, '') LIKE max_jobs_err),
29+
'error',
30+
/* any other error on the last execution -> error */
31+
(error != '' AND error NOT LIKE max_jobs_err),
32+
'error',
33+
/* otherwise keep original status (e.g., running/queued by system) */
34+
status
35+
) AS status,
36+
error,
37+
started_at
38+
FROM
39+
(
40+
SELECT
41+
*,
42+
lagInFrame(error, 1, '') OVER (
43+
PARTITION BY pipe_id ORDER BY created_at DESC
44+
) AS prev_error1,
45+
lagInFrame(error, 2, '') OVER (
46+
PARTITION BY pipe_id ORDER BY created_at DESC
47+
) AS prev_error2,
48+
row_number() OVER (PARTITION BY pipe_id ORDER BY created_at DESC) AS rn
49+
FROM tinybird.jobs_log
50+
WHERE
51+
job_type = 'copy'
52+
AND started_at > toStartOfDay(now())
53+
AND JSONExtract(job_metadata, 'pipe_name', 'String') <> 'members_with_location'
54+
)
55+
WHERE rn = 1
56+
ORDER BY pipe_id, created_at DESC
57+
58+
NODE errored_copy_pipes_latest_execution
59+
SQL >
60+
WITH
61+
possible_statuses AS (SELECT array('ok', 'error', 'cancelled', 'queued', 'working') AS statuses)
62+
SELECT
63+
'copy_pipes_latest_execution_status' AS name,
64+
IF(s = e.status, 1, 0) AS value,
65+
'Latest execution status per pipe (one-hot: 1 active, 0 otherwise)' AS help,
66+
'gauge' AS type,
67+
map('pipe_name', e.pipe_name, 'status', s) AS labels
68+
FROM copy_pipes_latest_executions AS e
69+
CROSS JOIN possible_statuses ps ARRAY
70+
JOIN ps.statuses AS s

0 commit comments

Comments
 (0)