Skip to content

Commit 0f853f5

Browse files
authored
feat(s3): add to_deltalake_streaming for single-commit Delta writes (#3231)
* feat(s3): add to_deltalake_streaming for single-commit Delta writes * formatting * remove docstrings * fix: not supported * fix: import sort
1 parent 5b78a89 commit 0f853f5

File tree

3 files changed

+269
-3
lines changed

3 files changed

+269
-3
lines changed

awswrangler/s3/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from awswrangler.s3._select import select_query
1414
from awswrangler.s3._upload import upload
1515
from awswrangler.s3._wait import wait_objects_exist, wait_objects_not_exist
16-
from awswrangler.s3._write_deltalake import to_deltalake
16+
from awswrangler.s3._write_deltalake import to_deltalake, to_deltalake_streaming
1717
from awswrangler.s3._write_excel import to_excel
1818
from awswrangler.s3._write_orc import to_orc
1919
from awswrangler.s3._write_parquet import store_parquet_metadata, to_parquet
@@ -49,6 +49,7 @@
4949
"to_csv",
5050
"to_json",
5151
"to_deltalake",
52+
"to_deltalake_streaming",
5253
"to_excel",
5354
"read_excel",
5455
"download",

awswrangler/s3/_write_deltalake.py

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from __future__ import annotations
44

5-
from typing import TYPE_CHECKING, Any, Literal
5+
from typing import TYPE_CHECKING, Any, Iterable, Iterator, Literal
66

77
import boto3
88
import pandas as pd
@@ -30,6 +30,7 @@ def _set_default_storage_options_kwargs(
3030
defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()}
3131
defaults["AWS_REGION"] = defaults.pop("REGION_NAME")
3232
defaults["AWS_SESSION_TOKEN"] = "" if defaults["AWS_SESSION_TOKEN"] is None else defaults["AWS_SESSION_TOKEN"]
33+
3334
s3_additional_kwargs = s3_additional_kwargs or {}
3435

3536
s3_lock_arguments = {}
@@ -133,3 +134,93 @@ def to_deltalake(
133134
schema_mode=schema_mode,
134135
storage_options=storage_options,
135136
)
137+
138+
139+
def _df_iter_to_record_batch_reader(
140+
df_iter: Iterable[pd.DataFrame],
141+
*,
142+
index: bool,
143+
dtype: dict[str, str],
144+
target_schema: pa.Schema | None = None,
145+
batch_size: int | None = None,
146+
) -> tuple[pa.RecordBatchReader, pa.Schema]:
147+
it = iter(df_iter)
148+
149+
first_df: pd.DataFrame | None = None
150+
for df in it:
151+
if not df.empty:
152+
first_df = df
153+
break
154+
155+
if first_df is None:
156+
empty_schema = pa.schema([])
157+
empty_reader = pa.RecordBatchReader.from_batches(empty_schema, [])
158+
return empty_reader, empty_schema
159+
160+
schema = target_schema or _data_types.pyarrow_schema_from_pandas(
161+
df=first_df, index=index, ignore_cols=None, dtype=dtype
162+
)
163+
164+
def batches() -> Iterator[pa.RecordBatch]:
165+
first_tbl: pa.Table = _df_to_table(first_df, schema, index, dtype)
166+
for b in first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches():
167+
yield b
168+
169+
for df in it:
170+
if df.empty:
171+
continue
172+
tbl: pa.Table = _df_to_table(df, schema, index, dtype)
173+
for b in tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches():
174+
yield b
175+
176+
reader = pa.RecordBatchReader.from_batches(schema, batches())
177+
return reader, schema
178+
179+
180+
@_utils.check_optional_dependency(deltalake, "deltalake")
181+
@Experimental
182+
def to_deltalake_streaming(
183+
*,
184+
dfs: Iterable[pd.DataFrame],
185+
path: str,
186+
index: bool = False,
187+
mode: Literal["error", "append", "overwrite", "ignore"] = "append",
188+
dtype: dict[str, str] | None = None,
189+
partition_cols: list[str] | None = None,
190+
schema_mode: Literal["overwrite", "merge"] | None = None,
191+
lock_dynamodb_table: str | None = None,
192+
s3_allow_unsafe_rename: bool = False,
193+
boto3_session: boto3.Session | None = None,
194+
s3_additional_kwargs: dict[str, str] | None = None,
195+
batch_size: int | None = None,
196+
target_file_size: int | None = None,
197+
) -> None:
198+
dtype = dtype or {}
199+
200+
storage_options = _set_default_storage_options_kwargs(
201+
boto3_session=boto3_session,
202+
s3_additional_kwargs=s3_additional_kwargs,
203+
s3_allow_unsafe_rename=s3_allow_unsafe_rename,
204+
lock_dynamodb_table=lock_dynamodb_table,
205+
)
206+
207+
reader, schema = _df_iter_to_record_batch_reader(
208+
df_iter=dfs,
209+
index=index,
210+
dtype=dtype,
211+
target_schema=None,
212+
batch_size=batch_size,
213+
)
214+
215+
if len(schema) == 0:
216+
return
217+
218+
deltalake.write_deltalake(
219+
table_or_uri=path,
220+
data=reader,
221+
partition_by=partition_cols,
222+
mode=mode,
223+
schema_mode=schema_mode,
224+
storage_options=storage_options,
225+
target_file_size=target_file_size,
226+
)

tests/unit/test_s3_deltalake.py

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,36 @@
11
from __future__ import annotations
22

3-
from typing import Any, Iterator
3+
from typing import Any, Iterable, Iterator
44

55
import boto3
6+
import pyarrow as pa
67
import pytest
8+
from pandas.testing import assert_frame_equal
79

810
import awswrangler as wr
911
import awswrangler.pandas as pd
12+
from awswrangler.s3._write_deltalake import _df_iter_to_record_batch_reader
1013

1114
from .._utils import (
1215
get_time_str_with_random_suffix,
1316
)
1417

1518

19+
def assert_df_equal_unordered(left: pd.DataFrame, right: pd.DataFrame, by: list[str]) -> None:
20+
"""Compare two dataframes ignoring row order and dtypes."""
21+
l2 = left.sort_values(by).reset_index(drop=True)
22+
r2 = right.sort_values(by).reset_index(drop=True)
23+
24+
assert_frame_equal(l2, r2, check_dtype=False, check_like=True)
25+
26+
1627
@pytest.fixture(scope="session")
1728
def lock_dynamodb_table() -> Iterator[str]:
1829
name = f"deltalake_lock_{get_time_str_with_random_suffix()}"
1930
print(f"Table name: {name}")
2031

2132
dynamodb_client = boto3.client("dynamodb")
33+
2234
dynamodb_client.create_table(
2335
TableName=name,
2436
BillingMode="PAY_PER_REQUEST",
@@ -94,3 +106,165 @@ def test_read_deltalake_partitions(path: str, lock_settings: dict[str, Any]) ->
94106

95107
df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")])
96108
assert df2.shape == (1, 1)
109+
110+
111+
@pytest.mark.parametrize("chunksize", [2, 10])
112+
def test_to_deltalake_streaming_single_commit_overwrite(
113+
path: str,
114+
lock_settings: dict[str, Any],
115+
chunksize: int,
116+
) -> None:
117+
df1 = pd.DataFrame({"c0": [1, 1], "c1": [10, 11], "v": [100, 200]})
118+
df2 = pd.DataFrame({"c0": [2, 2], "c1": [12, 13], "v": [300, 400]})
119+
120+
def dfs() -> Iterable[pd.DataFrame]:
121+
yield df1
122+
yield df2
123+
124+
wr.s3.to_deltalake_streaming(
125+
dfs=dfs(),
126+
path=path,
127+
mode="overwrite",
128+
partition_cols=["c0", "c1"],
129+
**lock_settings,
130+
)
131+
132+
out = wr.s3.read_deltalake(path=path)
133+
134+
expected = pd.concat([df1, df2], ignore_index=True)
135+
assert_df_equal_unordered(expected, out, by=["c0", "c1", "v"])
136+
137+
138+
def test_to_deltalake_streaming_creates_one_version_per_run(
139+
path: str,
140+
lock_settings: dict[str, Any],
141+
) -> None:
142+
df_run1_a = pd.DataFrame({"c0": [1], "c1": [10], "v": [111]})
143+
df_run1_b = pd.DataFrame({"c0": [1], "c1": [11], "v": [112]})
144+
145+
wr.s3.to_deltalake_streaming(
146+
dfs=[df_run1_a, df_run1_b],
147+
path=path,
148+
mode="overwrite",
149+
partition_cols=["c0", "c1"],
150+
**lock_settings,
151+
)
152+
153+
run1_expected = pd.concat([df_run1_a, df_run1_b], ignore_index=True)
154+
latest_v0 = wr.s3.read_deltalake(path=path)
155+
assert_df_equal_unordered(run1_expected, latest_v0, by=["c0", "c1", "v"])
156+
157+
df_run2_a = pd.DataFrame({"c0": [2], "c1": [12], "v": [221]})
158+
df_run2_b = pd.DataFrame({"c0": [2], "c1": [13], "v": [222]})
159+
160+
wr.s3.to_deltalake_streaming(
161+
dfs=[df_run2_a, df_run2_b],
162+
path=path,
163+
mode="overwrite",
164+
partition_cols=["c0", "c1"],
165+
**lock_settings,
166+
)
167+
168+
v0 = wr.s3.read_deltalake(path=path, version=0)
169+
v1 = wr.s3.read_deltalake(path=path, version=1)
170+
run2_expected = pd.concat([df_run2_a, df_run2_b], ignore_index=True)
171+
172+
assert_df_equal_unordered(run1_expected, v0, by=["c0", "c1", "v"])
173+
assert_df_equal_unordered(run2_expected, v1, by=["c0", "c1", "v"])
174+
175+
176+
def test_to_deltalake_streaming_partitions_and_filters(
177+
path: str,
178+
lock_settings: dict[str, Any],
179+
) -> None:
180+
df1 = pd.DataFrame({"c0": [1, 1, 2], "c1": [10, 11, 12], "v": [1, 2, 3]})
181+
df2 = pd.DataFrame({"c0": [2, 3, 3], "c1": [13, 14, 15], "v": [4, 5, 6]})
182+
183+
wr.s3.to_deltalake_streaming(
184+
dfs=[df1, df2],
185+
path=path,
186+
mode="overwrite",
187+
partition_cols=["c0", "c1"],
188+
**lock_settings,
189+
)
190+
191+
only_c02 = wr.s3.read_deltalake(
192+
path=path,
193+
partitions=[("c0", "=", "2")],
194+
columns=["v", "c1"],
195+
)
196+
assert set(only_c02["c1"].tolist()) == {12, 13}
197+
assert sorted(only_c02["v"].tolist()) == [3, 4]
198+
199+
200+
def test_to_deltalake_streaming_empty_iterator_is_noop(
201+
path: str,
202+
lock_settings: dict[str, Any],
203+
) -> None:
204+
wr.s3.to_deltalake_streaming(
205+
dfs=[pd.DataFrame({"c0": [1], "c1": [1], "v": [1]})],
206+
path=path,
207+
mode="overwrite",
208+
partition_cols=["c0", "c1"],
209+
**lock_settings,
210+
)
211+
baseline = wr.s3.read_deltalake(path=path)
212+
213+
def empty() -> Iterator[pd.DataFrame]:
214+
if False:
215+
yield pd.DataFrame() # pragma: no cover
216+
217+
wr.s3.to_deltalake_streaming(
218+
dfs=empty(),
219+
path=path,
220+
mode="overwrite",
221+
partition_cols=["c0", "c1"],
222+
**lock_settings,
223+
)
224+
after = wr.s3.read_deltalake(path=path)
225+
assert after.equals(baseline)
226+
227+
228+
def test_df_iter_to_record_batch_reader_schema_and_rows() -> None:
229+
df_empty = pd.DataFrame({"a": [], "b": []})
230+
df1 = pd.DataFrame({"a": [1, 2], "b": ["x", "y"]})
231+
df2 = pd.DataFrame({"a": [3], "b": ["z"]})
232+
233+
reader, schema = _df_iter_to_record_batch_reader(
234+
df_iter=[df_empty, df1, df2],
235+
index=False,
236+
dtype={},
237+
target_schema=None,
238+
batch_size=None,
239+
)
240+
241+
assert isinstance(schema, pa.Schema)
242+
assert {f.name for f in schema} == {"a", "b"}
243+
244+
table: pa.Table = reader.read_all()
245+
pdf = table.to_pandas()
246+
assert len(pdf) == 3
247+
assert sorted(pdf["a"].tolist()) == [1, 2, 3]
248+
assert set(pdf["b"].tolist()) == {"x", "y", "z"}
249+
250+
251+
def test_df_iter_to_record_batch_reader_respects_batch_size() -> None:
252+
df1 = pd.DataFrame({"a": list(range(5)), "b": ["x"] * 5})
253+
df2 = pd.DataFrame({"a": list(range(5, 9)), "b": ["y"] * 4})
254+
255+
reader, _ = _df_iter_to_record_batch_reader(
256+
df_iter=[df1, df2],
257+
index=False,
258+
dtype={},
259+
target_schema=None,
260+
batch_size=3,
261+
)
262+
263+
batch_count = 0
264+
row_count = 0
265+
for batch in reader:
266+
batch_count += 1
267+
row_count += batch.num_rows
268+
269+
assert batch_count >= 3
270+
assert row_count == 9

0 commit comments

Comments
 (0)