Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/aws/http/private/h2_frames.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ int aws_h2_encode_frame(
* body_complete will be set true if encoder reaches the end of the body_stream.
* body_stalled will be true if aws_input_stream_read() stopped early (didn't
* complete, though more space was available).
* body_failed will be true if the aws_input_stream was the cause of an error.
*
* Each call to this function encodes a complete DATA frame, or nothing at all,
* so it's always safe to encode a different frame type or the body of a different stream
Expand All @@ -223,7 +224,8 @@ int aws_h2_encode_data_frame(
size_t *connection_window_size_peer,
struct aws_byte_buf *output,
bool *body_complete,
bool *body_stalled);
bool *body_stalled,
bool *body_failed);

AWS_HTTP_API
void aws_h2_frame_destroy(struct aws_h2_frame *frame);
Expand Down
7 changes: 6 additions & 1 deletion source/h2_frames.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,15 @@ int aws_h2_encode_data_frame(
size_t *connection_window_size_peer,
struct aws_byte_buf *output,
bool *body_complete,
bool *body_stalled) {
bool *body_stalled,
bool *body_failed) {

AWS_PRECONDITION(encoder);
AWS_PRECONDITION(body_stream);
AWS_PRECONDITION(output);
AWS_PRECONDITION(body_complete);
AWS_PRECONDITION(body_stalled);
AWS_PRECONDITION(body_failed);
AWS_PRECONDITION(*stream_window_size_peer > 0);

if (aws_h2_validate_stream_id(stream_id)) {
Expand All @@ -336,6 +338,7 @@ int aws_h2_encode_data_frame(

*body_complete = false;
*body_stalled = false;
*body_failed = false;
uint8_t flags = 0;

/*
Expand Down Expand Up @@ -378,12 +381,14 @@ int aws_h2_encode_data_frame(

/* Read body into sub-buffer */
if (aws_input_stream_read(body_stream, &body_sub_buf)) {
*body_failed = true;
goto error;
}

/* Check if we've reached the end of the body */
struct aws_stream_status body_status;
if (aws_input_stream_get_status(body_stream, &body_status)) {
*body_failed = true;
goto error;
}

Expand Down
54 changes: 22 additions & 32 deletions source/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ static void s_stream_data_write_destroy(

AWS_PRECONDITION(stream);
AWS_PRECONDITION(write);
AWS_PRECONDITION(!aws_linked_list_node_is_in_list(&stream->node));
if (write->on_complete) {
write->on_complete(&stream->base, error_code, write->user_data);
}
Expand Down Expand Up @@ -682,37 +683,13 @@ static inline bool s_h2_stream_has_outgoing_writes(struct aws_h2_stream *stream)
return !aws_linked_list_empty(&stream->thread_data.outgoing_writes);
}

static void s_h2_stream_write_data_complete(struct aws_h2_stream *stream, bool *waiting_writes) {
AWS_PRECONDITION(waiting_writes);
AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream));

/* finish/clean up the current write operation */
struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.outgoing_writes);
struct aws_h2_stream_data_write *write_op = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
const bool ending_stream = write_op->end_stream;
s_stream_data_write_destroy(stream, write_op, AWS_OP_SUCCESS);

/* check to see if there are more queued writes or stream_end was called */
*waiting_writes = !ending_stream && !s_h2_stream_has_outgoing_writes(stream);
}

static struct aws_h2_stream_data_write *s_h2_stream_get_current_write(struct aws_h2_stream *stream) {
AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream));
struct aws_linked_list_node *node = aws_linked_list_front(&stream->thread_data.outgoing_writes);
struct aws_h2_stream_data_write *write = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
return write;
}

static struct aws_input_stream *s_h2_stream_get_data_stream(struct aws_h2_stream *stream) {
struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream);
return write->data_stream;
}

static bool s_h2_stream_does_current_write_end_stream(struct aws_h2_stream *stream) {
struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream);
return write->end_stream;
}

int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_body_state *body_state) {
AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);

Expand Down Expand Up @@ -799,12 +776,14 @@ int aws_h2_stream_encode_data_frame(
}

*data_encode_status = AWS_H2_DATA_ENCODE_COMPLETE;
struct aws_input_stream *input_stream = s_h2_stream_get_data_stream(stream);
struct aws_h2_stream_data_write *current_write = s_h2_stream_get_current_write(stream);
struct aws_input_stream *input_stream = current_write->data_stream;
AWS_ASSERT(input_stream);

bool input_stream_complete = false;
bool input_stream_stalled = false;
bool ends_stream = s_h2_stream_does_current_write_end_stream(stream);
bool input_stream_failed = false;
bool ends_stream = current_write->end_stream;
if (aws_h2_encode_data_frame(
encoder,
stream->base.id,
Expand All @@ -815,20 +794,30 @@ int aws_h2_stream_encode_data_frame(
&connection->thread_data.window_size_peer,
output,
&input_stream_complete,
&input_stream_stalled)) {
&input_stream_stalled,
&input_stream_failed)) {

int error_code = aws_last_error();

/* If error cause caused aws_input_stream, report that specific error in its write-completion callback */
if (input_stream_failed) {
aws_linked_list_remove(&current_write->node);
s_stream_data_write_destroy(stream, current_write, error_code);
}

/* Failed to write DATA, treat it as a Stream Error */
AWS_H2_STREAM_LOGF(ERROR, stream, "Error encoding stream DATA, %s", aws_error_name(aws_last_error()));
struct aws_h2err returned_h2err = s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
AWS_H2_STREAM_LOGF(ERROR, stream, "Error encoding stream DATA, %s", aws_error_name(error_code));
struct aws_h2err returned_h2err = s_send_rst_and_close_stream(stream, aws_h2err_from_aws_code(error_code));
if (aws_h2err_failed(returned_h2err)) {
aws_h2_connection_shutdown_due_to_write_err(connection, returned_h2err.aws_code);
}
return AWS_OP_SUCCESS;
}

bool waiting_writes = false;
if (input_stream_complete) {
s_h2_stream_write_data_complete(stream, &waiting_writes);
/* finish/clean up the current write operation */
aws_linked_list_remove(&current_write->node);
s_stream_data_write_destroy(stream, current_write, AWS_ERROR_SUCCESS);
}

/*
Expand Down Expand Up @@ -867,10 +856,11 @@ int aws_h2_stream_encode_data_frame(
* from outgoing list */
*data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED;
}
if (waiting_writes) {
if (!s_h2_stream_has_outgoing_writes(stream)) {
/* if window stalled and we waiting for manual writes, we take waiting writes status, which will be handled
* properly if more writes coming, but windows is still stalled. But not the other way around. */
AWS_ASSERT(input_stream_complete);
AWS_ASSERT(!ends_stream);
*data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES;
}
}
Expand Down
4 changes: 3 additions & 1 deletion tests/fuzz/fuzz_h2_decoder_correct.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
AWS_FATAL_ASSERT(
Expand All @@ -249,7 +250,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
&connection_window_size_peer,
&frame_data,
&body_complete,
&body_stalled) == AWS_OP_SUCCESS);
&body_stalled,
&body_failed) == AWS_OP_SUCCESS);

struct aws_stream_status body_status;
aws_input_stream_get_status(body, &body_status);
Expand Down
5 changes: 4 additions & 1 deletion tests/h2_test_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ int h2_fake_peer_send_data_frame_with_padding_length(

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -606,10 +607,12 @@ int h2_fake_peer_send_data_frame_with_padding_length(
&connection_window_size_peer,
&msg->message_data,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_TRUE(body_complete);
ASSERT_FALSE(body_stalled);
ASSERT_FALSE(body_failed);
ASSERT_TRUE(msg->message_data.len != 0);

ASSERT_SUCCESS(testing_channel_push_read_message(peer->testing_channel, msg));
Expand Down
4 changes: 2 additions & 2 deletions tests/test_h2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -5473,8 +5473,8 @@ TEST_CASE(h2_client_manual_data_write_read_broken) {
ASSERT_TRUE(stream_tester.complete);
/* The stream complete will get the error code from the input stream read. */
ASSERT_UINT_EQUALS(stream_tester.on_complete_error_code, AWS_IO_STREAM_READ_FAILED);
/* The write triggers the stream to complete with error, so the write failed as the stream completes. */
ASSERT_UINT_EQUALS(error_code, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED);
/* The write triggers the error, which should be reported to the write complete */
ASSERT_UINT_EQUALS(error_code, AWS_IO_STREAM_READ_FAILED);

aws_http_message_release(request);

Expand Down
15 changes: 12 additions & 3 deletions tests/test_h2_encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ TEST_CASE(h2_encoder_data) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -98,11 +99,13 @@ TEST_CASE(h2_encoder_data) {
&connection_window_size_peer,
&output,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_BIN_ARRAYS_EQUALS(expected, sizeof(expected), output.buffer, output.len);
ASSERT_TRUE(body_complete);
ASSERT_FALSE(body_stalled);
ASSERT_FALSE(body_failed);

aws_byte_buf_clean_up(&output);
aws_input_stream_release(body);
Expand Down Expand Up @@ -140,6 +143,7 @@ TEST_CASE(h2_encoder_data_stalled) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -152,11 +156,13 @@ TEST_CASE(h2_encoder_data_stalled) {
&connection_window_size_peer,
&output,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_BIN_ARRAYS_EQUALS(expected, sizeof(expected), output.buffer, output.len);
ASSERT_FALSE(body_complete);
ASSERT_TRUE(body_stalled);
ASSERT_FALSE(body_failed);

aws_byte_buf_clean_up(&output);
aws_input_stream_release(body);
Expand All @@ -182,6 +188,7 @@ TEST_CASE(h2_encoder_data_stalled_completely) {

bool body_complete;
bool body_stalled;
bool body_failed;
int32_t stream_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
size_t connection_window_size_peer = AWS_H2_WINDOW_UPDATE_MAX;
ASSERT_SUCCESS(aws_h2_encode_data_frame(
Expand All @@ -194,10 +201,12 @@ TEST_CASE(h2_encoder_data_stalled_completely) {
&connection_window_size_peer,
&output,
&body_complete,
&body_stalled));
&body_stalled,
&body_failed));

ASSERT_FALSE(body_complete);
ASSERT_TRUE(body_stalled);
ASSERT_FALSE(body_failed);
ASSERT_UINT_EQUALS(0, output.len);

/* clean up */
Expand Down