diff --git a/README.md b/README.md index cbe0c21..9d6562d 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,51 @@ client.write_dataframe( ) ``` +### Accept partial writes and inspect failed lines +`accept_partial` defaults to `True` and allows partial success when a batch contains invalid lines. +On partial failure, the client raises `InfluxDBPartialWriteError` with structured `line_errors`. + +```python +from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3.exceptions import InfluxDBPartialWriteError + +client = InfluxDBClient3(host="http://localhost:8181", token="token", database="db") +lp = "m v=1i 1\nm v=1.2 2" + +try: + client.write(lp) # accept_partial=True by default +except InfluxDBPartialWriteError as e: + for line_err in e.line_errors: + print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})") +``` + +Disable partial writes: +```python +from influxdb_client_3 import WriteOptions, write_client_options + +client = InfluxDBClient3( + host="http://localhost:8181", + token="token", + database="db", + write_client_options=write_client_options( + write_options=WriteOptions(accept_partial=False) + ), +) +``` + +### V2 compatibility mode (Clustered) +Set `use_v2_api=True` to route writes through `/api/v2/write` for Clustered/v2-compatible backends. + +`use_v2_api` can be configured by: +- `WriteOptions(use_v2_api=True)` +- constructor kwarg: `write_use_v2_api=True` +- env var: `INFLUX_WRITE_USE_V2_API=true` + +When `use_v2_api=True`: +- `accept_partial` is ignored by the backend +- `no_sync=True` is invalid and rejected before dispatch with: + `invalid write options: no_sync cannot be used with use_v2_api` + ## Querying ### Querying with SQL diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 888a455..f84e74a 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -29,6 +29,8 @@ INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC" +INFLUX_WRITE_ACCEPT_PARTIAL = "INFLUX_WRITE_ACCEPT_PARTIAL" +INFLUX_WRITE_USE_V2_API = "INFLUX_WRITE_USE_V2_API" INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT" INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT" INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION" @@ -155,19 +157,23 @@ def _parse_gzip_threshold(threshold: str) -> int: return threshold -def _parse_write_no_sync(write_no_sync: str): +def _parse_write_bool(value): """ - Parses and validates the provided write no sync value. + Parses a truthy/falsy value for write options. - This function ensures that the given value is a valid boolean, - and it raises an appropriate error if the value is not valid. + The input is normalized to string and matched against common truthy values. + Any non-truthy value is treated as False. - :param write_no_sync: The input value to be parsed and validated. - :type write_no_sync: Any - :return: The validated write no sync value as an boolean. + :param value: The input value to be parsed and validated. + :type value: Any + :return: Parsed boolean value. :rtype: bool """ - return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes'] + return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes'] + + +def _parse_write_no_sync(write_no_sync: str): + return _parse_write_bool(write_no_sync) def _parse_timeout(to: str) -> int: @@ -233,6 +239,8 @@ def __init__( :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key str query_timeout: int value used to set the client query API timeout in milliseconds. :key str write_timeout: int value used to set the client write API timeout in milliseconds. + :key bool write_accept_partial: allow partial writes when some lines fail. + :key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint. :key list[str] profilers: list of enabled Flux profilers """ self._org = org if org is not None else "default" @@ -243,6 +251,8 @@ def __init__( write_type = DefaultWriteOptions.write_type.value write_precision = DefaultWriteOptions.write_precision.value write_no_sync = DefaultWriteOptions.no_sync.value + write_accept_partial = DefaultWriteOptions.accept_partial.value + write_use_v2_api = DefaultWriteOptions.use_v2_api.value write_timeout = DefaultWriteOptions.timeout.value if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: @@ -250,15 +260,25 @@ def __init__( write_type = getattr(write_opts, 'write_type', write_type) write_precision = getattr(write_opts, 'write_precision', write_precision) write_no_sync = getattr(write_opts, 'no_sync', write_no_sync) + write_accept_partial = getattr(write_opts, 'accept_partial', write_accept_partial) + write_use_v2_api = getattr(write_opts, 'use_v2_api', write_use_v2_api) write_timeout = getattr(write_opts, 'timeout', write_timeout) if kw_keys.__contains__('write_timeout'): write_timeout = kwargs.get('write_timeout') + if kw_keys.__contains__('write_accept_partial'): + write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial')) + + if kw_keys.__contains__('write_use_v2_api'): + write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api')) + write_options = WriteOptions( write_type=write_type, write_precision=write_precision, no_sync=write_no_sync, + accept_partial=write_accept_partial, + use_v2_api=write_use_v2_api, ) self._write_client_options = { @@ -347,7 +367,15 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC) if write_no_sync is not None: - write_options.no_sync = _parse_write_no_sync(write_no_sync) + write_options.no_sync = _parse_write_bool(write_no_sync) + + write_accept_partial = os.getenv(INFLUX_WRITE_ACCEPT_PARTIAL) + if write_accept_partial is not None: + write_options.accept_partial = _parse_write_bool(write_accept_partial) + + write_use_v2_api = os.getenv(INFLUX_WRITE_USE_V2_API) + if write_use_v2_api is not None: + write_options.use_v2_api = _parse_write_bool(write_use_v2_api) precision = os.getenv(INFLUX_PRECISION) if precision is not None: @@ -402,10 +430,7 @@ def write(self, record=None, database=None, **kwargs): if database is None: database = self._database - try: - return self._write_api.write(bucket=database, record=record, **kwargs) - except InfluxDBError as e: - raise e + return self._write_api.write(bucket=database, record=record, **kwargs) def write_dataframe( self, diff --git a/influxdb_client_3/exceptions/__init__.py b/influxdb_client_3/exceptions/__init__.py index d725d03..eec96cb 100644 --- a/influxdb_client_3/exceptions/__init__.py +++ b/influxdb_client_3/exceptions/__init__.py @@ -1,3 +1,4 @@ # flake8: noqa -from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError +from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError, InfluxDBPartialWriteError, \ + InfluxDBPartialWriteLineError diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index d2b4b2d..8bb8e25 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -1,6 +1,9 @@ """Exceptions utils for InfluxDB.""" +import json import logging +from dataclasses import dataclass +from typing import List, Optional, Tuple from urllib3 import HTTPResponse @@ -39,6 +42,104 @@ def __init__(self, error_message, *args, **kwargs): self.message = error_message +def _is_partial_write_error(error_message) -> bool: + if not isinstance(error_message, str) or not error_message: + return False + normalized = error_message.lower() + return ( + "partial write of line protocol occurred" in normalized or + "parsing failed for write_lp endpoint" in normalized + ) + + +def _parse_partial_write_data_item(item) -> Optional[Tuple[str, int, str]]: + if item is None: + return None + if not isinstance(item, dict): + raise ValueError("array item is not an object") + + error_message = item.get("error_message") + if not isinstance(error_message, str): + raise ValueError("error_message must be string") + if not error_message: + return None + + line_number_raw = item.get("line_number") + if line_number_raw is None: + line_number = 0 + elif isinstance(line_number_raw, int): + line_number = line_number_raw + else: + raise ValueError("line_number must be int") + + original_line_raw = item.get("original_line") + if original_line_raw is None: + original_line = "" + elif isinstance(original_line_raw, str): + original_line = original_line_raw + else: + raise ValueError("original_line must be string") + + return error_message, line_number, original_line + + +def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str]]]: + if not isinstance(data, list): + return None + line_errors: List[Tuple[str, int, str]] = [] + try: + for item in data: + parsed = _parse_partial_write_data_item(item) + if parsed is None: + continue + line_errors.append(parsed) + except ValueError: + return None + return line_errors if len(line_errors) > 0 else None + + +def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, str]]: + try: + return _parse_partial_write_data_item(data) + except ValueError: + return None + + +def _format_partial_write_details(line_errors: List[Tuple[str, int, str]]) -> List[str]: + details: List[str] = [] + for error_message, line_number, original_line in line_errors: + if line_number != 0 and original_line != "": + details.append(f"\tline {line_number}: {error_message} ({original_line})") + elif error_message: + details.append(f"\t{error_message}") + return details + + +def _parse_partial_write_line_error_info(data) -> Tuple[List[Tuple[str, int, str]], List[str]]: + if data is None: + return [], [] + + typed_array = _parse_typed_partial_write_array(data) + if typed_array is not None: + return typed_array, _format_partial_write_details(typed_array) + + if isinstance(data, list): + details: List[str] = [] + for item in data: + if item is None: + continue + raw = json.dumps(item, separators=(',', ':')) + if raw and raw.lower() != "null": + details.append(raw) + return [], details + + typed_single = _parse_typed_partial_write_object_or_none(data) + if typed_single is not None: + return [typed_single], _format_partial_write_details([typed_single]) + + return [], [] + + # This error is for all write operations class InfluxDBError(InfluxDB3ClientError): """Raised when a server error occurs.""" @@ -56,10 +157,7 @@ def __init__(self, response: HTTPResponse = None, message: str = None): super().__init__(self.message) def _get_message(self, response): - # Body if response.data: - import json - def get(d, key): if not key or d is None: return d @@ -80,23 +178,15 @@ def get(d, key): # "data": [ { "error_message": "...", "line_number": 2, "original_line": "..." }, ... ] # } error_text = node.get("error") - data = node.get("data") - if error_text and isinstance(data, list): - details = [] - for item in data: - if not isinstance(item, dict): - continue - line_number = item.get("line_number") - error_message = item.get("error_message") - original_line = item.get("original_line") - if line_number is not None and error_message and original_line: - details.append( - f"\tline {line_number}: {error_message} ({original_line})" - ) - elif error_message: - details.append(f"\t{error_message}") + if error_text and _is_partial_write_error(error_text): + _, details = _parse_partial_write_line_error_info(node.get("data")) if details: - return error_text + ":\n" + "\n".join(details) + return error_text + ":\n" + "\n".join( + detail if detail.startswith("\t") else f"\t{detail}" + for detail in details + ) + return error_text + if error_text: return error_text for key in [['message'], ['data', 'error_message'], ['error']]: value = get(node, key) @@ -119,3 +209,44 @@ def get(d, key): def getheaders(self): """Helper method to make response headers more accessible.""" return self.response.getheaders() + + +@dataclass(frozen=True) +class InfluxDBPartialWriteLineError: + line_number: int + error_message: str + original_line: str + + +class InfluxDBPartialWriteError(InfluxDBError): + """Structured partial-write error with per-line failures.""" + + def __init__(self, response: HTTPResponse, line_errors: List[InfluxDBPartialWriteLineError]): + super().__init__(response=response) + self.line_errors = line_errors + + @classmethod + def from_response(cls, response: HTTPResponse): + if response is None or not response.data: + return None + try: + node = json.loads(response.data) + except Exception: + return None + if not isinstance(node, dict): + return None + error_text = node.get("error") + if not _is_partial_write_error(error_text): + return None + parsed_line_errors, _ = _parse_partial_write_line_error_info(node.get("data")) + if not parsed_line_errors: + return None + line_errors = [ + InfluxDBPartialWriteLineError( + line_number=line_number, + error_message=error_message, + original_line=original_line, + ) + for error_message, line_number, original_line in parsed_line_errors + ] + return cls(response=response, line_errors=line_errors) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 250a07e..6f20777 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -29,6 +29,8 @@ DEFAULT_WRITE_NO_SYNC = False DEFAULT_WRITE_TIMEOUT = 10_000 +DEFAULT_WRITE_ACCEPT_PARTIAL = True +DEFAULT_WRITE_USE_V2_API = False # Kwargs consumed during serialization that should not be passed to _post_write SERIALIZER_KWARGS = { @@ -66,6 +68,8 @@ class DefaultWriteOptions(Enum): write_type = WriteType.synchronous write_precision = DEFAULT_WRITE_PRECISION no_sync = DEFAULT_WRITE_NO_SYNC + accept_partial = DEFAULT_WRITE_ACCEPT_PARTIAL + use_v2_api = DEFAULT_WRITE_USE_V2_API timeout = DEFAULT_WRITE_TIMEOUT @@ -84,6 +88,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, write_precision=DEFAULT_WRITE_PRECISION, no_sync=DEFAULT_WRITE_NO_SYNC, tag_order=None, + accept_partial=DEFAULT_WRITE_ACCEPT_PARTIAL, + use_v2_api=DEFAULT_WRITE_USE_V2_API, timeout=DEFAULT_WRITE_TIMEOUT, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ @@ -103,7 +109,9 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called :param write_precision: precision to use when writing points to InfluxDB :param no_sync: skip waiting for WAL persistence on write + :param accept_partial: allow partial writes when some lines fail :param tag_order: optional list of tag names used to prioritize tag serialization order + :param use_v2_api: use /api/v2/write compatibility endpoint :param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000 :param write_scheduler: """ @@ -121,8 +129,14 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.write_precision = write_precision self.timeout = timeout self.no_sync = no_sync + self.accept_partial = accept_partial + self.use_v2_api = use_v2_api self.tag_order = sanitize_tag_order(tag_order) + def validate(self): + if self.use_v2_api and self.no_sync: + raise ValueError("invalid write options: no_sync cannot be used with use_v2_api") + def to_retry_strategy(self, **kwargs): """ Create a Retry strategy from write options. @@ -306,6 +320,14 @@ def __init__(self, # TODO above message has link to Influxdb2 API __NOT__ Influxdb3 API !!! - illustrates different API warnings.warn(message, DeprecationWarning) + def _resolve_write_request_options(self, kwargs): + no_sync = kwargs.pop('no_sync', self._write_options.no_sync) + accept_partial = kwargs.pop('accept_partial', self._write_options.accept_partial) + use_v2_api = kwargs.pop('use_v2_api', self._write_options.use_v2_api) + if use_v2_api and no_sync: + raise ValueError("invalid write options: no_sync cannot be used with use_v2_api") + return no_sync, accept_partial, use_v2_api + def write(self, bucket: str, org: str = None, record: Union[ str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], @@ -385,17 +407,22 @@ def write(self, bucket: str, org: str = None, if write_precision is None: write_precision = self._write_options.write_precision + self._write_options.validate() + kwargs = dict(kwargs) + no_sync, accept_partial, use_v2_api = self._resolve_write_request_options(kwargs) + if 'tag_order' in kwargs: kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order')) else: kwargs['tag_order'] = self._write_options.tag_order if self._write_options.write_type is WriteType.batching: + kwargs['no_sync'] = no_sync + kwargs['accept_partial'] = accept_partial + kwargs['use_v2_api'] = use_v2_api return self._write_batching(bucket, org, record, write_precision, **kwargs) - no_sync = self._write_options.no_sync - payloads = defaultdict(list) self._serialize(record, write_precision, payloads, **kwargs) @@ -403,7 +430,8 @@ def write(self, bucket: str, org: str = None, def write_payload(payload): final_string = b'\n'.join(payload[1]) - return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs) + return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, + accept_partial, use_v2_api, **kwargs) results = list(map(write_payload, payloads.items())) if not _async_req: @@ -583,22 +611,26 @@ def _retry_callback_delegate(exception): else: _retry_callback_delegate = None - no_sync = self._write_options.no_sync + kwargs = dict(kwargs) + no_sync, accept_partial, use_v2_api = self._resolve_write_request_options(kwargs) retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate) self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, - batch_item.key.precision, no_sync, urlopen_kw={'retries': retry}, **kwargs) + batch_item.key.precision, no_sync, accept_partial, use_v2_api, + urlopen_kw={'retries': retry}, **kwargs) logger.debug("Write request finished %s", batch_item) return _BatchResponse(data=batch_item) - def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs): + def _post_write(self, _async_req, bucket, org, body, precision, no_sync, accept_partial, use_v2_api, **kwargs): # Filter out serializer-specific kwargs before passing to _post_write http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision, no_sync=no_sync, + accept_partial=accept_partial, + use_v2_api=use_v2_api, async_req=_async_req, content_type="text/plain; charset=utf-8", **http_kwargs) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index b349b41..4b680b0 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -8,6 +8,7 @@ from influxdb_client_3.write_client.domain.write_precision_converter import WritePrecisionConverter from influxdb_client_3.write_client.rest import ApiException from influxdb_client_3.write_client.service._base_service import _BaseService +from influxdb_client_3.exceptions import InfluxDBPartialWriteError class WriteService(_BaseService): @@ -158,11 +159,16 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, collection_formats={}, urlopen_kw=kwargs.get('urlopen_kw', None)) except ApiException as e: - no_sync = 'no_sync' in local_var_params and local_var_params['no_sync'] - if no_sync and e.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = "Server doesn't support write with no_sync=true " \ - "(supported by InfluxDB 3 Core/Enterprise servers only)." - raise ApiException(status=0, reason=message) + use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] + if not use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = "Server doesn't support v3 write API. Set use_v2_api=True for v2 compatibility endpoint." + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + raise ex + partial = InfluxDBPartialWriteError.from_response(e.response) + if partial is not None: + raise partial raise e async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 @@ -210,7 +216,8 @@ async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D40 def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 local_var_params = dict(locals()) - all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', 'accept', 'org_id', 'precision', 'no_sync'] # noqa: E501 + all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', + 'accept', 'org_id', 'precision', 'no_sync', 'accept_partial', 'use_v2_api'] # noqa: E501 self._check_operation_params('post_write', all_params, local_var_params) # verify the required parameter 'org' is set if ('org' not in local_var_params or @@ -228,26 +235,30 @@ def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D path_params = {} query_params = [] + use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] no_sync = 'no_sync' in local_var_params and local_var_params['no_sync'] + accept_partial = local_var_params['accept_partial'] if 'accept_partial' in local_var_params else True if 'org' in local_var_params: query_params.append(('org', local_var_params['org'])) # noqa: E501 if 'org_id' in local_var_params: query_params.append(('orgID', local_var_params['org_id'])) # noqa: E501 if 'bucket' in local_var_params: - query_params.append(('db' if no_sync else 'bucket', local_var_params['bucket'])) # noqa: E501 - if no_sync: - # Setting no_sync=true is supported only in the v3 API. - path = '/api/v3/write_lp' + query_params.append(('bucket' if use_v2_api else 'db', local_var_params['bucket'])) # noqa: E501 + + if use_v2_api: + path = '/api/v2/write' if 'precision' in local_var_params: precision = local_var_params['precision'] - query_params.append(('precision', WritePrecisionConverter.to_v3_api_string(precision))) # noqa: E501 - query_params.append(('no_sync', 'true')) + query_params.append(('precision', WritePrecisionConverter.to_v2_api_string(precision))) # noqa: E501 else: - # By default, use the v2 API. - path = '/api/v2/write' + path = '/api/v3/write_lp' if 'precision' in local_var_params: precision = local_var_params['precision'] - query_params.append(('precision', WritePrecisionConverter.to_v2_api_string(precision))) # noqa: E501 + query_params.append(('precision', WritePrecisionConverter.to_v3_api_string(precision))) # noqa: E501 + if no_sync: + query_params.append(('no_sync', 'true')) + if accept_partial is False: + query_params.append(('accept_partial', 'false')) header_params = {} if 'zap_trace_span' in local_var_params: diff --git a/tests/test_api_client.py b/tests/test_api_client.py index e5600b5..90c184e 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -9,7 +9,7 @@ from influxdb_client_3.write_client._sync.api_client import ApiClient from influxdb_client_3.write_client.configuration import Configuration -from influxdb_client_3.exceptions import InfluxDBError +from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError from influxdb_client_3.write_client.service import WriteService from influxdb_client_3.version import VERSION @@ -114,7 +114,8 @@ def test_api_error_oss_with_detail(self): 'in line protocol for field \'val\' on line 1"}}') with self.assertRaises(InfluxDBError) as err: self._test_api_error(response_body) - self.assertEqual('invalid field value in line protocol for field \'val\' on line 1', err.exception.message) + self.assertEqual("parsing failed for write_lp endpoint:\n\tinvalid field value in line protocol for field " + "'val' on line 1", err.exception.message) def test_api_error_unknown(self): response_body = '{"detail":"no info"}' @@ -138,6 +139,7 @@ def test_api_error_v3_with_detail(self): "got iox::column_type::field::uinteger (**.DBG.remote_***)\n" "\tline 3: invalid column type for column 'v', expected iox::column_type::field::float, " "got iox::column_type::field::uinteger (***.INF.remote_***)", + True, ), # error_message only (no line_number/original_line) ( @@ -146,6 +148,7 @@ def test_api_error_v3_with_detail(self): '{"error_message":"only error message"}]}', "partial write of line protocol occurred:\n" "\tonly error message", + True, ), # non-dict item in data list is skipped ( @@ -154,25 +157,106 @@ def test_api_error_v3_with_detail(self): '{"error_message":"bad line","line_number":2,"original_line":"bad lp"}]}', "partial write of line protocol occurred:\n" "\tline 2: bad line (bad lp)", + True, ), # details empty -> return error_text ( "no detail fields", '{"error":"partial write of line protocol occurred","data":[{"line_number":2}]}', - "partial write of line protocol occurred", + "partial write of line protocol occurred:\n" + "\t{\"line_number\":2}", + False, + ), + # typed parse fails due line_number type -> raw fallback details + ( + "textual line_number falls back to raw", + '{"error":"partial write of line protocol occurred","data":' + '[{"error_message":"bad line","line_number":"x","original_line":"bad lp"}]}', + "partial write of line protocol occurred:\n" + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}", + False, + ), + # mixed valid + malformed in array -> raw fallback for whole array + ( + "mixed array malformed item falls back to raw", + '{"error":"partial write of line protocol occurred","data":' + '[{"error_message":"bad line","line_number":2,"original_line":"bad lp"},1]}', + "partial write of line protocol occurred:\n" + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n" + "\t1", + False, ), # data is not a dict when resolving fallback keys ( "data not dict for fallback", '{"error":"data not list","data":"oops"}', "data not list", + False, + ), + # typed object with empty message is dropped + ( + "empty error_message in object", + '{"error":"partial write of line protocol occurred","data":' + '{"error_message":"","line_number":2,"original_line":"bad lp"}}', + "partial write of line protocol occurred", + False, + ), + # typed array parse fails, raw fallback skips null item + ( + "raw fallback skips null details", + '{"error":"partial write of line protocol occurred","data":' + '[null,{"error_message":123}]}', + "partial write of line protocol occurred:\n" + "\t{\"error_message\":123}", + False, ), ] - for name, response_body, expected in cases: + for name, response_body, expected, is_partial in cases: with self.subTest(name): with self.assertRaises(InfluxDBError) as err: self._test_api_error(response_body) self.assertEqual(expected, err.exception.message) + if is_partial: + self.assertIsInstance(err.exception, InfluxDBPartialWriteError) + self.assertGreaterEqual(len(err.exception.line_errors), 1) + else: + self.assertNotIsInstance(err.exception, InfluxDBPartialWriteError) + + def test_api_error_v3_parsing_failed_object_returns_partial_error(self): + response_body = ('{"error":"parsing failed for write_lp endpoint","data":' + '{"error_message":"invalid field value","line_number":2,"original_line":"m,t=a f=bad"}}') + with self.assertRaises(InfluxDBPartialWriteError) as err: + self._test_api_error(response_body) + self.assertEqual(1, len(err.exception.line_errors)) + self.assertEqual(2, err.exception.line_errors[0].line_number) + + def test_api_error_v3_partial_write_with_message_only_object_returns_partial_error(self): + response_body = ('{"error":"partial write of line protocol occurred","data":' + '{"error_message":"only error message"}}') + with self.assertRaises(InfluxDBPartialWriteError) as err: + self._test_api_error(response_body) + self.assertEqual(1, len(err.exception.line_errors)) + self.assertEqual(0, err.exception.line_errors[0].line_number) + self.assertEqual("", err.exception.line_errors[0].original_line) + + def test_partial_write_from_response_guards(self): + self.assertIsNone(InfluxDBPartialWriteError.from_response(None)) + + empty_body = response.HTTPResponse(status=400, reason="Bad Request", body=b"") + self.assertIsNone(InfluxDBPartialWriteError.from_response(empty_body)) + + invalid_json = response.HTTPResponse(status=400, reason="Bad Request", body=b"{") + self.assertIsNone(InfluxDBPartialWriteError.from_response(invalid_json)) + + non_dict_json = response.HTTPResponse(status=400, reason="Bad Request", body=b"[]") + self.assertIsNone(InfluxDBPartialWriteError.from_response(non_dict_json)) + + object_without_typed_line_error = response.HTTPResponse( + status=400, + reason="Bad Request", + body=b'{"error":"partial write of line protocol occurred","data":{"error_message":123}}', + ) + self.assertIsNone(InfluxDBPartialWriteError.from_response(object_without_typed_line_error)) def test_api_error_headers(self): body = '{"error": "test error"}' diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index a37339f..9e38b40 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -123,6 +123,10 @@ def test_default_write_options(self): self.assertEqual(DefaultWriteOptions.write_type.value, client._write_client_options["write_options"].write_type) self.assertEqual(DefaultWriteOptions.no_sync.value, client._write_client_options["write_options"].no_sync) + self.assertEqual(DefaultWriteOptions.accept_partial.value, + client._write_client_options["write_options"].accept_partial) + self.assertEqual(DefaultWriteOptions.use_v2_api.value, + client._write_client_options["write_options"].use_v2_api) self.assertEqual(DefaultWriteOptions.write_precision.value, client._write_client_options["write_options"].write_precision) self.assertEqual(DefaultWriteOptions.timeout.value, client._write_client_options["write_options"].timeout) @@ -215,6 +219,8 @@ def test_default_client(self): expected_precision = DefaultWriteOptions.write_precision.value expected_write_type = DefaultWriteOptions.write_type.value expected_no_sync = DefaultWriteOptions.no_sync.value + expected_accept_partial = DefaultWriteOptions.accept_partial.value + expected_use_v2_api = DefaultWriteOptions.use_v2_api.value import os try: @@ -237,11 +243,15 @@ def verify_client_write_options(c): self.assertEqual(write_options.write_precision, expected_precision) self.assertEqual(write_options.write_type, expected_write_type) self.assertEqual(write_options.no_sync, expected_no_sync) + self.assertEqual(write_options.accept_partial, expected_accept_partial) + self.assertEqual(write_options.use_v2_api, expected_use_v2_api) self.assertEqual(write_options.tag_order, []) self.assertEqual(c._write_api._write_options.write_precision, expected_precision) self.assertEqual(c._write_api._write_options.write_type, expected_write_type) self.assertEqual(c._write_api._write_options.no_sync, expected_no_sync) + self.assertEqual(c._write_api._write_options.accept_partial, expected_accept_partial) + self.assertEqual(c._write_api._write_options.use_v2_api, expected_use_v2_api) self.assertEqual(c._write_api._write_options.tag_order, []) env_client = InfluxDBClient3.from_env() @@ -254,6 +264,7 @@ def verify_client_write_options(c): 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme', 'INFLUX_GZIP_THRESHOLD': '2000', 'INFLUX_WRITE_NO_SYNC': 'true', + 'INFLUX_WRITE_ACCEPT_PARTIAL': 'false', 'INFLUX_WRITE_USE_V2_API': 'true', 'INFLUX_WRITE_TIMEOUT': '1234', 'INFLUX_QUERY_TIMEOUT': '5678'}) def test_from_env_all_env_vars_set(self): client = InfluxDBClient3.from_env() @@ -268,6 +279,8 @@ def test_from_env_all_env_vars_set(self): write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.write_precision, WritePrecision.MS) self.assertEqual(write_options.no_sync, True) + self.assertEqual(write_options.accept_partial, False) + self.assertEqual(write_options.use_v2_api, True) self.assertEqual(1234, write_options.timeout) self.assertEqual(5.678, client._query_api._default_timeout) @@ -340,6 +353,22 @@ def test_parse_write_no_sync_anything_else_is_false(self): write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.no_sync, False) + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_ACCEPT_PARTIAL': 'false'}) + def test_parse_write_accept_partial_false(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + write_options = client._write_client_options.get("write_options") + self.assertEqual(write_options.accept_partial, False) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_USE_V2_API': 'true'}) + def test_parse_write_use_v2_api_true(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + write_options = client._write_client_options.get("write_options") + self.assertEqual(write_options.use_v2_api, True) + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_TIMEOUT': '6789'}) def test_parse_valid_write_timeout(self): diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index df700ee..e39b88a 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -12,8 +12,7 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ WriteType, InfluxDB3ClientQueryError -from influxdb_client_3.write_client.rest import ApiException -from influxdb_client_3.exceptions import InfluxDBError +from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError from tests.util import asyncio_run, lp_to_py_object @@ -136,19 +135,12 @@ def test_v3_error(self): host=self.host, database=self.database, token=self.token, - write_client_options=write_client_options( - write_options=WriteOptions( - write_type=WriteType.synchronous, - no_sync=True - ) - ) + write_client_options=write_client_options(write_options=WriteOptions(write_type=WriteType.synchronous)) ) as client: try: client.write(lp) self.fail("Expected InfluxDBError from invalid line protocol.") - except ApiException as err: - if "Server doesn't support write with no_sync=true" in str(err): - self.skipTest("no_sync not supported by this server.") + except InfluxDBPartialWriteError as err: msg = err.message self.assertIn("partial write of line protocol occurred", msg) self.assertIn(( diff --git a/tests/test_polars.py b/tests/test_polars.py index 44350e1..4272546 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -207,6 +207,8 @@ def test_write_polars_batching(self): bucket=ANY, precision=ANY, no_sync=ANY, + accept_partial=ANY, + use_v2_api=ANY, async_req=ANY, content_type=ANY, urlopen_kw=ANY, diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index ab71ff2..93391eb 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -38,8 +38,8 @@ def test_write_default_params(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v2/write", - query_string={"org": "ORG", "bucket": "DB", "precision": "ns"})) + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "nanosecond"})) def test_write_with_write_options(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -56,8 +56,8 @@ def test_write_with_write_options(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v2/write", - query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "microsecond"})) def test_write_with_no_sync_true(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -77,23 +77,88 @@ def test_write_with_no_sync_true(self, httpserver: HTTPServer): method="POST", uri="/api/v3/write_lp", query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"})) - def test_write_with_no_sync_true_on_v2_server(self, httpserver: HTTPServer): - self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) + def test_write_with_no_sync_true_in_kwargs(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) - client = InfluxDBClient3( + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.US) + ) + ).write(self.SAMPLE_RECORD, no_sync=True) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"})) + + def test_write_with_accept_partial_false(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) + + InfluxDBClient3( host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", write_client_options=write_client_options( write_options=WriteOptions( write_type=WriteType.synchronous, - no_sync=True))) + accept_partial=False + ) + ) + ).write(self.SAMPLE_RECORD) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "nanosecond", "accept_partial": "false"})) + + def test_write_with_use_v2_api_true(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) + + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions( + write_type=WriteType.synchronous, + write_precision=WritePrecision.US, + use_v2_api=True, + accept_partial=False + ) + ) + ).write(self.SAMPLE_RECORD) - with pytest.raises(ApiException, match=r".*Server doesn't support write with no_sync=true " - r"\(supported by InfluxDB 3 Core/Enterprise servers only\)."): + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + + def test_write_with_use_v2_api_true_in_kwargs(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) + + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.US) + ) + ).write(self.SAMPLE_RECORD, use_v2_api=True, accept_partial=False) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + + def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): + self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) + + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN") + + expected = ("Server doesn't support v3 write API. " + "Set use_v2_api=True for v2 compatibility endpoint.") + with pytest.raises(ApiException, match=r".*Server doesn't support v3 write API\. " + r"Set use_v2_api=True for v2 compatibility endpoint\.") as err: client.write(self.SAMPLE_RECORD) + assert err.value.message == expected + assert err.value.reason == expected + assert err.value.args == (expected,) self.assert_request_made(httpserver, RequestMatcher( method="POST", uri="/api/v3/write_lp", - query_string={"org": "ORG", "db": "DB", "precision": "nanosecond", "no_sync": "true"})) + query_string={"org": "ORG", "db": "DB", "precision": "nanosecond"})) def test_write_with_no_sync_false_and_gzip(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -111,8 +176,8 @@ def test_write_with_no_sync_false_and_gzip(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v2/write", - query_string={"org": "ORG", "bucket": "DB", "precision": "us"}, + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "microsecond"}, headers={"Content-Encoding": "gzip"}, )) def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer): @@ -135,6 +200,31 @@ def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer): query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"}, headers={"Content-Encoding": "gzip"}, )) + def test_write_invalid_use_v2_api_and_no_sync(self, httpserver: HTTPServer): + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions( + write_type=WriteType.synchronous, + use_v2_api=True, + no_sync=True + ) + ) + ) + with pytest.raises(ValueError, match=r".*invalid write options: no_sync cannot be used with use_v2_api.*"): + client.write(self.SAMPLE_RECORD) + + def test_write_invalid_use_v2_api_and_no_sync_in_kwargs(self, httpserver: HTTPServer): + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous) + ) + ) + + with pytest.raises(ValueError, match=r".*invalid write options: no_sync cannot be used with use_v2_api.*"): + client.write(self.SAMPLE_RECORD, use_v2_api=True, no_sync=True) + def test_write_with_timeout_in_write_options(self, httpserver: HTTPServer): self.delay_response(httpserver, 0.5)