-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Spark: Encapsulate parquet objects for Comet #13786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@huaxingao, @anuragmantri please take a look. |
parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
Show resolved
Hide resolved
} | ||
RowGroupReader pages; | ||
try { | ||
pages = cometReader.readNextRowGroup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readNextRowGroup
might return null
and setRowGroupInfo
will make use of pages
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, advance
is called only by next
which is implementing the iterator
interface and which explicitly checks for hasNext
so readNextRowGroup
always returns a not null value.
This could be done better, but I'll leave this as it is because -
i) it is not really clear what the behavior will be if advance
silently does not do anything.
ii) the code (and behavior) will be consistent with the implementation in VectorizedParquetReader
caseSensitive, | ||
maxRecordsPerBatch); | ||
if (isComet) { | ||
LOG.info("Comet enabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Maybe Comet vectorized reader
enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
|
||
boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; | ||
|
||
// ToDo: extract this into a Util method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
return new ParquetColumnSpec( | ||
1, // ToDo: pass in the correct id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks for pointing this out!
throw new IllegalArgumentException( | ||
"Missing required parameters for DecimalLogicalTypeAnnotation: " + params); | ||
} | ||
int scale = Integer.parseInt(params.get("scale")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add validation for the parameter format? for example,
If params.get("scale") returns null we will get NumberFormatException. Another case is non-numeric string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes the code very verbose, but I've added the additional validation.
specs.add(spec); | ||
} | ||
|
||
fileReader.setRequestedSchemaFromSpecs(specs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If setRequestedSchemaFromSpecs fails, the FileReader will be closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. In the latest update, this is now accessed via a wrapper class which can now throw additional exceptions. The wrapper is now auto closeable and is explicitly closed in this method.
@shangxinli thank you for taking a look. I found that there are more changes to get around parquet shading that need to be addressed; let me address your comments along with that. |
0119c24
to
a148a5b
Compare
@huaxingao, @shangxinli, @hsiang-c this is ready for final review. |
maxRecordsPerBatch); | ||
if (isComet) { | ||
LOG.info("Comet vectorized reader enabled"); | ||
return new CometVectorizedParquetReader<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary I’ve seen your work on the reader/writer API proposal, but I haven’t kept up with all the details. We’re adding a CometVectorizedParquetReader
, does this align with your proposal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the suggestion on that PR, we will just handle Comet as an internal detail for Parquet. So it will be hidden behind the FileFormat API.
Likely the this.isComet
will be driven by a configuration Map for the ReadBuilder, but forget about it now. It is not much of a complexity, and we will be handling it when the time comes.
Thanks for your consideration!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary thanks for verifying this. If any change is needed in future, it will be fairly easy to move code around.
@huaxingao thank you for reviewing; there is also a follow up PR - #14062 is in draft because it requires this PR.
@huaxingao any comments you would like me to address? |
@parthchandra Apologies for the delay. I’ve skimmed the changes and would like more time for a deeper review. Because we’re adding several classes to the Parquet module, Comet code is no longer confined to the Spark module. Given the broader scope, it would be ideal if we can get one more committer to review and sign off. |
Thanks @huaxingao. Who would you recommend we ask for a second review? |
cc @aokolnychyi @flyrain @szehon-ho @stevenzwu Appreciate it if you could please take a look too. Thanks a lot! |
@parthchandra Do we have Comet customers on Spark 3.4? If not, we don't need the changes on v3.4, because 3.4 has been marked as deprecated and will be removed in next release. Also Comet version has already been updated to 0.10.1. Could you please rebase? |
Abstract RowGroup and FileReader interaction apis into a reflection based bridge class Update to comet 0.10.1
a148a5b
to
866e41e
Compare
@huaxingao we are still supporting Spark 3.4.0 in Comet so it will be useful if we have the Spark 3.4 changes as well. Is the removal of spark 3.4 part of 1.11.0 or after that? |
@parthchandra There is a discussion to remove 3.4 in 1.11.0. If we need to support 3.4 in Comet for a longer time, we need to -1 in the discussion thread. |
} | ||
|
||
private static class FileIterator<T> implements CloseableIterator<T> { | ||
// private final ParquetFileReader reader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this
ByteBuffer fileAADPrefix) { | ||
CometBridge.FileReaderWrapper fileReader = null; | ||
try { | ||
Object cometOptions = CometBridge.createReadOptions(new Configuration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember how the ReadOptions works. Is it correct to have an empty Configuration here?
Do we need to verify the ParquetReadOption we build on Comet side is the same as the one on Iceberg side?
} | ||
|
||
/** Creates a CometIOException from a general Exception. */ | ||
public static CometIOException fromException(Exception cause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a couple of the methods are not used. Shall we remove the no usages methods?
@parthchandra @hsiang-c I remember we run Comet/Iceberg integration test in Comet repo. Is it possible to add a Comet integration test in |
Fixes issues in #13378. This makes all the remaining changes to Iceberg to address issues with Comet/Parquet shading. The corresponding changes to Comet are already merged into Comet 0.10.1
Additionally, the dependency of the iceberg-parquet module has been removed because it created an iceberg-flink dependency on comet-spark. The code is changed to use a reflection based CometBridge class to access the come (Parquet)
FileReader
.This PR also removes the commit to enable Comet execution (moving it to a follow up PR: cc @hsiang-c).
This PR makes the changes for Spark 3.4, 3.5, and 4.0