Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions java/vector/src/main/codegen/templates/FixedValueVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);

public static final int TYPE_WIDTH = ${type.width};

private final Accessor accessor = new Accessor();
private final Mutator mutator = new Mutator();

Expand Down
62 changes: 24 additions & 38 deletions java/vector/src/main/codegen/templates/NullableValueVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.arrow.flatbuf.Precision;

/**
* Nullable${minor.class} implements a vector of values which could be null. Elements in the vector
* ${className} implements a vector of values which could be null. Elements in the vector
* are first checked against a fixed length vector of boolean values. Then the element is retrieved
* from the base class (if not null).
*
Expand All @@ -47,7 +47,7 @@
public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class);

private final FieldReader reader = new ${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
private final FieldReader reader = new ${minor.class}ReaderImpl(${className}.this);

private final String bitsField = "$bits$";
private final String valuesField = "$values$";
Expand All @@ -67,7 +67,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type

public ${className}(String name, BufferAllocator allocator, int precision, int scale) {
super(name, allocator);
values = new ${minor.class}Vector(valuesField, allocator, precision, scale);
values = new ${valuesName}(valuesField, allocator, precision, scale);
this.precision = precision;
this.scale = scale;
mutator = new Mutator();
Expand All @@ -81,7 +81,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
<#else>
public ${className}(String name, BufferAllocator allocator) {
super(name, allocator);
values = new ${minor.class}Vector(valuesField, allocator);
values = new ${valuesName}(valuesField, allocator);
mutator = new Mutator();
accessor = new Accessor();
<#if minor.class == "TinyInt" ||
Expand Down Expand Up @@ -144,6 +144,13 @@ public List<FieldVector> getChildrenFromFields() {

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
<#if type.major = "VarLen">
// variable width values: truncate offset vector buffer to size (#1)
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.offsetVector.getBufferSizeFor(fieldNode.getLength() + 1));
<#else>
// fixed width values truncate value vector to size (#1)
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.getBufferSizeFor(fieldNode.getLength()));
</#if>
org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
bits.valueCount = fieldNode.getLength();
}
Expand Down Expand Up @@ -229,13 +236,6 @@ public void setInitialCapacity(int numRecords) {
values.setInitialCapacity(numRecords);
}

// @Override
// public SerializedField.Builder getMetadataBuilder() {
// return super.getMetadataBuilder()
// .addChild(bits.getMetadata())
// .addChild(values.getMetadata());
// }

@Override
public void allocateNew() {
if(!allocateNewSafe()){
Expand Down Expand Up @@ -329,20 +329,6 @@ public void zeroVector() {
}
</#if>


// @Override
// public void load(SerializedField metadata, ArrowBuf buffer) {
// clear();
// the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant)
// final SerializedField bitsField = metadata.getChild(0);
// bits.load(bitsField, buffer);
//
// final int capacity = buffer.capacity();
// final int bitsLength = bitsField.getBufferLength();
// final SerializedField valuesField = metadata.getChild(1);
// values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength));
// }

@Override
public TransferPair getTransferPair(BufferAllocator allocator){
return new TransferImpl(name, allocator);
Expand All @@ -356,10 +342,10 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator){

@Override
public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((Nullable${minor.class}Vector) to);
return new TransferImpl((${className}) to);
}

public void transferTo(Nullable${minor.class}Vector target){
public void transferTo(${className} target){
bits.transferTo(target.bits);
values.transferTo(target.values);
<#if type.major == "VarLen">
Expand All @@ -368,7 +354,7 @@ public void transferTo(Nullable${minor.class}Vector target){
clear();
}

public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class}Vector target) {
public void splitAndTransferTo(int startIndex, int length, ${className} target) {
bits.splitAndTransferTo(startIndex, length, target.bits);
values.splitAndTransferTo(startIndex, length, target.values);
<#if type.major == "VarLen">
Expand All @@ -377,22 +363,22 @@ public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class
}

private class TransferImpl implements TransferPair {
Nullable${minor.class}Vector to;
${className} to;

public TransferImpl(String name, BufferAllocator allocator){
<#if minor.class == "Decimal">
to = new Nullable${minor.class}Vector(name, allocator, precision, scale);
to = new ${className}(name, allocator, precision, scale);
<#else>
to = new Nullable${minor.class}Vector(name, allocator);
to = new ${className}(name, allocator);
</#if>
}

public TransferImpl(Nullable${minor.class}Vector to){
public TransferImpl(${className} to){
this.to = to;
}

@Override
public Nullable${minor.class}Vector getTo(){
public ${className} getTo(){
return to;
}

Expand All @@ -408,7 +394,7 @@ public void splitAndTransfer(int startIndex, int length) {

@Override
public void copyValueSafe(int fromIndex, int toIndex) {
to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
to.copyFromSafe(fromIndex, toIndex, ${className}.this);
}
}

Expand All @@ -422,22 +408,22 @@ public Mutator getMutator(){
return mutator;
}

public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
public void copyFrom(int fromIndex, int thisIndex, ${className} from){
final Accessor fromAccessor = from.getAccessor();
if (!fromAccessor.isNull(fromIndex)) {
mutator.set(thisIndex, fromAccessor.get(fromIndex));
}
}

public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from){
<#if type.major == "VarLen">
mutator.fillEmpties(thisIndex);
</#if>
values.copyFromSafe(fromIndex, thisIndex, from);
bits.getMutator().setSafe(thisIndex, 1);
}

public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
public void copyFromSafe(int fromIndex, int thisIndex, ${className} from){
<#if type.major == "VarLen">
mutator.fillEmpties(thisIndex);
</#if>
Expand Down Expand Up @@ -640,7 +626,7 @@ public void set(int index, ${minor.class}Holder holder){
}

public boolean isSafe(int outIndex) {
return outIndex < Nullable${minor.class}Vector.this.getValueCapacity();
return outIndex < ${className}.this.getValueCapacity();
}

<#assign fields = minor.fields!type.fields />
Expand Down
2 changes: 2 additions & 0 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public List<FieldVector> getChildrenFromFields() {

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
// truncate types vector buffer to size (#0)
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 0, typeVector.getBufferSizeFor(fieldNode.getLength()));
BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
this.valueCount = fieldNode.getLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public abstract class BaseDataValueVector extends BaseValueVector implements Buf

protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this

/** maximum extra size at the end of the buffer */
private static final int MAX_BUFFER_PADDING = 64;

public static void load(ArrowFieldNode fieldNode, List<BufferBacked> vectors, List<ArrowBuf> buffers) {
int expectedSize = vectors.size();
if (buffers.size() != expectedSize) {
Expand All @@ -40,6 +43,20 @@ public static void load(ArrowFieldNode fieldNode, List<BufferBacked> vectors, Li
}
}

public static void truncateBufferBasedOnSize(List<ArrowBuf> buffers, int bufferIndex, int byteSize) {
if (bufferIndex >= buffers.size()) {
throw new IllegalArgumentException("no buffer at index " + bufferIndex + ": " + buffers);
}
ArrowBuf buffer = buffers.get(bufferIndex);
if (buffer.writerIndex() < byteSize) {
throw new IllegalArgumentException("can not truncate buffer to a larger size " + byteSize + ": " + buffer.writerIndex());
}
if (buffer.writerIndex() - byteSize > MAX_BUFFER_PADDING) {
throw new IllegalArgumentException("Buffer too large to resize to " + byteSize + ": " + buffer.writerIndex());
}
buffer.writerIndex(byteSize);
}

public static List<ArrowBuf> unload(List<BufferBacked> vectors) {
List<ArrowBuf> result = new ArrayList<>(vectors.size());
for (BufferBacked vector : vectors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void load(ArrowFieldNode fieldNode, ArrowBuf data) {
int remainder = count % 8;
// set remaining bits
if (remainder > 0) {
byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7));;
byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7));
this.data.setByte(fullBytesCount, bitMask);
}
} else if (fieldNode.getNullCount() == fieldNode.getLength()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buf
vector.loadFieldBuffers(fieldNode, ownBuffers);
} catch (RuntimeException e) {
throw new IllegalArgumentException("Could not load buffers for field " +
field + " error message" + e.getMessage(), e);
field + ". error message: " + e.getMessage(), e);
}
List<Field> children = field.getChildren();
if (children.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public List<FieldVector> getChildrenFromFields() {

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
// variable width values: truncate offset vector buffer to size (#1)
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, offsets.getBufferSizeFor(fieldNode.getLength() + 1));
BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -32,6 +33,7 @@
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
import org.apache.arrow.vector.complex.writer.BigIntWriter;
import org.apache.arrow.vector.complex.writer.IntWriter;
Expand Down Expand Up @@ -99,6 +101,79 @@ public void testUnloadLoad() throws IOException {
}
}

@Test
public void testUnloadLoadAddPadding() throws IOException {
int count = 10000;
Schema schema;
try (
BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) {

// write some data
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter rootWriter = writer.rootAsMap();
ListWriter list = rootWriter.list("list");
IntWriter intWriter = list.integer();
for (int i = 0; i < count; i++) {
list.setPosition(i);
list.startList();
for (int j = 0; j < i % 4 + 1; j++) {
intWriter.writeInt(i);
}
list.endList();
}
writer.setValueCount(count);

// unload it
FieldVector root = parent.getChild("root");
schema = new Schema(root.getField().getChildren());
VectorUnloader vectorUnloader = newVectorUnloader(root);
try (
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
) {
List<ArrowBuf> oldBuffers = recordBatch.getBuffers();
List<ArrowBuf> newBuffers = new ArrayList<>();
for (ArrowBuf oldBuffer : oldBuffers) {
int l = oldBuffer.readableBytes();
if (l % 64 != 0) {
// pad
l = l + 64 - l % 64;
}
ArrowBuf newBuffer = allocator.buffer(l);
for (int i = oldBuffer.readerIndex(); i < oldBuffer.writerIndex(); i++) {
newBuffer.setByte(i - oldBuffer.readerIndex(), oldBuffer.getByte(i));
}
newBuffer.readerIndex(0);
newBuffer.writerIndex(l);
newBuffers.add(newBuffer);
}

try (ArrowRecordBatch newBatch = new ArrowRecordBatch(recordBatch.getLength(), recordBatch.getNodes(), newBuffers);) {
// load it
VectorLoader vectorLoader = new VectorLoader(newRoot);

vectorLoader.load(newBatch);

FieldReader reader = newRoot.getVector("list").getReader();
for (int i = 0; i < count; i++) {
reader.setPosition(i);
List<Integer> expected = new ArrayList<>();
for (int j = 0; j < i % 4 + 1; j++) {
expected.add(i);
}
Assert.assertEquals(expected, reader.readObject());
}
}

for (ArrowBuf newBuf : newBuffers) {
newBuf.release();
}
}
}
}

/**
* The validity buffer can be empty if:
* - all values are defined
Expand All @@ -113,12 +188,17 @@ public void testLoadEmptyValidityBuffer() throws IOException {
));
int count = 10;
ArrowBuf validity = allocator.getEmpty();
ArrowBuf values = allocator.buffer(count * 4); // integers
for (int i = 0; i < count; i++) {
values.setInt(i * 4, i);
ArrowBuf[] values = new ArrowBuf[2];
for (int i = 0; i < values.length; i++) {
ArrowBuf arrowBuf = allocator.buffer(count * 4); // integers
values[i] = arrowBuf;
for (int j = 0; j < count; j++) {
arrowBuf.setInt(j * 4, j);
}
arrowBuf.writerIndex(count * 4);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the size was wrong!

}
try (
ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values, validity, values));
ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1]));
BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
) {
Expand Down Expand Up @@ -153,7 +233,9 @@ public void testLoadEmptyValidityBuffer() throws IOException {
assertFalse(intDefinedVector.getAccessor().isNull(count + 10));
assertEquals(1234, intDefinedVector.getAccessor().get(count + 10));
} finally {
values.release();
for (ArrowBuf arrowBuf : values) {
arrowBuf.release();
}
}
}

Expand Down