feat(hive-console-sdk): async SupergraphFetcher, retry_count in SupergraphFetcher & user_agent in PersistedDocumentsManager (#7246)

This commit is contained in:
Arda TANRIKULU 2025-11-14 00:50:47 +03:00 committed by GitHub
parent c26e2e750d
commit cc6cd28eb5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 361 additions and 133 deletions

View file

@ -0,0 +1,30 @@
---
'hive-console-sdk-rs': minor
---
Breaking;
- `SupergraphFetcher` now has two different modes: async and sync. You can choose between `SupergraphFetcherAsyncClient` and `SupergraphFetcherSyncClient` based on your needs. See the examples at the bottom.
- `SupergraphFetcher` now has a new `retry_count` parameter to specify how many times to retry fetching the supergraph in case of failures.
- `PersistedDocumentsManager` new needs `user_agent` parameter to be sent to Hive Console when fetching persisted queries.
- `UsageAgent::new` is now `UsageAgent::try_new` and it returns a `Result` with `Arc`, so you can freely clone it across threads. This change was made to handle potential errors during the creation of the HTTP client. Make sure to handle the `Result` when creating a `UsageAgent`.
```rust
// Sync Mode
let fetcher = SupergraphFetcher::try_new_sync(/* params */)
.map_err(|e| anyhow!("Failed to create SupergraphFetcher: {}", e))?;
// Use the fetcher to fetch the supergraph (Sync)
let supergraph = fetcher
.fetch_supergraph()
.map_err(|e| anyhow!("Failed to fetch supergraph: {}", e))?;
// Async Mode
let fetcher = SupergraphFetcher::try_new_async(/* params */)
.map_err(|e| anyhow!("Failed to create SupergraphFetcher: {}", e))?;
// Use the fetcher to fetch the supergraph (Async)
let supergraph = fetcher
.fetch_supergraph()
.await
.map_err(|e| anyhow!("Failed to fetch supergraph: {}", e))?;
```

View file

@ -2604,7 +2604,7 @@ dependencies = [
[[package]]
name = "hive-apollo-router-plugin"
version = "2.3.1"
version = "2.3.2"
dependencies = [
"anyhow",
"apollo-router",
@ -2638,7 +2638,7 @@ dependencies = [
[[package]]
name = "hive-console-sdk"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"anyhow",
"async-trait",

View file

@ -24,6 +24,8 @@ use std::time::Duration;
use tower::{BoxError, ServiceBuilder, ServiceExt};
use tracing::{debug, info, warn};
use crate::consts::PLUGIN_VERSION;
pub static PERSISTED_DOCUMENT_HASH_KEY: &str = "hive::persisted_document_hash";
#[derive(Clone, Debug, Deserialize, JsonSchema, Default)]
@ -107,6 +109,7 @@ impl PersistedDocumentsPlugin {
Duration::from_secs(config.request_timeout.unwrap_or(15)),
config.retry_count.unwrap_or(3),
config.cache_size.unwrap_or(1000),
format!("hive-apollo-router/{}", PLUGIN_VERSION),
))),
allow_arbitrary_documents,
})

View file

@ -2,6 +2,7 @@ use crate::consts::PLUGIN_VERSION;
use crate::registry_logger::Logger;
use anyhow::{anyhow, Result};
use hive_console_sdk::supergraph_fetcher::SupergraphFetcher;
use hive_console_sdk::supergraph_fetcher::SupergraphFetcherSyncState;
use sha2::Digest;
use sha2::Sha256;
use std::env;
@ -12,7 +13,7 @@ use std::time::Duration;
#[derive(Debug)]
pub struct HiveRegistry {
file_name: String,
fetcher: SupergraphFetcher,
fetcher: SupergraphFetcher<SupergraphFetcherSyncState>,
pub logger: Logger,
}
@ -122,16 +123,19 @@ impl HiveRegistry {
env::set_var("APOLLO_ROUTER_SUPERGRAPH_PATH", file_name.clone());
env::set_var("APOLLO_ROUTER_HOT_RELOAD", "true");
let mut registry = HiveRegistry {
fetcher: SupergraphFetcher::try_new(
endpoint,
key,
format!("hive-apollo-router/{}", PLUGIN_VERSION),
Duration::from_secs(5),
Duration::from_secs(60),
accept_invalid_certs,
)
.map_err(|e| anyhow!("Failed to create SupergraphFetcher: {}", e))?,
let fetcher = SupergraphFetcher::try_new_sync(
endpoint,
&key,
format!("hive-apollo-router/{}", PLUGIN_VERSION),
Duration::from_secs(5),
Duration::from_secs(60),
accept_invalid_certs,
3,
)
.map_err(|e| anyhow!("Failed to create SupergraphFetcher: {}", e))?;
let registry = HiveRegistry {
fetcher,
file_name,
logger,
};
@ -156,9 +160,12 @@ impl HiveRegistry {
Ok(())
}
fn initial_supergraph(&mut self) -> Result<(), String> {
fn initial_supergraph(&self) -> Result<(), String> {
let mut file = std::fs::File::create(self.file_name.clone()).map_err(|e| e.to_string())?;
let resp = self.fetcher.fetch_supergraph()?;
let resp = self
.fetcher
.fetch_supergraph()
.map_err(|err| err.to_string())?;
match resp {
Some(supergraph) => {
@ -173,7 +180,7 @@ impl HiveRegistry {
Ok(())
}
fn poll(&mut self) {
fn poll(&self) {
match self.fetcher.fetch_supergraph() {
Ok(new_supergraph) => {
if let Some(new_supergraph) = new_supergraph {

View file

@ -8,6 +8,7 @@ use core::ops::Drop;
use futures::StreamExt;
use graphql_parser::parse_schema;
use graphql_parser::schema::Document;
use hive_console_sdk::agent::UsageAgentExt;
use hive_console_sdk::agent::{ExecutionReport, UsageAgent};
use http::HeaderValue;
use rand::Rng;
@ -245,8 +246,8 @@ impl Plugin for UsagePlugin {
let agent = if enabled {
let flush_interval = Duration::from_secs(flush_interval);
let agent = Arc::new(UsageAgent::new(
token.expect("token is set"),
let agent = UsageAgent::try_new(
&token.expect("token is set"),
endpoint,
target_id,
buffer_size,
@ -255,7 +256,8 @@ impl Plugin for UsagePlugin {
accept_invalid_certs,
flush_interval,
format!("hive-apollo-router/{}", PLUGIN_VERSION),
));
)
.map_err(Box::new)?;
start_flush_interval(agent.clone());
Some(agent)
} else {

View file

@ -1,5 +1,6 @@
use super::graphql::OperationProcessor;
use graphql_parser::schema::Document;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde::Serialize;
@ -103,17 +104,13 @@ impl Buffer {
Ok(reports)
}
}
#[derive(Clone)]
pub struct UsageAgent {
token: String,
buffer_size: usize,
endpoint: String,
buffer: Arc<Buffer>,
processor: Arc<OperationProcessor>,
buffer: Buffer,
processor: OperationProcessor,
client: ClientWithMiddleware,
flush_interval: Duration,
user_agent: String,
}
fn non_empty_string(value: Option<String>) -> Option<String> {
@ -130,14 +127,18 @@ pub enum AgentError {
Forbidden,
#[error("unable to send report: rate limited")]
RateLimited,
#[error("invalid token provided: {0}")]
InvalidToken(String),
#[error("unable to instantiate the http client for reports sending: {0}")]
HTTPClientCreationError(reqwest::Error),
#[error("unable to send report: {0}")]
Unknown(String),
}
impl UsageAgent {
#[allow(clippy::too_many_arguments)]
pub fn new(
token: String,
pub fn try_new(
token: &str,
endpoint: String,
target_id: Option<String>,
buffer_size: usize,
@ -146,22 +147,35 @@ impl UsageAgent {
accept_invalid_certs: bool,
flush_interval: Duration,
user_agent: String,
) -> Self {
let processor = Arc::new(OperationProcessor::new());
) -> Result<Arc<Self>, AgentError> {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let mut default_headers = HeaderMap::new();
default_headers.insert("X-Usage-API-Version", HeaderValue::from_static("2"));
let mut authorization_header = HeaderValue::from_str(&format!("Bearer {}", token))
.map_err(|_| AgentError::InvalidToken(token.to_string()))?;
authorization_header.set_sensitive(true);
default_headers.insert(reqwest::header::AUTHORIZATION, authorization_header);
default_headers.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
let reqwest_agent = reqwest::Client::builder()
.danger_accept_invalid_certs(accept_invalid_certs)
.connect_timeout(connect_timeout)
.timeout(request_timeout)
.user_agent(user_agent)
.build()
.map_err(|err| err.to_string())
.expect("Couldn't instantiate the http client for reports sending!");
.map_err(AgentError::HTTPClientCreationError)?;
let client = ClientBuilder::new(reqwest_agent)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
let buffer = Arc::new(Buffer::new());
let mut endpoint = endpoint;
@ -171,16 +185,14 @@ impl UsageAgent {
}
}
UsageAgent {
buffer,
processor,
endpoint,
token,
Ok(Arc::new(Self {
buffer_size,
endpoint,
buffer: Buffer::new(),
processor: OperationProcessor::new(),
client,
flush_interval,
user_agent,
}
}))
}
fn produce_report(&self, reports: Vec<ExecutionReport>) -> Result<Report, AgentError> {
@ -256,14 +268,6 @@ impl UsageAgent {
Ok(report)
}
pub fn add_report(&self, execution_report: ExecutionReport) -> Result<(), AgentError> {
let size = self.buffer.push(execution_report)?;
self.flush_if_full(size)?;
Ok(())
}
pub async fn send_report(&self, report: Report) -> Result<(), AgentError> {
let report_body =
serde_json::to_vec(&report).map_err(|e| AgentError::Unknown(e.to_string()))?;
@ -271,13 +275,6 @@ impl UsageAgent {
let resp = self
.client
.post(&self.endpoint)
.header("X-Usage-API-Version", "2")
.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {}", self.token),
)
.header(reqwest::header::USER_AGENT, self.user_agent.to_string())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(reqwest::header::CONTENT_LENGTH, report_body.len())
.body(report_body)
.send()
@ -297,17 +294,6 @@ impl UsageAgent {
}
}
pub fn flush_if_full(&self, size: usize) -> Result<(), AgentError> {
if size >= self.buffer_size {
let cloned_self = self.clone();
tokio::task::spawn(async move {
cloned_self.flush().await;
});
}
Ok(())
}
pub async fn flush(&self) {
let execution_reports = match self.buffer.drain() {
Ok(res) => res,
@ -345,3 +331,29 @@ impl UsageAgent {
}
}
}
pub trait UsageAgentExt {
fn add_report(&self, execution_report: ExecutionReport) -> Result<(), AgentError>;
fn flush_if_full(&self, size: usize) -> Result<(), AgentError>;
}
impl UsageAgentExt for Arc<UsageAgent> {
fn flush_if_full(&self, size: usize) -> Result<(), AgentError> {
if size >= self.buffer_size {
let cloned_self = self.clone();
tokio::task::spawn(async move {
cloned_self.flush().await;
});
}
Ok(())
}
fn add_report(&self, execution_report: ExecutionReport) -> Result<(), AgentError> {
let size = self.buffer.push(execution_report)?;
self.flush_if_full(size)?;
Ok(())
}
}

View file

@ -1,6 +1,8 @@
use std::time::Duration;
use moka::future::Cache;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest_middleware::ClientBuilder;
use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
@ -11,7 +13,6 @@ pub struct PersistedDocumentsManager {
agent: ClientWithMiddleware,
cache: Cache<String, String>,
endpoint: String,
key: String,
}
#[derive(Debug, thiserror::Error)]
@ -64,13 +65,18 @@ impl PersistedDocumentsManager {
request_timeout: Duration,
retry_count: u32,
cache_size: u64,
user_agent: String,
) -> Self {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(retry_count);
let mut default_headers = HeaderMap::new();
default_headers.insert("X-Hive-CDN-Key", HeaderValue::from_str(&key).unwrap());
let reqwest_agent = reqwest::Client::builder()
.danger_accept_invalid_certs(accept_invalid_certs)
.connect_timeout(connect_timeout)
.timeout(request_timeout)
.user_agent(user_agent)
.default_headers(default_headers)
.build()
.expect("Failed to create reqwest client");
let agent = ClientBuilder::new(reqwest_agent)
@ -83,7 +89,6 @@ impl PersistedDocumentsManager {
agent,
cache,
endpoint,
key,
}
}
@ -111,12 +116,7 @@ impl PersistedDocumentsManager {
"Fetching document {} from CDN: {}",
document_id, cdn_artifact_url
);
let cdn_response = self
.agent
.get(cdn_artifact_url)
.header("X-Hive-CDN-Key", self.key.to_string())
.send()
.await;
let cdn_response = self.agent.get(cdn_artifact_url).send().await;
match cdn_response {
Ok(response) => {

View file

@ -1,86 +1,260 @@
use std::fmt::Display;
use std::sync::RwLock;
use std::time::Duration;
use std::time::SystemTime;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::header::InvalidHeaderValue;
use reqwest::header::IF_NONE_MATCH;
use reqwest_middleware::ClientBuilder;
use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryDecision;
use reqwest_retry::RetryPolicy;
use reqwest_retry::RetryTransientMiddleware;
#[derive(Debug)]
pub struct SupergraphFetcher {
client: reqwest::blocking::Client,
pub struct SupergraphFetcher<AsyncOrSync> {
client: SupergraphFetcherAsyncOrSyncClient,
endpoint: String,
key: String,
user_agent: String,
etag: Option<String>,
etag: RwLock<Option<HeaderValue>>,
state: std::marker::PhantomData<AsyncOrSync>,
}
impl SupergraphFetcher {
pub fn try_new(
#[derive(Debug)]
pub struct SupergraphFetcherAsyncState;
#[derive(Debug)]
pub struct SupergraphFetcherSyncState;
#[derive(Debug)]
enum SupergraphFetcherAsyncOrSyncClient {
Async {
reqwest_client: ClientWithMiddleware,
},
Sync {
reqwest_client: reqwest::blocking::Client,
retry_policy: ExponentialBackoff,
},
}
pub enum SupergraphFetcherError {
FetcherCreationError(reqwest::Error),
NetworkError(reqwest_middleware::Error),
NetworkResponseError(reqwest::Error),
Lock(String),
InvalidKey(InvalidHeaderValue),
}
impl Display for SupergraphFetcherError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SupergraphFetcherError::FetcherCreationError(e) => {
write!(f, "Creating fetcher failed: {}", e)
}
SupergraphFetcherError::NetworkError(e) => write!(f, "Network error: {}", e),
SupergraphFetcherError::NetworkResponseError(e) => {
write!(f, "Network response error: {}", e)
}
SupergraphFetcherError::Lock(e) => write!(f, "Lock error: {}", e),
SupergraphFetcherError::InvalidKey(e) => write!(f, "Invalid CDN key: {}", e),
}
}
}
fn prepare_client_config(
mut endpoint: String,
key: &str,
retry_count: u32,
) -> Result<(String, HeaderMap, ExponentialBackoff), SupergraphFetcherError> {
if !endpoint.ends_with("/supergraph") {
if endpoint.ends_with("/") {
endpoint.push_str("supergraph");
} else {
endpoint.push_str("/supergraph");
}
}
let mut headers = HeaderMap::new();
let mut cdn_key_header =
HeaderValue::from_str(key).map_err(SupergraphFetcherError::InvalidKey)?;
cdn_key_header.set_sensitive(true);
headers.insert("X-Hive-CDN-Key", cdn_key_header);
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(retry_count);
Ok((endpoint, headers, retry_policy))
}
impl SupergraphFetcher<SupergraphFetcherSyncState> {
#[allow(clippy::too_many_arguments)]
pub fn try_new_sync(
endpoint: String,
key: String,
key: &str,
user_agent: String,
connect_timeout: Duration,
request_timeout: Duration,
accept_invalid_certs: bool,
) -> Result<Self, String> {
let mut endpoint = endpoint;
if !endpoint.ends_with("/supergraph") {
if endpoint.ends_with("/") {
endpoint.push_str("supergraph")
} else {
endpoint.push_str("/supergraph")
}
}
let client = reqwest::blocking::Client::builder()
.danger_accept_invalid_certs(accept_invalid_certs)
.connect_timeout(connect_timeout)
.timeout(request_timeout)
.build()
.map_err(|e| e.to_string())?;
retry_count: u32,
) -> Result<Self, SupergraphFetcherError> {
let (endpoint, headers, retry_policy) = prepare_client_config(endpoint, key, retry_count)?;
Ok(Self {
client,
client: SupergraphFetcherAsyncOrSyncClient::Sync {
reqwest_client: reqwest::blocking::Client::builder()
.danger_accept_invalid_certs(accept_invalid_certs)
.connect_timeout(connect_timeout)
.timeout(request_timeout)
.user_agent(user_agent)
.default_headers(headers)
.build()
.map_err(SupergraphFetcherError::FetcherCreationError)?,
retry_policy,
},
endpoint,
key,
user_agent,
etag: None,
etag: RwLock::new(None),
state: std::marker::PhantomData,
})
}
pub fn fetch_supergraph(&mut self) -> Result<Option<String>, String> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_str(&self.user_agent).unwrap(),
);
headers.insert("X-Hive-CDN-Key", self.key.parse().unwrap());
if let Some(checksum) = &self.etag {
headers.insert("If-None-Match", checksum.parse().unwrap());
}
let resp = self
.client
.get(self.endpoint.as_str())
.headers(headers)
.send()
.map_err(|e| e.to_string())?;
match resp.headers().get("etag") {
Some(checksum) => {
let etag = checksum.to_str().map_err(|e| e.to_string())?;
self.update_latest_etag(Some(etag.to_string()));
pub fn fetch_supergraph(&self) -> Result<Option<String>, SupergraphFetcherError> {
let request_start_time = SystemTime::now();
// Implementing retry logic for sync client
let mut n_past_retries = 0;
let (reqwest_client, retry_policy) = match &self.client {
SupergraphFetcherAsyncOrSyncClient::Sync {
reqwest_client,
retry_policy,
} => (reqwest_client, retry_policy),
_ => unreachable!(),
};
let resp = loop {
let mut req = reqwest_client.get(&self.endpoint);
let etag = self.get_latest_etag()?;
if let Some(etag) = etag {
req = req.header(IF_NONE_MATCH, etag);
}
None => {
self.update_latest_etag(None);
let response = req.send();
match response {
Ok(resp) => break resp,
Err(e) => match retry_policy.should_retry(request_start_time, n_past_retries) {
RetryDecision::DoNotRetry => {
return Err(SupergraphFetcherError::NetworkError(
reqwest_middleware::Error::Reqwest(e),
));
}
RetryDecision::Retry { execute_after } => {
n_past_retries += 1;
if let Ok(duration) = execute_after.elapsed() {
std::thread::sleep(duration);
}
}
},
}
}
};
if resp.status().as_u16() == 304 {
return Ok(None);
}
Ok(Some(resp.text().map_err(|e| e.to_string())?))
}
let etag = resp.headers().get("etag");
self.update_latest_etag(etag)?;
fn update_latest_etag(&mut self, etag: Option<String>) {
self.etag = etag;
let text = resp
.text()
.map_err(SupergraphFetcherError::NetworkResponseError)?;
Ok(Some(text))
}
}
impl SupergraphFetcher<SupergraphFetcherAsyncState> {
#[allow(clippy::too_many_arguments)]
pub fn try_new_async(
endpoint: String,
key: &str,
user_agent: String,
connect_timeout: Duration,
request_timeout: Duration,
accept_invalid_certs: bool,
retry_count: u32,
) -> Result<Self, SupergraphFetcherError> {
let (endpoint, headers, retry_policy) = prepare_client_config(endpoint, key, retry_count)?;
let reqwest_agent = reqwest::Client::builder()
.danger_accept_invalid_certs(accept_invalid_certs)
.connect_timeout(connect_timeout)
.timeout(request_timeout)
.default_headers(headers)
.user_agent(user_agent)
.build()
.map_err(SupergraphFetcherError::FetcherCreationError)?;
let reqwest_client = ClientBuilder::new(reqwest_agent)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Ok(Self {
client: SupergraphFetcherAsyncOrSyncClient::Async { reqwest_client },
endpoint,
etag: RwLock::new(None),
state: std::marker::PhantomData,
})
}
pub async fn fetch_supergraph(&self) -> Result<Option<String>, SupergraphFetcherError> {
let reqwest_client = match &self.client {
SupergraphFetcherAsyncOrSyncClient::Async { reqwest_client } => reqwest_client,
_ => unreachable!(),
};
let mut req = reqwest_client.get(&self.endpoint);
let etag = self.get_latest_etag()?;
if let Some(etag) = etag {
req = req.header(IF_NONE_MATCH, etag);
}
let resp = req
.send()
.await
.map_err(SupergraphFetcherError::NetworkError)?;
if resp.status().as_u16() == 304 {
return Ok(None);
}
let etag = resp.headers().get("etag");
self.update_latest_etag(etag)?;
let text = resp
.text()
.await
.map_err(SupergraphFetcherError::NetworkResponseError)?;
Ok(Some(text))
}
}
impl<AsyncOrSync> SupergraphFetcher<AsyncOrSync> {
fn get_latest_etag(&self) -> Result<Option<HeaderValue>, SupergraphFetcherError> {
let guard: std::sync::RwLockReadGuard<'_, Option<HeaderValue>> =
self.etag.try_read().map_err(|e| {
SupergraphFetcherError::Lock(format!("Failed to read the etag record: {:?}", e))
})?;
Ok(guard.clone())
}
fn update_latest_etag(&self, etag: Option<&HeaderValue>) -> Result<(), SupergraphFetcherError> {
let mut guard: std::sync::RwLockWriteGuard<'_, Option<HeaderValue>> =
self.etag.try_write().map_err(|e| {
SupergraphFetcherError::Lock(format!("Failed to update the etag record: {:?}", e))
})?;
if let Some(etag_value) = etag {
*guard = Some(etag_value.clone());
} else {
*guard = None;
}
Ok(())
}
}