Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions influxdb/src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,28 @@ impl From<&str> for Type {
Type::Text(b.into())
}
}

#[cfg(feature = "chrono")]
impl<Tz: chrono::TimeZone> From<chrono::DateTime<Tz>> for Type {
fn from(dt: chrono::DateTime<Tz>) -> Self {
match dt.timestamp_nanos_opt() {
Some(nanos) => Type::SignedInteger(nanos),
None => {
// For dates before 1677-09-21, or after
// 2262-04-11, we're just going to return 0.
Type::SignedInteger(0)
}
}
}
}

#[cfg(feature = "time")]
impl From<time::UtcDateTime> for Type {
fn from(dt: time::UtcDateTime) -> Self {
Type::SignedInteger(dt.unix_timestamp_nanos().try_into().unwrap_or(0))
}
}

impl<T> From<&T> for Type
where
T: Copy + Into<Type>,
Expand Down
31 changes: 31 additions & 0 deletions influxdb/tests/derive_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ struct WeatherReading {
wind_strength: Option<u64>,
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "derive", derive(InfluxDbWriteable))]
struct WeatherReadingWithNonstandardTime {
#[influxdb(time)]
reading_time: DateTime<Utc>,
#[influxdb(ignore)]
time: DateTime<Utc>,
#[influxdb(ignore)]
humidity: i32,
pressure: i32,
#[influxdb(tag)]
wind_strength: Option<u64>,
}

#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(Deserialize))]
struct WeatherReadingWithoutIgnored {
Expand All @@ -47,6 +61,23 @@ fn test_build_query() {
);
}

#[test]
fn test_build_nonstandard_query() {
let weather_reading = WeatherReadingWithNonstandardTime {
reading_time: Timestamp::Hours(1).into(),
time: Timestamp::Hours(1).into(),
humidity: 30,
pressure: 100,
wind_strength: Some(5),
};
let query = weather_reading.into_query("weather_reading");
let query = query.build().unwrap();
assert_eq!(
query.get(),
"weather_reading,wind_strength=5 pressure=100i 3600000000000"
);
}

#[cfg(feature = "derive")]
/// INTEGRATION TEST
///
Expand Down
81 changes: 58 additions & 23 deletions influxdb_derive/src/writeable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
use quote::quote;
use syn::{
parse::{Parse, ParseStream},
punctuated::Punctuated,
Expand All @@ -9,27 +9,32 @@ use syn::{
#[derive(Debug)]
struct WriteableField {
ident: Ident,
is_time: bool,
is_tag: bool,
is_ignore: bool,
}

mod kw {
use syn::custom_keyword;

custom_keyword!(time);
custom_keyword!(tag);
custom_keyword!(ignore);
}

#[allow(dead_code)] // TODO do we need to store the keywords?
enum FieldAttr {
Time(kw::time),
Tag(kw::tag),
Ignore(kw::ignore),
}

impl Parse for FieldAttr {
fn parse(input: ParseStream<'_>) -> syn::Result<Self> {
let lookahead = input.lookahead1();
if lookahead.peek(kw::tag) {
if lookahead.peek(kw::time) {
Ok(Self::Time(input.parse()?))
} else if lookahead.peek(kw::tag) {
Ok(Self::Tag(input.parse()?))
} else if lookahead.peek(kw::ignore) {
Ok(Self::Ignore(input.parse()?))
Expand All @@ -52,6 +57,7 @@ impl TryFrom<Field> for WriteableField {

fn try_from(field: Field) -> syn::Result<WriteableField> {
let ident = field.ident.expect("fields without ident are not supported");
let mut has_time_attr = false;
let mut is_tag = false;
let mut is_ignore = false;

Expand All @@ -60,6 +66,7 @@ impl TryFrom<Field> for WriteableField {
Meta::List(list) if list.path.is_ident("influxdb") => {
for attr in syn::parse2::<FieldAttrs>(list.tokens)?.0 {
match attr {
FieldAttr::Time(_) => has_time_attr = true,
FieldAttr::Tag(_) => is_tag = true,
FieldAttr::Ignore(_) => is_ignore = true,
}
Expand All @@ -69,8 +76,23 @@ impl TryFrom<Field> for WriteableField {
}
}

if [has_time_attr, is_tag, is_ignore]
.iter()
.filter(|&&b| b)
.count()
> 1
{
panic!("only one of time, tag, or ignore can be used");
}

// A field is considered a time field if:
// 1. It has the #[influxdb(time)] attribute, OR
// 2. It's named "time" and doesn't have #[influxdb(ignore)]
let is_time = has_time_attr || (ident == "time" && !is_ignore);

Ok(WriteableField {
ident,
is_time,
is_tag,
is_ignore,
})
Expand All @@ -97,39 +119,52 @@ pub fn expand_writeable(input: DeriveInput) -> syn::Result<TokenStream> {
}
};

let time_field = format_ident!("time");
let time_field_str = time_field.to_string();
#[allow(clippy::cmp_owned)] // that's not how idents work clippy
let fields = match fields {
let writeable_fields: Vec<WriteableField> = match fields {
Fields::Named(fields) => fields
.named
.into_iter()
.filter_map(|f| {
WriteableField::try_from(f)
.map(|wf| {
if !wf.is_ignore && wf.ident.to_string() != time_field_str {
let ident = wf.ident;
Some(match wf.is_tag {
true => quote!(query.add_tag(stringify!(#ident), self.#ident)),
false => quote!(query.add_field(stringify!(#ident), self.#ident)),
})
} else {
None
}
})
.transpose()
})
.map(WriteableField::try_from)
.collect::<syn::Result<Vec<_>>>()?,
_ => panic!("a struct without named fields is not supported"),
_ => panic!("A struct without named fields is not supported!"),
};

// Find the time field
let mut time_field = None;
for wf in &writeable_fields {
if wf.is_time {
if time_field.is_some() {
panic!("multiple time fields found!");
}
time_field = Some(wf.ident.clone());
}
}

// There must be exactly one time field
let time_field = time_field.expect("no time field found");

// Generate field assignments (excluding time and ignored fields)
let field_assignments = writeable_fields
.into_iter()
.filter_map(|wf| {
if wf.is_ignore || wf.is_time {
None
} else {
let ident = wf.ident;
Some(match wf.is_tag {
true => quote!(query.add_tag(stringify!(#ident), self.#ident)),
false => quote!(query.add_field(stringify!(#ident), self.#ident)),
})
}
})
.collect::<Vec<_>>();

Ok(quote! {
impl #impl_generics ::influxdb::InfluxDbWriteable for #ident #ty_generics #where_clause {
fn into_query<I: Into<String>>(self, name: I) -> ::influxdb::WriteQuery {
let timestamp: ::influxdb::Timestamp = self.#time_field.into();
let mut query = timestamp.into_query(name);
#(
query = #fields;
query = #field_assignments;
)*
query
}
Expand Down
Loading