Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator {
private final BaseAllocator parentAllocator;
private final Map<BaseAllocator, Object> childAllocators;
private final ArrowBuf empty;

// linked list to hold references to alive ledgers
private BufferLedger tail = null;

private final Object REF_LOCK = new Object();

// members used purely for debugging
private final IdentityHashMap<BufferLedger, Object> childLedgers;
private final IdentityHashMap<Reservation, Object> reservations;
Expand Down Expand Up @@ -177,13 +183,50 @@ public ArrowBuf getEmpty() {
return empty;
}

private void appendToRefList(BufferLedger ledger) {
synchronized (REF_LOCK) {
if (ledger.next != null || ledger.prev != null) {
throw new IllegalStateException("already linked");
}
if (tail == null) {
tail = ledger;
return;
}
tail.next = ledger;
ledger.prev = tail;
tail = ledger;
}
}

private void removeFromRefList(BufferLedger ledger) {
synchronized (REF_LOCK) {
if (ledger.next == ledger) {
return;
}
if (ledger.prev == ledger) {
throw new IllegalStateException();
}
if (ledger == tail) {
tail = ledger.prev;
}
if (ledger.prev != null) {
ledger.prev.next = ledger.next;
}
if (ledger.next != null) {
ledger.next.prev = ledger.prev;
}
ledger.prev = ledger;
ledger.next = ledger;
}
}

/**
* For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
* we have a new ledger
* Allows an AllocationManager to tell the allocator that we have a new ledger
* associated with this allocator.
*/
void associateLedger(BufferLedger ledger) {
assertOpen();
appendToRefList(ledger);
if (DEBUG) {
synchronized (DEBUG_LOCK) {
childLedgers.put(ledger, null);
Expand All @@ -192,12 +235,12 @@ void associateLedger(BufferLedger ledger) {
}

/**
* For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
* we are removing a
* ledger associated with this allocator
* Allows an AllocationManager to tell the allocator that we are removing a
* ledger associated with this allocator.
*/
void dissociateLedger(BufferLedger ledger) {
assertOpen();
removeFromRefList(ledger);
if (DEBUG) {
synchronized (DEBUG_LOCK) {
if (!childLedgers.containsKey(ledger)) {
Expand Down Expand Up @@ -376,8 +419,6 @@ public synchronized void close() {
return;
}

isClosed = true;

StringBuilder outstandingChildAllocators = new StringBuilder();
if (DEBUG) {
synchronized (DEBUG_LOCK) {
Expand All @@ -401,7 +442,7 @@ public synchronized void close() {
// are there outstanding buffers?
final int allocatedCount = childLedgers.size();
if (allocatedCount > 0) {
throw new IllegalStateException(
logger.error(
String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
name, allocatedCount, toString()));
}
Expand Down Expand Up @@ -434,12 +475,35 @@ public synchronized void close() {
String msg = String.format("Memory was leaked by query. Memory leaked: (%d)\n%s%s", allocated,
outstandingChildAllocators.toString(), toString());
logger.error(msg);
}

// Release outstanding buffers
synchronized (REF_LOCK) {
int releasedLedgerCount = 0;
while (tail != null) {
final BufferLedger tmp = tail.prev;
tail.destroy();
tail = tmp;
releasedLedgerCount++;
}
if (releasedLedgerCount != 0) {
String msg = String.format("Released %d outstanding buffer ledgers.", releasedLedgerCount);
logger.warn(msg);
}
}

final long remaining = getAllocatedMemory();
if (remaining > 0) {
String msg = String.format("Memory is still leaked after final clean-up: (%d)\n%s%s", remaining,
outstandingChildAllocators.toString(), toString());
throw new IllegalStateException(msg);
}

// we need to release our memory to our parent before we tell it we've closed.
super.close();

isClosed = true;

// Inform our parent allocator that we've closed
if (parentAllocator != null) {
parentAllocator.childClosed(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class BufferLedger implements ValueWithKeyIncluded<BufferAllocator>, Refe
private final HistoricalLog historicalLog =
BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1) : null;

BufferLedger prev = null;

BufferLedger next = null;

private volatile long lDestructionTime = 0;

BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) {
Expand Down Expand Up @@ -131,6 +136,17 @@ public boolean release(int decrement) {
return refCnt == 0;
}

/**
* Forcibly release the buffer ledger by setting reference count
* to zero. Used by BaseAllocator to reclaim outstanding buffers
* when being closed.
*/
void destroy() {
synchronized (allocationManager) {
release(bufRefCnt.get());
}
}

/**
* Decrement the ledger's reference count for the associated underlying
* memory chunk. If the reference count drops to 0, it implies that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void test_privateMax() throws Exception {
}
}

@Test(expected = IllegalStateException.class)
@Test()
public void testRootAllocator_closeWithOutstanding() throws Exception {
try {
try (final RootAllocator rootAllocator =
Expand All @@ -110,6 +110,19 @@ public void testRootAllocator_closeWithOutstanding() throws Exception {
}
}

@Test()
public void testRootAllocator_closeChildWithOutstanding() throws Exception {
try (final RootAllocator parent =
new RootAllocator(MAX_ALLOCATION)) {
final ArrowBuf parentBuf = parent.buffer(512);
assertNotNull("allocation failed", parentBuf);
try (BufferAllocator child1 = parent.newChildAllocator("child1", 0L, MAX_ALLOCATION)) {
final ArrowBuf childBuf = child1.buffer(512);
assertNotNull("allocation failed", childBuf);
}
}
}

@Test
public void testRootAllocator_getEmpty() throws Exception {
try (final RootAllocator rootAllocator =
Expand Down