Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.arrow.vector.file.json;

import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ValueVector.Mutator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.schema.ArrowVectorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.google.common.base.Objects;

public class JsonFileReader {
private final File inputFile;
private final JsonParser parser;
private final BufferAllocator allocator;
private Schema schema;

public JsonFileReader(File inputFile, BufferAllocator allocator) throws JsonParseException, IOException {
super();
this.inputFile = inputFile;
this.allocator = allocator;
MappingJsonFactory jsonFactory = new MappingJsonFactory();
this.parser = jsonFactory.createParser(inputFile);
}

public Schema start() throws JsonParseException, IOException {
readToken(START_OBJECT);
{
this.schema = readNextField("schema", Schema.class);
nextFieldIs("batches");
readToken(START_ARRAY);
return schema;
}
}

public VectorSchemaRoot read() throws IOException {
VectorSchemaRoot recordBatch = new VectorSchemaRoot(schema, allocator);
readToken(START_OBJECT);
{
int count = readNextField("count", Integer.class);
recordBatch.setRowCount(count);
nextFieldIs("columns");
readToken(START_ARRAY);
{
for (Field field : schema.getFields()) {
FieldVector vector = recordBatch.getVector(field.getName());
readVector(field, vector);
}
}
readToken(END_ARRAY);
}
readToken(END_OBJECT);
return recordBatch;
}

private void readVector(Field field, FieldVector vector) throws JsonParseException, IOException {
List<ArrowVectorType> vectorTypes = field.getTypeLayout().getVectorTypes();
List<BufferBacked> fieldInnerVectors = vector.getFieldInnerVectors();
if (vectorTypes.size() != fieldInnerVectors.size()) {
throw new IllegalArgumentException("vector types and inner vectors are not the same size: " + vectorTypes.size() + " != " + fieldInnerVectors.size());
}
readToken(START_OBJECT);
{
String name = readNextField("name", String.class);
if (!Objects.equal(field.getName(), name)) {
throw new IllegalArgumentException("Expected field " + field.getName() + " but got " + name);
}
int count = readNextField("count", Integer.class);
for (int v = 0; v < vectorTypes.size(); v++) {
ArrowVectorType vectorType = vectorTypes.get(v);
BufferBacked innerVector = fieldInnerVectors.get(v);
nextFieldIs(vectorType.getName());
readToken(START_ARRAY);
ValueVector valueVector = (ValueVector)innerVector;
valueVector.allocateNew();
Mutator mutator = valueVector.getMutator();
mutator.setValueCount(count);
for (int i = 0; i < count; i++) {
parser.nextToken();
setValueFromParser(valueVector, i);
}
readToken(END_ARRAY);
}
// if children
List<Field> fields = field.getChildren();
if (!fields.isEmpty()) {
List<FieldVector> vectorChildren = vector.getChildrenFromFields();
if (fields.size() != vectorChildren.size()) {
throw new IllegalArgumentException("fields and children are not the same size: " + fields.size() + " != " + vectorChildren.size());
}
nextFieldIs("children");
readToken(START_ARRAY);
for (int i = 0; i < fields.size(); i++) {
Field childField = fields.get(i);
FieldVector childVector = vectorChildren.get(i);
readVector(childField, childVector);
}
readToken(END_ARRAY);
}
}
readToken(END_OBJECT);
}

private void setValueFromParser(ValueVector valueVector, int i) throws IOException {
switch (valueVector.getMinorType()) {
case BIT:
((BitVector)valueVector).getMutator().set(i, parser.readValueAs(Boolean.class) ? 1 : 0);
break;
case TINYINT:
((TinyIntVector)valueVector).getMutator().set(i, parser.readValueAs(Integer.class));
break;
case SMALLINT:
((SmallIntVector)valueVector).getMutator().set(i, parser.readValueAs(Integer.class));
break;
case INT:
((IntVector)valueVector).getMutator().set(i, parser.readValueAs(Integer.class));
break;
case BIGINT:
((BigIntVector)valueVector).getMutator().set(i, parser.readValueAs(Long.class));
break;
case UINT1:
((UInt1Vector)valueVector).getMutator().set(i, parser.readValueAs(Integer.class));
break;
case UINT2:
((UInt2Vector)valueVector).getMutator().set(i, parser.readValueAs(Integer.class));
break;
case UINT4:
((UInt4Vector)valueVector).getMutator().set(i, parser.readValueAs(Integer.class));
break;
case UINT8:
((UInt8Vector)valueVector).getMutator().set(i, parser.readValueAs(Long.class));
break;
case FLOAT4:
((Float4Vector)valueVector).getMutator().set(i, parser.readValueAs(Float.class));
break;
case FLOAT8:
((Float8Vector)valueVector).getMutator().set(i, parser.readValueAs(Double.class));
break;
case VARCHAR:
((VarCharVector)valueVector).getMutator().setSafe(i, parser.readValueAs(String.class).getBytes(UTF_8));
break;
case TIMESTAMP:
((TimeStampVector)valueVector).getMutator().set(i, parser.readValueAs(Long.class));
break;
default:
throw new UnsupportedOperationException("minor type: " + valueVector.getMinorType());
}
}

public void close() throws IOException {
readToken(END_ARRAY);
readToken(END_OBJECT);
parser.close();
}

private <T> T readNextField(String expectedFieldName, Class<T> c) throws IOException, JsonParseException {
nextFieldIs(expectedFieldName);
parser.nextToken();
return parser.readValueAs(c);
}

private void nextFieldIs(String expectedFieldName) throws IOException, JsonParseException {
String name = parser.nextFieldName();
if (name == null || !name.equals(expectedFieldName)) {
throw new IllegalStateException("Expected " + expectedFieldName + " but got " + name);
}
}

private void readToken(JsonToken expected) throws JsonParseException, IOException {
JsonToken t = parser.nextToken();
if (t != expected) {
throw new IllegalStateException("Expected " + expected + " but got " + t);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.arrow.vector.file.json;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ValueVector.Accessor;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.schema.ArrowVectorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
import com.fasterxml.jackson.databind.MappingJsonFactory;

public class JsonFileWriter {

public static final class JSONWriteConfig {
private final boolean pretty;
private JSONWriteConfig(boolean pretty) {
this.pretty = pretty;
}
private JSONWriteConfig() {
this.pretty = false;
}
public JSONWriteConfig pretty(boolean pretty) {
return new JSONWriteConfig(pretty);
}
}

public static JSONWriteConfig config() {
return new JSONWriteConfig();
}

private final JsonGenerator generator;
private Schema schema;

public JsonFileWriter(File outputFile) throws IOException {
this(outputFile, config());
}

public JsonFileWriter(File outputFile, JSONWriteConfig config) throws IOException {
MappingJsonFactory jsonFactory = new MappingJsonFactory();
this.generator = jsonFactory.createGenerator(outputFile, JsonEncoding.UTF8);
if (config.pretty) {
DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
prettyPrinter.indentArraysWith(NopIndenter.instance);
this.generator.setPrettyPrinter(prettyPrinter);
}
}

public void start(Schema schema) throws IOException {
this.schema = schema;
generator.writeStartObject();
generator.writeObjectField("schema", schema);
generator.writeArrayFieldStart("batches");
}

public void write(VectorSchemaRoot recordBatch) throws IOException {
if (!recordBatch.getSchema().equals(schema)) {
throw new IllegalArgumentException("record batches must have the same schema: " + schema);
}
generator.writeStartObject();
{
generator.writeObjectField("count", recordBatch.getRowCount());
generator.writeArrayFieldStart("columns");
for (Field field : schema.getFields()) {
FieldVector vector = recordBatch.getVector(field.getName());
writeVector(field, vector);
}
generator.writeEndArray();
}
generator.writeEndObject();
}

private void writeVector(Field field, FieldVector vector) throws IOException {
List<ArrowVectorType> vectorTypes = field.getTypeLayout().getVectorTypes();
List<BufferBacked> fieldInnerVectors = vector.getFieldInnerVectors();
if (vectorTypes.size() != fieldInnerVectors.size()) {
throw new IllegalArgumentException("vector types and inner vectors are not the same size: " + vectorTypes.size() + " != " + fieldInnerVectors.size());
}
generator.writeStartObject();
{
generator.writeObjectField("name", field.getName());
int valueCount = vector.getAccessor().getValueCount();
generator.writeObjectField("count", valueCount);
for (int v = 0; v < vectorTypes.size(); v++) {
ArrowVectorType vectorType = vectorTypes.get(v);
BufferBacked innerVector = fieldInnerVectors.get(v);
generator.writeArrayFieldStart(vectorType.getName());
ValueVector valueVector = (ValueVector)innerVector;
for (int i = 0; i < valueCount; i++) {
writeValueToGenerator(valueVector, i);
}
generator.writeEndArray();
}
List<Field> fields = field.getChildren();
List<FieldVector> children = vector.getChildrenFromFields();
if (fields.size() != children.size()) {
throw new IllegalArgumentException("fields and children are not the same size: " + fields.size() + " != " + children.size());
}
if (fields.size() > 0) {
generator.writeArrayFieldStart("children");
for (int i = 0; i < fields.size(); i++) {
Field childField = fields.get(i);
FieldVector childVector = children.get(i);
writeVector(childField, childVector);
}
generator.writeEndArray();
}
}
generator.writeEndObject();
}

private void writeValueToGenerator(ValueVector valueVector, int i) throws IOException {
switch (valueVector.getMinorType()) {
case TIMESTAMP:
generator.writeNumber(((TimeStampVector)valueVector).getAccessor().get(i));
break;
case BIT:
generator.writeNumber(((BitVector)valueVector).getAccessor().get(i));
break;
default:
// TODO: each type
Accessor accessor = valueVector.getAccessor();
Object value = accessor.getObject(i);
if (value instanceof Number || value instanceof Boolean) {
generator.writeObject(value);
} else {
generator.writeObject(value.toString());
}
break;
}
}

public void close() throws IOException {
generator.writeEndArray();
generator.writeEndObject();
generator.close();
}

}
Loading