Skip to content

NIFI-15568: Fix Iceberg timestamp handling and add S3 storage class support#10877

Open
NirYanay2005 wants to merge 2 commits intoapache:mainfrom
NirYanay2005:NIFI-15568
Open

NIFI-15568: Fix Iceberg timestamp handling and add S3 storage class support#10877
NirYanay2005 wants to merge 2 commits intoapache:mainfrom
NirYanay2005:NIFI-15568

Conversation

@NirYanay2005
Copy link

Summary

NIFI-15568

This change improves Apache Iceberg integration in NiFi by addressing two related issues:

  1. Adds support for configuring the S3 storage class in S3IcebergFileIOProvider, which is required for certain on-prem or S3-compatible object stores.
  2. Fixes timestamp type compatibility issues when writing Parquet-backed Iceberg tables by converting java.sql.Timestamp values to java.time.LocalDateTime, including correct handling for nested records, collections, and partitioned tables.

The timestamp fix ensures compatibility with Iceberg’s internal expectations and allows writes to succeed when timestamp columns are used as partition keys.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-15568
  • Pull Request commit message starts with Apache NiFi Jira issue number, such as NIFI-15568
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • [] JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for submitted the revised pull request @NirYanay2005.

Please review the pull request instructions and run a build with the contrib-check profile enabled to correct formatting issues such as missing license headers.

@exceptionfactory
Copy link
Contributor

Please note that each commit must also be signed with a key associated with your GitHub profile for verification.

@NirYanay2005
Copy link
Author

I added a key to my commits and fixed all contrib-check problems.

@exceptionfactory
Copy link
Contributor

I added a key to my commits and fixed all contrib-check problems.

Thanks @NirYanay2005, the initial commit is now signed, but the subsequent commits are not. Can you squash all commits and this branch to ensure they are all signed?

Removed unnecessary code that already existed in main

Added licenses and formatting

Fixed all contrib-check issues
@NirYanay2005
Copy link
Author

I added a key to my commits and fixed all contrib-check problems.

Thanks @NirYanay2005, the initial commit is now signed, but the subsequent commits are not. Can you squash all commits and this branch to ensure they are all signed?

Ok, I squashed all commits

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the initial adjustments @NirYanay2005.

The new Storage Class property looks good. If you are interested in getting that merged more quickly, it would be worth breaking that out to a separate Jira issue and pull request.

Regarding the Record field conversion, the problem makes sense, but the proposed solution does not appear to be the best way forward. Introducing a new IcebergRecordConverter in the Parquet module does not seem like the right location for shared Record formatting, although that is worth further consideration. More importantly, running each Record through a conversion process places additional overhead on the entire process.

Instead of this approach, constructing the Iceberg Record object as need in the PutIcebergRecord Processor seems like the optimal place for changes. The initial implementation of the delegating Record depends heavily on NiFi Record field conversion. That would be the first potential place to make a change for handling type conversion in an optimal way.

If you are willing to work through an alternative approach, I can review options. Alternatively, I may take a closer look at the problem and could propose an alternative solution.

Thanks again for working on these issues and feel free to follow up on how you would like to proceed.

@NirYanay2005
Copy link
Author

About the StorageClass, i need both and cant use one without the other so i don't mind waiting a bit more.
I want to work through an improved approach, but I need a bit more direction on where the change should be made.
I understand that introducing a separate IcebergRecordConverter is not ideal, and that the better location would be during Iceberg Record construction inside PutIcebergRecord.
However, I’m not yet fully familiar with how PutIcebergRecord constructs the Iceberg Record object and where NiFi Record field conversion is applied.
Could you point me to the specific method in PutIcebergRecord where timestamp type handling would be most appropriate?
I’m happy to iterate on the implementation once I understand the intended extension point.

@exceptionfactory
Copy link
Contributor

About the StorageClass, i need both and cant use one without the other so i don't mind waiting a bit more. I want to work through an improved approach, but I need a bit more direction on where the change should be made. I understand that introducing a separate IcebergRecordConverter is not ideal, and that the better location would be during Iceberg Record construction inside PutIcebergRecord. However, I’m not yet fully familiar with how PutIcebergRecord constructs the Iceberg Record object and where NiFi Record field conversion is applied. Could you point me to the specific method in PutIcebergRecord where timestamp type handling would be most appropriate? I’m happy to iterate on the implementation once I understand the intended extension point.

Thanks for the reply. I recommend tracing through PutIcebergRecord and looking at DelegatedRecord as a starting point.

@NirYanay2005
Copy link
Author

NirYanay2005 commented Feb 22, 2026

I changed the implementation entirely. I moved it to DelegatedRecord get methods.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adjusting the direction @NirYanay2005. This looks on a good track. Regarding the InternalRecordWrapper, is that still needed? Are there any options to avoid the RecordWrapper in ParquetPartitionedWriter?

@NirYanay2005
Copy link
Author

I tired without and it failed as the conversion to a long is needed after the conversion to local date time but at a different phase.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants