Skip to content
This repository was archived by the owner on Jan 16, 2025. It is now read-only.

Commit 74838cc

Browse files
committed
Try to fix memcached corrupted uploads
If a memcached server responds to an op before the write of the op is completed, we can no longer trust the parser read state of the server and we should both abandon the op and reconnect to the server. For example, we may be halfway through a length+blob write. If the server rejects the op before we finish writing, we cannot simply abandon the write and start writing the next op, as those bytes may end up being part of the payload of the failed op! This situation has been observed in production (i.e. corrupted memcached values with binary op headers in them) and this is a best-guess at how to prevent it, but it is difficult to reproduce without mocking a server which produces abrupt failures.
1 parent e280bef commit 74838cc

File tree

3 files changed

+24
-2
lines changed

3 files changed

+24
-2
lines changed

src/main/java/net/spy/memcached/FillWriteBufferStatus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ public enum FillWriteBufferStatus {
5252
*/
5353
OP_STATUS_IS_WRITE_QUEUED,
5454
OP_STATUS_IS_READING,
55-
OP_STATUS_IS_RETRY
55+
OP_STATUS_IS_RETRY,
56+
/**
57+
* The server completed a write operation but the
58+
*/
59+
OP_STATUS_IS_INTERRUPTED_BY_COMPLETION
5660
;
5761

5862
public static FillWriteBufferStatus forOperationState(final OperationState opState) {
@@ -74,4 +78,7 @@ public static FillWriteBufferStatus forOperationState(final OperationState opSta
7478
public boolean isSuccess() {
7579
return this == SUCCESS;
7680
}
81+
public boolean needsReconnect() {
82+
return this == OP_STATUS_IS_INTERRUPTED_BY_COMPLETION;
83+
}
7784
}

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,13 +810,19 @@ public void complete() {
810810
*/
811811
private void handleWrites(final MemcachedNode node) throws IOException {
812812
FillWriteBufferStatus bufferFilledStatus = node.fillWriteBuffer(shouldOptimize);
813+
if (bufferFilledStatus.needsReconnect()) {
814+
throw new IOException("Reconnecting because write was interrupted by completion");
815+
}
813816
boolean canWriteMore = node.getBytesRemainingToWrite() > 0;
814817
while (canWriteMore && bufferFilledStatus.isSuccess()) {
815818
int wrote = node.writeSome();
816819
metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
817820
bufferFilledStatus = node.fillWriteBuffer(shouldOptimize);
818821
canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
819822
}
823+
if (bufferFilledStatus.needsReconnect()) {
824+
throw new IOException("Reconnecting because write was interrupted by completion");
825+
}
820826
}
821827

822828
/**

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,17 @@ public final FillWriteBufferStatus fillWriteBuffer(boolean shouldOptimize) {
199199
final OperationState oState = o.getState();
200200
ByteBuffer obuf = o.getBuffer();
201201
// This cases may happen due to race condition. See FillWriteBufferNPETest.
202+
// If we received a response to an operation before we finished sending
203+
// the full payload we can no longer trust that the server and client
204+
// are synchronized. For example, we may be in the middle of sending
205+
// bytes of a declared length and not sent all the bytes yet,
206+
// so the server may interpret the next op as part of the payload
207+
// of the previous op.
208+
// The only safe thing to do is abandon the current write op and
209+
// close and reestablish the connection.
202210
if (oState != OperationState.WRITING || obuf == null) {
203-
return logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf);
211+
logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf);
212+
return FillWriteBufferStatus.OP_STATUS_IS_INTERRUPTED_BY_COMPLETION;
204213
}
205214

206215
int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());

0 commit comments

Comments
 (0)