Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.DictionaryUtility;
import org.apache.arrow.vector.util.TransferPair;

/**
* Abstract class to read Schema and ArrowRecordBatches.
Expand Down Expand Up @@ -219,6 +220,18 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
throw new IllegalArgumentException("Dictionary ID " + id + " not defined in schema");
}
FieldVector vector = dictionary.getVector();
// if is deltaVector, concat it with non-delta vector with the same ID.
if (dictionaryBatch.isDelta()) {
FieldVector deltaVector = vector.getField().createVector(allocator);
load(dictionaryBatch, deltaVector);
concatDeltaDictionary(vector, deltaVector);
return;
}

load(dictionaryBatch, vector);
}

private void load(ArrowDictionaryBatch dictionaryBatch, FieldVector vector) {
VectorSchemaRoot root = new VectorSchemaRoot(
Collections.singletonList(vector.getField()),
Collections.singletonList(vector), 0);
Expand All @@ -229,4 +242,18 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
dictionaryBatch.close();
}
}

/**
* Concat dictionary vector and delta dictionary vector.
*/
private void concatDeltaDictionary(FieldVector vector, FieldVector deltaVector) {
final int valueCount = vector.getValueCount();
final int deltaValueCount = deltaVector.getValueCount();
final TransferPair transferPair = deltaVector.makeTransferPair(vector);
for (int i = 0; i < deltaValueCount; i++) {
transferPair.copyValueSafe(i, valueCount + i);
}
deltaVector.close();
vector.setValueCount(valueCount + deltaValueCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,24 @@ public class ArrowDictionaryBatch implements ArrowMessage {

private final long dictionaryId;
private final ArrowRecordBatch dictionary;
private final boolean isDelta;

@Deprecated
public ArrowDictionaryBatch(long dictionaryId, ArrowRecordBatch dictionary) {
this (dictionaryId, dictionary, false);
}

/**
* Constructs new instance.
*/
public ArrowDictionaryBatch(long dictionaryId, ArrowRecordBatch dictionary, boolean isDelta) {
this.dictionaryId = dictionaryId;
this.dictionary = dictionary;
this.isDelta = isDelta;
}

public boolean isDelta() {
return isDelta;
}

public byte getMessageType() {
Expand All @@ -54,6 +68,7 @@ public int writeTo(FlatBufferBuilder builder) {
DictionaryBatch.startDictionaryBatch(builder);
DictionaryBatch.addId(builder, dictionaryId);
DictionaryBatch.addData(builder, dataOffset);
DictionaryBatch.addIsDelta(builder, isDelta);
return DictionaryBatch.endDictionaryBatch(builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public static ArrowDictionaryBatch deserializeDictionaryBatch(Message message, A
throws IOException {
DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new DictionaryBatch());
ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), bodyBuffer);
return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch, dictionaryBatchFB.isDelta());
}

/**
Expand Down Expand Up @@ -570,7 +570,7 @@ public static ArrowDictionaryBatch deserializeDictionaryBatch(
final ArrowBuf body = buffer.slice(block.getMetadataLength(),
(int) totalLen - block.getMetadataLength());
ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body);
return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch, dictionaryBatchFB.isDelta());
}

/**
Expand Down
Loading