Skip to content

Commit 8bcacf7

Browse files
aws-sdk-rust-ciDavidSoutherjdisantiZelda Hesslerford-at-aws
committed
[examples] Sync SDK examples from awsdocs/aws-doc-sdk-examples
Includes commit(s): 13d6cb3ebc5edeca8161edfc8f554597650fb8f9 5458e2b9fd71abb916bca4ed53d8c1a894e4fe87 0b07df08b42023908b1d1af5d104004e562153a4 ecfe4ad1c9313d25034c12a85fb4a15a723c8c1a 3fa43aa5bf933bf38bb894e0911d64b42275a1d6 041033cde3c235183ba45c3328f3d3311cbdcc86 8dbee2f086245f2efbf34ed1f6f973e1ba9bca90 b188150c43d7ed145a88b16b7f5a4401825f40b2 ad60b13aa7241542f19492df8e81ad8d0be4888b 1311c1ff0b2deb72066f22e55fa15b38e338452d 9bfeaae36687d60c24e400076025314131de930a 12a44f94a30b759c7c4f521c60c78bdacfad0d7d 7cbb9ac3139b714a947a38c140b92809c21302fd c8e8df49083fcc4fe5a4a68897ceceac30f5e9d8 baf11a2b9716b4dc7a74885f50626555944848d8 0391e2511f641b46f38d523233dd5f251ca9fc41 bbc8e30d40970b3c901eead096ff6a640c1806c9 Co-authored-by: David Souther <[email protected]> Co-authored-by: John DiSanti <[email protected]> Co-authored-by: Zelda Hessler <[email protected]> Co-authored-by: ford prior <[email protected]> Co-authored-by: ford prior <[email protected]>
1 parent ee156e6 commit 8bcacf7

32 files changed

+612
-222
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ members = [
1212
"examples/cognitoidentity",
1313
"examples/cognitoidentityprovider",
1414
"examples/cognitosync",
15+
"examples/concurrency",
1516
"examples/config",
1617
"examples/custom-root-certificates",
1718
"examples/dynamodb",
@@ -48,6 +49,7 @@ members = [
4849
"examples/sqs",
4950
"examples/ssm",
5051
"examples/stepfunction",
52+
"examples/test-utils",
5153
"examples/testing",
5254
"examples/tls",
5355
"examples/transcribestreaming",

examples/apigatewaymanagement/src/bin/post_to_connection.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
use aws_config::meta::region::RegionProviderChain;
77
use aws_sdk_apigatewaymanagement::types::Blob;
8-
use aws_sdk_apigatewaymanagement::{config, Client, Endpoint, Error, Region, PKG_VERSION};
8+
use aws_sdk_apigatewaymanagement::{config, Client, Error, Region, PKG_VERSION};
99
use structopt::StructOpt;
1010

1111
#[derive(Debug, StructOpt)]
@@ -102,17 +102,16 @@ async fn main() -> Result<(), Error> {
102102
}
103103

104104
// snippet-start:[apigatewaymanagement.rust.post_to_connection_client]
105-
let endpoint = Endpoint::immutable(format!(
105+
let endpoint_url = format!(
106106
"https://{api_id}.execute-api.{region}.amazonaws.com/{stage}",
107107
api_id = api_id,
108108
region = region,
109109
stage = stage
110-
))
111-
.expect("valid endpoint");
110+
);
112111

113112
let shared_config = aws_config::from_env().region(region_provider).load().await;
114113
let api_management_config = config::Builder::from(&shared_config)
115-
.endpoint_resolver(endpoint)
114+
.endpoint_url(endpoint_url)
116115
.build();
117116
let client = Client::from_conf(api_management_config);
118117
// snippet-end:[apigatewaymanagement.rust.post_to_connection_client]

examples/concurrency/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[package]
2+
name = "concurrency"
3+
version = "0.1.0"
4+
authors = ["Zelda Hessler <[email protected]>"]
5+
description = "A collection of examples that demonstrate how to run AWS SDK requests concurrently. By running these\nexamples against your own AWS account, you can test how the SDK will behave when running variable numbers of requests\nwith a configurable concurrency limit. The right concurrency limit for you depends on the resources of the machine\nrunning this app, the data that's being sent in request, and the service handling your requests. Experiment with these\nexamples to find the best settings for your use case."
6+
edition = "2021"
7+
publish = false
8+
9+
[dependencies]
10+
futures = "0.3.25"
11+
tracing = "0.1.37"
12+
13+
[dependencies.clap]
14+
version = "~4.0.26"
15+
features = ["derive"]
16+
17+
[dependencies.tokio]
18+
version = "1.20.1"
19+
features = ["full"]
20+
21+
[dependencies.tracing-subscriber]
22+
version = "0.3.15"
23+
features = ["env-filter"]
24+
25+
[dev-dependencies]
26+
fastrand = "1.8.0"
27+
28+
[dev-dependencies.aws-config]
29+
path = "../../sdk/aws-config"
30+
version = "0.53.0"
31+
32+
[dev-dependencies.aws-sdk-s3]
33+
path = "../../sdk/s3"
34+
version = "0.23.0"
35+
36+
[dev-dependencies.aws-sdk-sqs]
37+
path = "../../sdk/sqs"
38+
version = "0.23.0"
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use aws_sdk_s3::types::ByteStream;
7+
use aws_sdk_s3::Client;
8+
use clap::Parser;
9+
use concurrency::Runtime;
10+
use std::iter::repeat_with;
11+
12+
const DEFAULT_CONCURRENCY_LIMIT: usize = 1_000;
13+
const DEFAULT_KEY_PREFIX: &str = "concurrency_test/object";
14+
const DEFAULT_OBJECT_SIZE_IN_BYTES: usize = 100_000;
15+
const DEFAULT_RUNTIME: Runtime = Runtime::MultiThreaded;
16+
const DEFAULT_TASK_COUNT: usize = 10_000;
17+
18+
#[derive(Parser, Debug)]
19+
#[command(author, version, about, long_about = None)]
20+
struct Args {
21+
/// The name of the S3 bucket that test objects will be uploaded to.
22+
#[arg(long)]
23+
bucket: String,
24+
25+
/// The prefix used to create keys for objects to be uploaded. For example, with the default
26+
/// prefix 'concurrency_test/object', the key of the first uploaded object will have the key
27+
/// 'concurrency_test/object_00000.txt'.
28+
#[arg(long, default_value_t = DEFAULT_KEY_PREFIX.to_string())]
29+
key_prefix: String,
30+
31+
/// The size of each uploaded object in bytes (100KB by default.) Each object will be a random
32+
/// alphanumeric string. The total amount of data uploaded will be equal to
33+
/// <task-count> * <object_size_in_bytes>. Larger sizes cause task creation to take more time.
34+
/// All of the strings are stored in memory while sending the requests so make sure you have
35+
/// enough RAM before setting this to a very large value.
36+
#[arg(long, default_value_t = DEFAULT_OBJECT_SIZE_IN_BYTES)]
37+
object_size_in_bytes: usize,
38+
39+
/// The total number of objects to upload to the S3 bucket.
40+
#[arg(long, default_value_t = DEFAULT_TASK_COUNT)]
41+
task_count: usize,
42+
43+
/// The maximum number of uploads running at a time.
44+
#[arg(long, default_value_t = DEFAULT_CONCURRENCY_LIMIT)]
45+
concurrency_limit: usize,
46+
47+
/// The runtime to use when running the tasks.
48+
#[arg(long, default_value_t = DEFAULT_RUNTIME)]
49+
runtime: Runtime,
50+
}
51+
52+
fn main() {
53+
let args = Args::parse();
54+
55+
let runtime = match args.runtime {
56+
Runtime::SingleThreaded => tokio::runtime::Builder::new_current_thread().build(),
57+
Runtime::MultiThreaded => tokio::runtime::Runtime::new(),
58+
}
59+
.unwrap();
60+
61+
runtime.block_on(async move { async_main(args).await })
62+
}
63+
64+
async fn async_main(args: Args) {
65+
let sdk_config = aws_config::load_from_env().await;
66+
let client = Client::new(&sdk_config);
67+
68+
let send_message_futures = (0..args.task_count).map(|i| {
69+
let key = format!("{}_{i:05}.txt", args.key_prefix);
70+
let body: Vec<_> = repeat_with(fastrand::alphanumeric)
71+
.take(args.object_size_in_bytes)
72+
.map(|c| c as u8)
73+
.collect();
74+
let fut = client
75+
.put_object()
76+
.bucket(&args.bucket)
77+
.key(key)
78+
.body(ByteStream::from(body))
79+
.send();
80+
81+
async move {
82+
// We unwrap here in order to stop running futures as soon as one of them fails.
83+
fut.await.expect("request should succeed")
84+
}
85+
});
86+
87+
let _res =
88+
concurrency::run_futures_concurrently(send_message_futures, args.concurrency_limit).await;
89+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use aws_sdk_sqs::Client;
7+
use clap::Parser;
8+
use concurrency::Runtime;
9+
10+
const DEFAULT_CONCURRENCY_LIMIT: usize = 1_000;
11+
const DEFAULT_RUNTIME: Runtime = Runtime::MultiThreaded;
12+
const DEFAULT_TASK_COUNT: usize = 10_000;
13+
14+
#[derive(Parser, Debug)]
15+
#[command(author, version, about, long_about = None)]
16+
struct Args {
17+
/// The URL of the Message Queue to send test messages to.
18+
#[arg(long)]
19+
message_queue_url: String,
20+
21+
/// The total number of test messages to send to the Message Queue.
22+
#[arg(long, default_value_t = DEFAULT_TASK_COUNT)]
23+
task_count: usize,
24+
25+
/// The maximum number of test messages to send to the Message Queue at a time.
26+
#[arg(long, default_value_t = DEFAULT_CONCURRENCY_LIMIT)]
27+
concurrency_limit: usize,
28+
29+
/// The runtime to use when running the tasks.
30+
#[arg(long, default_value_t = DEFAULT_RUNTIME)]
31+
runtime: Runtime,
32+
}
33+
34+
fn main() {
35+
let args = Args::parse();
36+
37+
let runtime = match args.runtime {
38+
Runtime::SingleThreaded => tokio::runtime::Builder::new_current_thread().build(),
39+
Runtime::MultiThreaded => tokio::runtime::Runtime::new(),
40+
}
41+
.unwrap();
42+
43+
runtime.block_on(async move { async_main(args).await })
44+
}
45+
46+
async fn async_main(args: Args) {
47+
// If you start encountering timeout errors, increase or disable the default timeouts.
48+
let sdk_config = aws_config::load_from_env().await;
49+
let client = Client::new(&sdk_config);
50+
51+
let send_message_futures = (0..args.task_count).map(|i| {
52+
let message_body = format!("concurrency test message #{i}");
53+
let fut = client
54+
.send_message()
55+
.queue_url(&args.message_queue_url)
56+
.message_body(message_body)
57+
.send();
58+
59+
async move {
60+
// We unwrap here in order to stop running futures as soon as one of them fails.
61+
fut.await.expect("request should succeed")
62+
}
63+
});
64+
65+
let _res =
66+
concurrency::run_futures_concurrently(send_message_futures, args.concurrency_limit).await;
67+
}

examples/concurrency/src/lib.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use clap::ValueEnum;
7+
use futures::future;
8+
use std::fmt::{self, Display, Formatter};
9+
use std::future::Future;
10+
use std::sync::Arc;
11+
use tokio::sync::Semaphore;
12+
use tokio::time::Instant;
13+
use tracing::{debug, info};
14+
15+
#[derive(Clone, Debug, ValueEnum)]
16+
pub enum Runtime {
17+
/// Use the single-threaded async runtime to run concurrent requests.
18+
SingleThreaded,
19+
/// Use the multi-threaded async runtime to run concurrent requests.
20+
MultiThreaded,
21+
}
22+
23+
impl Display for Runtime {
24+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
25+
write!(
26+
f,
27+
"{}",
28+
match self {
29+
Runtime::SingleThreaded => "single-threaded",
30+
Runtime::MultiThreaded => "multi-threaded",
31+
}
32+
)
33+
}
34+
}
35+
36+
pub async fn run_futures_concurrently<I, T, F>(task_futures: I, concurrency_limit: usize) -> Vec<T>
37+
where
38+
F: Future<Output = T>,
39+
I: Iterator<Item = F> + ExactSizeIterator,
40+
{
41+
tracing_subscriber::fmt::init();
42+
let task_count = task_futures.len();
43+
info!("Sending {task_count} messages, {concurrency_limit} at a time");
44+
45+
// a tokio semaphore can be used to ensure we only run up to <concurrency_limit> requests
46+
// at once.
47+
let semaphore = Arc::new(Semaphore::new(concurrency_limit));
48+
49+
// Marry each task future with a semaphore. `future::join_all` takes iterators as input so
50+
// there's no need to `.collect()` here.
51+
let futures = task_futures.into_iter().map(|fut| {
52+
// make a clone of the semaphore that can live in the future
53+
let semaphore = semaphore.clone();
54+
// because we wait on a permit from the semaphore, only <concurrency_limit> futures
55+
// will be run at once.
56+
async move {
57+
let permit = semaphore
58+
.acquire()
59+
.await
60+
.expect("we'll get one if we wait long enough");
61+
let res = fut.await;
62+
drop(permit);
63+
res
64+
}
65+
});
66+
67+
debug!("running futures concurrently with future::join_all");
68+
let start = Instant::now();
69+
let res: Vec<_> = future::join_all(futures).await;
70+
debug!("future_joined_successfully, asserting that all tasks were run...");
71+
assert_eq!(task_count, res.len());
72+
info!(
73+
"all {task_count} tasks completed after {:?}",
74+
start.elapsed()
75+
);
76+
77+
res
78+
}

examples/dynamodb/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ features = ["derive"]
4848

4949
[dependencies.serde_dynamo]
5050
version = "4"
51-
features = ["aws-sdk-dynamodb+0_19"]
51+
features = ["aws-sdk-dynamodb+0_22"]
5252

5353
[dependencies.structopt]
5454
version = "0.3"

examples/dynamodb/src/bin/list-tables-local.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
*/
55

66
// snippet-start:[dynamodb.rust.list-tables-local]
7-
use aws_sdk_dynamodb::{Client, Endpoint, Error};
7+
use aws_sdk_dynamodb::{Client, Error};
88
use dynamodb_code_examples::{make_config, scenario::list::list_tables, Opt};
99
use structopt::StructOpt;
1010

@@ -13,9 +13,9 @@ use structopt::StructOpt;
1313
async fn main() -> Result<(), Error> {
1414
let config = make_config(Opt::from_args()).await?;
1515
let dynamodb_local_config = aws_sdk_dynamodb::config::Builder::from(&config)
16-
.endpoint_resolver(
16+
.endpoint_url(
1717
// 8000 is the default dynamodb port
18-
Endpoint::immutable("http://localhost:8000").expect("Invalid endpoint"),
18+
"http://localhost:8000",
1919
)
2020
.build();
2121

examples/glue/Cargo.toml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ publish = false
1313
tracing = "0.1.37"
1414
async_once = "0.2.6"
1515
lazy_static = "1.4.0"
16+
clap_lex = "=0.3.0"
1617
thiserror = "1.0.37"
1718
secrecy = "0.8.0"
1819
futures = "0.3.25"
1920
tracing-bunyan-formatter = "0.3.4"
20-
mockall = "0.11.3"
2121

2222
[dependencies.aws-config]
2323
path = "../../sdk/aws-config"
@@ -27,10 +27,6 @@ version = "0.53.0"
2727
path = "../../sdk/glue"
2828
version = "0.23.0"
2929

30-
[dependencies.aws-sdk-iam]
31-
path = "../../sdk/iam"
32-
version = "0.23.0"
33-
3430
[dependencies.aws-sdk-s3]
3531
path = "../../sdk/s3"
3632
version = "0.23.0"
@@ -66,7 +62,7 @@ version = "0.3.15"
6662
features = ["env-filter"]
6763

6864
[dependencies.clap]
69-
version = "4.0.18"
65+
version = "~4.0.18"
7066
features = ["derive"]
7167

7268
[dependencies.uuid]

0 commit comments

Comments
 (0)