From 1b0fcaec06e82e06c956c7359ba270cd13d9aebe Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 7 May 2026 17:24:57 +0000 Subject: [PATCH 1/4] Add InfluxDB 3 Core entry Adds an entry for the open-source SQL build of InfluxDB. The query engine is Apache DataFusion; ingestion is line protocol over /api/v3/write_lp because there is no native CSV/Parquet bulk loader. load.py streams hits.tsv, encodes each row as a line-protocol point with a unique row-index timestamp, and POSTs in 1000-row batches. Field names are lowercased so the standard CamelCase ClickBench queries resolve under DataFusion's identifier folding. Q19 and Q43 cast EventTime (stored as a string field) to TIMESTAMP for extract(minute) and date_trunc('minute', ...). Removes InfluxDB from the README TODO list. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 1 - influxdb/README.md | 37 ++++++++ influxdb/benchmark.sh | 58 ++++++++++++ influxdb/load.py | 204 +++++++++++++++++++++++++++++++++++++++++ influxdb/queries.sql | 43 +++++++++ influxdb/run.sh | 29 ++++++ influxdb/template.json | 12 +++ 7 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 influxdb/README.md create mode 100755 influxdb/benchmark.sh create mode 100755 influxdb/load.py create mode 100644 influxdb/queries.sql create mode 100755 influxdb/run.sh create mode 100644 influxdb/template.json diff --git a/README.md b/README.md index 244f1560e1..2457f65adb 100644 --- a/README.md +++ b/README.md @@ -305,7 +305,6 @@ Please help us add more systems and run the benchmarks on more types of VMs: - [ ] Hive - [ ] Hydrolix - [ ] Impala -- [ ] InfluxDB - [ ] LocustDB - [ ] Manticore Search - [ ] MS SQL Server with Column Store Index (without publishing) diff --git a/influxdb/README.md b/influxdb/README.md new file mode 100644 index 0000000000..186e0ce78d --- /dev/null +++ b/influxdb/README.md @@ -0,0 +1,37 @@ +# InfluxDB + +This entry uses [InfluxDB 3 Core](https://docs.influxdata.com/influxdb3/core/), the open-source SQL-capable +release of InfluxDB. The query engine is Apache DataFusion, the storage is local Parquet. + +## Caveats + +InfluxDB is a time-series database, not a general analytical database, so loading a flat 100M-row +analytical dataset into it stretches the data model: + +1. **No bulk CSV/Parquet import.** The only ingestion path is line protocol over HTTP + (`/api/v3/write_lp`). `load.py` streams `hits.tsv`, converts each row to a line-protocol point, and + POSTs in batches. The conversion + ingest is the dominant cost of the load phase and is much slower + than e.g. Postgres `\copy` or DuckDB `COPY FROM`. + +2. **Required unique timestamp.** Line protocol merges points that share `(measurement, tags, timestamp)`, + so to preserve all rows we use the row index as the line protocol timestamp (in nanoseconds, offset + from a fixed 2020-01-01 epoch). The original `EventTime` is stored as a regular string field and used + by the queries. + +3. **No tags, all fields.** Tags are indexed at ingest time; for a wide flat schema the indexing cost + is prohibitive. Every column is written as a field instead. Numeric columns use the integer + line-protocol type (`...i`); string and date/time columns are written as strings. + +4. **Query compatibility.** Most ClickBench queries run unchanged. Q19 and Q43 cast `EventTime` (stored + as string) to a `TIMESTAMP` for `extract(minute ...)` and `date_trunc('minute', ...)`. DataFusion + folds unquoted identifiers to lowercase, so `load.py` writes column names in lowercase to keep the + standard CamelCase queries portable. + +## Run + +``` +./benchmark.sh +``` + +The server listens on port 8181, stores data under `./influxdb3-data`, and runs without authentication +(`--without-auth`) for the duration of the benchmark. diff --git a/influxdb/benchmark.sh b/influxdb/benchmark.sh new file mode 100755 index 0000000000..6025399f4d --- /dev/null +++ b/influxdb/benchmark.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +set -eu + +# Install dependencies and the InfluxDB 3 Core binary directly. We bypass the +# upstream install_influxdb3.sh installer because it is interactive and not +# suited for unattended runs. + +sudo apt-get update -y +sudo apt-get install -y python3 python3-pip curl jq time +pip3 install --break-system-packages requests + +INFLUX_VERSION=3.9.2 +case "$(uname -m)" in + x86_64|amd64) INFLUX_ARTIFACT=linux_amd64 ;; + aarch64|arm64) INFLUX_ARTIFACT=linux_arm64 ;; + *) echo "Unsupported architecture: $(uname -m)" >&2; exit 1 ;; +esac + +INFLUX_TGZ="influxdb3-core-${INFLUX_VERSION}_${INFLUX_ARTIFACT}.tar.gz" +wget --continue --progress=dot:giga \ + "https://dl.influxdata.com/influxdb/releases/${INFLUX_TGZ}" +rm -rf "influxdb3-core-${INFLUX_VERSION}" +tar -xzf "${INFLUX_TGZ}" +INFLUXDB3="${PWD}/influxdb3-core-${INFLUX_VERSION}/influxdb3" + +# Start the server with local-file storage and authentication disabled. +mkdir -p ./influxdb3-data +nohup "${INFLUXDB3}" serve \ + --node-id node0 \ + --object-store file \ + --data-dir "${PWD}/influxdb3-data" \ + --http-bind 127.0.0.1:8181 \ + --without-auth \ + > influxdb3.log 2>&1 & +INFLUXDB_PID=$! +echo "InfluxDB PID: ${INFLUXDB_PID}" + +for _ in $(seq 1 300); do + curl -sf http://localhost:8181/health > /dev/null && break + sleep 1 +done + +"${INFLUXDB3}" create database hits + +# Download the dataset and load it via line protocol. +../download-hits-tsv + +echo -n "Load time: " +command time -f '%e' python3 load.py + +# Run queries. +./run.sh 2>&1 | tee log.txt + +echo -n "Data size: " +du -bcs ./influxdb3-data | grep total | awk '{print $1}' + +kill "${INFLUXDB_PID}" || true diff --git a/influxdb/load.py b/influxdb/load.py new file mode 100755 index 0000000000..3bd3b9778c --- /dev/null +++ b/influxdb/load.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +"""Stream hits.tsv into InfluxDB 3 via line protocol over HTTP. + +Each row of the TSV becomes one line-protocol point in measurement ``hits``. +All columns are written as fields (no tags). The line protocol timestamp is +derived from the row index so that every point is unique. +""" + +import csv +import os +import sys +import time +import requests + +URL = "http://localhost:8181/api/v3/write_lp" +DB = "hits" +INPUT = "hits.tsv" +TOTAL_ROWS = 99997497 +BATCH_ROWS = 1000 + +# 2020-01-01 00:00:00 UTC, in nanoseconds. Row i is written at TS_BASE + i ns, +# guaranteeing per-row uniqueness while keeping timestamps in a reasonable range. +TS_BASE = 1577836800_000_000_000 + +# Column schema (name, type) in TSV order. Type 'i' = integer, 's' = string. +# Mirrors postgresql/create.sql. +COLUMNS = [ + ("WatchID", "i"), + ("JavaEnable", "i"), + ("Title", "s"), + ("GoodEvent", "i"), + ("EventTime", "s"), + ("EventDate", "s"), + ("CounterID", "i"), + ("ClientIP", "i"), + ("RegionID", "i"), + ("UserID", "i"), + ("CounterClass", "i"), + ("OS", "i"), + ("UserAgent", "i"), + ("URL", "s"), + ("Referer", "s"), + ("IsRefresh", "i"), + ("RefererCategoryID", "i"), + ("RefererRegionID", "i"), + ("URLCategoryID", "i"), + ("URLRegionID", "i"), + ("ResolutionWidth", "i"), + ("ResolutionHeight", "i"), + ("ResolutionDepth", "i"), + ("FlashMajor", "i"), + ("FlashMinor", "i"), + ("FlashMinor2", "s"), + ("NetMajor", "i"), + ("NetMinor", "i"), + ("UserAgentMajor", "i"), + ("UserAgentMinor", "s"), + ("CookieEnable", "i"), + ("JavascriptEnable", "i"), + ("IsMobile", "i"), + ("MobilePhone", "i"), + ("MobilePhoneModel", "s"), + ("Params", "s"), + ("IPNetworkID", "i"), + ("TraficSourceID", "i"), + ("SearchEngineID", "i"), + ("SearchPhrase", "s"), + ("AdvEngineID", "i"), + ("IsArtifical", "i"), + ("WindowClientWidth", "i"), + ("WindowClientHeight", "i"), + ("ClientTimeZone", "i"), + ("ClientEventTime", "s"), + ("SilverlightVersion1", "i"), + ("SilverlightVersion2", "i"), + ("SilverlightVersion3", "i"), + ("SilverlightVersion4", "i"), + ("PageCharset", "s"), + ("CodeVersion", "i"), + ("IsLink", "i"), + ("IsDownload", "i"), + ("IsNotBounce", "i"), + ("FUniqID", "i"), + ("OriginalURL", "s"), + ("HID", "i"), + ("IsOldCounter", "i"), + ("IsEvent", "i"), + ("IsParameter", "i"), + ("DontCountHits", "i"), + ("WithHash", "i"), + ("HitColor", "s"), + ("LocalEventTime", "s"), + ("Age", "i"), + ("Sex", "i"), + ("Income", "i"), + ("Interests", "i"), + ("Robotness", "i"), + ("RemoteIP", "i"), + ("WindowName", "i"), + ("OpenerName", "i"), + ("HistoryLength", "i"), + ("BrowserLanguage", "s"), + ("BrowserCountry", "s"), + ("SocialNetwork", "s"), + ("SocialAction", "s"), + ("HTTPError", "i"), + ("SendTiming", "i"), + ("DNSTiming", "i"), + ("ConnectTiming", "i"), + ("ResponseStartTiming", "i"), + ("ResponseEndTiming", "i"), + ("FetchTiming", "i"), + ("SocialSourceNetworkID", "i"), + ("SocialSourcePage", "s"), + ("ParamPrice", "i"), + ("ParamOrderID", "s"), + ("ParamCurrency", "s"), + ("ParamCurrencyID", "i"), + ("OpenstatServiceName", "s"), + ("OpenstatCampaignID", "s"), + ("OpenstatAdID", "s"), + ("OpenstatSourceID", "s"), + ("UTMSource", "s"), + ("UTMMedium", "s"), + ("UTMCampaign", "s"), + ("UTMContent", "s"), + ("UTMTerm", "s"), + ("FromTag", "s"), + ("HasGCLID", "i"), + ("RefererHash", "i"), + ("URLHash", "i"), + ("CLID", "i"), +] + +# Pre-compute static parts of each line for speed. +# String fields need each value individually escaped; integers can be written raw. +_STR_TRANS = str.maketrans({"\\": "\\\\", '"': '\\"', "\n": " ", "\r": " "}) + + +# DataFusion (the InfluxDB 3 query engine) folds unquoted identifiers to +# lowercase, so we lowercase field names at load time. That way the standard +# ClickBench queries with CamelCase column references (e.g. ``EventDate``) +# resolve correctly without needing to be quoted. +_LOWER_COLUMNS = [(name.lower(), ty) for name, ty in COLUMNS] + + +def encode_row(row, ts_ns): + parts = [] + for (name, ty), value in zip(_LOWER_COLUMNS, row): + if ty == "i": + # Integer field — append 'i' suffix per line-protocol spec. + # Empty/blank cells become 0. + v = value if value else "0" + parts.append(f"{name}={v}i") + else: + v = value.translate(_STR_TRANS) if value else "" + parts.append(f'{name}="{v}"') + return f"hits {','.join(parts)} {ts_ns}\n" + + +def main(): + session = requests.Session() + session.headers["Content-Type"] = "text/plain; charset=utf-8" + params = {"db": DB, "precision": "nanosecond", "accept_partial": "false"} + + total = 0 + last_log = time.monotonic() + buf = [] + + with open(INPUT, "r", encoding="utf-8", errors="replace", newline="") as f: + reader = csv.reader( + f, delimiter="\t", quoting=csv.QUOTE_NONE, escapechar=None + ) + for i, row in enumerate(reader): + if len(row) != len(COLUMNS): + # Pad short rows / truncate over-long ones; the source TSV is + # mostly clean but defensive handling avoids killing the load. + row = (row + [""] * len(COLUMNS))[: len(COLUMNS)] + buf.append(encode_row(row, TS_BASE + i)) + if len(buf) >= BATCH_ROWS: + _flush(session, params, buf) + total += len(buf) + buf.clear() + now = time.monotonic() + if now - last_log > 5: + pct = 100.0 * total / TOTAL_ROWS + print(f" {pct:5.2f}% ({total}/{TOTAL_ROWS})", flush=True) + last_log = now + if buf: + _flush(session, params, buf) + total += len(buf) + + print(f"Total rows written: {total}") + + +def _flush(session, params, buf): + body = "".join(buf).encode("utf-8") + r = session.post(URL, params=params, data=body, timeout=300) + if r.status_code >= 300: + sys.stderr.write(f"write_lp HTTP {r.status_code}: {r.text[:500]}\n") + + +if __name__ == "__main__": + main() diff --git a/influxdb/queries.sql b/influxdb/queries.sql new file mode 100644 index 0000000000..47114dee7f --- /dev/null +++ b/influxdb/queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM hits; +SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0; +SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits; +SELECT AVG(UserID) FROM hits; +SELECT COUNT(DISTINCT UserID) FROM hits; +SELECT COUNT(DISTINCT SearchPhrase) FROM hits; +SELECT MIN(EventDate), MAX(EventDate) FROM hits; +SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; +SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; +SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10; +SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; +SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10; +SELECT UserID, extract(minute FROM CAST(EventTime AS TIMESTAMP)) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID FROM hits WHERE UserID = 435090932899640449; +SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'; +SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; +SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; +SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits; +SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10; +SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; +SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; +SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', CAST(EventTime AS TIMESTAMP)) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', CAST(EventTime AS TIMESTAMP)) ORDER BY DATE_TRUNC('minute', CAST(EventTime AS TIMESTAMP)) LIMIT 10 OFFSET 1000; diff --git a/influxdb/run.sh b/influxdb/run.sh new file mode 100755 index 0000000000..f8352a6d44 --- /dev/null +++ b/influxdb/run.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +TRIES=3 +URL="http://localhost:8181/api/v3/query_sql" + +set -f +while IFS= read -r query; do + [ -z "$query" ] && continue + sync + echo 3 | sudo tee /proc/sys/vm/drop_caches > /dev/null + + body=$(jq -n --arg q "$query" '{db: "hits", q: $q, format: "json"}') + + echo -n "[" + for i in $(seq 1 $TRIES); do + t1=$(date +%s%N) + curl -sS --fail --max-time 600 -H 'Content-Type: application/json' \ + -X POST "$URL" -d "$body" > /dev/null + rc=$? + t2=$(date +%s%N) + if [ "$rc" = "0" ]; then + awk "BEGIN { printf \"%.3f\", ($t2 - $t1) / 1000000000 }" + else + echo -n "null" + fi + [ "$i" != "$TRIES" ] && echo -n ", " + done + echo "]," +done < queries.sql diff --git a/influxdb/template.json b/influxdb/template.json new file mode 100644 index 0000000000..51d0e822a8 --- /dev/null +++ b/influxdb/template.json @@ -0,0 +1,12 @@ +{ + "system": "InfluxDB", + "proprietary": "no", + "hardware": "cpu", + "tuned": "no", + "tags": [ + "Rust", + "column-oriented", + "time-series", + "lukewarm-cold-run" + ] +} From f84b4b6438bda22d71e11ed38af37e24789ca180 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 8 May 2026 14:55:11 +0000 Subject: [PATCH 2/4] influxdb: parallel loader, throttle progress, silence noisy commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two issues with the original entry: 1. Loading was very slow. The single-threaded loader with 1000-row batches managed ~1 K rows/s on a c7a.metal-class machine — about 28 hours for the full 100M-row dataset. 2. The captured benchmark log was dominated by noisy output during the load: progress every 5 s for hours, plus apt/wget/pip verbose output, risked hitting the 1 MiB log cap. Changes: - load.py: send batches through a ThreadPoolExecutor (16 workers, bounded queue). InfluxDB 3 saturates around 16 concurrent writers on this hardware; doubling to 32 only adds ~10%. Progress prints every 30 s instead of every 5 s. Bumped BATCH_ROWS from 1000 to 2000 to halve the HTTP round-trip count without hitting the default 10 MiB request-size cap. - benchmark.sh: silence apt and wget; install python3-requests via apt instead of `pip3 install --break-system-packages` (which is refused under PEP 668 anyway on noble); bump --max-http-request-size to 64 MiB and --wal-max-write-buffer-size to 1M as headroom. (We also briefly tried --wal-flush-interval 30s; it actively hurt throughput by ~30× — each write blocked until the next flush — so default 1s wins.) Measured on a 1M-row sample, single-node c7a.metal-48xl: single-thread, BATCH=1000: ~1,000 rows/s 16 workers, BATCH=2000: ~30,000 rows/s Co-Authored-By: Claude Opus 4.7 (1M context) --- influxdb/benchmark.sh | 16 ++++-- influxdb/load.py | 131 ++++++++++++++++++++++++++---------------- 2 files changed, 93 insertions(+), 54 deletions(-) diff --git a/influxdb/benchmark.sh b/influxdb/benchmark.sh index 6025399f4d..06688ad071 100755 --- a/influxdb/benchmark.sh +++ b/influxdb/benchmark.sh @@ -2,13 +2,13 @@ set -eu +export DEBIAN_FRONTEND=noninteractive + # Install dependencies and the InfluxDB 3 Core binary directly. We bypass the # upstream install_influxdb3.sh installer because it is interactive and not # suited for unattended runs. - -sudo apt-get update -y -sudo apt-get install -y python3 python3-pip curl jq time -pip3 install --break-system-packages requests +sudo apt-get update -qq >/dev/null +sudo apt-get install -y -qq python3 python3-requests curl jq time >/dev/null INFLUX_VERSION=3.9.2 case "$(uname -m)" in @@ -18,13 +18,15 @@ case "$(uname -m)" in esac INFLUX_TGZ="influxdb3-core-${INFLUX_VERSION}_${INFLUX_ARTIFACT}.tar.gz" -wget --continue --progress=dot:giga \ - "https://dl.influxdata.com/influxdb/releases/${INFLUX_TGZ}" +wget --continue -q "https://dl.influxdata.com/influxdb/releases/${INFLUX_TGZ}" rm -rf "influxdb3-core-${INFLUX_VERSION}" tar -xzf "${INFLUX_TGZ}" INFLUXDB3="${PWD}/influxdb3-core-${INFLUX_VERSION}/influxdb3" # Start the server with local-file storage and authentication disabled. +# The --wal-* tunings reduce per-second fsync churn during the multi-hour +# load and let more write requests accumulate in memory before being +# rejected with back-pressure. mkdir -p ./influxdb3-data nohup "${INFLUXDB3}" serve \ --node-id node0 \ @@ -32,6 +34,8 @@ nohup "${INFLUXDB3}" serve \ --data-dir "${PWD}/influxdb3-data" \ --http-bind 127.0.0.1:8181 \ --without-auth \ + --wal-max-write-buffer-size 1000000 \ + --max-http-request-size 67108864 \ > influxdb3.log 2>&1 & INFLUXDB_PID=$! echo "InfluxDB PID: ${INFLUXDB_PID}" diff --git a/influxdb/load.py b/influxdb/load.py index 3bd3b9778c..f0c5979294 100755 --- a/influxdb/load.py +++ b/influxdb/load.py @@ -4,19 +4,41 @@ Each row of the TSV becomes one line-protocol point in measurement ``hits``. All columns are written as fields (no tags). The line protocol timestamp is derived from the row index so that every point is unique. + +Batches are encoded and uploaded by a thread pool: encoding happens under +the GIL, but the HTTP upload releases the GIL while waiting on the socket +and on the InfluxDB server, so several uploads can be in flight at once. """ import csv -import os import sys import time +from concurrent.futures import ThreadPoolExecutor + import requests URL = "http://localhost:8181/api/v3/write_lp" DB = "hits" INPUT = "hits.tsv" TOTAL_ROWS = 99997497 -BATCH_ROWS = 1000 + +# Each line-protocol point is roughly 2 KB encoded (105 fields with their +# names repeated on every row). 2000 rows keeps the body comfortably under +# the 64 MiB max-http-request-size we set on the server. Larger batches mean +# fewer HTTP round-trips. +BATCH_ROWS = 2000 + +# Number of upload workers. InfluxDB's per-connection ingest is limited; what +# scales here is concurrent in-flight HTTP requests. 16 workers gives ~30× +# the throughput of single-threaded loading on this dataset; doubling to 32 +# only adds another ~10%. +WORKERS = 16 + +# Bound the in-flight queue so we don't accumulate gigabytes of pending +# bodies if the server back-pressures. +MAX_PENDING = WORKERS * 2 + +PROGRESS_INTERVAL_SECONDS = 30 # 2020-01-01 00:00:00 UTC, in nanoseconds. Row i is written at TS_BASE + i ns, # guaranteeing per-row uniqueness while keeping timestamps in a reasonable range. @@ -132,72 +154,85 @@ ("CLID", "i"), ] -# Pre-compute static parts of each line for speed. # String fields need each value individually escaped; integers can be written raw. _STR_TRANS = str.maketrans({"\\": "\\\\", '"': '\\"', "\n": " ", "\r": " "}) - # DataFusion (the InfluxDB 3 query engine) folds unquoted identifiers to # lowercase, so we lowercase field names at load time. That way the standard # ClickBench queries with CamelCase column references (e.g. ``EventDate``) # resolve correctly without needing to be quoted. _LOWER_COLUMNS = [(name.lower(), ty) for name, ty in COLUMNS] - - -def encode_row(row, ts_ns): - parts = [] - for (name, ty), value in zip(_LOWER_COLUMNS, row): - if ty == "i": - # Integer field — append 'i' suffix per line-protocol spec. - # Empty/blank cells become 0. - v = value if value else "0" - parts.append(f"{name}={v}i") - else: - v = value.translate(_STR_TRANS) if value else "" - parts.append(f'{name}="{v}"') - return f"hits {','.join(parts)} {ts_ns}\n" +_NUM_COLS = len(_LOWER_COLUMNS) + +_session = requests.Session() +_session.headers["Content-Type"] = "text/plain; charset=utf-8" +_PARAMS = {"db": DB, "precision": "nanosecond", "accept_partial": "false"} + + +def encode_and_upload(rows, ts_start): + out = [] + for i, row in enumerate(rows): + if len(row) != _NUM_COLS: + row = (row + [""] * _NUM_COLS)[:_NUM_COLS] + parts = [] + for (name, ty), value in zip(_LOWER_COLUMNS, row): + if ty == "i": + # Integer field — append 'i' suffix per line-protocol spec. + # Empty/blank cells become 0. + v = value if value else "0" + parts.append(f"{name}={v}i") + else: + v = value.translate(_STR_TRANS) if value else "" + parts.append(f'{name}="{v}"') + out.append(f"hits {','.join(parts)} {ts_start + i}\n") + + body = "".join(out).encode("utf-8") + r = _session.post(URL, params=_PARAMS, data=body, timeout=600) + if r.status_code >= 300: + sys.stderr.write(f"write_lp HTTP {r.status_code}: {r.text[:500]}\n") def main(): - session = requests.Session() - session.headers["Content-Type"] = "text/plain; charset=utf-8" - params = {"db": DB, "precision": "nanosecond", "accept_partial": "false"} - total = 0 + next_ts = TS_BASE last_log = time.monotonic() - buf = [] - - with open(INPUT, "r", encoding="utf-8", errors="replace", newline="") as f: - reader = csv.reader( - f, delimiter="\t", quoting=csv.QUOTE_NONE, escapechar=None - ) - for i, row in enumerate(reader): - if len(row) != len(COLUMNS): - # Pad short rows / truncate over-long ones; the source TSV is - # mostly clean but defensive handling avoids killing the load. - row = (row + [""] * len(COLUMNS))[: len(COLUMNS)] - buf.append(encode_row(row, TS_BASE + i)) - if len(buf) >= BATCH_ROWS: - _flush(session, params, buf) - total += len(buf) - buf.clear() + pending = [] + + with ThreadPoolExecutor(max_workers=WORKERS) as executor: + with open(INPUT, "r", encoding="utf-8", errors="replace", newline="") as f: + reader = csv.reader( + f, delimiter="\t", quoting=csv.QUOTE_NONE, escapechar=None + ) + batch = [] + for row in reader: + batch.append(row) + if len(batch) < BATCH_ROWS: + continue + + pending.append(executor.submit(encode_and_upload, batch, next_ts)) + next_ts += len(batch) + total += len(batch) + batch = [] + + # Drain oldest futures so memory stays bounded and any error + # surfaces promptly. + while len(pending) >= MAX_PENDING: + pending.pop(0).result() + now = time.monotonic() - if now - last_log > 5: + if now - last_log > PROGRESS_INTERVAL_SECONDS: pct = 100.0 * total / TOTAL_ROWS print(f" {pct:5.2f}% ({total}/{TOTAL_ROWS})", flush=True) last_log = now - if buf: - _flush(session, params, buf) - total += len(buf) - print(f"Total rows written: {total}") + if batch: + pending.append(executor.submit(encode_and_upload, batch, next_ts)) + total += len(batch) + for fut in pending: + fut.result() -def _flush(session, params, buf): - body = "".join(buf).encode("utf-8") - r = session.post(URL, params=params, data=body, timeout=300) - if r.status_code >= 300: - sys.stderr.write(f"write_lp HTTP {r.status_code}: {r.text[:500]}\n") + print(f"Total rows written: {total}") if __name__ == "__main__": From aa749332a55d362ba5df15b309a1ee5a642e7fc8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 8 May 2026 17:27:15 +0000 Subject: [PATCH 3/4] influxdb: bump query memory pool, restart server before queries, silence curl errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous run produced log lines like: curl: (18) transfer closed with outstanding read data remaining null, ... Three things were happening: 1. DataFusion was OOMing on heavy queries (Q29's REGEXP_REPLACE was the first to fall over) and aborting the response mid-stream. The log says: "Memory Exhausted while SpillPool (DiskManager is disabled)". InfluxDB 3.9.2 ships with DataFusion's DiskManager hardcoded disabled — there is no spill-to-disk fallback, and no flag to enable it. The only mitigation we have is to make the in-memory budget as large as possible. 2. After the load, the server still has WAL state and large in-memory write buffers. That memory isn't available to queries until the next snapshot, which made even moderate queries fail with the same OOM error. 3. `curl -sS` printed each transfer-closed message to stderr, which `tee` captured into the log alongside the `null` row from run.sh. Changes: - benchmark.sh: pass `--exec-mem-pool-bytes 80%` so DataFusion gets the lion's share of the box for query execution, and restart the server between load and queries (drains the WAL into Parquet, releases write-buffer memory, gives queries a clean budget). - run.sh: drop the `-S` from `curl -sS` and add `2>/dev/null`. curl's exit code is enough for run.sh to record `null`; the human-readable error message just polluted the captured log. After these changes a 5M-row sample produces 986 bytes of run.sh log with one clean `[null, null, null]` row for Q29 (still OOMs even at 5M with a regex over Referer; nothing we can do about that without a DiskManager). Co-Authored-By: Claude Opus 4.7 (1M context) --- influxdb/benchmark.sh | 51 ++++++++++++++++++++++++++++--------------- influxdb/run.sh | 8 +++++-- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/influxdb/benchmark.sh b/influxdb/benchmark.sh index 06688ad071..fd1e7d8032 100755 --- a/influxdb/benchmark.sh +++ b/influxdb/benchmark.sh @@ -28,22 +28,29 @@ INFLUXDB3="${PWD}/influxdb3-core-${INFLUX_VERSION}/influxdb3" # load and let more write requests accumulate in memory before being # rejected with back-pressure. mkdir -p ./influxdb3-data -nohup "${INFLUXDB3}" serve \ - --node-id node0 \ - --object-store file \ - --data-dir "${PWD}/influxdb3-data" \ - --http-bind 127.0.0.1:8181 \ - --without-auth \ - --wal-max-write-buffer-size 1000000 \ - --max-http-request-size 67108864 \ - > influxdb3.log 2>&1 & -INFLUXDB_PID=$! -echo "InfluxDB PID: ${INFLUXDB_PID}" - -for _ in $(seq 1 300); do - curl -sf http://localhost:8181/health > /dev/null && break - sleep 1 -done +start_server() { + nohup "${INFLUXDB3}" serve \ + --node-id node0 \ + --object-store file \ + --data-dir "${PWD}/influxdb3-data" \ + --http-bind 127.0.0.1:8181 \ + --without-auth \ + --wal-max-write-buffer-size 1000000 \ + --max-http-request-size 67108864 \ + --exec-mem-pool-bytes 80% \ + > influxdb3.log 2>&1 & + INFLUXDB_PID=$! + echo "InfluxDB PID: ${INFLUXDB_PID}" + + for _ in $(seq 1 300); do + curl -sf http://localhost:8181/health > /dev/null && return + sleep 1 + done + echo "Timed out waiting for InfluxDB to start" >&2 + return 1 +} + +start_server "${INFLUXDB3}" create database hits @@ -53,8 +60,18 @@ done echo -n "Load time: " command time -f '%e' python3 load.py +# Restart the server before running queries: this drains the WAL into +# Parquet, releases the in-memory write buffers, and gives query execution +# a clean budget. Without it, the post-load process is still juggling +# ingest state and DataFusion's memory pool can OOM on heavier queries +# (Q29's REGEXP_REPLACE in particular). InfluxDB 3.9.2 has DiskManager +# hardcoded disabled so there is no spill-to-disk fallback. +kill -TERM "${INFLUXDB_PID}" 2>/dev/null || true +wait "${INFLUXDB_PID}" 2>/dev/null || true +start_server + # Run queries. -./run.sh 2>&1 | tee log.txt +./run.sh | tee log.txt echo -n "Data size: " du -bcs ./influxdb3-data | grep total | awk '{print $1}' diff --git a/influxdb/run.sh b/influxdb/run.sh index f8352a6d44..c8b9a6136d 100755 --- a/influxdb/run.sh +++ b/influxdb/run.sh @@ -14,8 +14,12 @@ while IFS= read -r query; do echo -n "[" for i in $(seq 1 $TRIES); do t1=$(date +%s%N) - curl -sS --fail --max-time 600 -H 'Content-Type: application/json' \ - -X POST "$URL" -d "$body" > /dev/null + # `-s` (silent) without `-S` so transient curl errors like the + # "transfer closed" message that DataFusion emits when a query OOMs + # don't pollute the captured benchmark log; the non-zero exit code + # is enough for us to record a `null` below. + curl -s --fail --max-time 600 -H 'Content-Type: application/json' \ + -X POST "$URL" -d "$body" > /dev/null 2>&1 rc=$? t2=$(date +%s%N) if [ "$rc" = "0" ]; then From baf0310ba341d698e0abe1954af9a9ac73db2e83 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 8 May 2026 20:32:13 +0000 Subject: [PATCH 4/4] influxdb: load in chunks with WAL drain between, to dodge regroup_files bug A monolithic parallel load made every Parquet file cover the same broad [min_time, max_time] range, which tripped InfluxDB 3.9.2's regroup_files optimizer at query time ("overlapping ranges within same file"). Splitting the load into 10 chunks and restarting the server between them keeps each chunk's Parquet files bounded to a disjoint timestamp slice. --- influxdb/benchmark.sh | 46 ++++++++++++++++++++++++++++----------- influxdb/load.py | 50 +++++++++++++++++++++++++++++++------------ 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/influxdb/benchmark.sh b/influxdb/benchmark.sh index fd1e7d8032..786e4ffd96 100755 --- a/influxdb/benchmark.sh +++ b/influxdb/benchmark.sh @@ -50,6 +50,14 @@ start_server() { return 1 } +restart_server() { + # SIGTERM forces the WAL to drain into Parquet and the in-memory write + # buffers to flush; the next start comes up with no WAL to replay. + kill -TERM "${INFLUXDB_PID}" 2>/dev/null || true + wait "${INFLUXDB_PID}" 2>/dev/null || true + start_server +} + start_server "${INFLUXDB3}" create database hits @@ -57,18 +65,32 @@ start_server # Download the dataset and load it via line protocol. ../download-hits-tsv -echo -n "Load time: " -command time -f '%e' python3 load.py - -# Restart the server before running queries: this drains the WAL into -# Parquet, releases the in-memory write buffers, and gives query execution -# a clean budget. Without it, the post-load process is still juggling -# ingest state and DataFusion's memory pool can OOM on heavier queries -# (Q29's REGEXP_REPLACE in particular). InfluxDB 3.9.2 has DiskManager -# hardcoded disabled so there is no spill-to-disk fallback. -kill -TERM "${INFLUXDB_PID}" 2>/dev/null || true -wait "${INFLUXDB_PID}" 2>/dev/null || true -start_server +# Load in chunks, restarting the server between each chunk so the WAL drains +# into Parquet. With one monolithic load, every Parquet file ends up covering +# the same broad time range (16 parallel writers interleave timestamps across +# the whole dataset), and InfluxDB 3.9.2's regroup_files optimizer hits an +# internal "overlapping ranges within same file" assertion at query time. +# Chunking keeps each Parquet file's [min_time, max_time] bounded to a +# disjoint slice, so subsequent queries can plan successfully. +TOTAL_ROWS=99997497 +CHUNKS=10 +CHUNK_ROWS=$(( (TOTAL_ROWS + CHUNKS - 1) / CHUNKS )) + +load_t0=$(date +%s) +for i in $(seq 0 $((CHUNKS - 1))); do + chunk_start=$((i * CHUNK_ROWS)) + chunk_end=$(( (i + 1) * CHUNK_ROWS )) + if [ "$chunk_end" -gt "$TOTAL_ROWS" ]; then chunk_end=$TOTAL_ROWS; fi + echo "Chunk $((i + 1))/${CHUNKS}: rows ${chunk_start}..${chunk_end}" + python3 load.py --start-row "$chunk_start" --end-row "$chunk_end" + # Drain WAL so this chunk lands in its own Parquet files before the + # next chunk starts mixing more timestamps into the in-memory buffer. + restart_server +done +echo "Load time: $(($(date +%s) - load_t0))" + +# Server is already freshly restarted from the last chunk's drain, so no +# additional restart is needed before the query phase. # Run queries. ./run.sh | tee log.txt diff --git a/influxdb/load.py b/influxdb/load.py index f0c5979294..9ee906e3cb 100755 --- a/influxdb/load.py +++ b/influxdb/load.py @@ -8,12 +8,21 @@ Batches are encoded and uploaded by a thread pool: encoding happens under the GIL, but the HTTP upload releases the GIL while waiting on the socket and on the InfluxDB server, so several uploads can be in flight at once. + +Optionally accepts --start-row / --end-row so the load can be split into +chunks across multiple invocations. The wrapper script restarts InfluxDB +between chunks to drain the WAL into Parquet, which keeps the per-Parquet +``[min_time, max_time]`` ranges disjoint across chunks. That layout is what +keeps InfluxDB 3's ``regroup_files`` optimizer from tripping its +"overlapping ranges within same file" assertion at query time. """ +import argparse import csv import sys import time from concurrent.futures import ThreadPoolExecutor +from itertools import islice import requests @@ -193,8 +202,16 @@ def encode_and_upload(rows, ts_start): def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--start-row", type=int, default=0) + parser.add_argument("--end-row", type=int, default=TOTAL_ROWS) + args = parser.parse_args() + start_row = args.start_row + end_row = args.end_row + chunk_rows = end_row - start_row + total = 0 - next_ts = TS_BASE + next_ts = TS_BASE + start_row last_log = time.monotonic() pending = [] @@ -203,16 +220,21 @@ def main(): reader = csv.reader( f, delimiter="\t", quoting=csv.QUOTE_NONE, escapechar=None ) - batch = [] - for row in reader: - batch.append(row) - if len(batch) < BATCH_ROWS: - continue + # Skip rows before our chunk. csv.QUOTE_NONE means each record is + # exactly one line, so islice over the reader is safe. + if start_row: + for _ in islice(reader, start_row): + pass + + while total < chunk_rows: + take = min(BATCH_ROWS, chunk_rows - total) + batch = list(islice(reader, take)) + if not batch: + break pending.append(executor.submit(encode_and_upload, batch, next_ts)) next_ts += len(batch) total += len(batch) - batch = [] # Drain oldest futures so memory stays bounded and any error # surfaces promptly. @@ -221,18 +243,18 @@ def main(): now = time.monotonic() if now - last_log > PROGRESS_INTERVAL_SECONDS: - pct = 100.0 * total / TOTAL_ROWS - print(f" {pct:5.2f}% ({total}/{TOTAL_ROWS})", flush=True) + pct = 100.0 * total / chunk_rows + print( + f" {pct:5.2f}% ({total}/{chunk_rows})" + f" rows {start_row}..{start_row + total}", + flush=True, + ) last_log = now - if batch: - pending.append(executor.submit(encode_and_upload, batch, next_ts)) - total += len(batch) - for fut in pending: fut.result() - print(f"Total rows written: {total}") + print(f"Total rows written: {total} (chunk {start_row}..{start_row + total})") if __name__ == "__main__":