-
Notifications
You must be signed in to change notification settings - Fork 2
BQ to SquashFS download pipeline with test suite #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
98dc4e2
df9824c
bb68b8a
f60f004
06751d7
45329ca
386f3c4
f99c58e
c029785
ce26ff3
3a03014
d7e19f2
b704aee
1e28438
0126297
d51fef7
506bb3c
8358966
5a7e437
f7cfc5e
979c3de
f127c47
f80482f
e738458
b7d3368
caa04de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,168 @@ | ||||||
| #!/bin/bash | ||||||
| # ============================================================================= | ||||||
| # job_bq_download.sh — staged BQ image download → SquashFS → object store | ||||||
| # ============================================================================= | ||||||
| # | ||||||
| # WHAT IT DOES | ||||||
| # ------------ | ||||||
| # Downloads every pending image from a BigQuery training_images table and | ||||||
| # delivers it to the Arbutus object store as one SquashFS archive per array | ||||||
| # task, using only a small, bounded amount of scratch space. | ||||||
| # | ||||||
| # Runs as a SLURM array job of NUM_JOBS tasks. Each task owns the dataset | ||||||
| # slice MOD(photo_id, NUM_JOBS) == SLURM_ARRAY_TASK_ID and performs the | ||||||
| # FULL lifecycle for that slice before releasing its scratch space: | ||||||
| # | ||||||
| # stage 1 download pending images (32 threads, BQ-checkpointed every | ||||||
| # 10k-image chunk, each chunk packed to chunk_NNNN.sqfs) | ||||||
| # stage 2 count expected images across all chunk files (unsquashfs -l) | ||||||
| # stage 3 stream-merge chunks into a single task_N.sqfs (sqfstar, zstd) | ||||||
| # stage 4 verify merged image count == stage 2 count | ||||||
| # stage 5 upload task_N.sqfs to the object store, verify byte size | ||||||
| # stage 6 delete local chunks + merged sqfs (only after stage 5 verifies) | ||||||
| # | ||||||
| # Because tasks clean up after themselves, peak scratch usage is | ||||||
| # (concurrent tasks) x (~2x final task sqfs size) | ||||||
| # and is controlled by the array throttle (%K below), NOT the dataset size. | ||||||
| # | ||||||
| # STATUS TRACKING (BigQuery) | ||||||
| # -------------------------- | ||||||
| # Per chunk, results are appended to <dataset>.training_images_downloads and | ||||||
| # MERGEd into <dataset>.training_images (fetch_status = downloaded / failed / | ||||||
| # corrupted + image dims). The table is therefore updated live as the job | ||||||
| # runs. Concurrent MERGE serialization conflicts are retried with backoff | ||||||
| # inside download_images.py. | ||||||
| # | ||||||
| # FAILURE / RESUME SEMANTICS | ||||||
| # -------------------------- | ||||||
| # Nothing local is deleted until the uploaded archive is verified, and any | ||||||
| # failure notifies (ntfy) and exits non-zero. Resubmitting a failed or | ||||||
| # timed-out task id is always safe: | ||||||
| # - if the remote task_N.sqfs already exists, the task exits 0 immediately | ||||||
| # - already-downloaded images are skipped via training_images_downloads | ||||||
| # - chunk files from a previous partial run are stashed into a resume_* | ||||||
| # subdir (the downloader restarts chunk numbering and would overwrite | ||||||
| # them; the merge step searches recursively and still includes them) | ||||||
| # | ||||||
| # USAGE | ||||||
| # ----- | ||||||
| # sbatch scripts/job_bq_download.sh # full run | ||||||
| # sbatch --array=7 scripts/job_bq_download.sh # re-run one task | ||||||
| # scontrol update JobId=<id> ArrayTaskThrottle=8 # change concurrency live | ||||||
| # | ||||||
| # Before pointing at a new dataset, update STAGING_BASE, S3_DEST and | ||||||
| # --dataset below, and check scratch headroom vs the %K throttle. | ||||||
| # ============================================================================= | ||||||
| # | ||||||
| #SBATCH --account=def-drolnick | ||||||
| #SBATCH --job-name=bq_dl_2605 | ||||||
| #SBATCH --cpus-per-task=32 | ||||||
| #SBATCH --mem=64G | ||||||
| #SBATCH --time=4:00:00 | ||||||
| #SBATCH --array=0-59%4 | ||||||
| #SBATCH --output=/project/6068129/melabbas/ami-ml/scripts/bq_download_%A_%a.out | ||||||
| #SBATCH --mail-type=END,FAIL | ||||||
| #SBATCH --mail-user=hack1996man@gmail.com | ||||||
|
|
||||||
| set -uo pipefail | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Missing The script uses For example, if 🔧 Proposed fix-set -uo pipefail
+set -euo pipefail📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||
|
|
||||||
| NUM_JOBS=60 | ||||||
| TASK_ID=${SLURM_ARRAY_TASK_ID} | ||||||
|
|
||||||
| STAGING_BASE="/scratch/melabbas/global_all_leps_2605" | ||||||
| TASK_DIR="${STAGING_BASE}/task_${TASK_ID}" | ||||||
| OUT_SQFS="${STAGING_BASE}/task_${TASK_ID}.sqfs" | ||||||
| S3_DEST="s3://ami-trainingdata/ai-for-leps/global_all_leps_2605/task_${TASK_ID}.sqfs" | ||||||
| ENDPOINT="https://object-arbutus.cloud.computecanada.ca" | ||||||
| export AWS_PROFILE=ami | ||||||
|
|
||||||
| fail() { | ||||||
| echo "ERROR: $1" | ||||||
| notify "bq_download task ${TASK_ID}: FAILED" "$1 | logs: bq_download_${SLURM_ARRAY_JOB_ID}_${TASK_ID}.out" | ||||||
| exit 1 | ||||||
| } | ||||||
|
|
||||||
| echo "=== bq_download task=${TASK_ID}/${NUM_JOBS} started $(date) on $(hostname) ===" | ||||||
|
|
||||||
| # ── Idempotence: skip if this task already completed ───────────────────────── | ||||||
| if s5cmd --endpoint-url "${ENDPOINT}" ls "${S3_DEST}" >/dev/null 2>&1; then | ||||||
| echo "Remote ${S3_DEST} already exists — task previously completed. Exiting." | ||||||
| exit 0 | ||||||
| fi | ||||||
|
|
||||||
| mkdir -p "${TASK_DIR}" | ||||||
|
|
||||||
| # ── Resume safety: stash chunks from a previous partial run ────────────────── | ||||||
| # download_images.py restarts numbering at chunk_0001 and would overwrite them; | ||||||
| # merge_sqfs_chunks.py searches recursively, so a subdir keeps them mergeable. | ||||||
| if compgen -G "${TASK_DIR}/chunk_*.sqfs" > /dev/null; then | ||||||
| RESUME_DIR="${TASK_DIR}/resume_${SLURM_ARRAY_JOB_ID}" | ||||||
| mkdir -p "${RESUME_DIR}" | ||||||
| mv "${TASK_DIR}"/chunk_*.sqfs "${RESUME_DIR}/" | ||||||
| echo "Resume: moved $(ls ${RESUME_DIR} | wc -l) existing chunks to ${RESUME_DIR}" | ||||||
| fi | ||||||
|
|
||||||
| cd /project/6068129/melabbas/ami-ml | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for directory change. If 🛡️ Proposed fix-cd /project/6068129/melabbas/ami-ml
+cd /project/6068129/melabbas/ami-ml || exit 1📝 Committable suggestion
Suggested change
🧰 Tools🪛 Shellcheck (0.11.0)[warning] 105-105: Use 'cd ... || exit' or 'cd ... || return' in case cd fails. (SC2164) 🤖 Prompt for AI Agents |
||||||
| module load StdEnv/2023 arrow/17.0.0 | ||||||
| source .venv/bin/activate | ||||||
|
|
||||||
| # ── STAGE 1: download + pack chunks ────────────────────────────────────────── | ||||||
| echo "=== STAGE 1: download (num_jobs=${NUM_JOBS}, chunk_size=10000) $(date) ===" | ||||||
| T0=$SECONDS | ||||||
| python src/dataset_tools/bq_squashfs/download_images.py \ | ||||||
| --staging-dir "${TASK_DIR}" \ | ||||||
| --num-jobs ${NUM_JOBS} \ | ||||||
| --task-id ${TASK_ID} \ | ||||||
| --num-workers 32 \ | ||||||
| --chunk-size 10000 \ | ||||||
| --dataset global_all_leps_2605 \ | ||||||
| || fail "download_images.py exited non-zero" | ||||||
| echo "STAGE 1 done in $((SECONDS - T0))s" | ||||||
|
|
||||||
| # ── STAGE 2: count expected images across all chunks (incl. resume_* dirs) ── | ||||||
| echo "=== STAGE 2: count expected images $(date) ===" | ||||||
| EXPECTED=0 | ||||||
| while IFS= read -r chunk; do | ||||||
| N=$(unsquashfs -l "${chunk}" 2>/dev/null | grep -cE '\.(jpg|jpeg|png)$' || echo 0) | ||||||
| EXPECTED=$((EXPECTED + N)) | ||||||
| done < <(find "${TASK_DIR}" -name "chunk_*.sqfs") | ||||||
| echo "Expected: ${EXPECTED} images in $(find "${TASK_DIR}" -name 'chunk_*.sqfs' | wc -l) chunks" | ||||||
| [ "${EXPECTED}" -gt 0 ] || fail "no images found in chunks" | ||||||
|
|
||||||
| # ── STAGE 3: merge chunks → task sqfs ──────────────────────────────────────── | ||||||
| echo "=== STAGE 3: merge $(date) ===" | ||||||
| T0=$SECONDS | ||||||
| rm -f "${OUT_SQFS}" | ||||||
| python src/dataset_tools/bq_squashfs/merge_sqfs_chunks.py "${TASK_DIR}" \ | ||||||
| | sqfstar -comp zstd -Xcompression-level 3 -b 131072 -no-duplicates "${OUT_SQFS}" | ||||||
| PIPE_STATUS=("${PIPESTATUS[@]}") | ||||||
| [ "${PIPE_STATUS[0]}" -eq 0 ] && [ "${PIPE_STATUS[1]}" -eq 0 ] \ | ||||||
| || fail "merge failed (stream=${PIPE_STATUS[0]} sqfstar=${PIPE_STATUS[1]}) — chunks preserved" | ||||||
| LOCAL_SIZE=$(stat -c%s "${OUT_SQFS}") | ||||||
| echo "STAGE 3 done in $((SECONDS - T0))s — $(numfmt --to=iec ${LOCAL_SIZE})" | ||||||
|
|
||||||
| # ── STAGE 4: verify merged image count ─────────────────────────────────────── | ||||||
| ACTUAL=$(unsquashfs -l "${OUT_SQFS}" 2>/dev/null | grep -cE '\.(jpg|jpeg|png)$' || echo 0) | ||||||
| echo "Merged: ${ACTUAL}/${EXPECTED} images" | ||||||
| [ "${ACTUAL}" -eq "${EXPECTED}" ] || fail "image count mismatch ${ACTUAL}/${EXPECTED} — chunks preserved" | ||||||
|
|
||||||
| # ── STAGE 5: upload + verify remote size ───────────────────────────────────── | ||||||
| echo "=== STAGE 5: upload $(numfmt --to=iec ${LOCAL_SIZE}) $(date) ===" | ||||||
| T0=$SECONDS | ||||||
| s5cmd --endpoint-url "${ENDPOINT}" cp "${OUT_SQFS}" "${S3_DEST}" \ | ||||||
| || fail "s5cmd upload failed — local sqfs + chunks preserved" | ||||||
| UP_SECS=$((SECONDS - T0)) | ||||||
| REMOTE_SIZE=$(s5cmd --endpoint-url "${ENDPOINT}" ls "${S3_DEST}" | awk '{print $3}') | ||||||
| echo "Uploaded in ${UP_SECS}s — local=${LOCAL_SIZE} remote=${REMOTE_SIZE}" | ||||||
| [ "${REMOTE_SIZE}" = "${LOCAL_SIZE}" ] || fail "remote size mismatch — local files preserved" | ||||||
|
|
||||||
| # ── STAGE 6: verified — delete local ───────────────────────────────────────── | ||||||
| rm -f "${OUT_SQFS}" | ||||||
| find "${TASK_DIR}" -name "chunk_*.sqfs" -delete | ||||||
| rmdir "${TASK_DIR}"/resume_* 2>/dev/null || true | ||||||
| echo "Local cleanup done." | ||||||
|
|
||||||
| echo "=== bq_download task=${TASK_ID} COMPLETE $(date) ===" | ||||||
| notify "bq_download task ${TASK_ID}/${NUM_JOBS}: done" \ | ||||||
| "${ACTUAL} images → ${S3_DEST} ($(numfmt --to=iec ${LOCAL_SIZE})), upload ${UP_SECS}s" | ||||||
| exit 0 | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| #!/bin/bash | ||
| # Export a BigQuery query to CSV. | ||
| # | ||
| # Usage: | ||
| # sbatch --export=QUERY_FILE=queries/global_min25occ.sql,OUTPUT=global_min25occ.csv job_bq_export.sh | ||
| # sbatch --export=QUERY_FILE=queries/global_max2000img.sql,OUTPUT=global_max2000img.csv job_bq_export.sh | ||
| # | ||
| #SBATCH --account=def-drolnick | ||
| #SBATCH --job-name=bq_export | ||
| #SBATCH --cpus-per-task=2 | ||
| #SBATCH --mem=8G | ||
| #SBATCH --time=1:00:00 | ||
| #SBATCH --output=/project/6068129/melabbas/ami-ml/scripts/bq_export_%j.out | ||
| #SBATCH --mail-type=END,FAIL | ||
| #SBATCH --mail-user=hack1996man@gmail.com | ||
|
|
||
| set -euo pipefail | ||
|
|
||
| BASE_DIR="/home/melabbas/projects/def-drolnick/melabbas/ami-ml" | ||
| DATA_DIR="/project/6068129/melabbas/ami-ml/data" | ||
|
|
||
| echo "=== bq_export started at $(date) ===" | ||
| echo "Node : $(hostname)" | ||
| echo "QUERY_FILE : ${QUERY_FILE}" | ||
| echo "OUTPUT : ${DATA_DIR}/${OUTPUT}" | ||
| echo "" | ||
|
|
||
| cd "${BASE_DIR}" | ||
| module load StdEnv/2023 arrow/17.0.0 | ||
| source .venv/bin/activate | ||
|
|
||
| python src/dataset_tools/bq_squashfs/bq_export.py \ | ||
| --query-file "src/dataset_tools/bq_squashfs/${QUERY_FILE}" \ | ||
| --output "${DATA_DIR}/${OUTPUT}" \ | ||
| --project leps-ai | ||
|
|
||
| EXIT_CODE=$? | ||
| echo "" | ||
| echo "=== bq_export done at $(date) (exit=${EXIT_CODE}) ===" | ||
|
|
||
| if [ "${EXIT_CODE}" -eq 0 ]; then | ||
| SIZE=$(du -sh "${DATA_DIR}/${OUTPUT}" | cut -f1) | ||
| ROWS=$(( $(wc -l < "${DATA_DIR}/${OUTPUT}") - 1 )) | ||
| notify "bq_export done" "${OUTPUT} ${ROWS} rows ${SIZE}" | ||
| else | ||
| notify "bq_export FAILED" "exit=${EXIT_CODE} — check bq_export_${SLURM_JOB_ID}.out" | ||
| exit 1 | ||
| fi |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| #!/bin/bash | ||
| # Split a BQ-exported CSV into train/val/test CSVs. | ||
| # Reads from DATA_DIR/<CSV>, writes train.csv/val.csv/test.csv to DATA_DIR/splits/. | ||
| # | ||
| # Must run after job_bq_export.sh completes. | ||
| # | ||
| # Usage: | ||
| # sbatch --dependency=afterok:<export_job_id> job_bq_split.sh | ||
| # sbatch --export=CSV=global_min25occ.csv --dependency=afterok:<export_job_id> job_bq_split.sh | ||
| # | ||
| # CSV defaults to global_min25occ.csv if not set via --export. | ||
| # | ||
| #SBATCH --account=def-drolnick | ||
| #SBATCH --job-name=bq_split | ||
| #SBATCH --cpus-per-task=2 | ||
| #SBATCH --mem=8G | ||
| #SBATCH --time=0:30:00 | ||
| #SBATCH --output=/project/6068129/melabbas/ami-ml/scripts/bq_split_%j.out | ||
| #SBATCH --mail-type=END,FAIL | ||
| #SBATCH --mail-user=hack1996man@gmail.com | ||
|
|
||
| set -euo pipefail | ||
|
|
||
| BASE_DIR="/project/6068129/melabbas/ami-ml" | ||
| DATA_DIR="${BASE_DIR}/data" | ||
| CSV="${CSV:-global_min25occ.csv}" | ||
| INPUT_CSV="${DATA_DIR}/${CSV}" | ||
| OUTPUT_DIR="${DATA_DIR}/splits" | ||
|
|
||
| echo "=== bq_split started at $(date) ===" | ||
| echo "Node : $(hostname)" | ||
| echo "Input CSV : ${INPUT_CSV} ($(du -sh ${INPUT_CSV} | cut -f1))" | ||
| echo "Output dir : ${OUTPUT_DIR}" | ||
| echo "" | ||
|
|
||
| cd "${BASE_DIR}" | ||
| module load StdEnv/2023 arrow/17.0.0 | ||
| source .venv/bin/activate | ||
|
|
||
| mkdir -p "${OUTPUT_DIR}" | ||
|
|
||
| python src/dataset_tools/bq_squashfs/split.py \ | ||
| --csv "${INPUT_CSV}" \ | ||
| --output-dir "${OUTPUT_DIR}" \ | ||
| --category-key species_name \ | ||
| --val-frac 0.1 \ | ||
| --test-frac 0.1 \ | ||
| --split-by-occurrence \ | ||
| --max-instances 1000 \ | ||
| --min-instances 5 \ | ||
| --seed 42 | ||
|
|
||
| EXIT_CODE=$? | ||
| echo "" | ||
| echo "=== bq_split done at $(date) (exit=${EXIT_CODE}) ===" | ||
|
|
||
| if [ "${EXIT_CODE}" -eq 0 ]; then | ||
| TRAIN_ROWS=$(( $(wc -l < "${OUTPUT_DIR}/train.csv") - 1 )) | ||
| VAL_ROWS=$(( $(wc -l < "${OUTPUT_DIR}/val.csv") - 1 )) | ||
| TEST_ROWS=$(( $(wc -l < "${OUTPUT_DIR}/test.csv") - 1 )) | ||
| echo " train : ${TRAIN_ROWS} rows" | ||
| echo " val : ${VAL_ROWS} rows" | ||
| echo " test : ${TEST_ROWS} rows" | ||
| notify "bq_split: done" \ | ||
| "train=${TRAIN_ROWS} val=${VAL_ROWS} test=${TEST_ROWS} → ${OUTPUT_DIR}" | ||
| else | ||
| notify "bq_split: FAILED" \ | ||
| "exit=${EXIT_CODE} — check bq_split_${SLURM_JOB_ID}.out" | ||
| exit 1 | ||
| fi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move BigQuery dependencies to main dependencies section.
The
google-cloud-bigqueryandpandas-gbqlibraries are imported by production pipeline scripts (clean.py,bq_export.py,create_test_table.py,create_test_tables.py), not just dev/test tools. Placing them in[tool.poetry.group.dev.dependencies]means they won't be installed in production environments, causing import failures.📦 Move to main dependencies
📝 Committable suggestion
🤖 Prompt for AI Agents