Skip to content

Commit 152ba5c

Browse files
committed
Increase robustness of LwsApiCall implementation
- Handle partial writes by sending data in multiple iterations - Use retries when message send fails - Track and handle message receive if in parts using `receiveMessage` var
1 parent 16c40d4 commit 152ba5c

File tree

2 files changed

+108
-47
lines changed

2 files changed

+108
-47
lines changed

src/source/Signaling/LwsApiCalls.c

Lines changed: 105 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,9 @@ INT32 lwsHttpCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
251251
if (size != (INT32) pRequestInfo->bodySize) {
252252
DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size);
253253
if (size > 0) {
254-
// Schedule again
254+
// Update remainig data and schedule again
255+
pRequestInfo->bodySize -= size;
256+
pRequestInfo->body += size;
255257
lws_client_http_body_pending((struct lws*) wsi, 1);
256258
lws_callback_on_writable((struct lws*) wsi);
257259
} else {
@@ -311,6 +313,8 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
311313
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
312314
case LWS_CALLBACK_CLIENT_RECEIVE:
313315
case LWS_CALLBACK_CLIENT_WRITEABLE:
316+
case LWS_CALLBACK_TIMER:
317+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
314318
break;
315319
default:
316320
DLOGI("WSS callback with reason %d", reason);
@@ -383,9 +387,16 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
383387
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient);
384388
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock);
385389

390+
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
386391
// Notify the listener thread
387392
CVAR_BROADCAST(pSignalingClient->connectedCvar);
388393

394+
// Keep connection alive
395+
lws_callback_on_writable(wsi);
396+
break;
397+
case LWS_CALLBACK_TIMER:
398+
lws_callback_on_writable(wsi);
399+
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
389400
break;
390401

391402
case LWS_CALLBACK_CLIENT_CLOSED:
@@ -431,79 +442,106 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
431442

432443
DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr);
433444

434-
// Store the state as the result
435-
retValue = -1;
445+
if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
446+
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR);
447+
retValue = -1;
448+
} else {
449+
// Store normal closure status
450+
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK);
451+
retValue = 0;
452+
}
436453

437-
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status);
454+
break;
438455

456+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
457+
DLOGV("Received PONG from server");
439458
break;
440459

441460
case LWS_CALLBACK_CLIENT_RECEIVE:
442461

462+
lwsl_info("WS receive callback, len: %zu, is_final: %d\n", dataSize, lws_is_final_fragment(wsi));
463+
443464
// Check if it's a binary data
465+
if (lws_frame_is_binary(wsi)) {
466+
DLOGW("Received binary data");
467+
}
444468
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED);
445469

446-
// Skip if it's the first and last fragment and the size is 0
447-
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus);
448-
449-
// Check what type of a message it is. We will set the size to 0 on first and flush on last
450-
if (lws_is_first_fragment(wsi)) {
451-
pLwsCallInfo->receiveBufferSize = 0;
452-
}
470+
// Mark as receiving a message
471+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE);
453472

454-
// Store the data in the buffer
473+
// Store the data in the receive buffer
455474
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer),
456475
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN);
457476
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize);
458477
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize;
459478

460-
// Flush on last
479+
// Process complete message
461480
if (lws_is_final_fragment(wsi)) {
462-
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
481+
CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
463482
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR)));
483+
pLwsCallInfo->receiveBufferSize = 0;
484+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
464485
}
465486

466-
lws_callback_on_writable(wsi);
467-
487+
// Keep connection alive after receiving data
488+
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
489+
lws_callback_on_writable(wsi);
490+
}
468491
break;
469492

470493
case LWS_CALLBACK_CLIENT_WRITEABLE:
471494
DLOGD("Client is writable");
472495

473-
// Check if we are attempting to terminate the connection
474-
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) {
475-
retValue = 1;
476-
CHK(FALSE, retStatus);
496+
// Add buffer state check
497+
if (lws_send_pipe_choked(wsi)) {
498+
DLOGI("WS send pipe choked, retrying");
499+
lws_callback_on_writable(wsi);
500+
break;
477501
}
478502

479-
offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset);
480-
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize);
481-
writeSize = (INT32) (bufferSize - offset);
482-
483-
// Check if we need to do anything
484-
CHK(writeSize > 0, retStatus);
485-
486-
// Send data and notify on completion
487-
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT);
503+
// Log buffer state before write
504+
DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize);
488505

489-
if (size < 0) {
490-
DLOGW("Write failed. Returned write size is %d", size);
491-
// Quit
492-
retValue = -1;
493-
CHK(FALSE, retStatus);
506+
// Only check termination if we're not in the middle of receiving a message
507+
if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
508+
CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus);
494509
}
495510

496-
if (size == writeSize) {
497-
// Notify the listener
498-
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0);
499-
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0);
500-
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar);
501-
} else {
502-
// Partial write
503-
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize);
504-
// Schedule again
505-
lws_callback_on_writable(wsi);
511+
// Send data if anything is in the buffer
512+
if (pLwsCallInfo->sendBufferSize != 0) {
513+
SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE;
514+
// Log write attempt
515+
DLOGD("Attempting to write %zu bytes", remainingSize);
516+
517+
retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE, remainingSize, LWS_WRITE_TEXT);
518+
if (retValue < 0) {
519+
DLOGW("Write failed with %d", retValue);
520+
CHK(FALSE, retValue); // Return non-zero value to callback to indicate failure
521+
} else if ((SIZE_T) retValue < remainingSize) {
522+
DLOGW("Partial write occurred: %d of %zu bytes", retValue, remainingSize);
523+
// Move remaining data to start of buffer
524+
MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE, pLwsCallInfo->sendBuffer + LWS_PRE + retValue, remainingSize - retValue);
525+
// Update buffer size
526+
pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE;
527+
528+
// Handle partial write
529+
lws_callback_on_writable(wsi);
530+
} else {
531+
// Complete write
532+
DLOGI("Write complete: %d bytes", retValue);
533+
pLwsCallInfo->sendBufferSize = 0;
534+
// Keep connection alive after write
535+
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
536+
lws_callback_on_writable(wsi);
537+
}
538+
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK);
539+
// Signal completion immediately after successful write
540+
CVAR_BROADCAST(pSignalingClient->sendCvar);
541+
}
506542
}
543+
// Always return success from writeable callback
544+
retValue = 0;
507545

508546
break;
509547

@@ -574,8 +612,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
574612
// Execute the LWS REST call
575613
MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info));
576614
connectInfo.context = pContext;
577-
connectInfo.ssl_connection = LCCSCF_USE_SSL;
615+
connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control
578616
connectInfo.port = SIGNALING_DEFAULT_SSL_PORT;
617+
connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only
618+
connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol
579619

580620
CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd));
581621
CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR);
@@ -1381,6 +1421,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques
13811421
pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo;
13821422
pLwsCallInfo->pSignalingClient = pSignalingClient;
13831423
pLwsCallInfo->protocolIndex = protocolIndex;
1424+
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
13841425

13851426
*ppLwsCallInfo = pLwsCallInfo;
13861427

@@ -1944,6 +1985,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19441985
SIZE_T offset, size;
19451986
SERVICE_CALL_RESULT result;
19461987

1988+
UINT32 retryCount = 0;
1989+
const UINT32 MAX_RETRY_COUNT = 3;
1990+
19471991
CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG);
19481992

19491993
// See if anything needs to be done
@@ -1957,22 +2001,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19572001

19582002
MUTEX_LOCK(pSignalingClient->sendLock);
19592003
sendLocked = TRUE;
1960-
while (iterate) {
2004+
while (iterate && retryCount < MAX_RETRY_COUNT) {
19612005
offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset);
19622006
size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize);
19632007

19642008
result = (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->messageResult);
19652009

19662010
if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) {
19672011
CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT));
2012+
retryCount++;
2013+
} else if (result == SERVICE_CALL_UNKNOWN) {
2014+
// Partial write occurred, continue waiting
2015+
retryCount = 0;
2016+
continue;
19682017
} else {
19692018
iterate = FALSE;
19702019
}
2020+
if (iterate) {
2021+
// Wake up the service event loop
2022+
CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS));
2023+
}
19712024
}
1972-
19732025
MUTEX_UNLOCK(pSignalingClient->sendLock);
19742026
sendLocked = FALSE;
19752027

2028+
// Check if we timed out
2029+
if (retryCount >= MAX_RETRY_COUNT) {
2030+
DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT);
2031+
CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED);
2032+
}
2033+
19762034
// Do not await for the response in case of correlation id not specified
19772035
CHK(awaitForResponse, retStatus);
19782036

src/source/Signaling/LwsApiCalls.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ struct __LwsCallInfo {
211211
// Service exit indicator;
212212
volatile ATOMIC_BOOL cancelService;
213213

214+
// Message receiving indicator
215+
volatile ATOMIC_BOOL receiveMessage;
216+
214217
// Protocol index
215218
UINT32 protocolIndex;
216219

0 commit comments

Comments
 (0)