Skip to content

Commit 0ac72f5

Browse files
authored
fix(reset-pagination): Fix split using cursor (#845)
1 parent 8764296 commit 0ac72f5

File tree

3 files changed

+36
-11
lines changed

3 files changed

+36
-11
lines changed

airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
from typing import Optional
24

3-
from airbyte_cdk.sources.declarative.models import FailureType
5+
from airbyte_cdk.models import FailureType
46
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
57
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
68
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -44,10 +46,21 @@ def has_reached_limit(self) -> bool:
4446
def _reset(self) -> None:
4547
self._record_count = 0
4648

47-
def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
48-
new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
49+
def reduce_slice_range_if_possible(
50+
self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice
51+
) -> StreamSlice:
52+
"""
53+
:param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
54+
:param original_stream_slice: The original stream slice before any reduction
55+
:return: Reduced stream slice
56+
"""
57+
new_slice = (
58+
self._cursor.reduce_slice_range(original_stream_slice)
59+
if self._cursor
60+
else previous_stream_slice
61+
)
4962

50-
if new_slice == stream_slice:
63+
if new_slice == previous_stream_slice:
5164
self._number_of_attempt_with_same_slice += 1
5265
if (
5366
self._number_of_attempt_with_same_slice

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ def _read_pages(
345345
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
346346
stream_slice: StreamSlice,
347347
) -> Iterable[Record]:
348+
original_stream_slice = stream_slice
348349
pagination_tracker = self.pagination_tracker_factory()
349350
reset_pagination = False
350351
next_page_token = self._get_initial_next_page_token()
@@ -409,7 +410,9 @@ def _read_pages(
409410
if reset_pagination or pagination_tracker.has_reached_limit():
410411
next_page_token = self._get_initial_next_page_token()
411412
previous_slice = stream_slice
412-
stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice)
413+
stream_slice = pagination_tracker.reduce_slice_range_if_possible(
414+
stream_slice, original_stream_slice
415+
)
413416
LOGGER.info(
414417
f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}"
415418
)

unit_tests/sources/declarative/retrievers/test_pagination_tracker.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_given_reduce_slice_before_limit_reached_when_has_reached_limit_return_t
4848
tracker = PaginationTracker(max_number_of_records=2)
4949

5050
tracker.observe(_A_RECORD)
51-
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE)
51+
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, _A_STREAM_SLICE)
5252
tracker.observe(_A_RECORD)
5353

5454
assert not tracker.has_reached_limit()
@@ -57,31 +57,40 @@ def test_given_no_cursor_when_reduce_slice_range_then_return_same_slice(self):
5757
tracker = PaginationTracker()
5858
original_slice = StreamSlice(partition={}, cursor_slice={})
5959

60-
result_slice = tracker.reduce_slice_range_if_possible(original_slice)
60+
result_slice = tracker.reduce_slice_range_if_possible(original_slice, original_slice)
6161

6262
assert result_slice == original_slice
6363

6464
def test_given_no_cursor_when_reduce_slice_range_multiple_times_then_raise(self):
6565
tracker = PaginationTracker()
6666
original_slice = StreamSlice(partition={}, cursor_slice={})
6767

68-
tracker.reduce_slice_range_if_possible(original_slice)
68+
tracker.reduce_slice_range_if_possible(original_slice, original_slice)
6969
with pytest.raises(AirbyteTracedException):
70-
tracker.reduce_slice_range_if_possible(original_slice)
70+
tracker.reduce_slice_range_if_possible(original_slice, original_slice)
7171

7272
def test_given_cursor_when_reduce_slice_range_then_return_cursor_stream_slice(self):
7373
tracker = PaginationTracker(cursor=self._cursor)
7474
self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE
7575

7676
new_slice = tracker.reduce_slice_range_if_possible(
77-
StreamSlice(partition={}, cursor_slice={})
77+
StreamSlice(partition={}, cursor_slice={}), StreamSlice(partition={}, cursor_slice={})
7878
)
7979

8080
assert new_slice == _A_STREAM_SLICE
8181

8282
def test_given_cursor_cant_reduce_slice_when_reduce_slice_range_then_raise(self):
8383
tracker = PaginationTracker(cursor=self._cursor)
84+
original_slice = StreamSlice(partition={}, cursor_slice={})
8485
self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE
8586

8687
with pytest.raises(AirbyteTracedException):
87-
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE)
88+
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice)
89+
90+
def test_cursor_called_with_original_slice_when_reduce_slice_range(self):
91+
tracker = PaginationTracker(cursor=self._cursor)
92+
original_slice = StreamSlice(partition={}, cursor_slice={})
93+
94+
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice)
95+
96+
self._cursor.reduce_slice_range.assert_called_once_with(original_slice)

0 commit comments

Comments
 (0)