Skip to content

Commit 609f033

Browse files
authored
Hybrid Java buffer release: Support both manual and auto release at the same time (apache#34)
* Revert "Add AutoBufferLedger (apache#31)" This reverts commit e48da37. * Commit 1 * Commit 2 * Fix config builder visibility in Scala * Commit 2 Fixup * Commit 3 * Commit 3 Fixup
1 parent a11e628 commit 609f033

File tree

42 files changed

+2228
-1673
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2228
-1673
lines changed

java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public void retain(int increment) {
7878
bufRefCnt.addAndGet(increment);
7979
}
8080

81+
@Override
82+
public boolean isOpen() {
83+
return getRefCount() > 0;
84+
}
85+
8186
@Override
8287
public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
8388
retain();

java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.lang.reflect.Method;
2222
import java.util.concurrent.atomic.AtomicLong;
2323

24-
import org.apache.arrow.memory.util.MemoryUtil;
2524
import org.apache.arrow.util.VisibleForTesting;
2625

2726
/**
@@ -30,6 +29,20 @@
3029
* "-XX:MaxDirectMemorySize".
3130
*/
3231
public class DirectReservationListener implements ReservationListener {
32+
private final Method methodReserve;
33+
private final Method methodUnreserve;
34+
35+
private DirectReservationListener() {
36+
try {
37+
final Class<?> classBits = Class.forName("java.nio.Bits");
38+
methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
39+
methodReserve.setAccessible(true);
40+
methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
41+
methodUnreserve.setAccessible(true);
42+
} catch (Exception e) {
43+
throw new RuntimeException(e);
44+
}
45+
}
3346

3447
private static final DirectReservationListener INSTANCE = new DirectReservationListener();
3548

@@ -42,22 +55,43 @@ public static DirectReservationListener instance() {
4255
*/
4356
@Override
4457
public void reserve(long size) {
45-
MemoryUtil.reserveDirectMemory(size);
58+
try {
59+
if (size > Integer.MAX_VALUE) {
60+
throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
61+
}
62+
methodReserve.invoke(null, (int) size, (int) size);
63+
} catch (Exception e) {
64+
throw new RuntimeException(e);
65+
}
4666
}
4767

4868
/**
4969
* Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory.
5070
*/
5171
@Override
5272
public void unreserve(long size) {
53-
MemoryUtil.unreserveDirectMemory(size);
73+
try {
74+
if (size > Integer.MAX_VALUE) {
75+
throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
76+
}
77+
methodUnreserve.invoke(null, (int) size, (int) size);
78+
} catch (Exception e) {
79+
throw new RuntimeException(e);
80+
}
5481
}
5582

5683
/**
5784
* Get current reservation of jVM direct memory. Visible for testing.
5885
*/
5986
@VisibleForTesting
6087
public long getCurrentDirectMemReservation() {
61-
return MemoryUtil.getCurrentDirectMemReservation();
88+
try {
89+
final Class<?> classBits = Class.forName("java.nio.Bits");
90+
final Field f = classBits.getDeclaredField("reservedMemory");
91+
f.setAccessible(true);
92+
return ((AtomicLong) f.get(null)).get();
93+
} catch (Exception e) {
94+
throw new RuntimeException(e);
95+
}
6296
}
6397
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.dataset.jni;
19+
20+
import org.apache.arrow.memory.*;
21+
22+
/**
23+
* MemoryChunkManager implementation for native allocated memory.
24+
*/
25+
public class NativeUnderlyingMemory implements MemoryChunk {
26+
27+
private final int size;
28+
private final long nativeInstanceId;
29+
private final long address;
30+
31+
/**
32+
* Constructor.
33+
*
34+
* @param size Size of underlying memory (in bytes)
35+
* @param nativeInstanceId ID of the native instance
36+
* @param address Address of underlying memory
37+
*/
38+
NativeUnderlyingMemory(int size, long nativeInstanceId, long address) {
39+
this.size = size;
40+
this.nativeInstanceId = nativeInstanceId;
41+
this.address = address;
42+
}
43+
44+
@Override
45+
public long size() {
46+
return size;
47+
}
48+
49+
@Override
50+
public long memoryAddress() {
51+
return address;
52+
}
53+
54+
@Override
55+
public void destroy() {
56+
JniWrapper.get().releaseBuffer(nativeInstanceId);
57+
}
58+
}

java/dataset/src/main/java/org/apache/arrow/dataset/jni/UnsafeRecordBatchSerializer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
import org.apache.arrow.flatbuf.Message;
3636
import org.apache.arrow.flatbuf.MessageHeader;
3737
import org.apache.arrow.flatbuf.RecordBatch;
38-
import org.apache.arrow.memory.*;
38+
import org.apache.arrow.memory.ArrowBuf;
39+
import org.apache.arrow.memory.BufferAllocator;
3940
import org.apache.arrow.memory.util.LargeMemoryUtil;
4041
import org.apache.arrow.util.Preconditions;
4142
import org.apache.arrow.vector.compression.NoCompressionCodec;
@@ -126,10 +127,9 @@ public static ArrowRecordBatch deserializeUnsafe(
126127
final byte[] refDecoded = Base64.getDecoder().decode(keyValue.value());
127128
final long nativeBufferRef = ByteBuffer.wrap(refDecoded).order(ByteOrder.LITTLE_ENDIAN).getLong();
128129
final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length());
129-
final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
130-
size, nativeBufferRef, bufferMeta.offset());
131-
ReferenceManager rm = am.createReferenceManager(allocator);
132-
ArrowBuf buf = new ArrowBuf(rm, null, size, bufferMeta.offset());
130+
final NativeUnderlyingMemory chunk = new NativeUnderlyingMemory(size, nativeBufferRef,
131+
bufferMeta.offset());
132+
ArrowBuf buf = allocator.buffer(chunk);
133133
buffers.add(buf);
134134
}
135135

java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java

Lines changed: 0 additions & 81 deletions
This file was deleted.

java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java renamed to java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeUnderlyingMemory.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.arrow.memory;
18+
package org.apache.arrow.dataset.jni;
1919

2020
import static org.junit.Assert.*;
2121

22+
import org.apache.arrow.memory.*;
2223
import org.junit.After;
2324
import org.junit.Before;
2425
import org.junit.Test;
@@ -46,41 +47,41 @@ public void testReservation() {
4647
final RootAllocator root = rootAllocator();
4748

4849
final int size = 512;
49-
final AllocationManager am = new MockUnderlyingMemory(root, size);
50-
final BufferLedger ledger = am.associate(root);
50+
final MemoryChunk chunk = new MockUnderlyingMemory(size);
51+
final ArrowBuf buffer = root.buffer(chunk);
5152

5253
assertEquals(size, root.getAllocatedMemory());
5354

54-
ledger.release();
55+
buffer.close();
5556
}
5657

5758
@Test
5859
public void testBufferTransfer() {
5960
final RootAllocator root = rootAllocator();
6061

61-
ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
62-
ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
62+
BufferAllocator allocator1 = root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
63+
BufferAllocator allocator2 = root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
6364
assertEquals(0, allocator1.getAllocatedMemory());
6465
assertEquals(0, allocator2.getAllocatedMemory());
6566

6667
final int size = 512;
67-
final AllocationManager am = new MockUnderlyingMemory(allocator1, size);
68+
final MemoryChunk chunk = new MockUnderlyingMemory(size);
69+
final ArrowBuf buffer = allocator1.buffer(chunk);
6870

69-
final BufferLedger owningLedger = am.associate(allocator1);
70-
assertEquals(size, owningLedger.getAccountedSize());
71-
assertEquals(size, owningLedger.getSize());
71+
assertEquals(size, buffer.getActualMemoryConsumed());
72+
assertEquals(size, buffer.getPossibleMemoryConsumed());
7273
assertEquals(size, allocator1.getAllocatedMemory());
7374

74-
final BufferLedger transferredLedger = am.associate(allocator2);
75-
owningLedger.release(); // release previous owner
76-
assertEquals(0, owningLedger.getAccountedSize());
77-
assertEquals(size, owningLedger.getSize());
78-
assertEquals(size, transferredLedger.getAccountedSize());
79-
assertEquals(size, transferredLedger.getSize());
75+
final ArrowBuf transferredBuffer = buffer.getReferenceManager().retain(buffer, allocator2);
76+
buffer.close(); // release previous owner
77+
assertEquals(0, buffer.getActualMemoryConsumed());
78+
assertEquals(size, buffer.getPossibleMemoryConsumed());
79+
assertEquals(size, transferredBuffer.getActualMemoryConsumed());
80+
assertEquals(size, transferredBuffer.getPossibleMemoryConsumed());
8081
assertEquals(0, allocator1.getAllocatedMemory());
8182
assertEquals(size, allocator2.getAllocatedMemory());
8283

83-
transferredLedger.release();
84+
transferredBuffer.close();
8485
allocator1.close();
8586
allocator2.close();
8687
}
@@ -93,18 +94,18 @@ private static class MockUnderlyingMemory extends NativeUnderlyingMemory {
9394
/**
9495
* Constructor.
9596
*/
96-
MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) {
97-
super(accountingAllocator, size, -1L, -1L);
97+
MockUnderlyingMemory(int size) {
98+
super(size, -1L, -1L);
9899
}
99100

100101
@Override
101-
protected void release0() {
102-
System.out.println("Underlying memory released. Size: " + getSize());
102+
public void destroy() {
103+
System.out.println("Underlying memory released. Size: " + size());
103104
}
104105

105106
@Override
106-
protected long memoryAddress() {
107-
throw new UnsupportedOperationException();
107+
public long memoryAddress() {
108+
return -1L;
108109
}
109110
}
110111
}

java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ AllocationOutcome allocateBytes(long size) {
9797
} else {
9898
// Try again, but with details this time.
9999
// Populating details only on failures avoids performance overhead in the common case (success case).
100-
AllocationOutcomeDetails details = new AllocationOutcomeDetails();
100+
AllocationOutcomeDetails details = new AllocationOutcomeDetails(this);
101101
status = allocateBytesInternal(size, details);
102102
return new AllocationOutcome(status, details);
103103
}

0 commit comments

Comments
 (0)