From 42dc4da24f8028dddb76b4aab01f3a5be1bbf94f Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Sun, 31 May 2026 15:47:50 +0530 Subject: [PATCH 1/5] outbound_http_policy is created --- src/alerts/outbound_http_policy.rs | 338 +++++++++++++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100644 src/alerts/outbound_http_policy.rs diff --git a/src/alerts/outbound_http_policy.rs b/src/alerts/outbound_http_policy.rs new file mode 100644 index 000000000..4ad1788a6 --- /dev/null +++ b/src/alerts/outbound_http_policy.rs @@ -0,0 +1,338 @@ +use http::{HeaderMap, HeaderValue, header::HeaderName}; +use ipnet::IpNet; +use reqwest::ClientBuilder; +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + time::Duration, +}; +use tokio::net::lookup_host; +use url::Url; + +#[derive(Debug, Clone, Copy)] +pub enum AlertTargetKind { + Slack, + Webhook, + AlertManager, +} + +pub struct PreparedAlertTarget { + pub client: reqwest::Client, + pub headers: http::HeaderMap, +} + +#[derive(Debug, thiserror::Error)] +pub enum OutboundPolicyError { + #[error("missing URL host")] + MissingHost, + + #[error("Slack alert target must use HTTPS")] + SlackRequiresHttps, + + #[error("unsupported URL scheme:{0}")] + UnsupportedScheme(String), + + #[error("skipTlsCheck is disabled by server policy")] + InvalidTlsDisabled, + + #[error("failed to resolve target host {host} :{source}")] + ResolveFailed { + host: String, + source: std::io::Error, + }, + #[error("target host resolved to no address: {0}")] + NoResolvedAddresses(String), + + #[error("target address is denied by outbound policy: {0}")] + DeniedAddress(IpAddr), + + #[error("target domain is denied by outbound policy: {0}")] + DeniedDomain(String), + + #[error("private target requires P_ALERT_TARGET_ALLOW_PRIVATE=true and an allowlist match:{0}")] + PrivateAddressNotAllowed(IpAddr), + + #[error("invalid outbound policy CIDR{value}:{source}")] + InvalidCidr { + value: String, + source: ipnet::AddrParseError, + }, + + #[error("custom header is not allowed:{0}")] + DeniedHeader(String), + + #[error("invalid custom header name:{0}")] + InvalidHeaderName(String), + + #[error("invalid custom header value for:{0}")] + InvalidHeaderValue(String), + + #[error("invalid slack host: {0}")] + InvalidSlackHost(String), +} + +// All Alert-target outbound networking must enter here +// Policy authorizes the resolved destination, then pins exact DNS result into reqwest +pub async fn prepare_alert_target( + endpoint: &Url, + kind: AlertTargetKind, + skip_tls_check: bool, + headers: Option<&HashMap>, +) -> Result { + validate_scheme(endpoint, kind)?; + validate_tls_policy(kind, skip_tls_check)?; + let host = endpoint + .host_str() + .ok_or(OutboundPolicyError::MissingHost)? + .to_string(); + validate_domain_policy(&host, kind)?; + let port = endpoint + .port_or_known_default() + .ok_or_else(|| OutboundPolicyError::UnsupportedScheme(endpoint.scheme().to_string()))?; + let resolved_addrs = resolve_endpoint_addrs(&host, port).await?; + validate_resolved_addrs(&host, &resolved_addrs)?; + let authorization_allowed = operator_allowlist_matches(&host, &resolved_addrs)?; + let validated_headers = validate_header(headers, authorization_allowed)?; + let mut builder = default_client_builder() + .redirect(reqwest::redirect::Policy::none()) + .no_proxy() + .resolve_to_addrs(&host, &resolved_addrs); + if skip_tls_check { + builder = builder.danger_accept_invalid_certs(true); + } + let client = builder + .build() + .expect("alert target HTTP client can be constructed"); + Ok(PreparedAlertTarget { + client, + headers: validated_headers, + }) +} + +// create http client with timeouts to prevent resource exhaustion +fn default_client_builder() -> ClientBuilder { + ClientBuilder::new() + .connect_timeout(Duration::from_secs(3)) + .timeout(Duration::from_secs(30)) + .pool_idle_timeout(Duration::from_secs(90)) + .pool_max_idle_per_host(8) + .use_rustls_tls() + .http1_only() +} + +// We Only Accept http & https scheme +fn validate_scheme(endpoint: &Url, kind: AlertTargetKind) -> Result<(), OutboundPolicyError> { + match (kind, endpoint.scheme()) { + // for slack we must use https + (AlertTargetKind::Slack, "https") => Ok(()), + (AlertTargetKind::Slack, _) => Err(OutboundPolicyError::SlackRequiresHttps), + (_, "http" | "https") => Ok(()), + (_, scheme) => Err(OutboundPolicyError::UnsupportedScheme(scheme.to_string())), + } +} + +fn validate_tls_policy( + kind: AlertTargetKind, + skip_tls_check: bool, +) -> Result<(), OutboundPolicyError> { + // for slack we must ensure tls is enabled + if matches!(kind, AlertTargetKind::Slack) && skip_tls_check { + return Err(OutboundPolicyError::InvalidTlsDisabled); + } + // both tls flags need to align + if skip_tls_check && !env_bool("P_ALERT_TARGET_ALLOW_INVALID_TLS", false) { + return Err(OutboundPolicyError::InvalidTlsDisabled); + } + Ok(()) +} + +fn validate_domain_policy(host: &str, kind: AlertTargetKind) -> Result<(), OutboundPolicyError> { + // block explicitly denied domains + if matches_domain_list(host, "P_ALERT_TARGET_DENIED_DOMAINS") { + return Err(OutboundPolicyError::DeniedDomain(host.to_string())); + } + // slack target restricted to official webhook domains only + if matches!(kind, AlertTargetKind::Slack) + && host != "hooks.slack.com" + && host != "hooks.slack-gov.com" + { + return Err(OutboundPolicyError::InvalidSlackHost(host.to_string())); + } + Ok(()) +} + +// resolve hostname to ip +async fn resolve_endpoint_addrs( + host: &str, + port: u16, +) -> Result, OutboundPolicyError> { + // if host is ip, create socket directly + if let Ok(ip) = host.parse::() { + return Ok(vec![SocketAddr::new(ip, port)]); + } + // perform async dns resolution + let addrs = lookup_host((host, port)) + .await + .map_err(|source| OutboundPolicyError::ResolveFailed { + host: host.to_string(), + source, + })? + .collect::>(); + // fail if no address found + if addrs.is_empty() { + return Err(OutboundPolicyError::NoResolvedAddresses(host.to_string())); + } + Ok(addrs) +} +// check if target is explicitly allowlisted for private address access +fn operator_allowlist_matches( + host: &str, + addrs: &[SocketAddr], +) -> Result { + let allowed_cidrs = cidrs_from_env("P_ALERT_TARGET_ALLOWED_CIDRS")?; + let domain_allowed = matches_domain_list(host, "P_ALERT_TARGET_ALLOWED_DOMAINS"); + let cidrs_allowed = addrs + .iter() + .any(|addr| allowed_cidrs.iter().any(|cidr| cidr.contains(&addr.ip()))); + Ok(domain_allowed || cidrs_allowed) +} + +fn validate_resolved_addrs(host: &str, addrs: &[SocketAddr]) -> Result<(), OutboundPolicyError> { + let denied_cidrs = cidrs_from_env("P_ALERT_TARGET_DENIED_CIDRS")?; + let private_allowed = env_bool("P_ALERT_TARGET_ALLOW_PRIVATE", false); + let explicitly_allowlisted = operator_allowlist_matches(host, addrs)?; + for addr in addrs { + let ip = addr.ip(); + // deny ip addr which fall in the cidr range + if denied_cidrs.iter().any(|cidr| cidr.contains(&ip)) { + return Err(OutboundPolicyError::DeniedAddress(ip)); + } + // Private addresses require explicit opt-in plus allowlist match + if builtin_denied_ip(ip) && !(private_allowed && explicitly_allowlisted) { + return Err(OutboundPolicyError::PrivateAddressNotAllowed(ip)); + } + } + Ok(()) +} + +fn builtin_denied_ip(ip: IpAddr) -> bool { + // route to appropriate ip validation + match ip { + IpAddr::V4(ip) => denied_ipv4(ip), + IpAddr::V6(ip) => denied_ipv6(ip), + } +} + +fn denied_ipv4(ip: Ipv4Addr) -> bool { + // Check against RFC-defined special-use IPv4 address ranges + ip.is_unspecified() + || ip.is_loopback() + || ip.is_private() + || ip.is_link_local() + || ip.is_multicast() + || ip.octets()[0] == 100 && (64..=127).contains(&ip.octets()[1]) + || ip.octets()[0] >= 240 + || ip == Ipv4Addr::new(255, 255, 255, 255) +} + +fn denied_ipv6(ip: Ipv6Addr) -> bool { + // Check against RFC-defined special-use IPv6 address ranges + if ip.is_unspecified() || ip.is_loopback() || ip.is_multicast() { + return true; + } + let first = ip.segments()[0]; + + if (first & 0xfe00) == 0xfc00 { + return true; + } + if (first & 0xffc0) == 0xfe80 { + return true; + } + if let Some(mapped) = ip.to_ipv4_mapped() { + return denied_ipv4(mapped); + } + false +} + +fn validate_header( + headers: Option<&HashMap>, + authorization_allowed: bool, +) -> Result { + let mut map = HeaderMap::new(); + let Some(headers) = headers else { + return Ok(map); + }; + for (name, value) in headers { + let normalized = name.to_ascii_lowercase(); + // Block headers that can bypass security controls or proxy behavior + if denied_header(&normalized, authorization_allowed) { + return Err(OutboundPolicyError::DeniedHeader(name.clone())); + } + // Validate header name/value are syntactically correct before insertion + let header_name = HeaderName::from_bytes(name.as_bytes()) + .map_err(|_| OutboundPolicyError::InvalidHeaderName(name.clone()))?; + let header_value = HeaderValue::from_str(value) + .map_err(|_| OutboundPolicyError::InvalidHeaderValue(name.clone()))?; + map.insert(header_name, header_value); + } + Ok(map) +} + +fn denied_header(name: &str, authorization_allowed: bool) -> bool { + // These headers are always blocked because they control connection routing or proxy behavior + let always_denied = matches!( + name, + "host" + | "content-length" + | "transfer-encoding" + | "connection" + | "upgrade" + | "proxy-authorization" + | "proxy-authenticate" + | "cookie" + ); + + // Authorization header is only allowed for explicitly allowlisted destinations. + let authorization_denied = name == "authorization" && !authorization_allowed; + always_denied || authorization_denied +} + +// parse cidr blocks from env for ip allowlist/denylist +fn cidrs_from_env(name: &str) -> Result, OutboundPolicyError> { + env_list(name) + .into_iter() + .map(|value| { + value + .parse::() + .map_err(|source| OutboundPolicyError::InvalidCidr { value, source }) + }) + .collect() +} + +// check if host matches allowed domain,supporting subdomain +fn matches_domain_list(host: &str, env_name: &str) -> bool { + let host = host.trim_end_matches('.').to_ascii_lowercase(); + env_list(env_name).into_iter().any(|domain| { + let domain = domain.trim_end_matches('.').to_ascii_lowercase(); + host == domain || host.ends_with(&format!(".{domain}")) + }) +} + +// read comma-seperated values from env for config parsing +fn env_list(name: &str) -> Vec { + std::env::var(name) + .unwrap_or_default() + .split(',') + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) + .collect() +} + +// read bool values from env +fn env_bool(name: &str, default: bool) -> bool { + std::env::var(name) + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(default) +} From 84f27209644a8d4cf4abd4a1d1fc2407d5b2f73b Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Mon, 1 Jun 2026 19:06:51 +0530 Subject: [PATCH 2/5] Rewiring with existing code --- src/alerts/mod.rs | 4 + src/alerts/outbound_http_policy.rs | 1 + src/alerts/target.rs | 156 ++++++++++++++++++----------- src/handlers/http/targets.rs | 1 + 4 files changed, 106 insertions(+), 56 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 1d1b90999..abbeb3814 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -42,6 +42,7 @@ pub mod alert_structs; pub mod alert_traits; pub mod alert_types; pub mod alerts_utils; +pub mod outbound_http_policy; pub mod target; pub use crate::alerts::alert_enums::{ @@ -976,6 +977,8 @@ pub enum AlertError { Metadata(&'static str), #[error("User is not authorized to run this query")] Unauthorized, + #[error("Alert target outbound policy rejected request:{0}")] + OutboundPolicy(#[from] outbound_http_policy::OutboundPolicyError), #[error("ActixError: {0}")] Error(#[from] actix_web::Error), #[error("DataFusion Error: {0}")] @@ -1042,6 +1045,7 @@ impl actix_web::ResponseError for AlertError { Self::Unimplemented(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST, Self::MetastoreError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::OutboundPolicy(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/alerts/outbound_http_policy.rs b/src/alerts/outbound_http_policy.rs index 4ad1788a6..194adf738 100644 --- a/src/alerts/outbound_http_policy.rs +++ b/src/alerts/outbound_http_policy.rs @@ -197,6 +197,7 @@ fn operator_allowlist_matches( Ok(domain_allowed || cidrs_allowed) } +// reject the whole target if any resolved address is blocked fn validate_resolved_addrs(host: &str, addrs: &[SocketAddr]) -> Result<(), OutboundPolicyError> { let denied_cidrs = cidrs_from_env("P_ALERT_TARGET_DENIED_CIDRS")?; let private_allowed = env_bool("P_ALERT_TARGET_ALLOW_PRIVATE", false); diff --git a/src/alerts/target.rs b/src/alerts/target.rs index abe8792a5..25140db6e 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -25,16 +25,19 @@ use std::{ use async_trait::async_trait; use base64::Engine; use chrono::Utc; -use http::{HeaderMap, HeaderValue, header::AUTHORIZATION}; +use http::header::AUTHORIZATION; use itertools::Itertools; use once_cell::sync::Lazy; -use reqwest::ClientBuilder; use serde_json::{Value, json}; use tokio::sync::RwLock; use tracing::{error, trace, warn}; use ulid::Ulid; use url::Url; +use super::{ + ALERTS, + outbound_http_policy::{self, AlertTargetKind}, +}; use crate::{ alerts::{AlertError, AlertState, Context, alert_traits::CallableTarget}, metastore::metastore_traits::MetastoreObject, @@ -42,8 +45,6 @@ use crate::{ storage::object_storage::target_json_path, }; -use super::ALERTS; - pub static TARGETS: Lazy = Lazy::new(|| TargetConfigs { target_configs: RwLock::new(HashMap::new()), }); @@ -461,10 +462,38 @@ impl TargetType { TargetType::AlertManager(target) => target.call(payload).await, } } -} - -fn default_client_builder() -> ClientBuilder { - ClientBuilder::new() + pub async fn validate_outbound_policy(&self) -> Result<(), AlertError> { + match self { + TargetType::Slack(target) => { + outbound_http_policy::prepare_alert_target( + &target.endpoint, + AlertTargetKind::Slack, + false, + None, + ) + .await?; + } + TargetType::Other(target) => { + outbound_http_policy::prepare_alert_target( + &target.endpoint, + AlertTargetKind::Webhook, + target.skip_tls_check, + Some(&target.headers), + ) + .await?; + } + TargetType::AlertManager(target) => { + outbound_http_policy::prepare_alert_target( + &target.endpoint, + AlertTargetKind::AlertManager, + target.skip_tls_check, + None, + ) + .await?; + } + } + Ok(()) + } } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -475,10 +504,6 @@ pub struct SlackWebHook { #[async_trait] impl CallableTarget for SlackWebHook { async fn call(&self, payload: &Context) { - let client = default_client_builder() - .build() - .expect("Client can be constructed on this system"); - let alert = match payload.alert_info.alert_state { AlertState::Triggered => { serde_json::json!({ "text": payload.message }) @@ -490,13 +515,32 @@ impl CallableTarget for SlackWebHook { serde_json::json!({ "text": payload.default_disabled_string() }) } }; - - if let Err(e) = client.post(self.endpoint.clone()).json(&alert).send().await { - error!("Couldn't make call to webhook, error: {}", e) + // Revalidate immediately before delivery because stored targets and DNS can change. + let prepared = match outbound_http_policy::prepare_alert_target( + &self.endpoint, + AlertTargetKind::Slack, + false, + None, + ) + .await + { + Ok(prepared) => prepared, + Err(err) => { + error!("Slack alert target rejected by outbound policy:{err}"); + return; + } + }; + if let Err(e) = prepared + .client + .post(self.endpoint.clone()) + .json(&alert) + .send() + .await + { + error!("Couldn't make call to webhook, error:{}", e) } } } - #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct OtherWebHook { @@ -510,27 +554,31 @@ pub struct OtherWebHook { #[async_trait] impl CallableTarget for OtherWebHook { async fn call(&self, payload: &Context) { - let mut builder = default_client_builder(); - if self.skip_tls_check { - builder = builder.danger_accept_invalid_certs(true) - } - - let client = builder - .build() - .expect("Client can be constructed on this system"); - let alert = match payload.alert_info.alert_state { AlertState::Triggered => payload.message.clone(), AlertState::NotTriggered => payload.default_resolved_string(), AlertState::Disabled => payload.default_disabled_string(), }; - - let request = client + let prepared = match outbound_http_policy::prepare_alert_target( + &self.endpoint, + AlertTargetKind::Webhook, + self.skip_tls_check, + Some(&self.headers), + ) + .await + { + Ok(prepared) => prepared, + Err(err) => { + error!("Webhook alert target rejected by outbound policy:{err}"); + return; + } + }; + let request = prepared + .client .post(self.endpoint.clone()) - .headers((&self.headers).try_into().expect("valid_headers")); - + .headers(prepared.headers); if let Err(e) = request.body(alert).send().await { - error!("Couldn't make call to webhook, error: {}", e) + error!("Couldn't make call to webhook, error:{}", e) } } } @@ -548,26 +596,6 @@ pub struct AlertManager { #[async_trait] impl CallableTarget for AlertManager { async fn call(&self, payload: &Context) { - let mut builder = default_client_builder(); - - if self.skip_tls_check { - builder = builder.danger_accept_invalid_certs(true) - } - - if let Some(Auth { username, password }) = &self.auth { - let basic_auth_value = "Basic ".to_string() - + &base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); - let headers = HeaderMap::from_iter([( - AUTHORIZATION, - HeaderValue::try_from(basic_auth_value).expect("valid value"), - )]); - builder = builder.default_headers(headers) - } - - let client = builder - .build() - .expect("Client can be constructed on this system"); - let mut alerts = serde_json::json!([{ "labels": { "alertname": payload.alert_info.alert_name, @@ -598,13 +626,29 @@ impl CallableTarget for AlertManager { AlertState::Disabled => alert["labels"]["status"] = "disabled".into(), }; - if let Err(e) = client - .post(self.endpoint.clone()) - .json(&alerts) - .send() - .await + let prepared = match outbound_http_policy::prepare_alert_target( + &self.endpoint, + AlertTargetKind::AlertManager, + self.skip_tls_check, + None, + ) + .await { - error!("Couldn't make call to alertmanager, error: {}", e) + Ok(prepared) => prepared, + Err(err) => { + error!("Alertmanager target rejected by outbound policy:{err}"); + return; + } + }; + + let mut request = prepared.client.post(self.endpoint.clone()).json(&alerts); + if let Some(Auth { username, password }) = &self.auth { + let basic_auth_value = "Basic ".to_string() + + &base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); + request = request.header(AUTHORIZATION, basic_auth_value); + } + if let Err(e) = request.send().await { + error!("Couldn't make call to alertmanager, error:{}", e) } } } diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index db91c1e3d..c2f7664fe 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -38,6 +38,7 @@ pub async fn post( ) -> Result { let tenant_id = get_tenant_id_from_request(&req); target.tenant = tenant_id; + target.target.validate_outbound_policy().await?; // should check for duplicacy and liveness (??) // add to the map TARGETS.update(target.clone()).await?; From 55c7b6141544e45dac7f4f40bd7f94327e6fa7ac Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Tue, 2 Jun 2026 14:33:55 +0530 Subject: [PATCH 3/5] fix(alerts): enforce outbound policy for alert targets --- Cargo.toml | 2 + src/alerts/outbound_http_policy.rs | 178 ++++++++++++++--------- src/handlers/http/alert_target_policy.rs | 30 ++++ src/handlers/http/mod.rs | 1 + src/handlers/http/modal/query_server.rs | 3 +- src/handlers/http/modal/server.rs | 26 ++++ src/handlers/http/targets.rs | 1 + 7 files changed, 168 insertions(+), 73 deletions(-) create mode 100644 src/handlers/http/alert_target_policy.rs diff --git a/Cargo.toml b/Cargo.toml index 3d951e43f..b01ced5aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,9 +86,11 @@ tokio = { version = "^1.52.3", default-features = false, features = [ "macros", "fs", "rt-multi-thread", + "net" ] } tokio-stream = { version = "0.1.18", features = ["fs"] } tokio-util = { version = "0.7.18" } +ipnet = "2.12.0" # perf hotpath = { version = "0.16.0", optional = true, features = [ diff --git a/src/alerts/outbound_http_policy.rs b/src/alerts/outbound_http_policy.rs index 194adf738..297b17680 100644 --- a/src/alerts/outbound_http_policy.rs +++ b/src/alerts/outbound_http_policy.rs @@ -1,5 +1,6 @@ use http::{HeaderMap, HeaderValue, header::HeaderName}; use ipnet::IpNet; +use once_cell::sync::Lazy; use reqwest::ClientBuilder; use std::{ collections::HashMap, @@ -7,6 +8,7 @@ use std::{ time::Duration, }; use tokio::net::lookup_host; +use tokio::sync::RwLock; use url::Url; #[derive(Debug, Clone, Copy)] @@ -16,6 +18,66 @@ pub enum AlertTargetKind { AlertManager, } +// policy for alert-target egress +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AlertTargetPolicyConfig { + pub allow_private: bool, + pub allowed_domains: Vec, + pub allowed_cidrs: Vec, + pub denied_domains: Vec, + pub denied_cidrs: Vec, + pub allow_invalid_tls: bool, +} + +impl Default for AlertTargetPolicyConfig { + fn default() -> Self { + Self { + allow_private: false, + allowed_domains: vec![], + allowed_cidrs: vec![], + denied_domains: vec![], + denied_cidrs: vec![], + allow_invalid_tls: false, + } + } +} + +pub static ALERT_TARGET_POLICY: Lazy> = + Lazy::new(|| RwLock::new(AlertTargetPolicyConfig::default())); + +// Read one policy snapshot per validation +pub async fn active_policy() -> AlertTargetPolicyConfig { + ALERT_TARGET_POLICY.read().await.clone() +} + +// Replace atomically only after validation +pub async fn replace_policy(policy: AlertTargetPolicyConfig) -> Result<(), OutboundPolicyError> { + validate_policy(&policy)?; + *ALERT_TARGET_POLICY.write().await = policy; + Ok(()) +} + +// Validate admin-supplied policy before it is stored or used for target checks. +pub fn validate_policy(policy: &AlertTargetPolicyConfig) -> Result<(), OutboundPolicyError> { + parse_cidrs(&policy.allowed_cidrs)?; + parse_cidrs(&policy.denied_cidrs)?; + Ok(()) +} + +fn parse_cidrs(values: &[String]) -> Result, OutboundPolicyError> { + values + .iter() + .map(|val| { + val.parse::() + .map_err(|source| OutboundPolicyError::InvalidCidr { + value: val.clone(), + source, + }) + }) + .collect() +} + pub struct PreparedAlertTarget { pub client: reqwest::Client, pub headers: http::HeaderMap, @@ -49,7 +111,7 @@ pub enum OutboundPolicyError { #[error("target domain is denied by outbound policy: {0}")] DeniedDomain(String), - #[error("private target requires P_ALERT_TARGET_ALLOW_PRIVATE=true and an allowlist match:{0}")] + #[error("private target requires allowPrivate=true and an allowlist match:{0}")] PrivateAddressNotAllowed(IpAddr), #[error("invalid outbound policy CIDR{value}:{source}")] @@ -71,8 +133,8 @@ pub enum OutboundPolicyError { InvalidSlackHost(String), } -// All Alert-target outbound networking must enter here -// Policy authorizes the resolved destination, then pins exact DNS result into reqwest +// All alert-target outbound networking must enter here. +// The policy authorizes the resolved destination, then pins that exact DNS result into reqwest pub async fn prepare_alert_target( endpoint: &Url, kind: AlertTargetKind, @@ -80,18 +142,19 @@ pub async fn prepare_alert_target( headers: Option<&HashMap>, ) -> Result { validate_scheme(endpoint, kind)?; - validate_tls_policy(kind, skip_tls_check)?; + let policy = active_policy().await; + validate_tls_policy(kind, skip_tls_check, &policy)?; let host = endpoint .host_str() .ok_or(OutboundPolicyError::MissingHost)? .to_string(); - validate_domain_policy(&host, kind)?; + validate_domain_policy(&host, kind, &policy)?; let port = endpoint .port_or_known_default() .ok_or_else(|| OutboundPolicyError::UnsupportedScheme(endpoint.scheme().to_string()))?; let resolved_addrs = resolve_endpoint_addrs(&host, port).await?; - validate_resolved_addrs(&host, &resolved_addrs)?; - let authorization_allowed = operator_allowlist_matches(&host, &resolved_addrs)?; + validate_resolved_addrs(&host, &resolved_addrs, &policy)?; + let authorization_allowed = operator_allowlist_matches(&host, &resolved_addrs, &policy)?; let validated_headers = validate_header(headers, authorization_allowed)?; let mut builder = default_client_builder() .redirect(reqwest::redirect::Policy::none()) @@ -109,7 +172,7 @@ pub async fn prepare_alert_target( }) } -// create http client with timeouts to prevent resource exhaustion +// Keep alert delivery bounded so a slow or stuck target cannot hold workers forever. fn default_client_builder() -> ClientBuilder { ClientBuilder::new() .connect_timeout(Duration::from_secs(3)) @@ -120,10 +183,8 @@ fn default_client_builder() -> ClientBuilder { .http1_only() } -// We Only Accept http & https scheme fn validate_scheme(endpoint: &Url, kind: AlertTargetKind) -> Result<(), OutboundPolicyError> { match (kind, endpoint.scheme()) { - // for slack we must use https (AlertTargetKind::Slack, "https") => Ok(()), (AlertTargetKind::Slack, _) => Err(OutboundPolicyError::SlackRequiresHttps), (_, "http" | "https") => Ok(()), @@ -134,24 +195,29 @@ fn validate_scheme(endpoint: &Url, kind: AlertTargetKind) -> Result<(), Outbound fn validate_tls_policy( kind: AlertTargetKind, skip_tls_check: bool, + policy: &AlertTargetPolicyConfig, ) -> Result<(), OutboundPolicyError> { - // for slack we must ensure tls is enabled + // Slack webhooks are always HTTPS-only; do not let per-target config weaken that. if matches!(kind, AlertTargetKind::Slack) && skip_tls_check { return Err(OutboundPolicyError::InvalidTlsDisabled); } - // both tls flags need to align - if skip_tls_check && !env_bool("P_ALERT_TARGET_ALLOW_INVALID_TLS", false) { + // Invalid TLS is a deployment-level exception, not a user-controlled toggle. + if skip_tls_check && !policy.allow_invalid_tls { return Err(OutboundPolicyError::InvalidTlsDisabled); } Ok(()) } -fn validate_domain_policy(host: &str, kind: AlertTargetKind) -> Result<(), OutboundPolicyError> { - // block explicitly denied domains - if matches_domain_list(host, "P_ALERT_TARGET_DENIED_DOMAINS") { +fn validate_domain_policy( + host: &str, + kind: AlertTargetKind, + policy: &AlertTargetPolicyConfig, +) -> Result<(), OutboundPolicyError> { + // Denied domains win before DNS resolution, which avoids needless egress. + if matches_domain_list(host, &policy.denied_domains) { return Err(OutboundPolicyError::DeniedDomain(host.to_string())); } - // slack target restricted to official webhook domains only + // Slack targets are not generic webhooks; keep them pinned to Slack-owned hosts. if matches!(kind, AlertTargetKind::Slack) && host != "hooks.slack.com" && host != "hooks.slack-gov.com" @@ -161,16 +227,13 @@ fn validate_domain_policy(host: &str, kind: AlertTargetKind) -> Result<(), Outbo Ok(()) } -// resolve hostname to ip async fn resolve_endpoint_addrs( host: &str, port: u16, ) -> Result, OutboundPolicyError> { - // if host is ip, create socket directly if let Ok(ip) = host.parse::() { return Ok(vec![SocketAddr::new(ip, port)]); } - // perform async dns resolution let addrs = lookup_host((host, port)) .await .map_err(|source| OutboundPolicyError::ResolveFailed { @@ -178,38 +241,41 @@ async fn resolve_endpoint_addrs( source, })? .collect::>(); - // fail if no address found if addrs.is_empty() { return Err(OutboundPolicyError::NoResolvedAddresses(host.to_string())); } Ok(addrs) } -// check if target is explicitly allowlisted for private address access + +// Private/internal targets must match admin-owned allowlists before they may be used. fn operator_allowlist_matches( host: &str, addrs: &[SocketAddr], + policy: &AlertTargetPolicyConfig, ) -> Result { - let allowed_cidrs = cidrs_from_env("P_ALERT_TARGET_ALLOWED_CIDRS")?; - let domain_allowed = matches_domain_list(host, "P_ALERT_TARGET_ALLOWED_DOMAINS"); + let allowed_cidrs = parse_cidrs(&policy.allowed_cidrs)?; + let domain_allowed = matches_domain_list(host, &policy.allowed_domains); let cidrs_allowed = addrs .iter() .any(|addr| allowed_cidrs.iter().any(|cidr| cidr.contains(&addr.ip()))); Ok(domain_allowed || cidrs_allowed) } -// reject the whole target if any resolved address is blocked -fn validate_resolved_addrs(host: &str, addrs: &[SocketAddr]) -> Result<(), OutboundPolicyError> { - let denied_cidrs = cidrs_from_env("P_ALERT_TARGET_DENIED_CIDRS")?; - let private_allowed = env_bool("P_ALERT_TARGET_ALLOW_PRIVATE", false); - let explicitly_allowlisted = operator_allowlist_matches(host, addrs)?; +// Fail closed for multi-address DNS responses. If any resolved address is +// denied or non-public without an allowlist match, reject the target. +fn validate_resolved_addrs( + host: &str, + addrs: &[SocketAddr], + policy: &AlertTargetPolicyConfig, +) -> Result<(), OutboundPolicyError> { + let denied_cidrs = parse_cidrs(&policy.denied_cidrs)?; + let explicitly_allowlisted = operator_allowlist_matches(host, addrs, policy)?; for addr in addrs { let ip = addr.ip(); - // deny ip addr which fall in the cidr range if denied_cidrs.iter().any(|cidr| cidr.contains(&ip)) { return Err(OutboundPolicyError::DeniedAddress(ip)); } - // Private addresses require explicit opt-in plus allowlist match - if builtin_denied_ip(ip) && !(private_allowed && explicitly_allowlisted) { + if builtin_denied_ip(ip) && !(policy.allow_private && explicitly_allowlisted) { return Err(OutboundPolicyError::PrivateAddressNotAllowed(ip)); } } @@ -217,7 +283,6 @@ fn validate_resolved_addrs(host: &str, addrs: &[SocketAddr]) -> Result<(), Outbo } fn builtin_denied_ip(ip: IpAddr) -> bool { - // route to appropriate ip validation match ip { IpAddr::V4(ip) => denied_ipv4(ip), IpAddr::V6(ip) => denied_ipv6(ip), @@ -225,7 +290,7 @@ fn builtin_denied_ip(ip: IpAddr) -> bool { } fn denied_ipv4(ip: Ipv4Addr) -> bool { - // Check against RFC-defined special-use IPv4 address ranges + // Covers private, loopback, link-local, multicast, carrier NAT, and reserved ranges. ip.is_unspecified() || ip.is_loopback() || ip.is_private() @@ -237,7 +302,7 @@ fn denied_ipv4(ip: Ipv4Addr) -> bool { } fn denied_ipv6(ip: Ipv6Addr) -> bool { - // Check against RFC-defined special-use IPv6 address ranges + // Covers loopback, link-local, unique-local, multicast, and mapped IPv4 ranges. if ip.is_unspecified() || ip.is_loopback() || ip.is_multicast() { return true; } @@ -265,7 +330,7 @@ fn validate_header( }; for (name, value) in headers { let normalized = name.to_ascii_lowercase(); - // Block headers that can bypass security controls or proxy behavior + // Block headers that can bypass connection policy or smuggle credentials. if denied_header(&normalized, authorization_allowed) { return Err(OutboundPolicyError::DeniedHeader(name.clone())); } @@ -280,7 +345,7 @@ fn validate_header( } fn denied_header(name: &str, authorization_allowed: bool) -> bool { - // These headers are always blocked because they control connection routing or proxy behavior + // These headers are always blocked because they control routing or proxy behavior. let always_denied = matches!( name, "host" @@ -293,47 +358,16 @@ fn denied_header(name: &str, authorization_allowed: bool) -> bool { | "cookie" ); - // Authorization header is only allowed for explicitly allowlisted destinations. + // Authorization is allowed only after the destination matches admin policy. let authorization_denied = name == "authorization" && !authorization_allowed; always_denied || authorization_denied } -// parse cidr blocks from env for ip allowlist/denylist -fn cidrs_from_env(name: &str) -> Result, OutboundPolicyError> { - env_list(name) - .into_iter() - .map(|value| { - value - .parse::() - .map_err(|source| OutboundPolicyError::InvalidCidr { value, source }) - }) - .collect() -} - -// check if host matches allowed domain,supporting subdomain -fn matches_domain_list(host: &str, env_name: &str) -> bool { +// Domain entries match the exact host and its subdomains. +fn matches_domain_list(host: &str, domains: &[String]) -> bool { let host = host.trim_end_matches('.').to_ascii_lowercase(); - env_list(env_name).into_iter().any(|domain| { + domains.iter().any(|domain| { let domain = domain.trim_end_matches('.').to_ascii_lowercase(); host == domain || host.ends_with(&format!(".{domain}")) }) } - -// read comma-seperated values from env for config parsing -fn env_list(name: &str) -> Vec { - std::env::var(name) - .unwrap_or_default() - .split(',') - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(ToOwned::to_owned) - .collect() -} - -// read bool values from env -fn env_bool(name: &str, default: bool) -> bool { - std::env::var(name) - .ok() - .and_then(|value| value.parse::().ok()) - .unwrap_or(default) -} diff --git a/src/handlers/http/alert_target_policy.rs b/src/handlers/http/alert_target_policy.rs new file mode 100644 index 000000000..92986524d --- /dev/null +++ b/src/handlers/http/alert_target_policy.rs @@ -0,0 +1,30 @@ +use actix_web::{ + Responder, + web::{self, Json}, +}; + +use crate::alerts::{ + AlertError, + outbound_http_policy::{ + AlertTargetPolicyConfig, active_policy, replace_policy, validate_policy, + }, +}; + +pub async fn get() -> Result { + Ok(web::Json(active_policy().await)) +} + +pub async fn put( + Json(policy): Json, +) -> Result { + // validate before replacing so a bad policy never becomes active + replace_policy(policy.clone()).await?; + Ok(web::Json(policy)) +} + +pub async fn validate( + Json(policy): Json, +) -> Result { + validate_policy(&policy)?; + Ok(web::Json(policy)) +} diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 25962701d..1723cedce 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -29,6 +29,7 @@ use crate::{INTRA_CLUSTER_CLIENT, parseable::PARSEABLE}; use self::query::Query; pub mod about; +pub mod alert_target_policy; pub mod alerts; pub mod apikeys; pub mod cluster; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 828ef1585..8b0e7ff00 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -76,7 +76,8 @@ impl ParseableServer for QueryServer { .service(Server::get_alerts_webscope()) .service(Server::get_targets_webscope()) .service(Self::get_cluster_web_scope()) - .service(Server::get_demo_data_webscope()), + .service(Server::get_demo_data_webscope()) + .service(Server::get_alert_target_policy()), ) .service( web::scope(&prism_base_path()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index cae7dd3aa..a5929cebe 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -45,6 +45,7 @@ use crate::storage::field_stats::get_dataset_stats; use crate::sync; use crate::sync::sync_start; +use crate::handlers::http::alert_target_policy; use actix_web::Resource; use actix_web::Scope; use actix_web::web; @@ -99,6 +100,7 @@ impl ParseableServer for Server { .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) .service(Self::get_targets_webscope()) + .service(Self::get_alert_target_policy()) .service(Self::get_metrics_webscope()) .service(Self::get_demo_data_webscope()), ) @@ -343,6 +345,30 @@ impl Server { ) } + pub fn get_alert_target_policy() -> Scope { + web::scope("/admin/alert-target-policy") + .service( + web::resource("") + .route( + web::get() + .to(alert_target_policy::get) + .authorize(Action::SuperAdmin), + ) + .route( + web::put() + .to(alert_target_policy::put) + .authorize(Action::SuperAdmin), + ), + ) + .service( + web::resource("/validate").route( + web::post() + .to(alert_target_policy::validate) + .authorize(Action::SuperAdmin), + ), + ) + } + pub fn get_targets_webscope() -> Scope { web::scope("/targets") .service( diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index c2f7664fe..7b0b4b04f 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -90,6 +90,7 @@ pub async fn update( // esnure that the supplied target id is assigned to the target config target.id = target_id; target.tenant = tenant_id; + target.target.validate_outbound_policy().await?; // should check for duplicacy and liveness (??) // add to the map TARGETS.update(target.clone()).await?; From a9559144ef70d3bf4d1042d90629759ff19414d0 Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Tue, 2 Jun 2026 17:45:43 +0530 Subject: [PATCH 4/5] addressed few issues and added tests --- Cargo.lock | 11 +- src/alerts/mod.rs | 15 ++- src/alerts/outbound_http_policy.rs | 202 +++++++++++++++++++++++++++-- src/alerts/target.rs | 17 ++- src/lib.rs | 2 +- src/metastore/metastore_traits.rs | 2 +- 6 files changed, 224 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e9912d0c..1d115a699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3011,7 +3011,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "tokio", "tower-service", "tracing", @@ -3196,9 +3196,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "ipnet" -version = "2.11.0" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" @@ -4087,6 +4087,7 @@ dependencies = [ "humantime", "humantime-serde", "indexmap", + "ipnet", "itertools 0.15.0", "lazy_static", "num_cpus", @@ -4545,7 +4546,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.4", "thiserror 2.0.17", "tokio", "tracing", @@ -4582,7 +4583,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.4", "tracing", "windows-sys 0.60.2", ] diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index abbeb3814..c4fd2060c 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -1050,9 +1050,18 @@ impl actix_web::ResponseError for AlertError { } fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::build(self.status_code()) - .insert_header(ContentType::plaintext()) - .body(self.to_string()) + match self { + Self::OutboundPolicy(_) => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::json()) + .json(serde_json::json!({ + "error": "Alert target blocked by outbound security policy", + "message": self.to_string(), + "hint": "Ask admin to allow this destination using the alert target policy." + })), + _ => actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()), + } } } diff --git a/src/alerts/outbound_http_policy.rs b/src/alerts/outbound_http_policy.rs index 297b17680..1fd847955 100644 --- a/src/alerts/outbound_http_policy.rs +++ b/src/alerts/outbound_http_policy.rs @@ -19,7 +19,7 @@ pub enum AlertTargetKind { } // policy for alert-target egress -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct AlertTargetPolicyConfig { pub allow_private: bool, @@ -30,19 +30,6 @@ pub struct AlertTargetPolicyConfig { pub allow_invalid_tls: bool, } -impl Default for AlertTargetPolicyConfig { - fn default() -> Self { - Self { - allow_private: false, - allowed_domains: vec![], - allowed_cidrs: vec![], - denied_domains: vec![], - denied_cidrs: vec![], - allow_invalid_tls: false, - } - } -} - pub static ALERT_TARGET_POLICY: Lazy> = Lazy::new(|| RwLock::new(AlertTargetPolicyConfig::default())); @@ -81,6 +68,7 @@ fn parse_cidrs(values: &[String]) -> Result, OutboundPolicyError> { pub struct PreparedAlertTarget { pub client: reqwest::Client, pub headers: http::HeaderMap, + pub authorization_allowed: bool, } #[derive(Debug, thiserror::Error)] @@ -169,6 +157,7 @@ pub async fn prepare_alert_target( Ok(PreparedAlertTarget { client, headers: validated_headers, + authorization_allowed, }) } @@ -314,6 +303,15 @@ fn denied_ipv6(ip: Ipv6Addr) -> bool { if (first & 0xffc0) == 0xfe80 { return true; } + + // Block IPv6 transition ranges that can carry embedded IPv4 destinations. + if first == 0x2002 { + return true; + } + if first == 0x2001 && ip.segments()[1] == 0x0000 { + return true; + } + if let Some(mapped) = ip.to_ipv4_mapped() { return denied_ipv4(mapped); } @@ -371,3 +369,179 @@ fn matches_domain_list(host: &str, domains: &[String]) -> bool { host == domain || host.ends_with(&format!(".{domain}")) }) } + +#[cfg(test)] +mod tests { + use super::*; + use tokio::sync::{Mutex, MutexGuard}; + + static POLICY_TEST_LOCK: Lazy> = Lazy::new(|| Mutex::new(())); + + fn policy_with( + allow_private: bool, + allowed_cidrs: &[&str], + denied_cidrs: &[&str], + ) -> AlertTargetPolicyConfig { + AlertTargetPolicyConfig { + allow_private, + allowed_cidrs: allowed_cidrs + .iter() + .map(|value| value.to_string()) + .collect(), + denied_cidrs: denied_cidrs.iter().map(|value| value.to_string()).collect(), + ..Default::default() + } + } + + fn socket(ip: IpAddr) -> SocketAddr { + SocketAddr::new(ip, 80) + } + + async fn set_policy( + policy: AlertTargetPolicyConfig, + ) -> Result, OutboundPolicyError> { + let guard = POLICY_TEST_LOCK.lock().await; + replace_policy(policy).await?; + Ok(guard) + } + + #[test] + fn public_addresses_are_allowed_by_default() { + let policy = AlertTargetPolicyConfig::default(); + let addrs = [socket(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)))]; + + validate_resolved_addrs("example.com", &addrs, &policy).unwrap(); + } + + #[test] + fn private_addresses_are_blocked_by_default() { + let policy = AlertTargetPolicyConfig::default(); + let addrs = [socket(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]; + + let err = validate_resolved_addrs("localhost", &addrs, &policy).unwrap_err(); + + assert!(matches!( + err, + OutboundPolicyError::PrivateAddressNotAllowed(_) + )); + } + + #[test] + fn private_addresses_require_allow_private_and_allowlist_match() { + let addrs = [socket(IpAddr::V4(Ipv4Addr::new(10, 10, 0, 10)))]; + let policy = policy_with(true, &["10.10.0.0/16"], &[]); + + validate_resolved_addrs("internal.example.com", &addrs, &policy).unwrap(); + } + + #[test] + fn denied_cidrs_override_allowlists() { + let addrs = [socket(IpAddr::V4(Ipv4Addr::new(10, 10, 0, 10)))]; + let policy = policy_with(true, &["10.10.0.0/16"], &["10.10.0.10/32"]); + + let err = validate_resolved_addrs("internal.example.com", &addrs, &policy).unwrap_err(); + + assert!(matches!(err, OutboundPolicyError::DeniedAddress(_))); + } + + #[test] + fn invalid_cidr_policy_is_rejected() { + let policy = policy_with(true, &["bad-cidr"], &[]); + + let err = validate_policy(&policy).unwrap_err(); + + assert!(matches!(err, OutboundPolicyError::InvalidCidr { .. })); + } + + #[test] + fn authorization_header_requires_allowlist_decision() { + let mut headers = HashMap::new(); + headers.insert("Authorization".to_string(), "Bearer token".to_string()); + + let err = validate_header(Some(&headers), false).unwrap_err(); + assert!(matches!(err, OutboundPolicyError::DeniedHeader(name) if name == "Authorization")); + + let validated = validate_header(Some(&headers), true).unwrap(); + assert_eq!( + validated.get("authorization").unwrap(), + HeaderValue::from_static("Bearer token") + ); + } + + #[test] + fn cookie_header_is_always_blocked() { + let mut headers = HashMap::new(); + headers.insert("Cookie".to_string(), "session=secret".to_string()); + + let err = validate_header(Some(&headers), true).unwrap_err(); + + assert!(matches!(err, OutboundPolicyError::DeniedHeader(name) if name == "Cookie")); + } + + #[test] + fn slack_targets_are_https_only_and_host_pinned() { + let http_slack = Url::parse("http://hooks.slack.com/services/test").unwrap(); + let fake_slack = Url::parse("https://example.com/services/test").unwrap(); + let policy = AlertTargetPolicyConfig::default(); + + assert!(matches!( + validate_scheme(&http_slack, AlertTargetKind::Slack), + Err(OutboundPolicyError::SlackRequiresHttps) + )); + assert!(matches!( + validate_domain_policy("example.com", AlertTargetKind::Slack, &policy), + Err(OutboundPolicyError::InvalidSlackHost(_)) + )); + validate_domain_policy("hooks.slack.com", AlertTargetKind::Slack, &policy).unwrap(); + validate_domain_policy("hooks.slack-gov.com", AlertTargetKind::Slack, &policy).unwrap(); + validate_scheme(&fake_slack, AlertTargetKind::Slack).unwrap(); + } + + #[tokio::test] + async fn skip_tls_check_requires_operator_policy() { + let _guard = set_policy(AlertTargetPolicyConfig::default()) + .await + .unwrap(); + let endpoint = Url::parse("https://1.1.1.1/webhook").unwrap(); + + let result = prepare_alert_target(&endpoint, AlertTargetKind::Webhook, true, None).await; + + assert!(matches!( + result, + Err(OutboundPolicyError::InvalidTlsDisabled) + )); + } + #[test] + fn ipv6_transition_addresses_are_blocked() { + assert!(builtin_denied_ip(IpAddr::V6( + "2002:0a0a:0001::".parse().unwrap() + ))); + assert!(builtin_denied_ip(IpAddr::V6( + "2001:0000:4136:e378:8000:63bf:3fff:fdd2".parse().unwrap() + ))); + } + + #[tokio::test] + async fn prepare_alert_target_allows_authorization_for_allowlisted_destination() { + let _guard = set_policy(AlertTargetPolicyConfig { + allowed_cidrs: vec!["1.1.1.1/32".to_string()], + ..Default::default() + }) + .await + .unwrap(); + let endpoint = Url::parse("http://1.1.1.1/webhook").unwrap(); + let mut headers = HashMap::new(); + headers.insert("Authorization".to_string(), "Bearer token".to_string()); + + let prepared = + prepare_alert_target(&endpoint, AlertTargetKind::Webhook, false, Some(&headers)) + .await + .unwrap(); + + assert!(prepared.authorization_allowed); + assert_eq!( + prepared.headers.get("authorization").unwrap(), + HeaderValue::from_static("Bearer token") + ); + } +} diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 25140db6e..069a66d6f 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -483,13 +483,20 @@ impl TargetType { .await?; } TargetType::AlertManager(target) => { - outbound_http_policy::prepare_alert_target( + let prepared = outbound_http_policy::prepare_alert_target( &target.endpoint, AlertTargetKind::AlertManager, target.skip_tls_check, None, ) .await?; + // Alert Manager auth becomes an Authorization header at delivery time + if target.auth.is_some() && !prepared.authorization_allowed { + return Err(outbound_http_policy::OutboundPolicyError::DeniedHeader( + AUTHORIZATION.as_str().to_string(), + ) + .into()); + } } } Ok(()) @@ -643,6 +650,14 @@ impl CallableTarget for AlertManager { let mut request = prepared.client.post(self.endpoint.clone()).json(&alerts); if let Some(Auth { username, password }) = &self.auth { + // Credentials are only safe when the destination itself is explicitly allowlisted. + if !prepared.authorization_allowed { + error!( + "Alertmanager credentials blocked by outbound policy for destination:{}", + self.endpoint + ); + return; + } let basic_auth_value = "Basic ".to_string() + &base64::prelude::BASE64_STANDARD.encode(format!("{username}:{password}")); request = request.header(AUTHORIZATION, basic_auth_value); diff --git a/src/lib.rs b/src/lib.rs index 21030b7b0..94f65c034 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,8 +68,8 @@ use once_cell::sync::Lazy; pub use openid; use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; +pub use {clap, tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk}; -pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber, clap}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 4000ccee5..9eb6d3190 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -229,7 +229,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { stream_name: &str, get_base: bool, tenant_id: &Option, - is_migration: bool + is_migration: bool, ) -> Result; async fn put_stream_json( &self, From a2143817c69ba4d5417abbc14cae874705ecaac3 Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Wed, 1 Jul 2026 13:02:33 +0530 Subject: [PATCH 5/5] feat(alerts): make outbound HTTP policy tenant-scoped and persistent - replace the single global policy manager with per-tenant policy state - persist outbound HTTP policy through metastore and reload it on startup - reject policies with overlapping allow/deny domain or CIDR entries - follow the existing tenant-keyed design used elsewhere in the codebase --- src/alerts/alert_traits.rs | 2 +- src/alerts/outbound_http_policy.rs | 299 ++++++++++++++---- src/alerts/target.rs | 111 ++++--- src/handlers/http/alert_target_policy.rs | 13 +- src/handlers/http/modal/mod.rs | 6 +- src/handlers/http/targets.rs | 22 +- src/metastore/metastore_traits.rs | 15 +- .../metastores/object_store_metastore.rs | 96 +++++- src/storage/localfs.rs | 2 + src/storage/object_storage.rs | 8 + src/validator.rs | 23 +- 11 files changed, 475 insertions(+), 122 deletions(-) diff --git a/src/alerts/alert_traits.rs b/src/alerts/alert_traits.rs index fdce7b26c..9899ed303 100644 --- a/src/alerts/alert_traits.rs +++ b/src/alerts/alert_traits.rs @@ -124,5 +124,5 @@ pub trait AlertManagerTrait: Send + Sync { #[async_trait] pub trait CallableTarget { - async fn call(&self, payload: &Context); + async fn call(&self, tenant_id: &Option, payload: &Context); } diff --git a/src/alerts/outbound_http_policy.rs b/src/alerts/outbound_http_policy.rs index 1fd847955..4ac8f5115 100644 --- a/src/alerts/outbound_http_policy.rs +++ b/src/alerts/outbound_http_policy.rs @@ -9,8 +9,12 @@ use std::{ }; use tokio::net::lookup_host; use tokio::sync::RwLock; +use tracing::error; use url::Url; +use crate::metastore::{MetastoreError, metastore_traits::Metastore}; +use crate::parseable::PARSEABLE; + #[derive(Debug, Clone, Copy)] pub enum AlertTargetKind { Slack, @@ -30,28 +34,108 @@ pub struct AlertTargetPolicyConfig { pub allow_invalid_tls: bool, } -pub static ALERT_TARGET_POLICY: Lazy> = - Lazy::new(|| RwLock::new(AlertTargetPolicyConfig::default())); +pub static ALERT_TARGET_POLICY: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); // Read one policy snapshot per validation -pub async fn active_policy() -> AlertTargetPolicyConfig { - ALERT_TARGET_POLICY.read().await.clone() +pub async fn active_policy( + tenant_id: &Option, +) -> Result { + let tenant = tenant_id + .as_deref() + .unwrap_or(crate::parseable::DEFAULT_TENANT); + let guard = ALERT_TARGET_POLICY.read().await; + Ok(guard + .get(tenant) + .cloned() + .unwrap_or_else(AlertTargetPolicyConfig::default)) } // Replace atomically only after validation -pub async fn replace_policy(policy: AlertTargetPolicyConfig) -> Result<(), OutboundPolicyError> { +pub async fn replace_policy( + tenant_id: &Option, + policy: AlertTargetPolicyConfig, +) -> Result<(), OutboundPolicyError> { + let tenant = tenant_id + .as_deref() + .unwrap_or(crate::parseable::DEFAULT_TENANT); validate_policy(&policy)?; - *ALERT_TARGET_POLICY.write().await = policy; + let storage_tenant = if tenant == crate::parseable::DEFAULT_TENANT { + None + } else { + Some(tenant.to_string()) + }; + PARSEABLE + .metastore + .put_outbound_policy(&storage_tenant, &policy) + .await?; + ALERT_TARGET_POLICY + .write() + .await + .insert(tenant.to_string(), policy); Ok(()) } // Validate admin-supplied policy before it is stored or used for target checks. pub fn validate_policy(policy: &AlertTargetPolicyConfig) -> Result<(), OutboundPolicyError> { - parse_cidrs(&policy.allowed_cidrs)?; - parse_cidrs(&policy.denied_cidrs)?; + let allowed = parse_cidrs(&policy.allowed_cidrs)?; + let denied = parse_cidrs(&policy.denied_cidrs)?; + + if let Some(conflict) = find_conflicting_cidr(&allowed, &denied) { + return Err(OutboundPolicyError::ConflictingCidrs(conflict)); + } + + if let Some(conflict) = find_conflicting_domain(&policy.allowed_domains, &policy.denied_domains) + { + return Err(OutboundPolicyError::ConflictingDomains(conflict)); + } + Ok(()) } +fn find_conflicting_cidr(allowed: &[IpNet], denied: &[IpNet]) -> Option { + allowed.iter().find_map(|allowed_cidr| { + denied + .iter() + .any(|denied_cidr| cidrs_overlap(allowed_cidr, denied_cidr)) + .then_some(allowed_cidr.to_string()) + }) +} + +fn find_conflicting_domain(allowed: &[String], denied: &[String]) -> Option { + allowed.iter().find_map(|allowed_domain| { + denied + .iter() + .any(|denied_domain| domains_overlap(allowed_domain, denied_domain)) + .then_some(normalize_domain(allowed_domain)) + }) +} + +fn normalize_domain(domain: &str) -> String { + domain.trim_end_matches('.').to_ascii_lowercase() +} + +fn domains_overlap(left: &str, right: &str) -> bool { + let left = normalize_domain(left); + let right = normalize_domain(right); + + domain_matches_or_contains(&left, &right) || domain_matches_or_contains(&right, &left) +} + +fn domain_matches_or_contains(candidate: &str, parent: &str) -> bool { + if candidate == parent { + return true; + } + + candidate + .strip_suffix(parent) + .is_some_and(|prefix| prefix.ends_with('.')) +} + +fn cidrs_overlap(left: &IpNet, right: &IpNet) -> bool { + left.contains(right) || right.contains(left) +} + fn parse_cidrs(values: &[String]) -> Result, OutboundPolicyError> { values .iter() @@ -119,18 +203,41 @@ pub enum OutboundPolicyError { #[error("invalid slack host: {0}")] InvalidSlackHost(String), + + #[error("tenant id is required for outbound policy")] + MissingTenant, + + #[error("outbound policy not found for tenant: {0}")] + PolicyNotFound(String), + + #[error("allow and deny CIDR lists conflict on: {0}")] + ConflictingCidrs(String), + + #[error("allow and deny domain lists conflict on: {0}")] + ConflictingDomains(String), + + #[error("failed to load outbound HTTP policy for tenant {tenant_id}: {source}")] + PolicyValidationFailed { + tenant_id: String, + #[source] + source: Box, + }, + + #[error("metastore error: {0}")] + Metastore(#[from] MetastoreError), } // All alert-target outbound networking must enter here. // The policy authorizes the resolved destination, then pins that exact DNS result into reqwest pub async fn prepare_alert_target( + tenant_id: &Option, endpoint: &Url, kind: AlertTargetKind, skip_tls_check: bool, headers: Option<&HashMap>, ) -> Result { validate_scheme(endpoint, kind)?; - let policy = active_policy().await; + let policy = active_policy(tenant_id).await?; validate_tls_policy(kind, skip_tls_check, &policy)?; let host = endpoint .host_str() @@ -161,6 +268,45 @@ pub async fn prepare_alert_target( }) } +pub async fn load_policy_for_tenant( + metastore: &dyn Metastore, + tenant_id: &str, +) -> Result<(), OutboundPolicyError> { + let policy = metastore.get_outbound_policy(tenant_id).await?; + validate_policy(&policy)?; + ALERT_TARGET_POLICY + .write() + .await + .insert(tenant_id.to_string(), policy); + Ok(()) +} + +pub async fn load_all_policies(metastore: &dyn Metastore) -> Result<(), OutboundPolicyError> { + let policies = metastore.get_outbound_policies().await?; + let loaded = validate_loaded_policies(policies)?; + + *ALERT_TARGET_POLICY.write().await = loaded; + + Ok(()) +} + +fn validate_loaded_policies( + policies: HashMap, +) -> Result, OutboundPolicyError> { + let mut loaded = HashMap::new(); + for (tenant_id, policy) in policies { + if let Err(err) = validate_policy(&policy) { + error!("Failed to load outbound HTTP policy for tenant {tenant_id}: {err}"); + return Err(OutboundPolicyError::PolicyValidationFailed { + tenant_id, + source: Box::new(err), + }); + } + loaded.insert(tenant_id, policy); + } + Ok(loaded) +} + // Keep alert delivery bounded so a slow or stuck target cannot hold workers forever. fn default_client_builder() -> ClientBuilder { ClientBuilder::new() @@ -363,9 +509,9 @@ fn denied_header(name: &str, authorization_allowed: bool) -> bool { // Domain entries match the exact host and its subdomains. fn matches_domain_list(host: &str, domains: &[String]) -> bool { - let host = host.trim_end_matches('.').to_ascii_lowercase(); + let host = normalize_domain(host); domains.iter().any(|domain| { - let domain = domain.trim_end_matches('.').to_ascii_lowercase(); + let domain = normalize_domain(domain); host == domain || host.ends_with(&format!(".{domain}")) }) } @@ -373,9 +519,6 @@ fn matches_domain_list(host: &str, domains: &[String]) -> bool { #[cfg(test)] mod tests { use super::*; - use tokio::sync::{Mutex, MutexGuard}; - - static POLICY_TEST_LOCK: Lazy> = Lazy::new(|| Mutex::new(())); fn policy_with( allow_private: bool, @@ -397,14 +540,6 @@ mod tests { SocketAddr::new(ip, 80) } - async fn set_policy( - policy: AlertTargetPolicyConfig, - ) -> Result, OutboundPolicyError> { - let guard = POLICY_TEST_LOCK.lock().await; - replace_policy(policy).await?; - Ok(guard) - } - #[test] fn public_addresses_are_allowed_by_default() { let policy = AlertTargetPolicyConfig::default(); @@ -497,20 +632,6 @@ mod tests { validate_scheme(&fake_slack, AlertTargetKind::Slack).unwrap(); } - #[tokio::test] - async fn skip_tls_check_requires_operator_policy() { - let _guard = set_policy(AlertTargetPolicyConfig::default()) - .await - .unwrap(); - let endpoint = Url::parse("https://1.1.1.1/webhook").unwrap(); - - let result = prepare_alert_target(&endpoint, AlertTargetKind::Webhook, true, None).await; - - assert!(matches!( - result, - Err(OutboundPolicyError::InvalidTlsDisabled) - )); - } #[test] fn ipv6_transition_addresses_are_blocked() { assert!(builtin_denied_ip(IpAddr::V6( @@ -521,27 +642,97 @@ mod tests { ))); } - #[tokio::test] - async fn prepare_alert_target_allows_authorization_for_allowlisted_destination() { - let _guard = set_policy(AlertTargetPolicyConfig { - allowed_cidrs: vec!["1.1.1.1/32".to_string()], + #[test] + fn conflicting_allow_and_deny_cidrs_are_rejected() { + let policy = policy_with(true, &["10.0.0.0/8"], &["10.1.0.0/16"]); + let err = validate_policy(&policy).unwrap_err(); + assert!(matches!(err, OutboundPolicyError::ConflictingCidrs(_))); + } + + #[test] + fn conflicting_allow_and_deny_cidrs_are_rejected_regardless_of_order() { + let policy = policy_with(true, &["10.1.0.0/16"], &["10.0.0.0/8"]); + let err = validate_policy(&policy).unwrap_err(); + assert!(matches!(err, OutboundPolicyError::ConflictingCidrs(_))); + } + + #[test] + fn non_overlapping_cidrs_are_allowed() { + let policy = policy_with(true, &["10.0.0.0/24"], &["10.0.1.0/24"]); + validate_policy(&policy).unwrap(); + } + + #[test] + fn conflicting_allow_and_deny_domains_are_rejected() { + let policy = AlertTargetPolicyConfig { + allowed_domains: vec!["Example.com.".to_string()], + denied_domains: vec!["sub.example.com".to_string()], ..Default::default() - }) - .await - .unwrap(); - let endpoint = Url::parse("http://1.1.1.1/webhook").unwrap(); - let mut headers = HashMap::new(); - headers.insert("Authorization".to_string(), "Bearer token".to_string()); + }; - let prepared = - prepare_alert_target(&endpoint, AlertTargetKind::Webhook, false, Some(&headers)) - .await - .unwrap(); + let err = validate_policy(&policy).unwrap_err(); - assert!(prepared.authorization_allowed); - assert_eq!( - prepared.headers.get("authorization").unwrap(), - HeaderValue::from_static("Bearer token") + assert!( + matches!(err, OutboundPolicyError::ConflictingDomains(domain) if domain == "example.com") ); } + + #[test] + fn conflicting_allow_and_deny_domains_are_rejected_regardless_of_order() { + let policy = AlertTargetPolicyConfig { + allowed_domains: vec!["sub.example.com".to_string()], + denied_domains: vec!["example.com".to_string()], + ..Default::default() + }; + + let err = validate_policy(&policy).unwrap_err(); + + assert!( + matches!(err, OutboundPolicyError::ConflictingDomains(domain) if domain == "sub.example.com") + ); + } + + #[test] + fn similar_but_unrelated_domains_are_allowed() { + let policy = AlertTargetPolicyConfig { + allowed_domains: vec!["example.com".to_string()], + denied_domains: vec!["notexample.com".to_string()], + ..Default::default() + }; + + validate_policy(&policy).unwrap(); + } + + #[tokio::test] + async fn active_policy_defaults_for_missing_tenant() { + let tenant_id = Some("tenant-without-policy".to_string()); + + let policy = active_policy(&tenant_id).await.unwrap(); + + assert!(!policy.allow_private); + assert!(policy.allowed_domains.is_empty()); + assert!(policy.allowed_cidrs.is_empty()); + assert!(policy.denied_domains.is_empty()); + assert!(policy.denied_cidrs.is_empty()); + assert!(!policy.allow_invalid_tls); + } + + #[test] + fn validate_loaded_policies_rejects_invalid_entries() { + let mut policies = HashMap::new(); + policies.insert( + "invalid".to_string(), + AlertTargetPolicyConfig { + allowed_cidrs: vec!["bad-cidr".to_string()], + ..Default::default() + }, + ); + + let err = validate_loaded_policies(policies).unwrap_err(); + + assert!(matches!( + err, + OutboundPolicyError::PolicyValidationFailed { tenant_id, .. } if tenant_id == "invalid" + )); + } } diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 069a66d6f..4fb529f76 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -196,6 +196,49 @@ pub struct Target { } impl Target { + pub async fn validate_outbound_policy(&self) -> Result<(), AlertError> { + let tenant_id = &self.tenant; + match &self.target { + TargetType::Slack(target) => { + outbound_http_policy::prepare_alert_target( + tenant_id, + &target.endpoint, + AlertTargetKind::Slack, + false, + None, + ) + .await?; + } + TargetType::Other(target) => { + outbound_http_policy::prepare_alert_target( + tenant_id, + &target.endpoint, + AlertTargetKind::Webhook, + target.skip_tls_check, + Some(&target.headers), + ) + .await?; + } + TargetType::AlertManager(target) => { + let prepared = outbound_http_policy::prepare_alert_target( + tenant_id, + &target.endpoint, + AlertTargetKind::AlertManager, + target.skip_tls_check, + None, + ) + .await?; + if target.auth.is_some() && !prepared.authorization_allowed { + return Err(outbound_http_policy::OutboundPolicyError::DeniedHeader( + AUTHORIZATION.as_str().to_string(), + ) + .into()); + } + } + } + + Ok(()) + } pub fn mask(self) -> Value { match self.target { TargetType::Slack(slack_web_hook) => { @@ -262,7 +305,7 @@ impl Target { if !state.timed_out { // call once and then start sleeping // reduce repeats by 1 - call_target(self.target.clone(), context.clone()); + call_target(self.target.clone(), self.tenant.clone(), context.clone()); // set state state.timed_out = true; state.awaiting_resolve = true; @@ -282,7 +325,7 @@ impl Target { } } - call_target(self.target.clone(), context); + call_target(self.target.clone(), self.tenant.clone(), context); } // do not send out any notifs // (an eval should not have run!) @@ -345,7 +388,7 @@ impl Target { let should_call = sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { - call_target(target.clone(), alert_context.clone()) + call_target(target.clone(), tenant_id.clone(), alert_context.clone()) } }, Retry::Finite(times) => { @@ -365,7 +408,7 @@ impl Target { let should_call = sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { - call_target(target.clone(), alert_context.clone()) + call_target(target.clone(), tenant_id.clone(), alert_context.clone()) } } } @@ -385,9 +428,9 @@ impl MetastoreObject for Target { } } -fn call_target(target: TargetType, context: Context) { +fn call_target(target: TargetType, tenant_id: Option, context: Context) { trace!("Calling target with context- {context:?}"); - tokio::spawn(async move { target.call(&context).await }); + tokio::spawn(async move { target.call(&tenant_id, &context).await }); } #[derive(Debug, serde::Deserialize)] @@ -455,52 +498,13 @@ pub enum TargetType { } impl TargetType { - pub async fn call(&self, payload: &Context) { + pub async fn call(&self, tenant_id: &Option, payload: &Context) { match self { - TargetType::Slack(target) => target.call(payload).await, - TargetType::Other(target) => target.call(payload).await, - TargetType::AlertManager(target) => target.call(payload).await, + TargetType::Slack(target) => target.call(tenant_id, payload).await, + TargetType::Other(target) => target.call(tenant_id, payload).await, + TargetType::AlertManager(target) => target.call(tenant_id, payload).await, } } - pub async fn validate_outbound_policy(&self) -> Result<(), AlertError> { - match self { - TargetType::Slack(target) => { - outbound_http_policy::prepare_alert_target( - &target.endpoint, - AlertTargetKind::Slack, - false, - None, - ) - .await?; - } - TargetType::Other(target) => { - outbound_http_policy::prepare_alert_target( - &target.endpoint, - AlertTargetKind::Webhook, - target.skip_tls_check, - Some(&target.headers), - ) - .await?; - } - TargetType::AlertManager(target) => { - let prepared = outbound_http_policy::prepare_alert_target( - &target.endpoint, - AlertTargetKind::AlertManager, - target.skip_tls_check, - None, - ) - .await?; - // Alert Manager auth becomes an Authorization header at delivery time - if target.auth.is_some() && !prepared.authorization_allowed { - return Err(outbound_http_policy::OutboundPolicyError::DeniedHeader( - AUTHORIZATION.as_str().to_string(), - ) - .into()); - } - } - } - Ok(()) - } } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -510,7 +514,7 @@ pub struct SlackWebHook { #[async_trait] impl CallableTarget for SlackWebHook { - async fn call(&self, payload: &Context) { + async fn call(&self, tenant_id: &Option, payload: &Context) { let alert = match payload.alert_info.alert_state { AlertState::Triggered => { serde_json::json!({ "text": payload.message }) @@ -524,6 +528,7 @@ impl CallableTarget for SlackWebHook { }; // Revalidate immediately before delivery because stored targets and DNS can change. let prepared = match outbound_http_policy::prepare_alert_target( + tenant_id, &self.endpoint, AlertTargetKind::Slack, false, @@ -560,13 +565,14 @@ pub struct OtherWebHook { #[async_trait] impl CallableTarget for OtherWebHook { - async fn call(&self, payload: &Context) { + async fn call(&self, tenant_id: &Option, payload: &Context) { let alert = match payload.alert_info.alert_state { AlertState::Triggered => payload.message.clone(), AlertState::NotTriggered => payload.default_resolved_string(), AlertState::Disabled => payload.default_disabled_string(), }; let prepared = match outbound_http_policy::prepare_alert_target( + tenant_id, &self.endpoint, AlertTargetKind::Webhook, self.skip_tls_check, @@ -602,7 +608,7 @@ pub struct AlertManager { #[async_trait] impl CallableTarget for AlertManager { - async fn call(&self, payload: &Context) { + async fn call(&self, tenant_id: &Option, payload: &Context) { let mut alerts = serde_json::json!([{ "labels": { "alertname": payload.alert_info.alert_name, @@ -634,6 +640,7 @@ impl CallableTarget for AlertManager { }; let prepared = match outbound_http_policy::prepare_alert_target( + tenant_id, &self.endpoint, AlertTargetKind::AlertManager, self.skip_tls_check, diff --git a/src/handlers/http/alert_target_policy.rs b/src/handlers/http/alert_target_policy.rs index 92986524d..ed2af8b85 100644 --- a/src/handlers/http/alert_target_policy.rs +++ b/src/handlers/http/alert_target_policy.rs @@ -9,16 +9,23 @@ use crate::alerts::{ AlertTargetPolicyConfig, active_policy, replace_policy, validate_policy, }, }; +use crate::utils::get_tenant_id_from_request; +use actix_web::HttpRequest; -pub async fn get() -> Result { - Ok(web::Json(active_policy().await)) +pub async fn get(req: HttpRequest) -> Result { + let tenant_id = get_tenant_id_from_request(&req); + let policy = active_policy(&tenant_id).await?; + Ok(web::Json(policy)) } pub async fn put( + req: HttpRequest, Json(policy): Json, ) -> Result { // validate before replacing so a bad policy never becomes active - replace_policy(policy.clone()).await?; + let tenant_id = get_tenant_id_from_request(&req); + validate_policy(&policy)?; + replace_policy(&tenant_id, policy.clone()).await?; Ok(web::Json(policy)) } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 176d4cefe..e6da58a01 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -35,7 +35,7 @@ use tokio::sync::{RwLock, oneshot}; use tracing::{error, info, warn}; use crate::{ - alerts::{ALERTS, get_alert_manager, target::TARGETS}, + alerts::{ALERTS, get_alert_manager, outbound_http_policy, target::TARGETS}, cli::Options, correlation::CORRELATIONS, hottier::{GLOBAL_HOTTIER, HotTierManager, StreamHotTier}, @@ -234,6 +234,10 @@ pub async fn load_on_init() -> anyhow::Result<()> { error!("{err}"); } + if let Err(err) = outbound_http_policy::load_all_policies(PARSEABLE.metastore.as_ref()).await { + error!("Failed to load outbound policies: {err}"); + } + Ok(()) } diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index 7b0b4b04f..3eef3a830 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -28,17 +28,23 @@ use crate::{ AlertError, target::{TARGETS, Target}, }, - utils::get_tenant_id_from_request, + utils::get_user_and_tenant_from_request, }; +fn tenant_from_request(req: &HttpRequest) -> Result, AlertError> { + get_user_and_tenant_from_request(req) + .map(|(_, tenant)| tenant) + .map_err(|err| AlertError::CustomError(err.to_string())) +} + // POST /targets pub async fn post( req: HttpRequest, Json(mut target): Json, ) -> Result { - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = tenant_from_request(&req)?; target.tenant = tenant_id; - target.target.validate_outbound_policy().await?; + target.validate_outbound_policy().await?; // should check for duplicacy and liveness (??) // add to the map TARGETS.update(target.clone()).await?; @@ -48,7 +54,7 @@ pub async fn post( // GET /targets pub async fn list(req: HttpRequest) -> Result { - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = tenant_from_request(&req)?; // add to the map let list = TARGETS .list(&tenant_id) @@ -63,7 +69,7 @@ pub async fn list(req: HttpRequest) -> Result { // GET /targets/{target_id} pub async fn get(req: HttpRequest, target_id: Path) -> Result { let target_id = target_id.into_inner(); - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = tenant_from_request(&req)?; let target = TARGETS.get_target_by_id(&target_id, &tenant_id).await?; Ok(web::Json(target.mask())) @@ -76,7 +82,7 @@ pub async fn update( Json(mut target): Json, ) -> Result { let target_id = target_id.into_inner(); - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = tenant_from_request(&req)?; // if target_id does not exist, error let old_target = TARGETS.get_target_by_id(&target_id, &tenant_id).await?; @@ -90,7 +96,7 @@ pub async fn update( // esnure that the supplied target id is assigned to the target config target.id = target_id; target.tenant = tenant_id; - target.target.validate_outbound_policy().await?; + target.validate_outbound_policy().await?; // should check for duplicacy and liveness (??) // add to the map TARGETS.update(target.clone()).await?; @@ -101,7 +107,7 @@ pub async fn update( // DELETE /targets/{target_id} pub async fn delete(req: HttpRequest, target_id: Path) -> Result { let target_id = target_id.into_inner(); - let tenant_id = get_tenant_id_from_request(&req); + let tenant_id = tenant_from_request(&req)?; let target = TARGETS.delete(&target_id, &tenant_id).await?; Ok(web::Json(target.mask())) diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 9eb6d3190..a890cfa21 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -29,6 +29,7 @@ use ulid::Ulid; use crate::{ alerts::{ alert_structs::{AlertStateEntry, MTTRHistory}, + outbound_http_policy::AlertTargetPolicyConfig, target::Target, }, catalog::manifest::Manifest, @@ -164,7 +165,19 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { obj: &dyn MetastoreObject, tenant_id: &Option, ) -> Result<(), MetastoreError>; - + /// outbound_policy + async fn get_outbound_policies( + &self, + ) -> Result, MetastoreError>; + async fn get_outbound_policy( + &self, + tenant_id: &str, + ) -> Result; + async fn put_outbound_policy( + &self, + tenant_id: &Option, + policy: &AlertTargetPolicyConfig, + ) -> Result<(), MetastoreError>; /// dashboards async fn get_dashboards(&self) -> Result>, MetastoreError>; async fn put_dashboard( diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 475278676..e9eeeae4a 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -34,6 +34,7 @@ use ulid::Ulid; use crate::{ alerts::{ alert_structs::{AlertStateEntry, MTTRHistory}, + outbound_http_policy::AlertTargetPolicyConfig, target::Target, }, catalog::{manifest::Manifest, partition_path}, @@ -53,7 +54,8 @@ use crate::{ TARGETS_ROOT_DIRECTORY, object_storage::{ alert_json_path, alert_state_json_path, filter_path, manifest_path, mttr_json_path, - parseable_json_path, schema_path, stream_json_path, to_bytes, + outbound_http_policy_json_path, parseable_json_path, schema_path, stream_json_path, + to_bytes, }, }, users::filters::{Filter, migrate_v1_v2}, @@ -1193,6 +1195,98 @@ impl Metastore for ObjectStoreMetastore { .await?) } + async fn get_outbound_policies( + &self, + ) -> Result, MetastoreError> { + let mut all_policies = HashMap::new(); + + // Preserve the legacy/default-tenant policy at the object-store root. + let default_path = outbound_http_policy_json_path(&None); + match self.storage.get_object(&default_path, &None).await { + Ok(bytes) => match serde_json::from_slice::(&bytes) { + Ok(policy) => { + all_policies.insert(DEFAULT_TENANT.to_string(), policy); + } + Err(err) => { + return Err(MetastoreError::Error { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + message: format!( + "Failed to deserialize outbound HTTP policy for tenant {DEFAULT_TENANT}: {err}" + ), + flow: "get_outbound_policies".to_string(), + }); + } + }, + Err(err) if is_missing_optional_dir(&err) => {} + Err(err) => return Err(MetastoreError::ObjectStorageError(err)), + } + + let base_paths = self + .storage + .list_dirs_relative(&RelativePathBuf::from_iter([""]), &None) + .await? + .into_iter() + .filter(|tenant| !tenant.starts_with('.')) + .collect::>(); + + for tenant in base_paths { + let tenant_id = Some(tenant.clone()); + let path = outbound_http_policy_json_path(&tenant_id); + let policy = match self.storage.get_object(&path, &tenant_id).await { + Ok(bytes) => match serde_json::from_slice::(&bytes) { + Ok(policy) => policy, + Err(err) => { + return Err(MetastoreError::Error { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + message: format!( + "Failed to deserialize outbound HTTP policy for tenant {tenant}: {err}" + ), + flow: "get_outbound_policies".to_string(), + }); + } + }, + Err(err) if is_missing_optional_dir(&err) => continue, + Err(err) => return Err(MetastoreError::ObjectStorageError(err)), + }; + + all_policies.insert(tenant, policy); + } + + Ok(all_policies) + } + + async fn get_outbound_policy( + &self, + tenant_id: &str, + ) -> Result { + let tenant = if tenant_id == DEFAULT_TENANT { + None + } else { + Some(tenant_id.to_string()) + }; + let path = outbound_http_policy_json_path(&tenant); + let bytes = match self.storage.get_object(&path, &tenant).await { + Ok(bytes) => bytes, + Err(err) if is_missing_optional_dir(&err) => { + return Ok(AlertTargetPolicyConfig::default()); + } + Err(err) => return Err(err.into()), + }; + Ok(serde_json::from_slice::(&bytes)?) + } + + async fn put_outbound_policy( + &self, + tenant_id: &Option, + policy: &AlertTargetPolicyConfig, + ) -> Result<(), MetastoreError> { + let path = outbound_http_policy_json_path(tenant_id); + Ok(self + .storage + .put_object(&path, to_bytes(policy), tenant_id) + .await?) + } + async fn get_all_schemas( &self, stream_name: &str, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 97d909eb6..c05cef2fa 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -504,6 +504,7 @@ impl ObjectStorage for LocalFS { async fn list_streams(&self) -> Result, ObjectStorageError> { let ignore_dir = &[ "lost+found", + DEFAULT_TENANT, PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR, ALERTS_ROOT_DIRECTORY, @@ -541,6 +542,7 @@ impl ObjectStorage for LocalFS { async fn list_old_streams(&self) -> Result, ObjectStorageError> { let ignore_dir = &[ "lost+found", + DEFAULT_TENANT, PARSEABLE_ROOT_DIRECTORY, ALERTS_ROOT_DIRECTORY, SETTINGS_ROOT_DIRECTORY, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index aa3ca9730..05c919fa6 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1482,6 +1482,14 @@ pub fn target_json_path(target_id: &Ulid, tenant_id: &Option) -> Relativ ]) } +/// Constructs the path for storing outbound HTTP policy JSON files +/// Format: "/settings/outbound_http_policy.json" +#[inline(always)] +pub fn outbound_http_policy_json_path(tenant_id: &Option) -> RelativePathBuf { + let root = tenant_id.as_deref().unwrap_or(""); + RelativePathBuf::from_iter([root, SETTINGS_ROOT_DIRECTORY, "outbound_http_policy.json"]) +} + /// Constructs the path for storing alert state JSON files /// Format: ".alerts/alert_state_{alert_id}.json" #[inline(always)] diff --git a/src/validator.rs b/src/validator.rs index 0410e396a..f5bcb7bcf 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -22,9 +22,9 @@ use error::HotTierValidationError; use once_cell::sync::Lazy; use self::error::{StreamNameValidationError, UsernameValidationError}; -use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; use crate::storage::StreamType; use crate::utils::human_size::bytes_to_human_size; +use crate::{hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES, parseable::DEFAULT_TENANT}; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -61,6 +61,12 @@ pub fn stream_name( )); } + if stream_name == DEFAULT_TENANT { + return Err(StreamNameValidationError::ReservedName( + stream_name.to_owned(), + )); + } + if stream_type == StreamType::Internal { return Err(StreamNameValidationError::InternalStream( stream_name.to_owned(), @@ -176,6 +182,8 @@ pub mod error { NameSpecialChar { c: char }, #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), + #[error("The stream name {0} is reserved for internal use")] + ReservedName(String), #[error( "The stream {0} is reserved for internal use and cannot be used for user defined streams" )] @@ -222,3 +230,16 @@ pub mod error { NotFound(String), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn stream_name_rejects_default_tenant() { + let err = stream_name(DEFAULT_TENANT, StreamType::UserDefined).unwrap_err(); + assert!( + matches!(err, StreamNameValidationError::ReservedName(name) if name == DEFAULT_TENANT) + ); + } +}