Skip to content
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,51 @@ client.write_dataframe(
)
```

### Accept partial writes and inspect failed lines
`accept_partial` defaults to `True` and allows partial success when a batch contains invalid lines.
On partial failure, the client raises `InfluxDBPartialWriteError` with structured `line_errors`.

```python
from influxdb_client_3 import InfluxDBClient3
from influxdb_client_3.exceptions import InfluxDBPartialWriteError

client = InfluxDBClient3(host="http://localhost:8181", token="token", database="db")
lp = "m v=1i 1\nm v=1.2 2"

try:
client.write(lp) # accept_partial=True by default
except InfluxDBPartialWriteError as e:
for line_err in e.line_errors:
print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})")
```

Disable partial writes:
```python
from influxdb_client_3 import WriteOptions, write_client_options

client = InfluxDBClient3(
host="http://localhost:8181",
token="token",
database="db",
write_client_options=write_client_options(
write_options=WriteOptions(accept_partial=False)
),
)
```

### V2 compatibility mode (Clustered)
Set `use_v2_api=True` to route writes through `/api/v2/write` for Clustered/v2-compatible backends.

`use_v2_api` can be configured by:
- `WriteOptions(use_v2_api=True)`
- constructor kwarg: `write_use_v2_api=True`
- env var: `INFLUX_WRITE_USE_V2_API=true`

When `use_v2_api=True`:
- `accept_partial` is ignored by the backend
- `no_sync=True` is invalid and rejected before dispatch with:
`invalid write options: no_sync cannot be used with use_v2_api`

## Querying

### Querying with SQL
Expand Down
51 changes: 38 additions & 13 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
INFLUX_WRITE_ACCEPT_PARTIAL = "INFLUX_WRITE_ACCEPT_PARTIAL"
INFLUX_WRITE_USE_V2_API = "INFLUX_WRITE_USE_V2_API"
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"
Expand Down Expand Up @@ -155,19 +157,23 @@ def _parse_gzip_threshold(threshold: str) -> int:
return threshold


def _parse_write_no_sync(write_no_sync: str):
def _parse_write_bool(value):
"""
Parses and validates the provided write no sync value.
Parses a truthy/falsy value for write options.

This function ensures that the given value is a valid boolean,
and it raises an appropriate error if the value is not valid.
The input is normalized to string and matched against common truthy values.
Any non-truthy value is treated as False.

:param write_no_sync: The input value to be parsed and validated.
:type write_no_sync: Any
:return: The validated write no sync value as an boolean.
:param value: The input value to be parsed and validated.
:type value: Any
:return: Parsed boolean value.
:rtype: bool
"""
Comment thread
alespour marked this conversation as resolved.
return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes']
return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes']


def _parse_write_no_sync(write_no_sync: str):
return _parse_write_bool(write_no_sync)


def _parse_timeout(to: str) -> int:
Expand Down Expand Up @@ -233,6 +239,8 @@ def __init__(
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
:key bool write_accept_partial: allow partial writes when some lines fail.
:key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
:key list[str] profilers: list of enabled Flux profilers
"""
self._org = org if org is not None else "default"
Expand All @@ -243,22 +251,34 @@ def __init__(
write_type = DefaultWriteOptions.write_type.value
write_precision = DefaultWriteOptions.write_precision.value
write_no_sync = DefaultWriteOptions.no_sync.value
write_accept_partial = DefaultWriteOptions.accept_partial.value
write_use_v2_api = DefaultWriteOptions.use_v2_api.value
write_timeout = DefaultWriteOptions.timeout.value

if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
write_opts = write_client_options['write_options']
write_type = getattr(write_opts, 'write_type', write_type)
write_precision = getattr(write_opts, 'write_precision', write_precision)
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
write_accept_partial = getattr(write_opts, 'accept_partial', write_accept_partial)
write_use_v2_api = getattr(write_opts, 'use_v2_api', write_use_v2_api)
write_timeout = getattr(write_opts, 'timeout', write_timeout)

if kw_keys.__contains__('write_timeout'):
write_timeout = kwargs.get('write_timeout')

if kw_keys.__contains__('write_accept_partial'):
write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial'))

if kw_keys.__contains__('write_use_v2_api'):
write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api'))

write_options = WriteOptions(
write_type=write_type,
write_precision=write_precision,
no_sync=write_no_sync,
accept_partial=write_accept_partial,
use_v2_api=write_use_v2_api,
)

self._write_client_options = {
Expand Down Expand Up @@ -347,7 +367,15 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':

write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
if write_no_sync is not None:
write_options.no_sync = _parse_write_no_sync(write_no_sync)
write_options.no_sync = _parse_write_bool(write_no_sync)

write_accept_partial = os.getenv(INFLUX_WRITE_ACCEPT_PARTIAL)
if write_accept_partial is not None:
write_options.accept_partial = _parse_write_bool(write_accept_partial)

write_use_v2_api = os.getenv(INFLUX_WRITE_USE_V2_API)
if write_use_v2_api is not None:
write_options.use_v2_api = _parse_write_bool(write_use_v2_api)

precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
Expand Down Expand Up @@ -402,10 +430,7 @@ def write(self, record=None, database=None, **kwargs):
if database is None:
database = self._database

try:
return self._write_api.write(bucket=database, record=record, **kwargs)
except InfluxDBError as e:
raise e
return self._write_api.write(bucket=database, record=record, **kwargs)

def write_dataframe(
self,
Expand Down
3 changes: 2 additions & 1 deletion influxdb_client_3/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa

from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError
from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError, InfluxDBPartialWriteError, \
InfluxDBPartialWriteLineError
169 changes: 150 additions & 19 deletions influxdb_client_3/exceptions/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Exceptions utils for InfluxDB."""

import json
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple

from urllib3 import HTTPResponse

Expand Down Expand Up @@ -39,6 +42,104 @@ def __init__(self, error_message, *args, **kwargs):
self.message = error_message


def _is_partial_write_error(error_message) -> bool:
if not isinstance(error_message, str) or not error_message:
return False
normalized = error_message.lower()
return (
"partial write of line protocol occurred" in normalized or
"parsing failed for write_lp endpoint" in normalized
)


def _parse_partial_write_data_item(item) -> Optional[Tuple[str, int, str]]:
if item is None:
return None
if not isinstance(item, dict):
raise ValueError("array item is not an object")

error_message = item.get("error_message")
if not isinstance(error_message, str):
raise ValueError("error_message must be string")
if not error_message:
return None

line_number_raw = item.get("line_number")
if line_number_raw is None:
line_number = 0
elif isinstance(line_number_raw, int):
line_number = line_number_raw
else:
raise ValueError("line_number must be int")

original_line_raw = item.get("original_line")
if original_line_raw is None:
original_line = ""
elif isinstance(original_line_raw, str):
original_line = original_line_raw
else:
raise ValueError("original_line must be string")

return error_message, line_number, original_line


def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str]]]:
if not isinstance(data, list):
return None
line_errors: List[Tuple[str, int, str]] = []
try:
for item in data:
parsed = _parse_partial_write_data_item(item)
if parsed is None:
continue
line_errors.append(parsed)
except ValueError:
return None
return line_errors if len(line_errors) > 0 else None


def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, str]]:
try:
return _parse_partial_write_data_item(data)
except ValueError:
return None


def _format_partial_write_details(line_errors: List[Tuple[str, int, str]]) -> List[str]:
details: List[str] = []
for error_message, line_number, original_line in line_errors:
if line_number != 0 and original_line != "":
details.append(f"\tline {line_number}: {error_message} ({original_line})")
elif error_message:
details.append(f"\t{error_message}")
return details


def _parse_partial_write_line_error_info(data) -> Tuple[List[Tuple[str, int, str]], List[str]]:
if data is None:
return [], []

typed_array = _parse_typed_partial_write_array(data)
if typed_array is not None:
return typed_array, _format_partial_write_details(typed_array)

if isinstance(data, list):
details: List[str] = []
for item in data:
if item is None:
continue
raw = json.dumps(item, separators=(',', ':'))
if raw and raw.lower() != "null":
details.append(raw)
return [], details

typed_single = _parse_typed_partial_write_object_or_none(data)
if typed_single is not None:
return [typed_single], _format_partial_write_details([typed_single])

return [], []


# This error is for all write operations
class InfluxDBError(InfluxDB3ClientError):
"""Raised when a server error occurs."""
Expand All @@ -56,10 +157,7 @@ def __init__(self, response: HTTPResponse = None, message: str = None):
super().__init__(self.message)

def _get_message(self, response):
# Body
if response.data:
import json

def get(d, key):
if not key or d is None:
return d
Expand All @@ -80,23 +178,15 @@ def get(d, key):
# "data": [ { "error_message": "...", "line_number": 2, "original_line": "..." }, ... ]
# }
error_text = node.get("error")
data = node.get("data")
if error_text and isinstance(data, list):
details = []
for item in data:
if not isinstance(item, dict):
continue
line_number = item.get("line_number")
error_message = item.get("error_message")
original_line = item.get("original_line")
if line_number is not None and error_message and original_line:
details.append(
f"\tline {line_number}: {error_message} ({original_line})"
)
elif error_message:
details.append(f"\t{error_message}")
if error_text and _is_partial_write_error(error_text):
_, details = _parse_partial_write_line_error_info(node.get("data"))
if details:
return error_text + ":\n" + "\n".join(details)
return error_text + ":\n" + "\n".join(
detail if detail.startswith("\t") else f"\t{detail}"
for detail in details
)
return error_text
if error_text:
return error_text
for key in [['message'], ['data', 'error_message'], ['error']]:
value = get(node, key)
Expand All @@ -119,3 +209,44 @@ def get(d, key):
def getheaders(self):
"""Helper method to make response headers more accessible."""
return self.response.getheaders()


@dataclass(frozen=True)
class InfluxDBPartialWriteLineError:
line_number: int
error_message: str
original_line: str


class InfluxDBPartialWriteError(InfluxDBError):
"""Structured partial-write error with per-line failures."""

def __init__(self, response: HTTPResponse, line_errors: List[InfluxDBPartialWriteLineError]):
super().__init__(response=response)
Comment thread
alespour marked this conversation as resolved.
self.line_errors = line_errors

@classmethod
def from_response(cls, response: HTTPResponse):
if response is None or not response.data:
return None
try:
node = json.loads(response.data)
except Exception:
return None
if not isinstance(node, dict):
return None
error_text = node.get("error")
if not _is_partial_write_error(error_text):
return None
parsed_line_errors, _ = _parse_partial_write_line_error_info(node.get("data"))
if not parsed_line_errors:
return None
line_errors = [
InfluxDBPartialWriteLineError(
line_number=line_number,
error_message=error_message,
original_line=original_line,
)
for error_message, line_number, original_line in parsed_line_errors
]
return cls(response=response, line_errors=line_errors)
Loading
Loading