Skip to content

Commit

Permalink
Wait for query completion (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshidan authored Jun 28, 2023
1 parent fb5636a commit d17286c
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 18 deletions.
1 change: 1 addition & 0 deletions bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ arrow = { version="42.0", default-features = false, features = ["ipc"] }
base64 = "0.21"
bigdecimal = { version="0.3", features=["serde"] }
num-bigint = "0.4"
backon = "0.4"

[dev-dependencies]
tokio = { version="1.20", features=["rt-multi-thread"] }
Expand Down
130 changes: 123 additions & 7 deletions bigquery/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use backon::{ExponentialBuilder, Retryable};
use core::time::Duration;
use std::borrow::Cow;
use std::collections::VecDeque;
Expand All @@ -19,12 +20,15 @@ use crate::http::bigquery_routine_client::BigqueryRoutineClient;
use crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient;
use crate::http::bigquery_table_client::BigqueryTableClient;
use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
use crate::http::error::Error;
use crate::http::job::get_query_results::GetQueryResultsRequest;
use crate::http::job::query::QueryRequest;
use crate::http::job::JobReference;
use crate::http::table::TableReference;
use crate::query;
use crate::query::QueryOption;
use crate::storage;
use crate::{http, query};

const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"];

#[derive(Debug)]
pub struct ClientConfig {
Expand Down Expand Up @@ -188,25 +192,111 @@ impl Client {
/// let col2 = row.column::<Option<String>>(1);
/// }
/// }
pub async fn query(&self, project_id: &str, request: QueryRequest) -> Result<query::Iterator, Error> {
pub async fn query(&self, project_id: &str, request: QueryRequest) -> Result<query::Iterator, query::run::Error> {
let builder = ExponentialBuilder::default().with_max_times(usize::MAX);
self.query_with_option(project_id, request, QueryOption::default().with_retry(builder))
.await
}

/// Run query job and get result.
/// ```rust
/// use google_cloud_bigquery::http::job::query::QueryRequest;
/// use google_cloud_bigquery::query::row::Row;
/// use google_cloud_bigquery::client::Client;
/// use google_cloud_bigquery::query::QueryOption;
/// use google_cloud_bigquery::query::ExponentialBuilder;
///
/// async fn run(client: &Client, project_id: &str) {
/// let request = QueryRequest {
/// query: "SELECT * FROM dataset.table".to_string(),
/// ..Default::default()
/// };
/// let retry = ExponentialBuilder::default().with_max_times(10);
/// let option = QueryOption::default().with_retry(retry);
/// let mut iter = client.query_with_option(project_id, request, option).await.unwrap();
/// while let Some(row) = iter.next::<Row>().await.unwrap() {
/// let col1 = row.column::<String>(0);
/// let col2 = row.column::<Option<String>>(1);
/// }
/// }
pub async fn query_with_option(
&self,
project_id: &str,
request: QueryRequest,
option: QueryOption,
) -> Result<query::Iterator, query::run::Error> {
let result = self.job_client.query(project_id, &request).await?;
let (total_rows, page_token, rows, force_first_fetch) = if result.job_complete {
(
result.total_rows.unwrap_or_default(),
result.page_token,
result.rows.unwrap_or_default(),
false,
)
} else {
(
self.wait_for_query(&result.job_reference, &option.retry, &request.timeout_ms)
.await?,
None,
vec![],
true,
)
};
Ok(query::Iterator {
client: self.job_client.clone(),
project_id: result.job_reference.project_id,
job_id: result.job_reference.job_id,
request: GetQueryResultsRequest {
start_index: 0,
page_token: result.page_token,
page_token,
max_results: request.max_results,
timeout_ms: request.timeout_ms,
location: result.job_reference.location,
format_options: request.format_options,
},
chunk: VecDeque::from(result.rows.unwrap_or_default()),
total_size: result.total_rows.unwrap_or_default(),
chunk: VecDeque::from(rows),
total_size: total_rows,
force_first_fetch,
})
}

async fn wait_for_query(
&self,
job: &JobReference,
builder: &ExponentialBuilder,
timeout_ms: &Option<i64>,
) -> Result<i64, query::run::Error> {
// Use get_query_results only to wait for completion, not to read results.
let request = GetQueryResultsRequest {
max_results: Some(0),
timeout_ms: *timeout_ms,
location: job.location.clone(),
..Default::default()
};
let action = || async {
tracing::debug!("waiting for job completion {:?}", job);
let result = self
.job_client
.get_query_results(&job.project_id, &job.job_id, &request)
.await
.map_err(query::run::Error::Http)?;
if result.job_complete {
Ok(result.total_rows)
} else {
Err(query::run::Error::JobIncomplete)
}
};
action
.retry(builder)
.when(|e: &query::run::Error| match e {
query::run::Error::JobIncomplete => true,
query::run::Error::Http(http::error::Error::HttpClient(_)) => true,
query::run::Error::Http(http::error::Error::Response(r)) => r.is_retryable(&JOB_RETRY_REASONS),
_ => false,
})
.await
}

/// Read table data by BigQuery Storage Read API.
/// ```rust
/// use google_cloud_bigquery::storage::row::Row;
Expand Down Expand Up @@ -512,7 +602,6 @@ mod tests {
.query(
&project_id,
QueryRequest {
max_results: Some(1),
query: "SELECT * FROM rust_test_job.table_data_1686707863".to_string(),
..Default::default()
},
Expand All @@ -522,6 +611,8 @@ mod tests {
while let Some(row) = iterator_as_struct.next::<TestData>().await.unwrap() {
data_as_struct.push(row);
}
assert_eq!(iterator_as_struct.total_size, 3);
assert_eq!(iterator_as_row.total_size, 3);
assert_eq!(data_as_struct.len(), 3);
assert_eq!(data_as_row.len(), 3);

Expand Down Expand Up @@ -603,6 +694,31 @@ mod tests {
assert_data(0, data_as_row[0].clone());
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_query_job_incomplete() {
let (client, project_id) = create_client().await;
let mut data: Vec<TestData> = vec![];
let mut iter = client
.query(
&project_id,
QueryRequest {
timeout_ms: Some(5), // pass wait_for_query
use_query_cache: Some(false),
max_results: Some(4999),
query: "SELECT * FROM rust_test_job.table_data_10000v2".to_string(),
..Default::default()
},
)
.await
.unwrap();
while let Some(row) = iter.next::<TestData>().await.unwrap() {
data.push(row);
}
assert_eq!(iter.total_size, 10000);
assert_eq!(data.len(), 10000);
}

fn assert_data(index: usize, d: TestData) {
let now = if index == 0 {
datetime!(2023-06-14 01:57:43.438086 UTC)
Expand Down
4 changes: 2 additions & 2 deletions bigquery/src/http/bigquery_job_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ mod test {
assert!(result.page_token.is_some());
assert_eq!(result.rows.unwrap().len(), 2);
assert_eq!(result.total_rows.unwrap(), 3);
assert_eq!(result.total_bytes_processed, 0);
assert_eq!(result.total_bytes_processed.unwrap(), 0);
assert!(result.job_complete);

// query all results
Expand Down Expand Up @@ -411,7 +411,7 @@ mod test {
.unwrap();
assert!(result.job_reference.job_id.is_empty());
assert!(result.total_rows.is_none());
assert_eq!(result.total_bytes_processed, 0);
assert_eq!(result.total_bytes_processed.unwrap(), 0);
assert!(result.job_complete);

table_client
Expand Down
43 changes: 43 additions & 0 deletions bigquery/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,36 @@ pub struct ErrorResponse {
/// Example values include: `400` (Bad Request), `401` (Unauthorized), and `404` (Not Found).
pub code: u16,

/// Detailed error code & message from the Google API frontend.
pub errors: Option<Vec<ErrorResponseItem>>,

/// Description of the error. Same as `errors.message`.
pub message: String,
}

const RETRYABLE_CODES: [u16; 4] = [500, 502, 503, 504];

impl ErrorResponse {
pub fn is_retryable(&self, retryable_reasons: &[&str]) -> bool {
if RETRYABLE_CODES.contains(&self.code) {
return true;
}
match &self.errors {
None => false,
Some(details) => {
for detail in details {
for reason in retryable_reasons {
if &detail.reason == reason {
return true;
}
}
}
false
}
}
}
}

impl fmt::Display for ErrorResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.message.fmt(f)
Expand All @@ -35,6 +61,23 @@ impl fmt::Display for ErrorResponse {

impl std::error::Error for ErrorResponse {}

/// ErrorItem is a detailed error code & message from the Google API frontend.
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ErrorResponseItem {
/// Message is the human-readable description of the error.
pub message: String,

/// Reason is the typed error code. For example: "some_example".
pub reason: String,
}

impl fmt::Display for ErrorResponseItem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.message.fmt(f)
}
}

#[derive(serde::Deserialize)]
pub(crate) struct ErrorWrapper {
pub(crate) error: ErrorResponse,
Expand Down
7 changes: 3 additions & 4 deletions bigquery/src/http/job/get_query_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub struct GetQueryResultsResponse {
pub rows: Option<Vec<Tuple>>,
/// The total number of bytes processed for this query.
/// If this query was a dry run, this is the number of bytes that would be processed if the query were run.
#[serde(deserialize_with = "crate::http::from_str")]
pub total_bytes_processed: i64,
#[serde(default, deserialize_with = "crate::http::from_str_option")]
pub total_bytes_processed: Option<i64>,
/// Whether the query has completed or not.
/// If rows or totalRows are present, this will always be true.
/// If this is false, totalRows will not be available.
Expand All @@ -76,7 +76,7 @@ pub struct GetQueryResultsResponse {
/// For more information about error messages, see Error messages.
pub errors: Option<Vec<ErrorProto>>,
/// Whether the query result was fetched from the query cache.
pub cache_hit: bool,
pub cache_hit: Option<bool>,
/// Output only. The number of rows affected by a DML statement.
/// Present only for DML statements INSERT, UPDATE or DELETE.
#[serde(default, deserialize_with = "crate::http::from_str_option")]
Expand All @@ -91,6 +91,5 @@ pub fn build(
data: &GetQueryResultsRequest,
) -> RequestBuilder {
let url = format!("{}/projects/{}/queries/{}", base_url, project_id, job_id);
println!("{:?}", serde_json::to_string(data).unwrap());
client.get(url).query(data)
}
6 changes: 3 additions & 3 deletions bigquery/src/http/job/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ pub struct QueryResponse {
pub rows: Option<Vec<Tuple>>,
/// The total number of bytes processed for this query.
/// If this query was a dry run, this is the number of bytes that would be processed if the query were run.
#[serde(deserialize_with = "crate::http::from_str")]
pub total_bytes_processed: i64,
#[serde(default, deserialize_with = "crate::http::from_str_option")]
pub total_bytes_processed: Option<i64>,
/// Whether the query has completed or not.
/// If rows or totalRows are present, this will always be true.
/// If this is false, totalRows will not be available.
Expand All @@ -141,7 +141,7 @@ pub struct QueryResponse {
/// For more information about error messages, see Error messages.
pub errors: Option<Vec<ErrorProto>>,
/// Whether the query result was fetched from the query cache.
pub cache_hit: bool,
pub cache_hit: Option<bool>,
/// Output only. The number of rows affected by a DML statement.
/// Present only for DML statements INSERT, UPDATE or DELETE.
#[serde(default, deserialize_with = "crate::http::from_str_option")]
Expand Down
29 changes: 28 additions & 1 deletion bigquery/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub use backon::*;
use std::collections::VecDeque;

use crate::http::bigquery_job_client::BigqueryJobClient;
Expand All @@ -6,6 +7,19 @@ use crate::http::job::get_query_results::GetQueryResultsRequest;
use crate::http::tabledata::list::Tuple;
use crate::query::value::StructDecodable;

#[derive(Debug, Clone, Default)]
pub struct QueryOption {
/// Exponential back off retry setting
pub(crate) retry: ExponentialBuilder,
}

impl QueryOption {
pub fn with_retry(mut self, builder: ExponentialBuilder) -> Self {
self.retry = builder;
self
}
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
Expand All @@ -20,6 +34,7 @@ pub struct Iterator {
pub(crate) job_id: String,
pub(crate) request: GetQueryResultsRequest,
pub(crate) chunk: VecDeque<Tuple>,
pub(crate) force_first_fetch: bool,
pub total_size: i64,
}

Expand All @@ -29,7 +44,9 @@ impl Iterator {
if let Some(v) = self.chunk.pop_front() {
return Ok(T::decode(v).map(Some)?);
}
if self.request.page_token.is_none() {
if self.force_first_fetch {
self.force_first_fetch = false
} else if self.request.page_token.is_none() {
return Ok(None);
}
let response = self
Expand Down Expand Up @@ -259,3 +276,13 @@ pub mod value {
}
}
}

pub mod run {
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
Http(#[from] crate::http::error::Error),
#[error("Retry exceeded with job incomplete")]
JobIncomplete,
}
}
3 changes: 2 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ skip = [
{ name = "syn", version = "=1.0.109" },
{ name = "regex-syntax", version = "=0.6.29" },
{ name = "webpki-roots", version = "=0.22.6" },
{ name = "hashbrown", version = "=0.14.0" }
{ name = "hashbrown", version = "=0.14.0" },
{ name = "miniz_oxide", version = "=0.6.2" }
]
# Similarly to `skip` allows you to skip certain crates during duplicate
# detection. Unlike skip, it also includes the entire tree of transitive
Expand Down

0 comments on commit d17286c

Please sign in to comment.