Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/aws/http/private/h2_frames.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ int aws_h2_encode_frame(
* body_complete will be set true if encoder reaches the end of the body_stream.
* body_stalled will be true if aws_input_stream_read() stopped early (didn't
* complete, though more space was available).
* body_failed will be true if the aws_input_stream was the cause of an error.
*
* Each call to this function encodes a complete DATA frame, or nothing at all,
* so it's always safe to encode a different frame type or the body of a different stream
Expand All @@ -223,7 +224,8 @@ int aws_h2_encode_data_frame(
size_t *connection_window_size_peer,
struct aws_byte_buf *output,
bool *body_complete,
bool *body_stalled);
bool *body_stalled,
bool *body_failed);

AWS_HTTP_API
void aws_h2_frame_destroy(struct aws_h2_frame *frame);
Expand Down
7 changes: 6 additions & 1 deletion source/h2_frames.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,15 @@ int aws_h2_encode_data_frame(
size_t *connection_window_size_peer,
struct aws_byte_buf *output,
bool *body_complete,
bool *body_stalled) {
bool *body_stalled,
bool *body_failed) {

AWS_PRECONDITION(encoder);
AWS_PRECONDITION(body_stream);
AWS_PRECONDITION(output);
AWS_PRECONDITION(body_complete);
AWS_PRECONDITION(body_stalled);
AWS_PRECONDITION(body_failed);
AWS_PRECONDITION(*stream_window_size_peer > 0);

if (aws_h2_validate_stream_id(stream_id)) {
Expand All @@ -336,6 +338,7 @@ int aws_h2_encode_data_frame(

*body_complete = false;
*body_stalled = false;
*body_failed = false;
uint8_t flags = 0;

/*
Expand Down Expand Up @@ -378,12 +381,14 @@ int aws_h2_encode_data_frame(

/* Read body into sub-buffer */
if (aws_input_stream_read(body_stream, &body_sub_buf)) {
*body_failed = true;
goto error;
}

/* Check if we've reached the end of the body */
struct aws_stream_status body_status;
if (aws_input_stream_get_status(body_stream, &body_status)) {
*body_failed = true;
goto error;
}

Expand Down
54 changes: 22 additions & 32 deletions source/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ static void s_stream_data_write_destroy(

AWS_PRECONDITION(stream);
AWS_PRECONDITION(write);
AWS_PRECONDITION(!aws_linked_list_node_is_in_list(&stream->node));
if (write->on_complete) {
write->on_complete(&stream->base, error_code, write->user_data);
}
Expand Down Expand Up @@ -682,37 +683,13 @@ static inline bool s_h2_stream_has_outgoing_writes(struct aws_h2_stream *stream)
return !aws_linked_list_empty(&stream->thread_data.outgoing_writes);
}

static void s_h2_stream_write_data_complete(struct aws_h2_stream *stream, bool *waiting_writes) {
AWS_PRECONDITION(waiting_writes);
AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream));

/* finish/clean up the current write operation */
struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.outgoing_writes);
struct aws_h2_stream_data_write *write_op = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
const bool ending_stream = write_op->end_stream;
s_stream_data_write_destroy(stream, write_op, AWS_OP_SUCCESS);

/* check to see if there are more queued writes or stream_end was called */
*waiting_writes = !ending_stream && !s_h2_stream_has_outgoing_writes(stream);
}

static struct aws_h2_stream_data_write *s_h2_stream_get_current_write(struct aws_h2_stream *stream) {
AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream));
struct aws_linked_list_node *node = aws_linked_list_front(&stream->thread_data.outgoing_writes);
struct aws_h2_stream_data_write *write = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
return write;
}

static struct aws_input_stream *s_h2_stream_get_data_stream(struct aws_h2_stream *stream) {
struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream);
return write->data_stream;
}

static bool s_h2_stream_does_current_write_end_stream(struct aws_h2_stream *stream) {
struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream);
return write->end_stream;
}

int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_body_state *body_state) {
AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);

Expand Down Expand Up @@ -799,12 +776,14 @@ int aws_h2_stream_encode_data_frame(
}

*data_encode_status = AWS_H2_DATA_ENCODE_COMPLETE;
struct aws_input_stream *input_stream = s_h2_stream_get_data_stream(stream);
struct aws_h2_stream_data_write *current_write = s_h2_stream_get_current_write(stream);
struct aws_input_stream *input_stream = current_write->data_stream;
AWS_ASSERT(input_stream);

bool input_stream_complete = false;
bool input_stream_stalled = false;
bool ends_stream = s_h2_stream_does_current_write_end_stream(stream);
bool input_stream_failed = false;
bool ends_stream = current_write->end_stream;
if (aws_h2_encode_data_frame(
encoder,
stream->base.id,
Expand All @@ -815,20 +794,30 @@ int aws_h2_stream_encode_data_frame(
&connection->thread_data.window_size_peer,
output,
&input_stream_complete,
&input_stream_stalled)) {
&input_stream_stalled,
&input_stream_failed)) {

int error_code = aws_last_error();

/* If error cause caused aws_input_stream, report that specific error in its write-completion callback */
if (input_stream_failed) {
aws_linked_list_remove(&current_write->node);
s_stream_data_write_destroy(stream, current_write, error_code);
}

/* Failed to write DATA, treat it as a Stream Error */
AWS_H2_STREAM_LOGF(ERROR, stream, "Error encoding stream DATA, %s", aws_error_name(aws_last_error()));
struct aws_h2err returned_h2err = s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
AWS_H2_STREAM_LOGF(ERROR, stream, "Error encoding stream DATA, %s", aws_error_name(error_code));
struct aws_h2err returned_h2err = s_send_rst_and_close_stream(stream, aws_h2err_from_aws_code(error_code));
if (aws_h2err_failed(returned_h2err)) {
aws_h2_connection_shutdown_due_to_write_err(connection, returned_h2err.aws_code);
}
return AWS_OP_SUCCESS;
}

bool waiting_writes = false;
if (input_stream_complete) {
s_h2_stream_write_data_complete(stream, &waiting_writes);
/* finish/clean up the current write operation */
aws_linked_list_remove(&current_write->node);
s_stream_data_write_destroy(stream, current_write, AWS_ERROR_SUCCESS);
}

/*
Expand Down Expand Up @@ -867,10 +856,11 @@ int aws_h2_stream_encode_data_frame(
* from outgoing list */
*data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED;
}
if (waiting_writes) {
if (!s_h2_stream_has_outgoing_writes(stream)) {
/* if window stalled and we waiting for manual writes, we take waiting writes status, which will be handled
* properly if more writes coming, but windows is still stalled. But not the other way around. */
AWS_ASSERT(input_stream_complete);
AWS_ASSERT(!ends_stream);
*data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES;
}
}
Expand Down
4 changes: 3 additions & 1 deletion tests/fuzz/fuzz_h2_decoder_correct.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
AWS_FATAL_ASSERT(
Expand All @@ -249,7 +250,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
&connection_window_size_peer,
&frame_data,
&body_complete,
&body_stalled) == AWS_OP_SUCCESS);
&body_stalled,
&body_failed) == AWS_OP_SUCCESS);

struct aws_stream_status body_status;
aws_input_stream_get_status(body, &body_status);
Expand Down
5 changes: 4 additions & 1 deletion tests/h2_test_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ int h2_fake_peer_send_data_frame_with_padding_length(

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -606,10 +607,12 @@ int h2_fake_peer_send_data_frame_with_padding_length(
&connection_window_size_peer,
&msg->message_data,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_TRUE(body_complete);
ASSERT_FALSE(body_stalled);
ASSERT_FALSE(body_failed);
ASSERT_TRUE(msg->message_data.len != 0);

ASSERT_SUCCESS(testing_channel_push_read_message(peer->testing_channel, msg));
Expand Down
4 changes: 2 additions & 2 deletions tests/test_h2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -5473,8 +5473,8 @@ TEST_CASE(h2_client_manual_data_write_read_broken) {
ASSERT_TRUE(stream_tester.complete);
/* The stream complete will get the error code from the input stream read. */
ASSERT_UINT_EQUALS(stream_tester.on_complete_error_code, AWS_IO_STREAM_READ_FAILED);
/* The write triggers the stream to complete with error, so the write failed as the stream completes. */
ASSERT_UINT_EQUALS(error_code, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED);
/* The write triggers the error, which should be reported to the write complete */
ASSERT_UINT_EQUALS(error_code, AWS_IO_STREAM_READ_FAILED);

aws_http_message_release(request);

Expand Down
15 changes: 12 additions & 3 deletions tests/test_h2_encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ TEST_CASE(h2_encoder_data) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -98,11 +99,13 @@ TEST_CASE(h2_encoder_data) {
&connection_window_size_peer,
&output,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_BIN_ARRAYS_EQUALS(expected, sizeof(expected), output.buffer, output.len);
ASSERT_TRUE(body_complete);
ASSERT_FALSE(body_stalled);
ASSERT_FALSE(body_failed);

aws_byte_buf_clean_up(&output);
aws_input_stream_release(body);
Expand Down Expand Up @@ -140,6 +143,7 @@ TEST_CASE(h2_encoder_data_stalled) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -152,11 +156,13 @@ TEST_CASE(h2_encoder_data_stalled) {
&connection_window_size_peer,
&output,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_BIN_ARRAYS_EQUALS(expected, sizeof(expected), output.buffer, output.len);
ASSERT_FALSE(body_complete);
ASSERT_TRUE(body_stalled);
ASSERT_FALSE(body_failed);

aws_byte_buf_clean_up(&output);
aws_input_stream_release(body);
Expand All @@ -182,6 +188,7 @@ TEST_CASE(h2_encoder_data_stalled_completely) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -194,10 +201,12 @@ TEST_CASE(h2_encoder_data_stalled_completely) {
&connection_window_size_peer,
&output,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_FALSE(body_complete);
ASSERT_TRUE(body_stalled);
ASSERT_FALSE(body_failed);
ASSERT_UINT_EQUALS(0, output.len);

/* clean up */
Expand Down