Skip to content

Conversation

exceptionfactory
Copy link
Contributor

Summary

NIFI-15062 Adds a PutIcebergRecord Processor and several Controller Services that provide initial integration for storing records in Apache Iceberg tables.

The Apache Iceberg ecosystem supports a wide variety of catalogs, storage providers, and file formats. The purpose of this pull request is to provide several Controller Service abstractions that enable extensible integration, with a specific implementation of each Controller Service. With the number of potential integration options, Apache NiFi should not necessarily implement support for every possible solution, but should provide extension points that enable focused types of integration. The iceberg-api library is the foundation for this approach.

The nifi-iceberg-bundle includes multiple modules that have the following dependency hierarchy:

  • nifi-iceberg-shared-nar
    • nifi-iceberg-services-api-nar
      • nifi-iceberg-processors-nar
      • nifi-iceberg-rest-catalog-nar
      • nifi-iceberg-aws-nar
      • nifi-iceberg-parquet-writer-nar

The nifi-iceberg-shared-nar contains the iceberg-api and iceberg-core libraries along with transitive dependencies.

The nifi-iceberg-services-api-nar depends on iceberg-api and incorporates the Apache NiFi Controller Service interfaces that align with iceberg-api interfaces.

The nifi-iceberg-processors-nar contains the PutIcebergRecord Processor, which references properties for the following Controller Services:

  • IcebergCatalog
  • IcebergWriter
  • IcebergFileIOProvider

These three interfaces define the primary extension points for external integration.

The nifi-iceberg-rest-catalog-nar contains the RESTIcebergCatalog implementation of the IcebergCatalog Controller Service. This implementation configures the RESTSessionCatalog from the iceberg-core library and supports Catalog Authentication using OAuth2 with Client Credentials or Bearer Tokens. Building on the iceberg-core library provided in nifi-iceberg-shared-nar, the nifi-iceberg-rest-catalog-nar does not have any additional dependencies. The RESTIcebergCatalog defines a FileIO Provider property that supports configurable Controller Services for Iceberg FileIO implementations.

The nifi-iceberg-aws-nar contains the S3IcebergFileIOProvider which configures and returns the Iceberg S3FileIO class. Support for S3 requires a number of AWS SDK 2 libraries, which is one of the primary reasons for separate packaging of FileIOProvider implementations. The S3 implementation supports configurable authentication using Basic or Session Credentials, as well as Vended Credentials, where the REST Catalog is expected to provide the required credentials.

The nifi-iceberg-parquet-writer-nar contains the ParquetIcebergWriter Controller Service, supporting Apache Parquet serialization. Apache Parquet has a number of transitive dependencies, including a dependency on the hadoop-common library. The NAR packaging excludes many unnecessary transitive dependencies and has an explicit list of dependencies required at runtime for Parquet serialization.

This implementation structure and Controller Service design strategy should serve as the basis for additional storage provider implementations. With Apache Parquet being the predominant format for Apache Iceberg, direct support for other file formats may not be necessary. The variety of Iceberg REST Catalog implementations may require additional configuration options in the future, but the core IcebergCatalog Controller Service abstraction provides a decoupled strategy for future implementation.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

- Added RESTIcebergCatalog implementation of IcebergCatalog
- Added S3FileIOProvider implementation of IcebergFileIOProvider
- Added ParquetIcebergWriter implementation of IcebergWriter
Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to run tests as soon as possible but I left some comments after going through the code.

final ListIterator<RecordField> recordFields = recordSchema.getFields().listIterator();
while (recordFields.hasNext()) {
final RecordField recordField = recordFields.next();
final int id = recordFields.nextIndex();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it going to be an issue for nested records? When getStructType() calls itself for nested records we will have duplicate IDs in the same schema tree, no? Iceberg requires every ID in a schema tree to be globally unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point, most of the initial testing did not use nested types. I will review the implementation options and implement a better strategy for nested field identification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored PutIcebergRecord to use the Table Schema and Struct instead of depending on this conversion from Record Schema to Iceberg Struct Type. Although this may be useful for other scenarios such as initial table creation, it is not needed in the current implementation, so I have removed this class.

…cord Schema

- Removed StructTypeProvider and implementation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants