The main client class for connecting to Timeplus Proton.
const client = new ProtonClient(config: ProtonConfig);| Property | Type | Required | Default | Description |
|---|---|---|---|---|
host |
string |
No | "localhost" |
The Proton server hostname or IP address |
port |
number |
No | 3218 |
The Proton server HTTP port |
path |
string |
No | - | Optional path prefix for proxy scenarios (e.g., "/query") |
username |
string |
No | - | Username for basic authentication |
password |
string |
No | "" |
Password for basic authentication |
timeout |
number |
No | - | Connection timeout in milliseconds. If not set, no timeout is applied |
Default connection (localhost:3218):
const client = new ProtonClient();Custom host and port:
const client = new ProtonClient({
host: "proton.example.com",
port: 8123,
});With authentication:
const client = new ProtonClient({
host: "proton.example.com",
username: "admin",
password: "secret",
});With timeout:
const client = new ProtonClient({
timeout: 30000, // 30 seconds
});With proxy path:
const client = new ProtonClient({
host: "api.example.com",
port: 8080,
path: "/query",
});
// Sends queries to: http://api.example.com:8080/query?default_format=JSONEachRowFull configuration:
const client = new ProtonClient({
host: "proton.example.com",
port: 3218,
path: "/query",
username: "admin",
password: "secret",
timeout: 30000,
});Executes a SQL query against Proton and returns a streaming result.
async query<T = RowData>(
sql: string,
options?: QueryOptions
): Promise<QueryResult<T>>The SQL query to execute. Cannot be empty or whitespace-only.
| Property | Type | Required | Description |
|---|---|---|---|
signal |
AbortSignal |
No | An AbortSignal for external cancellation control |
| Property | Type | Description |
|---|---|---|
rows |
AsyncIterableIterator<T> |
An async iterator that yields parsed rows as they arrive |
abort |
() => void |
Function to cancel the query and close the connection |
T- The expected row type. Defaults toRowData(Record<string, any>)
Basic streaming query:
const { rows } = await client.query("SELECT * FROM my_stream");
for await (const row of rows) {
console.log(row);
}With typed results:
interface StockTick {
symbol: string;
price: number;
timestamp: string;
}
const { rows } = await client.query<StockTick>(
"SELECT symbol, price, _tp_time as timestamp FROM stock_ticks"
);
for await (const tick of rows) {
console.log(`${tick.symbol}: $${tick.price}`);
}Abort after timeout:
const { rows, abort } = await client.query("SELECT * FROM my_stream");
// Abort after 10 seconds
setTimeout(() => abort(), 10000);
try {
for await (const row of rows) {
console.log(row);
}
} catch (err) {
console.log("Stream ended:", err.message);
}Using external AbortController:
const controller = new AbortController();
const { rows } = await client.query(
"SELECT * FROM my_stream",
{ signal: controller.signal }
);
// Abort from elsewhere
process.on("SIGINT", () => controller.abort());
for await (const row of rows) {
console.log(row);
}Bounded query (non-streaming):
// Use table() function for historical queries
const { rows } = await client.query(
"SELECT * FROM table(my_stream) LIMIT 100"
);
const results = [];
for await (const row of rows) {
results.push(row);
}
console.log(`Got ${results.length} rows`);Low-level utility function for parsing NDJSON (newline-delimited JSON) streams. Exported for advanced use cases where you need to parse NDJSON from custom sources.
async function* ndjsonStreamParser<T>(
reader: ReadableStreamDefaultReader<Uint8Array>
): AsyncIterableIterator<T>| Parameter | Type | Description |
|---|---|---|
reader |
ReadableStreamDefaultReader<Uint8Array> |
A stream reader from response.body.getReader() |
An AsyncIterableIterator<T> that yields parsed JSON objects.
- Handles partial lines split across chunks
- Properly decodes multi-byte UTF-8 characters
- Flushes remaining buffer when stream ends
- Releases the reader lock when done or on error
- Throws descriptive error if JSON parsing fails
import { ndjsonStreamParser } from "@timeplus/proton-javascript-driver";
const response = await fetch("https://example.com/stream");
const reader = response.body!.getReader();
for await (const item of ndjsonStreamParser<MyType>(reader)) {
console.log(item);
}Default type for query results when no type parameter is provided.
type RowData = Record<string, any>;Configuration object for the client constructor.
interface ProtonConfig {
host?: string; // default: "localhost"
port?: number; // default: 3218
path?: string; // optional path prefix for proxy scenarios
username?: string;
password?: string;
timeout?: number;
}Options for the query() method.
interface QueryOptions {
signal?: AbortSignal;
}Return type of the query() method.
interface QueryResult<T> {
rows: AsyncIterableIterator<T>;
abort: () => void;
}When Proton returns a non-2xx status code, the client throws an error with the format:
Proton Error (${status}): ${errorMessage}
Passing an empty or whitespace-only SQL string throws:
SQL query cannot be empty
When a query is aborted (via abort(), timeout, or external signal), the error thrown is:
Query aborted or timed out
If the server sends malformed JSON, the parser throws:
Failed to parse NDJSON line: ${first100chars}...
try {
const { rows } = await client.query("INVALID SQL");
for await (const row of rows) {
console.log(row);
}
} catch (err) {
if (err.message.includes("Proton Error")) {
console.error("Query failed:", err.message);
} else if (err.message.includes("aborted")) {
console.log("Query was cancelled");
} else if (err.message.includes("Failed to parse")) {
console.error("Received invalid data:", err.message);
} else {
console.error("Unexpected error:", err);
}
}The client supports HTTP Basic Authentication.
When username is provided in the config, the client sends an Authorization header with each request:
Authorization: Basic <base64(username:password)>
- Credentials are sent with every request
- Always use HTTPS in production to protect credentials in transit
- Do not log or serialize the
ProtonConfigobject as it contains the password - Consider using environment variables for credentials:
const client = new ProtonClient({
host: process.env.PROTON_HOST,
port: process.env.PROTON_PORT ? parseInt(process.env.PROTON_PORT) : undefined,
username: process.env.PROTON_USER,
password: process.env.PROTON_PASSWORD,
});The timeout config option applies to the initial connection. Once the stream is established, it can run indefinitely (useful for streaming queries).
For streaming queries where you want to limit total duration, use the abort() function:
const { rows, abort } = await client.query("SELECT * FROM my_stream");
// Limit total streaming time
setTimeout(() => abort(), 60000); // 1 minute max
for await (const row of rows) {
process.stdout.write(".");
}The client automatically cleans up resources:
- Reader lock is released when iteration completes or errors
- Aborting a query closes the underlying HTTP connection
For proper cleanup in all scenarios, use try-finally:
const { rows, abort } = await client.query("SELECT * FROM my_stream");
try {
for await (const row of rows) {
if (shouldStop(row)) {
abort();
break;
}
process.stdout.write(".");
}
} catch (err) {
console.error("Stream error:", err);
}