Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package io.netty.buffer;

import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;

import org.apache.arrow.memory.OutOfMemoryException;

Expand All @@ -37,7 +39,109 @@ public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow" +
".allocator");

private static final int DEFAULT_NUM_HEAP_ARENA;
private static final int DEFAULT_NUM_DIRECT_ARENA;

private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = 64;

private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);

static {
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null;
try {
validateAndCalculatePageShifts(defaultPageSize);
} catch (Throwable t) {
pageSizeFallbackCause = t;
defaultPageSize = 8192;
}
DEFAULT_PAGE_SIZE = defaultPageSize;

int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
Throwable maxOrderFallbackCause = null;
try {
validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
} catch (Throwable t) {
maxOrderFallbackCause = t;
defaultMaxOrder = 11;
}
DEFAULT_MAX_ORDER = defaultMaxOrder;

// Determine reasonable default for nHeapArena and nDirectArena.
// Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
final Runtime runtime = Runtime.getRuntime();

// Use 2 * cores by default to reduce condition as we use 2 * cores for the number of EventLoops
// in NIO and EPOLL as well. If we choose a smaller number we will run into hotspots as allocation and
// deallocation needs to be synchronized on the PoolArena.
// See https://github.com/netty/netty/issues/3888
final int defaultMinNumArena = runtime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

// cache sizes
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);

// the number of threshold of allocations when cached entries will be freed up if not frequently used
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);

DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true);


if (memoryLogger.isDebugEnabled()) {
memoryLogger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
memoryLogger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
if (pageSizeFallbackCause == null) {
memoryLogger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
} else {
memoryLogger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
}
if (maxOrderFallbackCause == null) {
memoryLogger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
} else {
memoryLogger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
}
memoryLogger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
memoryLogger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
memoryLogger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
memoryLogger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
memoryLogger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
memoryLogger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
memoryLogger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
}
}

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;

public final UnsafeDirectLittleEndian empty;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
Expand Down Expand Up @@ -79,6 +183,38 @@ public long getNormalBufferCount() {
return normalBufferSize.get();
}

private static int validateAndCalculatePageShifts(int pageSize) {
if (pageSize < MIN_PAGE_SIZE) {
throw new IllegalArgumentException(
"pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")");
}

if ((pageSize & pageSize - 1) != 0) {
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
}

// Logarithm base 2. At this point we know that pageSize is a power of two.
return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
}

private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
if (maxOrder > 14) {
throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
}

// Ensure the resulting chunkSize does not overflow.
int chunkSize = pageSize;
for (int i = maxOrder; i > 0; i--) {
if (chunkSize > MAX_CHUNK_SIZE / 2) {
throw new IllegalArgumentException(
String.format("pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder,
MAX_CHUNK_SIZE));
}
chunkSize <<= 1;
}
return chunkSize;
}

private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {

private final long initialCapacity;
Expand Down Expand Up @@ -129,7 +265,7 @@ private class InnerAllocator extends PooledByteBufAllocator {
private final int chunkSize;

public InnerAllocator() {
super(true);
super(true, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);

try {
Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TestBaseAllocator {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);

private final static int MAX_ALLOCATION = 8 * 1024;
private static final long CACHE_ALIGN = 64;

/*
// ---------------------------------------- DEBUG -----------------------------------
Expand Down Expand Up @@ -657,4 +658,29 @@ public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
}

@Test
public void testMemoryOffset() {
try (final BufferAllocator allocator = new RootAllocator(10000)) {
int[] reqCapacities = { 0, 5, 15, 67, 130, 510, 1024, 1023, 1025 };
int[] expectedResult = { 0, 8, 16, 128, 256, 512, 1024, 1024, 2048 };

for (int i = 0; i < reqCapacities.length; i++) {
ArrowBuf buf = allocator.buffer(reqCapacities[i]);
if (buf.capacity() > 0) {
long memory = buf.memoryAddress();
int delta = (int) (memory % CACHE_ALIGN);
assertTrue("memory is not align to 64-bytes", delta == 0);
assertEquals(expectedResult[i], buf.capacity());
int size = buf.capacity();
if (size >= CACHE_ALIGN) {
delta = (int) (size % CACHE_ALIGN);
assertTrue(" size is not align to 64-bytes ", delta == 0);
}
}
buf.release();
}
}
}

}
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.41.Final</version>
<version>4.0.45.Final</version>
</dependency>

<dependency>
Expand Down