Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
577 changes: 577 additions & 0 deletions data/managed/log_sources/aws_inspector/log_source.yml

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion infra/lib/log-puller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface ExternalLogPullerProps {

// Managed log source types that support pulling.
export const PULLER_LOG_SOURCE_TYPES: string[] = [
"aws_inspector",
"o365",
"duo",
"enrich_abusech_urlhaus",
Expand All @@ -26,6 +27,7 @@ export const PULLER_LOG_SOURCE_TYPES: string[] = [
"enrich_otx",
];
const LOG_SOURCE_RATES: Record<string, cdk.Duration> = {
aws_inspector: cdk.Duration.minutes(10),
o365: cdk.Duration.minutes(1),
duo: cdk.Duration.minutes(1),
enrich_abusech_urlhaus: cdk.Duration.minutes(5),
Expand Down Expand Up @@ -83,7 +85,14 @@ export class ExternalLogPuller extends Construct {

func.addEnvironment("LOG_SOURCE_TO_SECRET_ARN_MAP", JSON.stringify(logSourceSecretMap));

props.ingestionBucket.grantWrite(func);
props.ingestionBucket.grantReadWrite(func);
// Used for managed log source.
func.addToRolePolicy(
new iam.PolicyStatement({
actions: ["inspector2:ListFindings"],
resources: ["*"],
})
);

const dlq = new sqs.Queue(this, "DLQ", {});

Expand Down
1 change: 1 addition & 0 deletions infra/lib/log-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ const MANAGED_LOG_SOURCE_PREFIX_MAP: Record<string, string> = {
aws_route53_resolver_logs: "aws",
aws_s3access: "aws",
aws_elb: "aws",
aws_inspector: "aws",
crowdstrike: "crowdstrike",
crowdstrike_falcon: "crowdstrike",
duo: "duo",
Expand Down
37 changes: 37 additions & 0 deletions lib/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions lib/rust/log_puller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ lambda_runtime = "0.7.1"
aws-config = "0.51.0"
aws_lambda_events = "0.7.2"
aws-sdk-s3 = "0.21.0"
aws-sdk-inspector2 = "0.21.0"
aws-sdk-secretsmanager = "0.21.0"
aws-smithy-client = "0.51.0"
aws-smithy-types = "0.51.0"
aws-smithy-types-convert = { version = "0.51.0", features = ["convert-chrono"] }
lazy_static = "1.4.0"
async_once = "0.2.6"
uuid = { version = "1.1.2", features = ["v4"] }
Expand Down
26 changes: 17 additions & 9 deletions lib/rust/log_puller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,20 @@ async fn handler(event: LambdaEvent<SqsEvent>) -> Result<Option<SQSBatchResponse

let puller = ctx.log_source_type.clone();
let log_source_name = record.log_source_name.clone();
let fut = puller
.pull_logs(client.clone(), ctx, start_dt, end_dt)
.and_then(|data| async move { upload_data(data, &record.log_source_name).await })
.map(move |r| {
r.with_context(|| format!("Error for log_source: {}", log_source_name))
});
let client = client.clone();

let fut = async move {
ctx.load_is_initial_run().await?;
let data = puller
.pull_logs(client.clone(), ctx, start_dt, end_dt)
.await?;
let did_upload = upload_data(data, &record.log_source_name).await?;
if did_upload {
ctx.mark_initial_run_complete().await?;
}
anyhow::Ok(())
}
.map(move |r| r.with_context(|| format!("Error for log_source: {}", log_source_name)));
anyhow::Ok(fut)
})
.zip(msg_ids.iter())
Expand Down Expand Up @@ -266,10 +274,10 @@ async fn handler(event: LambdaEvent<SqsEvent>) -> Result<Option<SQSBatchResponse
}
}

async fn upload_data(data: Vec<u8>, log_source: &str) -> Result<()> {
async fn upload_data(data: Vec<u8>, log_source: &str) -> Result<bool> {
if data.is_empty() {
info!("No new data for log_source: {}", log_source);
return Ok(());
return Ok(false);
}
info!("Uploading data for {}", log_source);
let bucket = std::env::var("INGESTION_BUCKET_NAME")?;
Expand Down Expand Up @@ -297,5 +305,5 @@ async fn upload_data(data: Vec<u8>, log_source: &str) -> Result<()> {
e
})?;

Ok(())
Ok(true)
}
7 changes: 1 addition & 6 deletions lib/rust/log_puller/src/pullers/abusech.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl PullLogs for AbuseChThreatfoxPuller {
end_dt: DateTime<FixedOffset>,
) -> Result<Vec<u8>> {
info!("Pulling Threatfox...");
let is_initial_run = ctx.is_initial_run().await?;
let is_initial_run = ctx.is_initial_run();

// This is a simple implementation. There's also a full export available
// that we could technically retrieve on the initial run, but keep it simple for now.
Expand Down Expand Up @@ -154,11 +154,6 @@ impl PullLogs for AbuseChThreatfoxPuller {
json_bytes.write(b"\n")?;
}

// TODO: should be integrated into puller to mark complete after upload succeeds.
if is_initial_run {
ctx.mark_initial_run_complete().await?;
}

Ok(json_bytes)
}
}
118 changes: 118 additions & 0 deletions lib/rust/log_puller/src/pullers/amazon_inspector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use aws_sdk_inspector2::{
input::ListFindingsInput,
model::{DateFilter, FilterCriteria},
};
use std::io::Write;
use std::sync::Arc;
use tokio::sync::Mutex;

use anyhow::{anyhow, Context as AnyhowContext, Error, Result};
use async_trait::async_trait;
use aws_smithy_client::{
self,
erase::{DynConnector, DynMiddleware},
};
use aws_smithy_types::DateTime as SmithyDateTime;
use aws_smithy_types_convert::date_time::DateTimeExt;
use chrono::{DateTime, FixedOffset};
use lazy_static::lazy_static;
use log::{debug, error, info};

use super::{PullLogs, PullLogsContext};
use async_once::AsyncOnce;
use shared::JsonValueExt;

#[derive(Clone)]
pub struct AmazonInspectorPuller;

lazy_static! {
static ref AWS_CONFIG: AsyncOnce<aws_config::SdkConfig> =
AsyncOnce::new(async { aws_config::load_from_env().await });
static ref INSPECTOR_RAW_CLIENT: aws_smithy_client::Client = make_inspector_raw_client();
}

fn make_inspector_raw_client() -> aws_smithy_client::Client {
let middleware = DynMiddleware::new(aws_sdk_inspector2::middleware::DefaultMiddleware::new());
aws_smithy_client::Client::builder()
.dyn_https_connector(
aws_smithy_client::http_connector::ConnectorSettings::builder().build(),
)
.middleware::<DynMiddleware<DynConnector>>(middleware)
.build()
}

#[async_trait]
impl PullLogs for AmazonInspectorPuller {
async fn pull_logs(
self,
client: reqwest::Client,
ctx: &PullLogsContext,
start_dt: DateTime<FixedOffset>,
end_dt: DateTime<FixedOffset>,
) -> Result<Vec<u8>> {
info!("Pulling Amazon Inspector logs....");

let raw_client = &INSPECTOR_RAW_CLIENT;
let sdk_conf = AWS_CONFIG.get().await;
let client_conf = aws_sdk_inspector2::config::Config::new(sdk_conf);

let start_dt = if ctx.is_initial_run() {
info!("Initial run for Amazon Inspector.");
end_dt - chrono::Duration::days(1)
} else {
start_dt
};

let date_filter = DateFilter::builder()
.start_inclusive(SmithyDateTime::from_chrono_fixed(start_dt))
.end_inclusive(SmithyDateTime::from_chrono_fixed(end_dt))
.build();
let filter = FilterCriteria::builder()
.last_observed_at(date_filter)
.build();

let mut next_token: Option<String> = None;
let mut is_first = true;

let mut all_findings = vec![];

while is_first || next_token.is_some() {
let input = ListFindingsInput::builder()
.filter_criteria(filter.clone())
.set_next_token(next_token.clone())
.build()?;
let op = input.make_operation(&client_conf).await?;
let resp = raw_client.call_raw(op).await?;
let (raw_resp, _) = resp.raw.into_parts();
let raw_body = raw_resp.body();

let body_val: serde_json::Value = serde_json::from_slice(raw_body.bytes().unwrap())?;
let mut body_val = body_val.into_object().context("Must be object")?;

let findings = body_val
.remove("findings")
.and_then(|v| v.into_array())
.unwrap_or_default();

all_findings.extend(findings);

next_token = body_val.remove("nextToken").and_then(|v| v.into_str());
is_first = false;

debug!("Loaded page for Amazon Inspector");
}

info!(
"Loaded {} findings for Amazon Inspector",
all_findings.len()
);
// Could be concurrent with IO...
let mut json_bytes = vec![];
for finding in all_findings {
json_bytes.write(serde_json::to_vec(&finding)?.as_slice())?;
json_bytes.write(b"\n")?;
}

Ok(json_bytes)
}
}
25 changes: 17 additions & 8 deletions lib/rust/log_puller/src/pullers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::atomic::AtomicBool};

use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
Expand All @@ -13,6 +13,7 @@ use tokio::sync::Mutex;
use shared::secrets::load_secret;

mod abusech;
mod amazon_inspector;
mod duo;
mod o365;
mod otx;
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct PullLogsContext {
tables_config: HashMap<String, config::Config>,
cache: Arc<Mutex<PullerCache>>,
s3: aws_sdk_s3::Client,
is_initial_run: AtomicBool,
}

impl PullLogsContext {
Expand All @@ -67,6 +69,7 @@ impl PullLogsContext {
tables_config,
cache: Arc::new(Mutex::new(PullerCache::new())),
s3,
is_initial_run: AtomicBool::new(false),
}
}

Expand All @@ -86,16 +89,11 @@ impl PullLogsContext {
}

/// Returns true if this is the first time the puller has run. Useful for e.g. pulling more logs on first run.
pub async fn is_initial_run(&self) -> Result<bool> {
pub async fn load_is_initial_run(&self) -> Result<bool> {
let bucket = std::env::var("INGESTION_BUCKET_NAME").context("need bucket!")?;

let initial_run_key = "__puller_initial_run__";
let s3_key = format!("{}/{}", initial_run_key, self.log_source_type.to_str());
let mut cache = self.cache.lock().await;

if cache.get(initial_run_key).is_some() {
return Ok(false);
}

let res = self
.s3
Expand All @@ -118,7 +116,8 @@ impl PullLogsContext {
}?;

if is_initial {
cache.set(initial_run_key, "".to_string(), None);
self.is_initial_run
.swap(true, std::sync::atomic::Ordering::Relaxed);
}

Ok(is_initial)
Expand All @@ -141,6 +140,11 @@ impl PullLogsContext {
Ok(())
}

pub fn is_initial_run(&self) -> bool {
self.is_initial_run
.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn config(&self) -> &HashMap<String, String> {
&self.config
}
Expand Down Expand Up @@ -169,6 +173,7 @@ pub trait PullLogs {
#[derive(Clone)]
#[enum_dispatch(PullLogs)]
pub enum LogSource {
AmazonInspectorPuller(amazon_inspector::AmazonInspectorPuller),
O365Puller(o365::O365Puller),
DuoPuller(duo::DuoPuller),
Otx(otx::OtxPuller),
Expand All @@ -180,6 +185,9 @@ pub enum LogSource {
impl LogSource {
pub fn from_str(s: &str) -> Option<LogSource> {
match s.to_lowercase().as_str() {
"aws_inspector" => Some(LogSource::AmazonInspectorPuller(
amazon_inspector::AmazonInspectorPuller {},
)),
"o365" => Some(LogSource::O365Puller(o365::O365Puller {})),
"duo" => Some(LogSource::DuoPuller(duo::DuoPuller {})),
"otx" => Some(LogSource::Otx(otx::OtxPuller {})),
Expand All @@ -197,6 +205,7 @@ impl LogSource {
}
pub fn to_str(&self) -> &str {
match self {
LogSource::AmazonInspectorPuller(_) => "aws_inspector",
LogSource::DuoPuller(_) => "duo",
LogSource::O365Puller(_) => "o365",
LogSource::Otx(_) => "otx",
Expand Down