Skip to content

Commit

Permalink
WIP: Multiquery, Timestamp Support, Test Refactor (#6)
Browse files Browse the repository at this point in the history
* Multiquery, Timestamp Support, Much improved Integration Tests
  • Loading branch information
Empty2k12 authored Jul 14, 2019
1 parent 129e2ed commit d5e78ee
Show file tree
Hide file tree
Showing 13 changed files with 824 additions and 438 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.0.3] - 2019-07-14

### Added

- Possibility to run multiple queries in one. See the Integration Tests in `tests/integration_tests.rs` for examples.
- Ability to specify Timestamp for write queries

### Changed

- You now have to borrow a query when passing it to the `query` method

## [0.0.2] - 2019-07-23

### Changed

- URLEncode Query before sending it to InfluxDB, which caused some empty returns (#5)
- Improved Test Coverage: There's now even more tests verifying correctness of the crate (#5)
- It's no longer necessary to supply a wildcard generic when working with serde*integration: `client.json_query::<Weather>(query)` instead of `client.json_query::<Weather, *>(query)`

[unreleased]: https://github.com/Empty2k12/influxdb-rust/compare/v0.0.3...HEAD
[0.0.3]: https://github.com/Empty2k12/influxdb-rust/compare/v0.0.2...v0.0.3
[0.0.2]: https://github.com/Empty2k12/influxdb-rust/releases/tag/v0.0.2
514 changes: 284 additions & 230 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "influxdb"
version = "0.0.2"
version = "0.0.3"
authors = ["Gero Gerke <[email protected]>"]
edition = "2018"
description = "InfluxDB Driver for Rust"
Expand Down
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM xd009642/tarpaulin

RUN wget https://dl.influxdata.com/influxdb/releases/influxdb_1.7.6_amd64.deb
RUN dpkg -i influxdb_1.7.6_amd64.deb
RUN INFLUXDB_HTTP_BIND_ADDRESS=9999 influxd > $HOME/influx.log 2>&1 &

WORKDIR /volume

CMD cargo build && cargo tarpaulin
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,27 @@ Pull requests are always welcome.

- Reading and Writing to InfluxDB
- Optional Serde Support for Deserialization
- Running multiple queries in one request (e.g. `SELECT * FROM weather_berlin; SELECT * FROM weather_london`)

## Planned Features

- Running multiple queries in one request (e.g. `SELECT * FROM weather_berlin; SELECT * FROM weather_london`)
- Read Query Builder instead of supplying raw queries
- Authentication against InfluxDB
- Methods for setting time and time precision in a query
- `#[derive(InfluxDbWritable)]`

## Quickstart

Add the following to your `Cargo.toml`

```toml
influxdb = "0.0.2"
influxdb = "0.0.3"
```

For an example with using Serde deserialization, please refer to [serde_integration](crate::integrations::serde_integration)

```rust
use influxdb::query::InfluxDbQuery;
use influxdb::query::{InfluxDbQuery, Timestamp};
use influxdb::client::InfluxDbClient;
use serde::Deserialize;
use tokio::runtime::current_thread::Runtime;

// Create a InfluxDbClient with URL `http://localhost:8086`
Expand All @@ -68,7 +67,7 @@ let client = InfluxDbClient::new("http://localhost:8086", "test");
// Let's write something to InfluxDB. First we're creating a
// InfluxDbWriteQuery to write some data.
// This creates a query which writes a new measurement into a series called `weather`
let write_query = InfluxDbQuery::write_query("weather")
let write_query = InfluxDbQuery::write_query(Timestamp::NOW, "weather")
.add_field("temperature", 82);

// Since this library is async by default, we're going to need a Runtime,
Expand All @@ -78,14 +77,14 @@ let mut rt = Runtime::new().expect("Unable to create a runtime");

// To actually submit the data to InfluxDB, the `block_on` method can be used to
// halt execution of our program until it has been completed.
let write_result = rt.block_on(client.query(write_query));
let write_result = rt.block_on(client.query(&write_query));
assert!(write_result.is_ok(), "Write result was not okay");

// Reading data is as simple as writing. First we need to create a query
let read_query = InfluxDbQuery::raw_read_query("SELECT _ FROM weather");

// Again, we're blocking until the request is done
let read_result = rt.block_on(client.query(read_query));
let read_result = rt.block_on(client.query(&read_query));

assert!(read_result.is_ok(), "Read result was not ok");

Expand All @@ -101,4 +100,4 @@ in the repository.

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

@ 2019 Gero Gerke, All rights reserved.
@ 2019 Gero Gerke, All rights reserved.
60 changes: 33 additions & 27 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use reqwest::r#async::{Client, Decoder};
use std::mem;

use crate::error::InfluxDbError;
use crate::query::{InfluxDbQuery, QueryType};
use crate::query::read_query::InfluxDbReadQuery;
use crate::query::write_query::InfluxDbWriteQuery;
use crate::query::InfluxDbQuery;

use url::form_urlencoded;

use std::any::Any;

/// Internal Representation of a Client
pub struct InfluxDbClient {
url: String,
Expand Down Expand Up @@ -108,21 +112,20 @@ impl InfluxDbClient {
///
/// ```rust
/// use influxdb::client::InfluxDbClient;
/// use influxdb::query::InfluxDbQuery;
/// use influxdb::query::{InfluxDbQuery, Timestamp};
///
/// let client = InfluxDbClient::new("http://localhost:8086", "test");
/// let _future = client.query(
/// InfluxDbQuery::write_query("weather")
/// &InfluxDbQuery::write_query(Timestamp::NOW, "weather")
/// .add_field("temperature", 82)
/// );
/// ```
pub fn query<Q>(&self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
pub fn query<Q>(&self, q: &Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
where
Q: InfluxDbQuery,
Q: Any + InfluxDbQuery,
{
use futures::future;

let q_type = q.get_type();
let query = match q.build() {
Err(err) => {
let error = InfluxDbError::InvalidQueryError {
Expand All @@ -133,35 +136,38 @@ impl InfluxDbClient {
Ok(query) => query,
};

let client = match q_type {
QueryType::ReadQuery => {
let read_query = query.get();
let encoded: String = form_urlencoded::Serializer::new(String::new())
.append_pair("db", self.database_name())
.append_pair("q", &read_query)
.finish();
let http_query_string = format!(
"{url}/query?{encoded}",
url = self.database_url(),
encoded = encoded
);

if read_query.contains("SELECT") || read_query.contains("SHOW") {
Client::new().get(http_query_string.as_str())
} else {
Client::new().post(http_query_string.as_str())
}
let any_value = q as &dyn Any;

let client = if let Some(_) = any_value.downcast_ref::<InfluxDbReadQuery>() {
let read_query = query.get();
let encoded: String = form_urlencoded::Serializer::new(String::new())
.append_pair("db", self.database_name())
.append_pair("q", &read_query)
.finish();
let http_query_string = format!(
"{url}/query?{encoded}",
url = self.database_url(),
encoded = encoded
);
if read_query.contains("SELECT") || read_query.contains("SHOW") {
Client::new().get(http_query_string.as_str())
} else {
Client::new().post(http_query_string.as_str())
}
QueryType::WriteQuery => Client::new()
} else if let Some(write_query) = any_value.downcast_ref::<InfluxDbWriteQuery>() {
Client::new()
.post(
format!(
"{url}/write?db={db}",
"{url}/write?db={db}{precision_str}",
url = self.database_url(),
db = self.database_name(),
precision_str = write_query.get_precision_modifier()
)
.as_str(),
)
.body(query.get()),
.body(query.get())
} else {
unreachable!()
};

Box::new(
Expand Down
68 changes: 46 additions & 22 deletions src/integrations/serde_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
//! `name`, InfluxDB provides alongside query results.
//!
//! ```rust,no_run
//! use influxdb::query::InfluxDbQuery;
//! use futures::prelude::*;
//! use influxdb::client::InfluxDbClient;
//! use influxdb::query::InfluxDbQuery;
//! use serde::Deserialize;
//!
//! #[derive(Deserialize)]
//! struct WeatherWithoutCityName {
//! temperature: i32
//! temperature: i32,
//! }
//!
//! #[derive(Deserialize)]
Expand All @@ -24,16 +25,26 @@
//!
//! let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
//! let client = InfluxDbClient::new("http://localhost:8086", "test");
//! let query = InfluxDbQuery::raw_read_query("SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC");
//! let _result = rt.block_on(client.json_query::<WeatherWithoutCityName>(query))
//! let query = InfluxDbQuery::raw_read_query(
//! "SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC",
//! );
//! let _result = rt
//! .block_on(client.json_query(query))
//! .map(|mut db_result| db_result.deserialize_next::<WeatherWithoutCityName>())
//! .map(|it| {
//! it.map(|series_vec| {
//! series_vec
//! .series
//! .into_iter()
//! .map(|mut city_series| {
//! let city_name = city_series.name.split("_").collect::<Vec<&str>>().remove(2);
//! Weather { weather: city_series.values.remove(0), city_name: city_name.to_string() }
//! }).collect::<Vec<Weather>>()
//! let city_name =
//! city_series.name.split("_").collect::<Vec<&str>>().remove(2);
//! Weather {
//! weather: city_series.values.remove(0),
//! city_name: city_name.to_string(),
//! }
//! })
//! .collect::<Vec<Weather>>()
//! })
//! });
//! ```
Expand All @@ -56,6 +67,8 @@ use crate::query::InfluxDbQuery;

use url::form_urlencoded;

use futures::future::Either;

#[derive(Deserialize)]
#[doc(hidden)]
struct _DatabaseError {
Expand All @@ -64,14 +77,30 @@ struct _DatabaseError {

#[derive(Deserialize, Debug)]
#[doc(hidden)]
pub struct DatabaseQueryResult<T> {
pub results: Vec<InfluxDbReturn<T>>,
pub struct DatabaseQueryResult {
pub results: Vec<serde_json::Value>,
}

impl DatabaseQueryResult {
pub fn deserialize_next<T: 'static>(
&mut self,
) -> impl Future<Item = InfluxDbReturn<T>, Error = InfluxDbError>
where
T: DeserializeOwned,
{
match serde_json::from_value::<InfluxDbReturn<T>>(self.results.remove(0)) {
Ok(item) => futures::future::result(Ok(item)),
Err(err) => futures::future::err(InfluxDbError::DeserializationError {
error: format!("could not deserialize: {}", err),
}),
}
}
}

#[derive(Deserialize, Debug)]
#[doc(hidden)]
pub struct InfluxDbReturn<T> {
pub series: Option<Vec<InfluxDbSeries<T>>>,
pub series: Vec<InfluxDbSeries<T>>,
}

#[derive(Deserialize, Debug)]
Expand All @@ -82,13 +111,10 @@ pub struct InfluxDbSeries<T> {
}

impl InfluxDbClient {
pub fn json_query<T: 'static>(
pub fn json_query(
&self,
q: InfluxDbReadQuery,
) -> Box<dyn Future<Item = Option<Vec<InfluxDbSeries<T>>>, Error = InfluxDbError>>
where
T: DeserializeOwned,
{
) -> impl Future<Item = DatabaseQueryResult, Error = InfluxDbError> {
use futures::future;

let query = q.build().unwrap();
Expand All @@ -113,13 +139,11 @@ impl InfluxDbClient {
"Only SELECT and SHOW queries supported with JSON deserialization",
),
};
return Box::new(
future::err::<Option<Vec<InfluxDbSeries<T>>>, InfluxDbError>(error),
);
return Either::B(future::err::<DatabaseQueryResult, InfluxDbError>(error));
}
};

Box::new(
Either::A(
client
.send()
.and_then(|mut res| {
Expand All @@ -137,9 +161,9 @@ impl InfluxDbClient {
});
} else {
// Json has another structure, let's try actually parsing it to the type we're deserializing
let from_slice = serde_json::from_slice::<DatabaseQueryResult<T>>(&body);
let from_slice = serde_json::from_slice::<DatabaseQueryResult>(&body);

let mut deserialized = match from_slice {
let deserialized = match from_slice {
Ok(deserialized) => deserialized,
Err(err) => {
return futures::future::err(InfluxDbError::DeserializationError {
Expand All @@ -148,7 +172,7 @@ impl InfluxDbClient {
}
};

return futures::future::result(Ok(deserialized.results.remove(0).series));
return futures::future::result(Ok(deserialized));
}
}),
)
Expand Down
Loading

0 comments on commit d5e78ee

Please sign in to comment.