Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*******************************************************************************

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.arrow.vector.complex;

import io.netty.buffer.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.NullableIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Dictionary;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.TransferPair;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class DictionaryVector implements ValueVector {

private ValueVector indices;
private Dictionary dictionary;

public DictionaryVector(ValueVector indices, Dictionary dictionary) {
this.indices = indices;
this.dictionary = dictionary;
}

/**
* Dictionary encodes a vector. The dictionary will be built using the values from the vector.
*
* @param vector vector to encode
* @return dictionary encoded vector
*/
public static DictionaryVector encode(ValueVector vector) {
validateType(vector.getMinorType());
Map<Object, Integer> lookUps = new HashMap<>();
Map<Integer, Integer> transfers = new HashMap<>();

ValueVector.Accessor accessor = vector.getAccessor();
int count = accessor.getValueCount();

NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
indices.allocateNew(count);
NullableIntVector.Mutator mutator = indices.getMutator();

int nextIndex = 0;
for (int i = 0; i < count; i++) {
Object value = accessor.getObject(i);
if (value != null) { // if it's null leave it null
Integer index = lookUps.get(value);
if (index == null) {
index = nextIndex++;
lookUps.put(value, index);
transfers.put(i, index);
}
mutator.set(i, index);
}
}
mutator.setValueCount(count);

// copy the dictionary values into the dictionary vector
TransferPair dictionaryTransfer = vector.getTransferPair(vector.getAllocator());
ValueVector dictionaryVector = dictionaryTransfer.getTo();
dictionaryVector.allocateNewSafe();
for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) {
dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue());
}
dictionaryVector.getMutator().setValueCount(transfers.size());
Dictionary dictionary = new Dictionary(dictionaryVector, false);

return new DictionaryVector(indices, dictionary);
}

/**
* Dictionary encodes a vector with a provided dictionary. The dictionary must contain all values in the vector.
*
* @param vector vector to encode
* @param dictionary dictionary used for encoding
* @return dictionary encoded vector
*/
public static DictionaryVector encode(ValueVector vector, Dictionary dictionary) {
validateType(vector.getMinorType());
// load dictionary values into a hashmap for lookup
ValueVector.Accessor dictionaryAccessor = dictionary.getDictionary().getAccessor();
Map<Object, Integer> lookUps = new HashMap<>(dictionaryAccessor.getValueCount());
for (int i = 0; i < dictionaryAccessor.getValueCount(); i++) {
// for primitive array types we need a wrapper that implements equals and hashcode appropriately
lookUps.put(dictionaryAccessor.getObject(i), i);
}

// vector to hold our indices (dictionary encoded values)
NullableIntVector indices = new NullableIntVector(vector.getField().getName(), vector.getAllocator());
NullableIntVector.Mutator mutator = indices.getMutator();

ValueVector.Accessor accessor = vector.getAccessor();
int count = accessor.getValueCount();

indices.allocateNew(count);

for (int i = 0; i < count; i++) {
Object value = accessor.getObject(i);
if (value != null) { // if it's null leave it null
// note: this may fail if value was not included in the dictionary
mutator.set(i, lookUps.get(value));
}
}
mutator.setValueCount(count);

return new DictionaryVector(indices, dictionary);
}

/**
* Decodes a dictionary encoded array using the provided dictionary.
*
* @param indices dictionary encoded values, must be int type
* @param dictionary dictionary used to decode the values
* @return vector with values restored from dictionary
*/
public static ValueVector decode(ValueVector indices, Dictionary dictionary) {
ValueVector.Accessor accessor = indices.getAccessor();
int count = accessor.getValueCount();
ValueVector dictionaryVector = dictionary.getDictionary();
// copy the dictionary values into the decoded vector
TransferPair transfer = dictionaryVector.getTransferPair(indices.getAllocator());
transfer.getTo().allocateNewSafe();
for (int i = 0; i < count; i++) {
Object index = accessor.getObject(i);
if (index != null) {
transfer.copyValueSafe(((Number) index).intValue(), i);
}
}

ValueVector decoded = transfer.getTo();
decoded.getMutator().setValueCount(count);
return decoded;
}

private static void validateType(MinorType type) {
// byte arrays don't work as keys in our dictionary map - we could wrap them with something to
// implement equals and hashcode if we want that functionality
if (type == MinorType.VARBINARY || type == MinorType.LIST || type == MinorType.MAP || type == MinorType.UNION) {
throw new IllegalArgumentException("Dictionary encoding for complex types not implemented");
}
}

public ValueVector getIndexVector() { return indices; }

public ValueVector getDictionaryVector() { return dictionary.getDictionary(); }

public Dictionary getDictionary() { return dictionary; }

@Override
public MinorType getMinorType() { return indices.getMinorType(); }

@Override
public Field getField() { return indices.getField(); }

// note: dictionary vector is not closed, as it may be shared
@Override
public void close() { indices.close(); }

@Override
public void allocateNew() throws OutOfMemoryException { indices.allocateNew(); }

@Override
public boolean allocateNewSafe() { return indices.allocateNewSafe(); }

@Override
public BufferAllocator getAllocator() { return indices.getAllocator(); }

@Override
public void setInitialCapacity(int numRecords) { indices.setInitialCapacity(numRecords); }

@Override
public int getValueCapacity() { return indices.getValueCapacity(); }

@Override
public int getBufferSize() { return indices.getBufferSize(); }

@Override
public int getBufferSizeFor(int valueCount) { return indices.getBufferSizeFor(valueCount); }

@Override
public Iterator<ValueVector> iterator() {
return indices.iterator();
}

@Override
public void clear() { indices.clear(); }

@Override
public TransferPair getTransferPair(BufferAllocator allocator) { return indices.getTransferPair(allocator); }

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return indices.getTransferPair(ref, allocator); }

@Override
public TransferPair makeTransferPair(ValueVector target) { return indices.makeTransferPair(target); }

@Override
public Accessor getAccessor() { return indices.getAccessor(); }

@Override
public Mutator getMutator() { return indices.getMutator(); }

@Override
public FieldReader getReader() { return indices.getReader(); }

@Override
public ArrowBuf[] getBuffers(boolean clear) { return indices.getBuffers(clear); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*******************************************************************************

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.arrow.vector.types;

import org.apache.arrow.vector.ValueVector;

public class Dictionary {

private ValueVector dictionary;
private boolean ordered;

public Dictionary(ValueVector dictionary, boolean ordered) {
this.dictionary = dictionary;
this.ordered = ordered;
}

public ValueVector getDictionary() {
return dictionary;
}

public boolean isOrdered() {
return ordered;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.List;
import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.arrow.flatbuf.DictionaryEncoding;
import org.apache.arrow.vector.schema.TypeLayout;
import org.apache.arrow.vector.schema.VectorLayout;

Expand All @@ -37,6 +40,7 @@ public class Field {
private final String name;
private final boolean nullable;
private final ArrowType type;
private final Long dictionary;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dictionary id is what goes in the IPC metadata, but how will this work in a strictly in-memory setting? In normal use, when you create a DictionaryVector field, the id will be null here. Then in the file/stream adapter code, you will need to either use the dictionary id here or come up with a new one.

At least in C++, I have planned for the dictionary id to be an encapsulated detail of the stream/file formats, but is otherwise not part of the public API. So I guess what I am asking is whether the dictionary id might have any use in Java outside of files and streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may also be a bit confused -- if these Field objects are only used for record batch disassembly and reassembly, then these changes are fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that the dictionary id has any use outside the stream/file formats. I was hesitant to mess with the stream/file encoding, so it seemed easier to add it here. I'm not sure of the repercussions either way...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With fresh eyes, I think it's OK to leave this here for now. We'll have to resolve some implementation questions in the course of handling dictionaries in the file/stream code

private final List<Field> children;
private final TypeLayout typeLayout;

Expand All @@ -45,11 +49,13 @@ private Field(
@JsonProperty("name") String name,
@JsonProperty("nullable") boolean nullable,
@JsonProperty("type") ArrowType type,
@JsonProperty("dictionary") Long dictionary,
@JsonProperty("children") List<Field> children,
@JsonProperty("typeLayout") TypeLayout typeLayout) {
this.name = name;
this.nullable = nullable;
this.type = checkNotNull(type);
this.dictionary = dictionary;
if (children == null) {
this.children = ImmutableList.of();
} else {
Expand All @@ -59,13 +65,22 @@ private Field(
}

public Field(String name, boolean nullable, ArrowType type, List<Field> children) {
this(name, nullable, type, children, TypeLayout.getTypeLayout(checkNotNull(type)));
this(name, nullable, type, null, children, TypeLayout.getTypeLayout(checkNotNull(type)));
}

public Field(String name, boolean nullable, ArrowType type, Long dictionary, List<Field> children) {
this(name, nullable, type, dictionary, children, TypeLayout.getTypeLayout(checkNotNull(type)));
}

public static Field convertField(org.apache.arrow.flatbuf.Field field) {
String name = field.name();
boolean nullable = field.nullable();
ArrowType type = getTypeForField(field);
DictionaryEncoding dictionaryEncoding = field.dictionary();
Long dictionary = null;
if (dictionaryEncoding != null) {
dictionary = dictionaryEncoding.id();
}
ImmutableList.Builder<org.apache.arrow.vector.schema.VectorLayout> layout = ImmutableList.builder();
for (int i = 0; i < field.layoutLength(); ++i) {
layout.add(new org.apache.arrow.vector.schema.VectorLayout(field.layout(i)));
Expand All @@ -75,8 +90,7 @@ public static Field convertField(org.apache.arrow.flatbuf.Field field) {
childrenBuilder.add(convertField(field.children(i)));
}
List<Field> children = childrenBuilder.build();
Field result = new Field(name, nullable, type, children, new TypeLayout(layout.build()));
return result;
return new Field(name, nullable, type, dictionary, children, new TypeLayout(layout.build()));
}

public void validate() {
Expand All @@ -89,6 +103,11 @@ public void validate() {
public int getField(FlatBufferBuilder builder) {
int nameOffset = name == null ? -1 : builder.createString(name);
int typeOffset = type.getType(builder);
int dictionaryOffset = -1;
if (dictionary != null) {
builder.addLong(dictionary);
dictionaryOffset = builder.offset();
}
int[] childrenData = new int[children.size()];
for (int i = 0; i < children.size(); i++) {
childrenData[i] = children.get(i).getField(builder);
Expand All @@ -107,6 +126,9 @@ public int getField(FlatBufferBuilder builder) {
org.apache.arrow.flatbuf.Field.addNullable(builder, nullable);
org.apache.arrow.flatbuf.Field.addTypeType(builder, type.getTypeID().getFlatbufID());
org.apache.arrow.flatbuf.Field.addType(builder, typeOffset);
if (dictionary != null) {
org.apache.arrow.flatbuf.Field.addDictionary(builder, dictionaryOffset);
}
org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset);
org.apache.arrow.flatbuf.Field.addLayout(builder, layoutOffset);
return org.apache.arrow.flatbuf.Field.endField(builder);
Expand All @@ -124,6 +146,9 @@ public ArrowType getType() {
return type;
}

@JsonInclude(Include.NON_NULL)
public Long getDictionary() { return dictionary; }

public List<Field> getChildren() {
return children;
}
Expand All @@ -141,6 +166,7 @@ public boolean equals(Object obj) {
return Objects.equals(this.name, that.name) &&
Objects.equals(this.nullable, that.nullable) &&
Objects.equals(this.type, that.type) &&
Objects.equals(this.dictionary, that.dictionary) &&
(Objects.equals(this.children, that.children) ||
(this.children == null && that.children.size() == 0) ||
(this.children.size() == 0 && that.children == null));
Expand All @@ -153,6 +179,9 @@ public String toString() {
sb.append(name).append(": ");
}
sb.append(type);
if (dictionary != null) {
sb.append("[dictionary: ").append(dictionary).append("]");
}
if (!children.isEmpty()) {
sb.append("<").append(Joiner.on(", ").join(children)).append(">");
}
Expand Down
Loading