Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;

Expand Down Expand Up @@ -110,7 +110,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
}

if (dictionaryEncoding != null) {
Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on export of dictionary encoded array");

data.dictionary = ArrowArray.allocateNew(allocator);
Expand Down
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand Down Expand Up @@ -103,7 +103,7 @@ private void doImport(ArrowArray.Snapshot snapshot) {
DictionaryEncoding encoding = vector.getField().getDictionary();
checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary");

Dictionary dictionary = dictionaryProvider.lookup(encoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(encoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary");

// reset the dictionary vector to the initial state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.stream.Collectors;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

Expand All @@ -52,12 +52,12 @@ final class ArrowArrayStreamReader extends ArrowReader {
}

@Override
public Map<Long, Dictionary> getDictionaryVectors() {
public Map<Long, BaseDictionary> getDictionaryVectors() {
return provider.getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(), provider::lookup));
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return provider.lookup(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.Map;
import java.util.Set;

import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.BatchedDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;

/**
Expand All @@ -39,14 +40,14 @@
*/
public class CDataDictionaryProvider implements DictionaryProvider, AutoCloseable {

private final Map<Long, Dictionary> map;
private final Map<Long, BaseDictionary> map;

public CDataDictionaryProvider() {
this.map = new HashMap<>();
}

void put(Dictionary dictionary) {
Dictionary previous = map.put(dictionary.getEncoding().getId(), dictionary);
void put(BaseDictionary dictionary) {
BaseDictionary previous = map.put(dictionary.getEncoding().getId(), dictionary);
if (previous != null) {
previous.getVector().close();
}
Expand All @@ -58,16 +59,25 @@ public final Set<Long> getDictionaryIds() {
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return map.get(id);
}

@Override
public void close() {
for (Dictionary dictionary : map.values()) {
for (BaseDictionary dictionary : map.values()) {
dictionary.getVector().close();
}
map.clear();
}

@Override
public void resetDictionaries() {
map.values().forEach( dictionary -> {
if (dictionary instanceof BatchedDictionary) {
((BatchedDictionary) dictionary).reset();
}
});
}

}
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/SchemaExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.arrow.c.jni.PrivateData;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -95,7 +95,7 @@ void export(ArrowSchema schema, Field field, DictionaryProvider dictionaryProvid
}

if (dictionaryEncoding != null) {
Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
BaseDictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
checkNotNull(dictionary, "Dictionary lookup failed on export of field with dictionary");

data.dictionary = ArrowSchema.allocateNew(allocator);
Expand Down
7 changes: 4 additions & 3 deletions java/c/src/test/java/org/apache/arrow/c/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.compare.Range;
import org.apache.arrow.vector.compare.RangeEqualsVisitor;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowReader;
Expand Down Expand Up @@ -244,7 +245,7 @@ void roundtrip(Schema schema, List<ArrowRecordBatch> batches, DictionaryProvider
}
assertThat(reader.loadNextBatch()).isFalse();
assertThat(reader.getDictionaryIds()).isEqualTo(provider.getDictionaryIds());
for (Map.Entry<Long, Dictionary> entry : reader.getDictionaryVectors().entrySet()) {
for (Map.Entry<Long, BaseDictionary> entry : reader.getDictionaryVectors().entrySet()) {
final FieldVector expected = provider.lookup(entry.getKey()).getVector();
final FieldVector actual = entry.getValue().getVector();
assertVectorsEqual(expected, actual);
Expand Down Expand Up @@ -286,7 +287,7 @@ static class InMemoryArrowReader extends ArrowReader {
}

@Override
public Dictionary lookup(long id) {
public BaseDictionary lookup(long id) {
return provider.lookup(id);
}

Expand All @@ -296,7 +297,7 @@ public Set<Long> getDictionaryIds() {
}

@Override
public Map<Long, Dictionary> getDictionaryVectors() {
public Map<Long, BaseDictionary> getDictionaryVectors() {
return getDictionaryIds().stream().collect(Collectors.toMap(Function.identity(), this::lookup));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void loadRecordBatch(ArrowRecordBatch batch) {
}

@Override
protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch, boolean validateReplacements) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void loadRecordBatch(ArrowRecordBatch batch) {
}

@Override
protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch, boolean validateReplacements) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
Expand Down Expand Up @@ -66,7 +66,7 @@ static Schema generateSchemaMessages(final Schema originalSchema, final FlightDe
}
// Create and write dictionary batches
for (Long id : dictionaryIds) {
final Dictionary dictionary = provider.lookup(id);
final BaseDictionary dictionary = provider.lookup(id);
final FieldVector vector = dictionary.getVector();
final int count = vector.getValueCount();
// Do NOT close this root, as it does not actually own the vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.BaseDictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
Expand Down Expand Up @@ -268,7 +268,7 @@ public boolean next() {
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
}
final Dictionary dictionary = dictionaries.lookup(id);
final BaseDictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
}
Expand Down Expand Up @@ -410,12 +410,12 @@ public void onNext(ArrowMessage msg) {
}

final List<Field> fields = new ArrayList<>();
final Map<Long, Dictionary> dictionaryMap = new HashMap<>();
final Map<Long, BaseDictionary> dictionaryMap = new HashMap<>();
for (final Field originalField : schema.getFields()) {
final Field updatedField = DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap);
fields.add(updatedField);
}
for (final Map.Entry<Long, Dictionary> entry : dictionaryMap.entrySet()) {
for (final Map.Entry<Long, BaseDictionary> entry : dictionaryMap.entrySet()) {
dictionaries.put(entry.getValue());
}
schema = new Schema(fields, schema.getCustomMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,13 @@ public interface ArrowBufHasher {
* @return the hash code.
*/
int hashCode(ArrowBuf buf, long offset, long length);

/**
* Calculates the hash code for a byte array.
* @param buf the non-null byte array.
* @param offset offset within the buffer for the memory region.
* @param length length of the memory region.
* @return the hash code.
*/
int hashCode(byte[] buf, int offset, int length);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public int hashCode(ArrowBuf buf, long offset, long length) {
return hashCode(buf.memoryAddress() + offset, length);
}

@Override
public int hashCode(byte[] buf, int offset, int length) {
return hashCode(buf, offset, length, seed);
}

/**
* Calculates the hash code for a memory region.
* @param buf the buffer for the memory region.
Expand Down Expand Up @@ -107,6 +112,36 @@ public static int hashCode(long address, long length, int seed) {
return finalizeHashCode(hash, length);
}

/**
* Calculates the hash code for a byte array.
* @param buffer the non-null buffer to read.
* @param offset an offset into the byte array.
* @param length length of the memory region.
* @param seed the seed.
* @return the hash code.
*/
public static int hashCode(byte[] buffer, int offset, int length, int seed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding support for byte[] buffers instead of using ArrowBuf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for data that hasn't been written into an ArrowBuf via the Vector.set() APIs yet. In the dictionary map we can hash the original data and only copy it into an ArrowBuf if the hash wasn't present in the map.

int index = offset;
int hash = seed;
while (index + 4 <= length) {
int intValue = readInt(buffer, index, 4);
hash = combineHashCode(hash, intValue);
index += 4;
}

if (index < length) {
// process remaining data as a integer in little endian
int intValue = 0;
for (int i = length - 1; i >= index; i--) {
intValue <<= 8;
intValue |= (buffer[i] & 0x000000ff);
index += 1;
}
hash = combineHashCode(hash, intValue);
}
return finalizeHashCode(hash, length);
}

/**
* Combine the current hash code and a new int value to calculate
* a new hash code.
Expand Down Expand Up @@ -173,4 +208,13 @@ public boolean equals(@Nullable Object o) {
public int hashCode() {
return seed;
}

private static int readInt(byte[] buffer, int offset, int len) {
int result = 0;
for (int i = offset; i < offset + len; i++) {
result <<= 8;
result |= (buffer[i] & 0x000000ff);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.arrow.memory.util.hash;

import java.nio.ByteBuffer;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.util.MemoryUtil;
Expand Down Expand Up @@ -93,6 +94,34 @@ public int hashCode(ArrowBuf buf, long offset, long length) {
return hashCode(buf.memoryAddress() + offset, length);
}

@Override
public int hashCode(byte[] buf, int offset, int length) {
int hashValue = 0;
int index = 0;
while (index + 8 <= length) {
long longValue = ByteBuffer.wrap(buf, offset + index, 8).getLong();
int longHash = getLongHashCode(longValue);
hashValue = combineHashCode(hashValue, longHash);
index += 8;
}

if (index + 4 <= length) {
int intValue = ByteBuffer.wrap(buf, offset + index, 4).getInt();
int intHash = intValue;
hashValue = combineHashCode(hashValue, intHash);
index += 4;
}

while (index < length) {
byte byteValue = buf[index];
int byteHash = byteValue;
hashValue = combineHashCode(hashValue, byteHash);
index += 1;
}

return finalizeHashCode(hashValue);
}

protected int combineHashCode(int currentHashCode, int newHashCode) {
return currentHashCode * 37 + newHashCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public int hashCode(ArrowBuf buf, long offset, long length) {
return SimpleHasher.INSTANCE.hashCode(buf, offset, length);
}

@Override
public int hashCode(byte[] buf, int offset, int length) {
throw new UnsupportedOperationException("Not used in UT.");
}

@Override
public int hashCode() {
return super.hashCode();
Expand Down
Loading