Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
@Override
public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(1);
setReaderAndWriterIndex();
result.add(typeBuffer);

return result;
Expand Down Expand Up @@ -534,10 +533,8 @@ public int getBufferSizeFor(final int valueCount) {
public ArrowBuf[] getBuffers(boolean clear) {
List<ArrowBuf> list = new java.util.ArrayList<>();
setReaderAndWriterIndex();
if (getBufferSize() != 0) {
list.add(typeBuffer);
list.addAll(java.util.Arrays.asList(internalStruct.getBuffers(clear)));
}
list.add(typeBuffer);
list.addAll(java.util.Arrays.asList(internalStruct.getBuffers(clear)));
if (clear) {
valueCount = 0;
typeBuffer.getReferenceManager().retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,9 @@ public Field getField() {
public ArrowBuf[] getBuffers(boolean clear) {
final ArrowBuf[] buffers;
setReaderAndWriterIndex();
if (getBufferSize() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this if check came from an earlier refactor that happened in Arrow and several tests had failed in Dremio. I think we should keep it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old implementation, VectorUnloader.java checks the typeBufferCount with getFieldBuffers, it's ok since getFieldBuffers returns all buffers without check buffer size. If we keep this if, then the expected buffer count check in VectorUnloader is invalid, should we remove that check directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old implementation, VectorUnloader.java checks the typeBufferCount with getFieldBuffers, it's ok since getFieldBuffers returns all buffers without check buffer size. If we keep this if, then the expected buffer count check in VectorUnloader is invalid, should we remove that check directly?

I am okay with this change but it would be great if someone from Dremio can bless this (based on my previous comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this 'if' here, then I guess we should make some change in VectorUnloader.
If vector.getBufferSize ==0, then no buffers would be sent via IPC, however, the VectorLoader depends on TypeBufferCount to decide how many buffers to load into a vector, and in this case, something is wrong.
To solve this, we may add a check in VectorUnloader, if vector bufferSize==0, we should append it's field buffers(by call getFieldBuffers) also, even their writerIndex/readerIndex=0. In this way, we could keep the 'if (getBufferSize() == 0)' in vectors and the IPC also works well.

Do you think we should keep this PR or update it as I suggested above?

buffers = new ArrowBuf[0];
} else {
buffers = new ArrowBuf[2];
buffers[0] = validityBuffer;
buffers[1] = valueBuffer;
}
buffers = new ArrowBuf[2];
buffers[0] = validityBuffer;
buffers[1] = valueBuffer;
if (clear) {
for (final ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain(1);
Expand Down Expand Up @@ -503,7 +499,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
*/
public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(2);
setReaderAndWriterIndex();
result.add(validityBuffer);
result.add(valueBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ public List<ArrowBuf> getFieldBuffers() {
fillHoles(valueCount);

List<ArrowBuf> result = new ArrayList<>(3);
setReaderAndWriterIndex();
result.add(validityBuffer);
result.add(offsetBuffer);
result.add(valueBuffer);
Expand Down Expand Up @@ -638,15 +637,12 @@ public Field getField() {
@Override
public ArrowBuf[] getBuffers(boolean clear) {
final ArrowBuf[] buffers;
fillHoles(valueCount);
setReaderAndWriterIndex();
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
buffers = new ArrowBuf[3];
buffers[0] = validityBuffer;
buffers[1] = offsetBuffer;
buffers[2] = valueBuffer;
}
buffers = new ArrowBuf[3];
buffers[0] = validityBuffer;
buffers[1] = offsetBuffer;
buffers[2] = valueBuffer;
if (clear) {
for (final ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.vector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
Expand Down Expand Up @@ -63,24 +64,26 @@ public ArrowRecordBatch getRecordBatch() {
List<ArrowFieldNode> nodes = new ArrayList<>();
List<ArrowBuf> buffers = new ArrayList<>();
for (FieldVector vector : root.getFieldVectors()) {
appendNodes(vector, nodes, buffers);
ArrowBuf[] fieldBuffers = vector.getBuffers(false);
buffers.addAll(Arrays.asList(fieldBuffers));
int expectedBufferCount = appendNodes(vector, nodes);
if (fieldBuffers.length != expectedBufferCount) {
throw new IllegalArgumentException(String.format(
"wrong number of buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
}
}
return new ArrowRecordBatch(root.getRowCount(), nodes, buffers, alignBuffers);
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
private int appendNodes(FieldVector vector, List<ArrowFieldNode> nodes) {
nodes.add(new ArrowFieldNode(vector.getValueCount(), includeNullCount ? vector.getNullCount() : -1));
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
int expectedBufferCount = TypeLayout.getTypeBufferCount(vector.getField().getType());
if (fieldBuffers.size() != expectedBufferCount) {
throw new IllegalArgumentException(String.format(
"wrong number of buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
}
buffers.addAll(fieldBuffers);

int expected = TypeLayout.getTypeBufferCount(vector.getField().getType());

for (FieldVector child : vector.getChildrenFromFields()) {
appendNodes(child, nodes, buffers);
expected += appendNodes(child, nodes);
}
return expected;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,10 @@ public void reset() {
@Override
public ArrowBuf[] getBuffers(boolean clear) {
final ArrowBuf[] buffers;
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
List<ArrowBuf> list = new ArrayList<>();
list.add(offsetBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
}
List<ArrowBuf> list = new ArrayList<>();
list.add(offsetBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
if (clear) {
for (ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
@Override
public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(1);
setReaderAndWriterIndex();
result.add(validityBuffer);

return result;
Expand Down Expand Up @@ -348,14 +347,10 @@ public void reset() {
public ArrowBuf[] getBuffers(boolean clear) {
setReaderAndWriterIndex();
final ArrowBuf[] buffers;
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
List<ArrowBuf> list = new ArrayList<>();
list.add(validityBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
}
List<ArrowBuf> list = new ArrayList<>();
list.add(validityBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
if (clear) {
for (ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
@Override
public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(2);
setReaderAndWriterIndex();
result.add(validityBuffer);
result.add(offsetBuffer);

Expand Down Expand Up @@ -662,15 +661,11 @@ public void reset() {
public ArrowBuf[] getBuffers(boolean clear) {
setReaderAndWriterIndex();
final ArrowBuf[] buffers;
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
List<ArrowBuf> list = new ArrayList<>();
list.add(offsetBuffer);
list.add(validityBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
}
List<ArrowBuf> list = new ArrayList<>();
list.add(validityBuffer);
list.add(offsetBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this change, the order of validity and offset buffers is changed in the getBuffers method for ListVector which creates problems with serialization/deserialization resulting in test failures in Dremio. This would break backward compatibility with existing serialised files.
Also keeping the existing order in #getBuffers will also break tests since this PR replaces #getFieldBuffers with #getBuffers in VectorUnloader. #getFieldBuffers and #getBuffers currently have different order of buffers while #getFieldBuffers and #loadFieldBuffers same order, and the order should be same for VectorLoader and VectorUnloader.
cc @pravindra @tianchen92

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@projjal Thanks for your testing result!
Seems like a legacy problem here, and I think validityBuffer->offsetBuffer should be the right order considering other vectors.
@jacques-n @pravindra any thoughts on how to resolve the conflicts? :)

list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
if (clear) {
for (ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
@Override
public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(1);
setReaderAndWriterIndex();
result.add(validityBuffer);

return result;
Expand Down Expand Up @@ -296,14 +295,10 @@ public int getValueCapacity() {
public ArrowBuf[] getBuffers(boolean clear) {
setReaderAndWriterIndex();
final ArrowBuf[] buffers;
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
List<ArrowBuf> list = new ArrayList<>();
list.add(validityBuffer);
list.addAll(Arrays.asList(super.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
}
List<ArrowBuf> list = new ArrayList<>();
list.add(validityBuffer);
list.addAll(Arrays.asList(super.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
if (clear) {
for (ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2673,11 +2673,11 @@ public void testUnloadVariableWidthVector() {

varCharVector.set(0, "abcd".getBytes());

List<ArrowBuf> bufs = varCharVector.getFieldBuffers();
assertEquals(3, bufs.size());
ArrowBuf[] bufs = varCharVector.getBuffers(false);
assertEquals(3, bufs.length);

ArrowBuf offsetBuf = bufs.get(1);
ArrowBuf dataBuf = bufs.get(2);
ArrowBuf offsetBuf = bufs[1];
ArrowBuf dataBuf = bufs[2];

assertEquals(12, offsetBuf.writerIndex());
assertEquals(4, offsetBuf.getInt(4));
Expand Down