Skip to content

Commit 58003dd

Browse files
authored
chore: Clean up unit tests (#2099)
* chore: Clean up unit tests
1 parent c610b6f commit 58003dd

File tree

12 files changed

+92
-190
lines changed

12 files changed

+92
-190
lines changed

awswrangler/athena/_read.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,7 +1357,7 @@ def read_sql_table(
13571357
- `Global Configurations <https://aws-sdk-pandas.readthedocs.io/en/3.0.0rc2/
13581358
tutorials/021%20-%20Global%20Configurations.html>`_
13591359
1360-
**There are two approaches to be defined through ctas_approach parameter:**
1360+
**There are three approaches available through ctas_approach and unload_approach parameters:**
13611361
13621362
**1** - ctas_approach=True (Default):
13631363
@@ -1375,8 +1375,26 @@ def read_sql_table(
13751375
- Does not support columns with repeated names.
13761376
- Does not support columns with undefined data types.
13771377
- A temporary table will be created and then deleted immediately.
1378+
- Does not support custom data_source/catalog_id.
1379+
1380+
**2** - unload_approach=True and ctas_approach=False:
1381+
1382+
Does an UNLOAD query on Athena and parse the Parquet result on s3.
1383+
1384+
PROS:
1385+
1386+
- Faster for mid and big result sizes.
1387+
- Can handle some level of nested types.
1388+
- Does not modify Glue Data Catalog
1389+
1390+
CONS:
13781391
1379-
**2** - ctas_approach=False:
1392+
- Output S3 path must be empty.
1393+
- Does not support timestamp with time zone.
1394+
- Does not support columns with repeated names.
1395+
- Does not support columns with undefined data types.
1396+
1397+
**3** - ctas_approach=False:
13801398
13811399
Does a regular query on Athena and parse the regular CSV result on s3.
13821400
@@ -1385,6 +1403,7 @@ def read_sql_table(
13851403
- Faster for small result sizes (less latency).
13861404
- Does not require create/delete table permissions on Glue
13871405
- Supports timestamp with time zone.
1406+
- Support custom data_source/catalog_id.
13881407
13891408
CONS:
13901409
@@ -1423,16 +1442,15 @@ def read_sql_table(
14231442
14241443
There are two batching strategies:
14251444
1426-
- If **chunksize=True**, depending on the size of the data, one or more data frames will be
1427-
returned per each file in the query result.
1428-
Unlike **chunksize=INTEGER**, rows from different files will not be mixed in the resulting data frames.
1445+
- If **chunksize=True**, depending on the size of the data, one or more data frames are returned per file in the query result.
1446+
Unlike **chunksize=INTEGER**, rows from different files are not mixed in the resulting data frames.
14291447
1430-
- If **chunksize=INTEGER**, awswrangler will iterate on the data by number of rows egual the received INTEGER.
1448+
- If **chunksize=INTEGER**, awswrangler iterates on the data by number of rows equal to the received INTEGER.
14311449
14321450
`P.S.` `chunksize=True` is faster and uses less memory while `chunksize=INTEGER` is more precise
1433-
in number of rows for each Dataframe.
1451+
in number of rows for each data frame.
14341452
1435-
`P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an interador with a
1453+
`P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an iterator with a
14361454
single DataFrame because regular Athena queries only produces a single output file.
14371455
14381456
Note
@@ -1473,20 +1491,6 @@ def read_sql_table(
14731491
For SSE-KMS, this is the KMS key ARN or ID.
14741492
keep_files : bool
14751493
Should awswrangler delete or keep the staging files produced by Athena?
1476-
ctas_database : str, optional
1477-
The name of the alternative database where the CTAS temporary table is stored.
1478-
If None, the default `database` is used.
1479-
ctas_temp_table_name : str, optional
1480-
The name of the temporary table and also the directory name on S3 where the CTAS result is stored.
1481-
If None, it will use the follow random pattern: `f"temp_table_{uuid.uuid4().hex}"`.
1482-
On S3 this directory will be under under the pattern: `f"{s3_output}/{ctas_temp_table_name}/"`.
1483-
ctas_bucketing_info: Tuple[List[str], int], optional
1484-
Tuple consisting of the column names used for bucketing as the first element and the number of buckets as the
1485-
second element.
1486-
Only `str`, `int` and `bool` are supported as column data types for bucketing.
1487-
ctas_write_compression: str, optional
1488-
Write compression for the temporary table where the CTAS result is stored.
1489-
Corresponds to the `write_compression` parameters for CREATE TABLE AS statement in Athena.
14901494
use_threads : bool, int
14911495
True to enable concurrent requests, False to disable multiple threads.
14921496
If enabled os.cpu_count() will be used as the max number of threads.

awswrangler/distributed/ray/modin/s3/_write_text.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def _to_text_distributed( # pylint: disable=unused-argument
101101
# Create Ray Dataset
102102
ds = _ray_dataset_from_df(df)
103103

104-
# Repartition into a single block if or writing into a single key or if bucketing is enabled
104+
# Repartition into a single block if writing into a single key or if bucketing is enabled
105105
if ds.count() > 0 and path:
106106
ds = ds.repartition(1)
107107
_logger.warning(

tests/load/test_s3.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ def _modin_repartition(df: pd.DataFrame, num_blocks: int) -> pd.DataFrame:
2020
return dataset.to_modin()
2121

2222

23-
@pytest.mark.repeat(1)
2423
@pytest.mark.parametrize("benchmark_time", [150])
2524
def test_s3_select(benchmark_time: float, request: pytest.FixtureRequest) -> None:
2625
paths = [f"s3://ursa-labs-taxi-data/2018/{i}/data.parquet" for i in range(10, 13)]
@@ -96,7 +95,7 @@ def test_s3_read_parquet_many_files(
9695
assert timer.elapsed_time < benchmark_time
9796

9897

99-
@pytest.mark.parametrize("benchmark_time", [30])
98+
@pytest.mark.parametrize("benchmark_time", [40])
10099
def test_s3_read_parquet_partition_filter(benchmark_time: float, request: pytest.FixtureRequest) -> None:
101100
path = "s3://amazon-reviews-pds/parquet/"
102101
with ExecutionTimer(request, data_paths=path) as timer:
@@ -167,18 +166,15 @@ def test_s3_write_parquet_blocks(
167166

168167
@pytest.mark.parametrize("benchmark_time", [5])
169168
def test_s3_delete_objects(path: str, path2: str, benchmark_time: float, request: pytest.FixtureRequest) -> None:
170-
df = pd.DataFrame({"id": [1, 2, 3]})
171-
objects_per_bucket = 505
172-
paths1 = [f"{path}delete-test{i}.json" for i in range(objects_per_bucket)]
173-
paths2 = [f"{path2}delete-test{i}.json" for i in range(objects_per_bucket)]
169+
df = pd.DataFrame({"id": range(0, 505)})
170+
paths1 = wr.s3.to_parquet(df=df, path=path, max_rows_by_file=1)["paths"]
171+
paths2 = wr.s3.to_parquet(df=df, path=path2, max_rows_by_file=1)["paths"]
174172
paths = paths1 + paths2
175-
for path in paths:
176-
wr.s3.to_csv(df, path)
177173
with ExecutionTimer(request) as timer:
178174
wr.s3.delete_objects(path=paths)
179175
assert timer.elapsed_time < benchmark_time
180-
assert len(wr.s3.list_objects(f"{path}delete-test*")) == 0
181-
assert len(wr.s3.list_objects(f"{path2}delete-test*")) == 0
176+
assert len(wr.s3.list_objects(path)) == 0
177+
assert len(wr.s3.list_objects(path2)) == 0
182178

183179

184180
@pytest.mark.parametrize("benchmark_time", [20])
@@ -226,16 +222,12 @@ def test_s3_write_json(
226222
@pytest.mark.timeout(300)
227223
@pytest.mark.parametrize("benchmark_time", [15])
228224
def test_wait_object_exists(path: str, benchmark_time: int, request: pytest.FixtureRequest) -> None:
229-
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]})
230-
231-
num_objects = 200
232-
file_paths = [f"{path}{i}.txt" for i in range(num_objects)]
225+
df = pd.DataFrame({"c0": range(0, 200)})
233226

234-
for file_path in file_paths:
235-
wr.s3.to_csv(df, file_path, index=True)
227+
paths = wr.s3.to_parquet(df=df, path=path, max_rows_by_file=1)["paths"]
236228

237229
with ExecutionTimer(request) as timer:
238-
wr.s3.wait_objects_exist(file_paths, parallelism=16)
230+
wr.s3.wait_objects_exist(paths, parallelism=16)
239231

240232
assert timer.elapsed_time < benchmark_time
241233

tests/unit/test_athena.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ def test_athena_create_ctas(path, glue_table, glue_table2, glue_database, glue_c
224224

225225
@pytest.mark.xfail(is_ray_modin, raises=AssertionError, reason="Index equality regression")
226226
def test_athena(path, glue_database, glue_table, kms_key, workgroup0, workgroup1):
227-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
228227
wr.s3.to_parquet(
229228
df=get_df(),
230229
path=path,
@@ -301,7 +300,6 @@ def test_athena(path, glue_database, glue_table, kms_key, workgroup0, workgroup1
301300

302301

303302
def test_read_sql_query_parameter_formatting_respects_prefixes(path, glue_database, glue_table, workgroup0):
304-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
305303
wr.s3.to_parquet(
306304
df=get_df(),
307305
path=path,
@@ -329,7 +327,6 @@ def test_read_sql_query_parameter_formatting_respects_prefixes(path, glue_databa
329327
[("string", "Seattle"), ("date", datetime.date(2020, 1, 1)), ("bool", True), ("category", 1.0)],
330328
)
331329
def test_read_sql_query_parameter_formatting(path, glue_database, glue_table, workgroup0, col_name, col_value):
332-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
333330
wr.s3.to_parquet(
334331
df=get_df(),
335332
path=path,
@@ -354,7 +351,6 @@ def test_read_sql_query_parameter_formatting(path, glue_database, glue_table, wo
354351

355352
@pytest.mark.parametrize("col_name", [("string"), ("date"), ("bool"), ("category")])
356353
def test_read_sql_query_parameter_formatting_null(path, glue_database, glue_table, workgroup0, col_name):
357-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
358354
wr.s3.to_parquet(
359355
df=get_df(),
360356
path=path,
@@ -377,6 +373,7 @@ def test_read_sql_query_parameter_formatting_null(path, glue_database, glue_tabl
377373
assert len(df.index) == 1
378374

379375

376+
@pytest.mark.xfail(raises=botocore.exceptions.ClientError, reason="QueryId not found.")
380377
def test_athena_query_cancelled(glue_database):
381378
query_execution_id = wr.athena.start_query_execution(
382379
sql="SELECT " + "rand(), " * 10000 + "rand()", database=glue_database
@@ -551,7 +548,6 @@ def test_category(path, glue_table, glue_database):
551548
)
552549
for df2 in dfs:
553550
ensure_data_types_category(df2)
554-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
555551

556552

557553
@pytest.mark.parametrize("workgroup", [None, 0, 1, 2, 3])
@@ -784,7 +780,6 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
784780
table = next(wr.catalog.get_tables(name_contains=glue_table))
785781
assert table["StorageDescriptor"]["NumberOfBuckets"] == nb_of_buckets
786782
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols
787-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
788783

789784

790785
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@@ -873,7 +868,6 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
873868
table = next(wr.catalog.get_tables(name_contains=glue_table))
874869
assert table["StorageDescriptor"]["NumberOfBuckets"] == nb_of_buckets
875870
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols
876-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
877871

878872

879873
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@@ -1196,7 +1190,6 @@ def test_bucketing_combined_csv_saving(path, glue_database, glue_table):
11961190

11971191

11981192
def test_start_query_execution_wait(path, glue_database, glue_table):
1199-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
12001193
wr.s3.to_parquet(
12011194
df=get_df(),
12021195
path=path,
@@ -1358,7 +1351,6 @@ def test_get_query_execution(workgroup0, workgroup1):
13581351

13591352
@pytest.mark.parametrize("compression", [None, "snappy", "gzip"])
13601353
def test_read_sql_query_ctas_write_compression(path, glue_database, glue_table, compression):
1361-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
13621354
wr.s3.to_parquet(
13631355
df=get_df(),
13641356
path=path,

tests/unit/test_athena_csv.py

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import csv
22
import logging
3-
from sys import version_info
43

54
import boto3
65
import pyarrow as pa
@@ -290,7 +289,6 @@ def test_csv_catalog(path, glue_table, glue_database, use_threads, concurrent_pa
290289
assert len(df2.columns) == 11
291290
assert df2["id"].sum() == 6
292291
ensure_data_types_csv(df2)
293-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
294292

295293

296294
@pytest.mark.parametrize("use_threads", [True, False])
@@ -377,7 +375,6 @@ def test_athena_csv_types(path, glue_database, glue_table):
377375
assert len(df2.columns) == 10
378376
assert df2["id"].sum() == 6
379377
ensure_data_types_csv(df2)
380-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
381378

382379

383380
@pytest.mark.parametrize("use_threads", [True, False])
@@ -457,47 +454,26 @@ def test_failing_catalog(path, glue_table, use_threads):
457454
@pytest.mark.parametrize("concurrent_partitioning", [True, False])
458455
@pytest.mark.parametrize("compression", ["gzip", "bz2", None])
459456
def test_csv_compressed(path, glue_table, glue_database, use_threads, concurrent_partitioning, compression):
460-
df = get_df_csv()
461-
if version_info < (3, 7) and compression:
462-
with pytest.raises(wr.exceptions.InvalidArgument):
463-
wr.s3.to_csv(
464-
df=df,
465-
path=path,
466-
sep="\t",
467-
index=True,
468-
use_threads=use_threads,
469-
boto3_session=None,
470-
s3_additional_kwargs=None,
471-
dataset=True,
472-
partition_cols=["par0", "par1"],
473-
mode="overwrite",
474-
table=glue_table,
475-
database=glue_database,
476-
concurrent_partitioning=concurrent_partitioning,
477-
compression=compression,
478-
)
479-
else:
480-
wr.s3.to_csv(
481-
df=df,
482-
path=path,
483-
sep="\t",
484-
index=True,
485-
use_threads=use_threads,
486-
boto3_session=None,
487-
s3_additional_kwargs=None,
488-
dataset=True,
489-
partition_cols=["par0", "par1"],
490-
mode="overwrite",
491-
table=glue_table,
492-
database=glue_database,
493-
concurrent_partitioning=concurrent_partitioning,
494-
compression=compression,
495-
)
496-
df2 = wr.athena.read_sql_table(glue_table, glue_database)
497-
assert df2.shape == (3, 11)
498-
assert df2["id"].sum() == 6
499-
ensure_data_types_csv(df2)
500-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
457+
wr.s3.to_csv(
458+
df=get_df_csv(),
459+
path=path,
460+
sep="\t",
461+
index=True,
462+
use_threads=use_threads,
463+
boto3_session=None,
464+
s3_additional_kwargs=None,
465+
dataset=True,
466+
partition_cols=["par0", "par1"],
467+
mode="overwrite",
468+
table=glue_table,
469+
database=glue_database,
470+
concurrent_partitioning=concurrent_partitioning,
471+
compression=compression,
472+
)
473+
df2 = wr.athena.read_sql_table(glue_table, glue_database)
474+
assert df2.shape == (3, 11)
475+
assert df2["id"].sum() == 6
476+
ensure_data_types_csv(df2)
501477

502478

503479
@pytest.mark.parametrize("use_threads", [True, False])

tests/unit/test_athena_parquet.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ def test_parquet_catalog(path, path2, glue_table, glue_table2, glue_database):
7474
assert len(columns_types) == 18
7575
assert len(partitions_types) == 2
7676
assert len(partitions_values) == 2
77-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
78-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table2) is True
7977

8078

8179
@pytest.mark.parametrize("use_threads", [True, False])
@@ -163,8 +161,6 @@ def test_parquet_catalog_casting(path, glue_database, glue_table):
163161
df = wr.athena.read_sql_table(table=glue_table, database=glue_database, ctas_approach=False)
164162
assert df.shape == (3, 16)
165163
ensure_data_types(df=df, has_list=False)
166-
wr.s3.delete_objects(path=path)
167-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
168164

169165

170166
def test_parquet_catalog_casting_to_string_with_null(path, glue_table, glue_database):
@@ -208,8 +204,6 @@ def test_parquet_compress(path, glue_table, glue_database, compression):
208204
df2 = wr.athena.read_sql_table(glue_table, glue_database)
209205
ensure_data_types(df2)
210206
df2 = wr.s3.read_parquet(path=path)
211-
wr.s3.delete_objects(path=path)
212-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
213207
ensure_data_types(df2)
214208

215209

@@ -242,7 +236,6 @@ def test_parquet_char_length(path, glue_database, glue_table):
242236
@pytest.mark.parametrize("col2", [[1, 1, 1, 1, 1], [1, 2, 3, 4, 5], [1, 1, 1, 1, 2], [1, 2, 2, 2, 2]])
243237
@pytest.mark.parametrize("chunked", [True, 1, 2, 100])
244238
def test_parquet_chunked(path, glue_database, glue_table, col2, chunked):
245-
wr.s3.delete_objects(path=path)
246239
values = list(range(5))
247240
df = pd.DataFrame({"col1": values, "col2": col2})
248241
wr.s3.to_parquet(
@@ -274,12 +267,9 @@ def test_parquet_chunked(path, glue_database, glue_table, col2, chunked):
274267
assert chunked == len(df2)
275268
assert chunked >= len(dfs[-1])
276269

277-
assert wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table) is True
278270

279-
280-
@pytest.mark.xfail(is_ray_modin, raises=AssertionError, reason="Issue since upgrading to Ray 2.3")
271+
@pytest.mark.xfail(is_ray_modin, raises=AssertionError, reason="Issue since upgrading to PyArrow 11.0")
281272
def test_unsigned_parquet(path, glue_database, glue_table):
282-
wr.s3.delete_objects(path=path)
283273
df = pd.DataFrame({"c0": [0, 0, (2**8) - 1], "c1": [0, 0, (2**16) - 1], "c2": [0, 0, (2**32) - 1]})
284274
df["c0"] = df.c0.astype("uint8")
285275
df["c1"] = df.c1.astype("uint16")
@@ -317,9 +307,6 @@ def test_unsigned_parquet(path, glue_database, glue_table):
317307
mode="overwrite",
318308
)
319309

320-
wr.s3.delete_objects(path=path)
321-
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
322-
323310

324311
def test_parquet_overwrite_partition_cols(path, glue_database, glue_table):
325312
df = pd.DataFrame({"c0": [1, 2, 1, 2], "c1": [1, 2, 1, 2], "c2": [2, 1, 2, 1]})

0 commit comments

Comments
 (0)