Performance test improvements#394
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR enhances performance testing capabilities by adding comprehensive metrics collection for consumer performance, including batch processing rates, end-to-end latency measurements, memory usage tracking, and offset lag monitoring. The changes also improve cache management in the consumer implementation and fix issues with message handling during rebalancing.
Key changes:
- Added configurable batch size (
js.consumer.max.batch.size) and cache size (js.consumer.max.cache.size.per.worker.ms) parameters for consumers - Implemented dynamic cache sizing based on consumption rate rather than static increments
- Enhanced performance test infrastructure with latency percentiles, memory tracking, and lag monitoring capabilities
Reviewed Changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| test/promisified/producer/flush.spec.js | Added producer creation and cleanup for timeout test |
| test/promisified/consumer/consumerCacheTests.spec.js | Adjusted message counts and timing for more reliable cache testing |
| test/promisified/consumer/consumeMessages.spec.js | Modified concurrency tests to use dynamic batch processing expectations |
| test/promisified/admin/fetch_offsets.spec.js | Fixed timing of message push to occur after commit check |
| package.json, schemaregistry/package.json, lib/util.js | Version bump to 1.6.1 |
| lib/kafkajs/_producer.js | Refactored to avoid mutating input message objects |
| lib/kafkajs/_consumer_cache.js | Added support for returning messages to cache head |
| lib/kafkajs/_consumer.js | Replaced static cache sizing with dynamic rate-based sizing, improved message return handling during pending operations |
| examples/performance/*.js | Added comprehensive performance testing infrastructure with latency, memory, and lag metrics |
| ci/update-version.js | Fixed prerelease version formatting |
| ci/tests/run_perf_test.* | Migrated performance tests from bash to Node.js with enhanced metrics |
| MIGRATION.md, CHANGELOG.md | Updated documentation for new consumer configuration options |
| .semaphore/semaphore.yml | Updated CI to use new Node.js-based performance test runner |
Comments suppressed due to low confidence (1)
ci/tests/run_perf_test.js:1
- Use
letinstead ofvarfor block-scoped variable declaration, consistent with modern JavaScript practices and the rest of the codebase.
#!/usr/bin/env node
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let totalMessagesSent = 0; | ||
| let totalBytesSent = 0; | ||
|
|
||
| let staticValueLength = Math.floor(msgSize * (1 - randomness)); |
There was a problem hiding this comment.
Variable staticValueBytes is assigned without declaration. Add let or const before staticValueBytes.
| let staticValueLength = Math.floor(msgSize * (1 - randomness)); | |
| let staticValueLength = Math.floor(msgSize * (1 - randomness)); | |
| let staticValueBytes; |
There was a problem hiding this comment.
Minor change worth making.
5349bd4 to
1e5c310
Compare
|
Milind L (milindl)
left a comment
There was a problem hiding this comment.
Thanks for the changes.
9dee07d to
58d611e
Compare
The base branch was changed.
edaad22 to
d78d6e6
Compare
…avg and max memory usage
…essage count and partition number
…itions are consumed first
d78d6e6 to
0f83fbb
Compare
Add a Dockerfile that bakes the librdkafka source build and the perf example deps, and switch the producer/create-topics Jobs to run from it instead of doing apt/git/nvm/npm work in an init container at pod start. The producer still writes logs into the shared workspace volume so the log-keeper sidecar and scale.py can collect them. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
max in flight 10
… scale fixes Producer metrics (performance-primitives-common.js): - Measure per-request send latency (producer.send issue -> promise resolution) and report avg/p50/p99/p99.9/max via the existing count-sketch, plus records/s, in the final producer metrics. Producer configuration: - newCompatibleProducer reads extra librdkafka properties from a configuration.yaml (PRODUCER_CONFIG_FILE), applied after the higher-latency cluster tuning so they take precedence. Specified via producer.config in the chart values and mounted from a ConfigMap. - Producer-test MESSAGE_COUNT now derives from LIMIT_RPS / TERMINATE_TIMEOUT_MS: unbounded (terminate on timeout) when LIMIT_RPS is unset, else capped to fill the timeout window at the target rate. scale.py: - Collect pod logs in parallel; write the global log file before copying the per-pod log files; store everything under a UTC-timestamped logs/ folder. - Stop waiting on the Job in helm install (it deadlocks on the log-keeper sidecar); wait for the producer container instead, then release the sidecar. - Release all sidecars before helm uninstall so pods don't get stuck Terminating for the full terminationGracePeriodSeconds. - Detect the host architecture and pin job pods to matching nodes via affinity. - Default Helm release name is now ckjs-perf-scale. Chart: - build-image.sh to build/push the prebuilt image (SOURCE_REF, --platform, --no-cache, --no-push); node-arch affinity helper applied to both jobs; producer config ConfigMap; .helmignore.
…t them - runProducer samples the running send-latency stats (avg/p50/p99/p99.9/max/ count) every 5s and appends them as JSON lines to jsmetrics.jsonl in the producer's working dir. Writes are buffered (no per-line flush); the stream is flushed and closed only when the test completes. - scale.py copies jsmetrics.jsonl from every pod into the run's timestamped log folder alongside the other per-pod logs. - Add scale/.gitignore (ignore collected logs) and update .helmignore.
…rator - runProducer now samples p90 alongside p50/p99/p99.9 and includes it in each jsmetrics.jsonl line. - plot_metrics.py reads a jsmetrics.jsonl and writes a self-contained Markdown report with a base64-embedded matplotlib chart of avg/p50/p90/p99/p99.9/max producer send latency over the test runtime. Series missing from older data are skipped.
Rework runProducer's send loop to model steady parallel production traffic: - Drop the per-window `await Promise.all`. Sends are dispatched continuously and the in-flight promises are pruned each iteration (completed ones removed), keeping memory bounded to the outstanding set. - Always yield to the event loop each iteration (LIMIT_RPS pacing sleep, or setImmediate otherwise) so delivery callbacks run, the prune sees completions, and termination can fire — no event-loop starvation. - Surface real produce errors: QUEUE_FULL is retried; other errors are recorded on the promise and re-thrown (in-loop and after the final drain) so a genuine broker failure aborts the test instead of becoming an unhandled rejection. Also ignore generated *.md reports under scale/.
Restructure the ckjs-perf-scale chart so test parameters live in separate producer/consumer sections, each with its own replicas, mode, tuning flags and librdkafka config list. When a consumer section is present, a second Job runs performance-consolidated.js with the flag matching consumerMode (--consumer-each-batch / --consumer-each-message), and scale.py collects its pod logs into a separate scale-<release>-consumer.log. - values/example-values: per-section replicas, add producer.initialDelayMs, drop skipCTPTest/concurrentRun; consumer section is optional. - templates: producer + consumer env ConfigMaps and extra-config ConfigMaps, new consumer-job.yaml, create-topics + consumer commands gate --produce-to-second-topic on consumer.produceToSecondTopic so the second topic is only created when something produces to it. - primitives: add CONSUMER_CONFIG_FILE loader applied in newCompatibleConsumer; skip creating a null topic2 in runCreateTopics. - common: extract the periodic jsmetrics writer into startMetricsLogger and use it for the consumer T0->T1 E2E latency; write per-run-type files (jsmetrics-producer/-consumer-batch/-consumer-message.jsonl). - scale.py: generalise pod collection by component/container; copy the new jsmetrics files. - plot_metrics.py: label graphs by run type and also plot consumer E2E latency for both modes when sibling files are present.
plot_metrics.py now accepts a run/log folder: it writes a per-pod report for every pod's jsmetrics file and a combined report that groups a per-pod latency chart under Producers/Consumers sections, each followed by a broker-RTT chart aggregated (max) across that role's pods. Refactors the single-file report into build_single_report and adds build_combined_report, aggregate_rtt_across_pods, and stats/pod-id helpers; single-file mode is unchanged. Make the jsmetrics sampling interval configurable via JSMETRICS_INTERVAL_MS (default 5000), passed to startMetricsLogger for both producer send-latency and consumer E2E-latency. Add producer.jsmetricsIntervalMs / consumer .jsmetricsIntervalMs to values/example-values and emit JSMETRICS_INTERVAL_MS in both ConfigMaps (defaulting to 5000 so an omitted value stays valid).
Collect each pod's files repeatedly while the Jobs run instead of once at the end. Every --copy-interval minutes (default 10): run `kubectl get jobs` first to prime external re-authentication, then atomically copy every producer/consumer pod's log + jsmetrics files (kubectl cp to a .tmp then os.replace, so a failed copy never clobbers the last good snapshot) and refresh the per-component console-log summaries (also written atomically). Loop until every main container has terminated — doing one final post-completion copy — then release the log-keeper sidecars and helm uninstall, bounded by --timeout. Replaces collect_component/process_pod/copy_and_release_pod/wait_container_done with collect_round + helpers (kubectl_get_jobs, atomic_copy_pod_logs, write_text_atomic, fetch_pod_result, build_summary_text, containers_terminated). Also make startMetricsLogger write jsmetrics-*.jsonl through a single append-mode write stream so each sample is dispatched to the fd promptly (and the file is truncated fresh per run), making mid-run copies reflect all samples so far.
The jsmetrics-*.jsonl files reported cumulative latency since the run start, so each line was a running aggregate that hid per-window changes. Add a windowStats accumulator (mirroring the cumulative stats) that is reset every metrics interval, so each jsonl line describes only that window while the end-of-run console summary stays cumulative. In the consumer, updateLatency now takes the stats accumulator as its first parameter and uses an internal per-accumulator counter for the running average (the window count must not depend on cumulative messagesMeasured). Latency stats are grouped into T0T1 and T0T2 sub-objects with identically-named fields (percentiles/count/avg/max), collapsing the isT0T2 branching; callers in performance-consolidated.js are updated accordingly.
Previously each copy round fetched every pod's full console log via
`kubectl logs --all-containers` with capture=True, holding each pod's
entire log as a Python string across 20 concurrent workers and then
concatenating all of them into one string in build_summary_text. With
~100 pods and long runs this allocated well over a gigabyte per round.
Now each pod's full console log is streamed straight to disk as
{pod}-console.log (kubectl stdout redirected to a file handle, written
atomically), and the per-component summary embeds only the last
--summary-tail-lines lines of each (read via `tail`), parsing
`=== Producer Rate:` from that tail. Peak memory is bounded to N tail
lines per worker regardless of pod count or run length.
Adds --summary-tail-lines (default 50, comfortably more than the ~29-line
gap between rate blocks). Concurrency, atomic writes, and the kubectl cp
of the .log/.jsonl files are unchanged.
|








measure eachBatch rate, time and lag, avg and max memory usage