|
47 | 47 | LongType, |
48 | 48 | NestedField, |
49 | 49 | StringType, |
50 | | - StructType, |
51 | 50 | TimestampType, |
52 | 51 | TimestamptzType, |
53 | 52 | ) |
@@ -258,111 +257,6 @@ def test_add_files_to_unpartitioned_table_with_field_ids( |
258 | 257 | assert all(df["qux"] == date(2024, 3, 7)) |
259 | 258 |
|
260 | 259 |
|
261 | | -@pytest.mark.integration |
262 | | -def test_add_files_with_mismatched_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
263 | | - identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}" |
264 | | - tbl = _create_table(session_catalog, identifier, format_version) |
265 | | - |
266 | | - # Create schema with field IDs that don't match the table schema |
267 | | - # Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog) |
268 | | - # This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux) |
269 | | - mismatched_schema = pa.schema( |
270 | | - [ |
271 | | - pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), |
272 | | - pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), |
273 | | - pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}), # Wrong: should be 3 |
274 | | - pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}), # Wrong: should be 4 |
275 | | - ] |
276 | | - ) |
277 | | - |
278 | | - file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet" |
279 | | - fo = tbl.io.new_output(file_path) |
280 | | - with fo.create(overwrite=True) as fos: |
281 | | - with pq.ParquetWriter(fos, schema=mismatched_schema) as writer: |
282 | | - writer.write_table(ARROW_TABLE_WITH_IDS) |
283 | | - |
284 | | - # Adding files with mismatched field IDs should fail |
285 | | - with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): |
286 | | - tbl.add_files(file_paths=[file_path]) |
287 | | - |
288 | | - |
289 | | -@pytest.mark.integration |
290 | | -def test_add_files_with_mismatched_nested_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
291 | | - """Test that files with mismatched nested (struct) field IDs are rejected.""" |
292 | | - identifier = f"default.nested_mismatched_field_ids_v{format_version}" |
293 | | - |
294 | | - # Create a table with a nested struct field |
295 | | - try: |
296 | | - session_catalog.drop_table(identifier=identifier) |
297 | | - except NoSuchTableError: |
298 | | - pass |
299 | | - |
300 | | - nested_schema = Schema( |
301 | | - NestedField(1, "id", IntegerType(), required=False), |
302 | | - NestedField( |
303 | | - 2, |
304 | | - "user", |
305 | | - StructType( |
306 | | - NestedField(3, "name", StringType(), required=False), |
307 | | - NestedField(4, "age", IntegerType(), required=False), |
308 | | - ), |
309 | | - required=False, |
310 | | - ), |
311 | | - schema_id=0, |
312 | | - ) |
313 | | - |
314 | | - tbl = session_catalog.create_table( |
315 | | - identifier=identifier, |
316 | | - schema=nested_schema, |
317 | | - properties={"format-version": str(format_version)}, |
318 | | - ) |
319 | | - |
320 | | - # Create PyArrow schema with MISMATCHED nested field IDs |
321 | | - # The table expects: id=1, user=2, user.name=3, user.age=4 |
322 | | - # This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs) |
323 | | - pa_schema_mismatched = pa.schema( |
324 | | - [ |
325 | | - pa.field("id", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"1"}), |
326 | | - pa.field( |
327 | | - "user", |
328 | | - pa.struct( |
329 | | - [ |
330 | | - pa.field("name", pa.string(), nullable=True, metadata={b"PARQUET:field_id": b"99"}), # Wrong! |
331 | | - pa.field("age", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"100"}), # Wrong! |
332 | | - ] |
333 | | - ), |
334 | | - nullable=True, |
335 | | - metadata={b"PARQUET:field_id": b"2"}, |
336 | | - ), |
337 | | - ] |
338 | | - ) |
339 | | - |
340 | | - pa_table = pa.table( |
341 | | - { |
342 | | - "id": pa.array([1, 2, 3], type=pa.int32()), |
343 | | - "user": pa.array( |
344 | | - [ |
345 | | - {"name": "Alice", "age": 30}, |
346 | | - {"name": "Bob", "age": 25}, |
347 | | - {"name": "Charlie", "age": 35}, |
348 | | - ], |
349 | | - type=pa_schema_mismatched.field("user").type, |
350 | | - ), |
351 | | - }, |
352 | | - schema=pa_schema_mismatched, |
353 | | - ) |
354 | | - |
355 | | - file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet" |
356 | | - fo = tbl.io.new_output(file_path) |
357 | | - with fo.create(overwrite=True) as fos: |
358 | | - with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer: |
359 | | - writer.write_table(pa_table) |
360 | | - |
361 | | - # Adding files with mismatched nested field IDs should fail |
362 | | - with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): |
363 | | - tbl.add_files(file_paths=[file_path]) |
364 | | - |
365 | | - |
366 | 260 | @pytest.mark.integration |
367 | 261 | def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
368 | 262 | from pyiceberg.io.pyarrow import parquet_file_to_data_file |
@@ -707,6 +601,65 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog |
707 | 601 | tbl.add_files(file_paths=[file_path]) |
708 | 602 |
|
709 | 603 |
|
| 604 | +@pytest.mark.integration |
| 605 | +def test_add_files_with_field_ids_fails_on_schema_mismatch( |
| 606 | + spark: SparkSession, session_catalog: Catalog, format_version: int |
| 607 | +) -> None: |
| 608 | + """Test that files with mismatched field types (when field IDs match) are rejected.""" |
| 609 | + identifier = f"default.table_schema_mismatch_based_on_field_ids__fails_v{format_version}" |
| 610 | + |
| 611 | + tbl = _create_table(session_catalog, identifier, format_version) |
| 612 | + |
| 613 | + # All fields are renamed and reordered but have matching field IDs, so they should be compatible |
| 614 | + # except for 'baz' which has the wrong type |
| 615 | + WRONG_SCHEMA = pa.schema( |
| 616 | + [ |
| 617 | + pa.field("qux_", pa.date32(), metadata={"PARQUET:field_id": "4"}), |
| 618 | + pa.field("baz_", pa.string(), metadata={"PARQUET:field_id": "3"}), # Wrong type: should be int32 |
| 619 | + pa.field("bar_", pa.string(), metadata={"PARQUET:field_id": "2"}), |
| 620 | + pa.field("foo_", pa.bool_(), metadata={"PARQUET:field_id": "1"}), |
| 621 | + ] |
| 622 | + ) |
| 623 | + file_path = f"s3://warehouse/default/table_with_field_ids_schema_mismatch_fails/v{format_version}/test.parquet" |
| 624 | + # write parquet files |
| 625 | + fo = tbl.io.new_output(file_path) |
| 626 | + with fo.create(overwrite=True) as fos: |
| 627 | + with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer: |
| 628 | + writer.write_table( |
| 629 | + pa.Table.from_pylist( |
| 630 | + [ |
| 631 | + { |
| 632 | + "qux_": date(2024, 3, 7), |
| 633 | + "baz_": "123", |
| 634 | + "bar_": "bar_string", |
| 635 | + "foo_": True, |
| 636 | + }, |
| 637 | + { |
| 638 | + "qux_": date(2024, 3, 7), |
| 639 | + "baz_": "124", |
| 640 | + "bar_": "bar_string", |
| 641 | + "foo_": True, |
| 642 | + }, |
| 643 | + ], |
| 644 | + schema=WRONG_SCHEMA, |
| 645 | + ) |
| 646 | + ) |
| 647 | + |
| 648 | + expected = """Mismatch in fields: |
| 649 | +┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ |
| 650 | +┃ ┃ Table field ┃ Dataframe field ┃ |
| 651 | +┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ |
| 652 | +│ ✅ │ 1: foo: optional boolean │ 1: foo_: optional boolean │ |
| 653 | +│ ✅ │ 2: bar: optional string │ 2: bar_: optional string │ |
| 654 | +│ ❌ │ 3: baz: optional int │ 3: baz_: optional string │ |
| 655 | +│ ✅ │ 4: qux: optional date │ 4: qux_: optional date │ |
| 656 | +└────┴──────────────────────────┴───────────────────────────┘ |
| 657 | +""" |
| 658 | + |
| 659 | + with pytest.raises(ValueError, match=expected): |
| 660 | + tbl.add_files(file_paths=[file_path]) |
| 661 | + |
| 662 | + |
710 | 663 | @pytest.mark.integration |
711 | 664 | def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
712 | 665 | identifier = f"default.unpartitioned_with_large_types{format_version}" |
|
0 commit comments