Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
511bdbc
#23338: Enable AWS authentication for the http_client source.
johannesfloriangeiger Jul 4, 2025
3985c82
#23338: Refactor code to improve readability.
johannesfloriangeiger Aug 9, 2025
03f3b99
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Aug 10, 2025
16cfedf
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Aug 16, 2025
712f2a0
#23338: Refactor code to improve readability.
johannesfloriangeiger Aug 16, 2025
ab10d38
Merge remote-tracking branch 'origin/23338-http_client-aws-auth' into…
johannesfloriangeiger Aug 16, 2025
e6daa19
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Sep 24, 2025
049c35a
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Sep 25, 2025
1a21be9
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Sep 25, 2025
b6aa4c0
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Sep 26, 2025
02fbd6f
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Sep 28, 2025
5182b5d
Merge branch 'master' into 23338-http_client-aws-auth
pront Sep 29, 2025
7ccc235
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Nov 1, 2025
1f665a4
Update changelog.d/23338.feature.md
johannesfloriangeiger Nov 1, 2025
8590525
Update src/aws/mod.rs
johannesfloriangeiger Nov 1, 2025
c5d2bd5
#23338: PR comments.
johannesfloriangeiger Nov 1, 2025
c59e173
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Nov 3, 2025
20b230e
Merge branch 'master' into 23338-http_client-aws-auth
pront Nov 5, 2025
06e2390
Merge branch 'master' into 23338-http_client-aws-auth
johannesfloriangeiger Nov 15, 2025
ecf8750
Merge branch 'vectordotdev:master' into 23338-http_client-aws-auth
johannesfloriangeiger Nov 18, 2025
5806006
#23338: PR comments.
johannesfloriangeiger Nov 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23338.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enables AWS authentication for the http_client source.

authors: johannesfloriangeiger
10 changes: 10 additions & 0 deletions src/aws/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@ impl AwsAuthentication {
}
}

/// Returns the region for the credentials based on the authentication mechanism chosen.
pub fn region(&self) -> Option<Region> {
match self {
AwsAuthentication::AccessKey { region, .. }
| AwsAuthentication::File { region, .. }
| AwsAuthentication::Role { region, .. }
| AwsAuthentication::Default { region, .. } => region.clone().map(Region::new),
}
}

#[cfg(test)]
/// Creates dummy authentication for tests.
pub fn test_auth() -> AwsAuthentication {
Expand Down
66 changes: 63 additions & 3 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use aws_config::{
};
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use aws_sigv4::{
http_request::{PayloadChecksumKind, SignableBody, SignableRequest, SigningSettings},
http_request::{
PayloadChecksumKind, SignableBody, SignableRequest, SigningInstructions, SigningSettings,
},
sign::v4,
};
use aws_smithy_async::rt::sleep::TokioSleep;
Expand Down Expand Up @@ -288,6 +290,65 @@ pub async fn sign_request(
SignableBody::Bytes(request.body().as_ref()),
)?;

let signing_instructions = create_signing_instructions(
service_name,
signable_request,
credentials_provider,
region,
payload_checksum_sha256,
)
.await?;
signing_instructions.apply_to_request_http0x(request);

Ok(())
}

/// Sign the empty request prior to sending to AWS.
/// The signature is added to the provided `request`.
pub async fn sign_request_with_empty_body<T>(
service_name: &str,
request: &mut http::Request<T>,
credentials_provider: &SharedCredentialsProvider,
region: Option<&Region>,
payload_checksum_sha256: bool,
) -> crate::Result<()> {
let headers = request
.headers()
.iter()
.map(|(k, v)| {
Ok((
k.as_str(),
std::str::from_utf8(v.as_bytes()).map_err(|_| SigningError::NotUTF8Header)?,
))
})
.collect::<Result<Vec<_>, SigningError>>()?;
let signable_request = SignableRequest::new(
request.method().as_str(),
request.uri().to_string(),
headers.into_iter(),
SignableBody::empty(),
)?;
let signing_instructions = create_signing_instructions(
service_name,
signable_request,
credentials_provider,
region,
payload_checksum_sha256,
)
.await?;
signing_instructions.apply_to_request_http0x(request);

Ok(())
}

/// Create the signing instructions for a request.
pub async fn create_signing_instructions(
service_name: &str,
signable_request: SignableRequest<'_>,
credentials_provider: &SharedCredentialsProvider,
region: Option<&Region>,
payload_checksum_sha256: bool,
) -> crate::Result<SigningInstructions> {
let credentials = credentials_provider.provide_credentials().await?;
let identity = Identity::new(credentials, None);

Expand All @@ -312,9 +373,8 @@ pub async fn sign_request(

let (signing_instructions, _signature) =
aws_sigv4::http_request::sign(signable_request, &signing_params.into())?.into_parts();
signing_instructions.apply_to_request_http0x(request);

Ok(())
Ok(signing_instructions)
}

#[derive(Debug)]
Expand Down
23 changes: 8 additions & 15 deletions src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#[cfg(feature = "aws-core")]
use aws_config::meta::region::ProvideRegion;
#[cfg(feature = "aws-core")]
use aws_types::region::Region;
use http::{header::AUTHORIZATION, HeaderName, HeaderValue, Method, Request, StatusCode};
use hyper::Body;
use indexmap::IndexMap;
Expand All @@ -20,8 +18,6 @@ use super::{
sink::HttpSink,
};
#[cfg(feature = "aws-core")]
use crate::aws::AwsAuthentication;
#[cfg(feature = "aws-core")]
use crate::sinks::util::http::SigV4Config;
use crate::{
codecs::{EncodingConfigWithFraming, SinkType},
Expand Down Expand Up @@ -300,22 +296,19 @@ impl SinkConfig for HttpSinkConfig {
let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)?
.region()
.await;
let region = (match &auth {
AwsAuthentication::AccessKey { region, .. } => region.clone(),
AwsAuthentication::File { .. } => None,
AwsAuthentication::Role { region, .. } => region.clone(),
AwsAuthentication::Default { region, .. } => region.clone(),
})
.map_or(default_region, |r| Some(Region::new(r.to_string())))
.expect("Region must be specified");
let region = auth
.region()
.or(default_region)
.expect("Region must be specified");
let shared_credentials_provider = auth
.credentials_provider(region.clone(), &ProxyConfig::default(), None)
.await?;

HttpService::new_with_sig_v4(
client,
http_sink_request_builder,
SigV4Config {
shared_credentials_provider: auth
.credentials_provider(region.clone(), &ProxyConfig::default(), None)
.await?,
shared_credentials_provider,
region: region.clone(),
service: service.clone(),
},
Expand Down
180 changes: 114 additions & 66 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::{collections::HashMap, future::ready};
use tokio_stream::wrappers::IntervalStream;
use vector_lib::json_size::JsonSize;

#[cfg(feature = "aws-core")]
use crate::aws::sign_request_with_empty_body;
use crate::http::{QueryParameterValue, QueryParameters};
use crate::{
http::{Auth, HttpClient},
Expand All @@ -28,6 +30,8 @@ use crate::{
tls::TlsSettings,
SourceSender,
};
#[cfg(feature = "aws-core")]
use aws_config::meta::region::ProvideRegion;
use vector_lib::shutdown::ShutdownSignal;
use vector_lib::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};

Expand Down Expand Up @@ -194,79 +198,123 @@ pub(crate) async fn call<
auth.apply(&mut request);
}

tokio::time::timeout(inputs.timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.timeout.as_secs_f64()
#[cfg(feature = "aws-core")]
let aws_auth = match &inputs.auth {
None => None,
Some(Auth::Aws { auth, service }) => Some((auth.clone(), service.clone())),
_ => None,
};

tokio::time::timeout(inputs.timeout, async move {
let request = request;

#[cfg(feature = "aws-core")]
let request = match aws_auth {
None => request,
Some((auth, service)) => {
let mut signed_request = request;
let default_region =
crate::aws::region_provider(&ProxyConfig::default(), None)
.expect("Region provider must be available")
.region()
.await;
let region = auth
.region()
.or(default_region)
.expect("Region must be specified");
let credentials_provider = &auth
.credentials_provider(region.clone(), &ProxyConfig::default(), None)
.await
.expect("Credentials provider must be available");
sign_request_with_empty_body(
service.as_str(),
&mut signed_request,
credentials_provider,
Some(&region),
false,
)
.into()),
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
emit!(EndpointBytesReceived {
byte_size: body.len(),
protocol: "http",
endpoint: endpoint.as_str(),
});
Ok((header, body))
})
.into_stream()
.filter_map(move |response| {
ready(match response {
Ok((header, body)) if header.status == hyper::StatusCode::OK => {
context.on_response(&url, &header, &body).map(|mut events| {
let byte_size = if events.is_empty() {
// We need to explicitly set the byte size
// to 0 since
// `estimated_json_encoded_size_of` returns
// at least 1 for an empty collection. For
// the purposes of the
// HttpClientEventsReceived event, we should
// emit 0 when there aren't any usable
// metrics.
JsonSize::zero()
} else {
events.estimated_json_encoded_size_of()
};
.await
.expect("Signing request failed");

emit!(HttpClientEventsReceived {
byte_size,
count: events.len(),
url: url.to_string()
});
signed_request
}
};

// We'll enrich after receiving the events so
// that the byte sizes are accurate.
context.enrich_events(&mut events);
request
})
.and_then(move |request| tokio::time::timeout(inputs.timeout, client.send(request)))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.timeout.as_secs_f64()
)
.into()),
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
emit!(EndpointBytesReceived {
byte_size: body.len(),
protocol: "http",
endpoint: endpoint.as_str(),
});
Ok((header, body))
})
.into_stream()
.filter_map(move |response| {
ready(match response {
Ok((header, body)) if header.status == hyper::StatusCode::OK => {
context.on_response(&url, &header, &body).map(|mut events| {
let byte_size = if events.is_empty() {
// We need to explicitly set the byte size
// to 0 since
// `estimated_json_encoded_size_of` returns
// at least 1 for an empty collection. For
// the purposes of the
// HttpClientEventsReceived event, we should
// emit 0 when there aren't any usable
// metrics.
JsonSize::zero()
} else {
events.estimated_json_encoded_size_of()
};

stream::iter(events)
})
}
Ok((header, _)) => {
context.on_http_response_error(&url, &header);
emit!(HttpClientHttpResponseError {
code: header.status,
url: url.to_string(),
});
None
}
Err(error) => {
emit!(HttpClientHttpError {
error,
emit!(HttpClientEventsReceived {
byte_size,
count: events.len(),
url: url.to_string()
});
None
}
})

// We'll enrich after receiving the events so
// that the byte sizes are accurate.
context.enrich_events(&mut events);

stream::iter(events)
})
}
Ok((header, _)) => {
context.on_http_response_error(&url, &header);
emit!(HttpClientHttpResponseError {
code: header.status,
url: url.to_string(),
});
None
}
Err(error) => {
emit!(HttpClientHttpError {
error,
url: url.to_string()
});
None
}
})
.flatten()
.boxed()
})
.flatten()
.boxed()
})
.flatten_unordered(None)
.boxed();
Expand Down
Loading