-
Notifications
You must be signed in to change notification settings - Fork 843
Closed
apache/incubator-heron
#3683Labels
Description
Using com.esotericsoftware.kryo.util.Pool with threadSafe = true and softReferences = false and a limit.
java.lang.IllegalStateException: Queue full
at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.esotericsoftware.kryo.util.Pool$1.add(Pool.java:52)
at com.esotericsoftware.kryo.util.Pool$SoftReferenceQueue.offer(Pool.java:168)
at com.esotericsoftware.kryo.util.Pool.free(Pool.java:95)
at serializers.KryoSnappyRedisSerializer.serialize(KryoSnappyRedisSerializer.java:79)
I'm running following code under JMH with 8 concurrent threads. Is that correct behavior?
package tech.viacomcbs.serializers;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.SerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
import com.esotericsoftware.kryo.util.Pool;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class KryoRedisSerializer implements RedisSerializer<Object> {
public static final Logger log = LoggerFactory.getLogger(KryoRedisSerializer.class);
static final byte[] EMPTY_ARRAY = new byte[0];
private static final Pool<Kryo> kryoPool = new Pool<Kryo>(true, true, 16) {
@Override
protected Kryo create() {
Kryo kryo = new Kryo();
try {
kryo.setRegistrationRequired(false);
CompatibleFieldSerializer.CompatibleFieldSerializerConfig config = new CompatibleFieldSerializer.CompatibleFieldSerializerConfig();
config.setExtendedFieldNames(false);
config.setReadUnknownFieldData(true);
kryo.setDefaultSerializer(new SerializerFactory.CompatibleFieldSerializerFactory(config));
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return kryo;
}
};
Pool<Output> outputPool = new Pool<Output>(true, false, 16) {
protected Output create () {
return new Output(1024, -1);
}
};
Pool<Input> inputPool = new Pool<Input>(true, false, 16) {
protected Input create () {
return new Input(4096);
}
};
@Override
public byte[] serialize(Object t) throws SerializationException {
if (t == null) {
return EMPTY_ARRAY;
} else {
Kryo kryo = null;
Output output = null;
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
kryo = kryoPool.obtain();
output = outputPool.obtain();
output.setOutputStream(stream);
kryo.writeClassAndObject(output, t);
output.flush();
return stream.toByteArray();
} catch (IOException e) {
throw new SerializationException("Unable to close stream", e);
} finally {
if (kryo != null) {
kryoPool.free(kryo);
}
if (output != null) {
outputPool.free(output);
}
}
}
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length == 0) {
return null;
} else {
Kryo kryo = null;
Input input = null;
try {
kryo = kryoPool.obtain();
input = inputPool.obtain();
input.setBuffer(bytes);
return kryo.readClassAndObject(input);
} finally {
if (kryo != null) {
kryoPool.free(kryo);
}
if (input != null) {
inputPool.free(input);
}
}
}
}
}