Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Commit f7549dc

Browse files
authored
Merge pull request #10 from peroxyacyl/2022.02.24-deflate-influx1
deflate line protocol on influx1
2 parents 8889d4d + 30e6ab8 commit f7549dc

11 files changed

+131
-79
lines changed

src/influxdb-cpp-rest/deflate.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#include <cpprest/http_client.h>
2+
#include <cpprest/json.h>
3+
#include <cpprest/rawptrstream.h>
4+
#include <fmt/ostream.h>
5+
#include <rx.hpp>
6+
#include <atomic>
7+
#include <chrono>
8+
9+
using namespace web::http::client;
10+
using namespace web::http;
11+
using namespace utility;
12+
using namespace web::http::compression::builtin;
13+
14+
namespace influxdb {
15+
namespace utility {
16+
int compress(std::shared_ptr<fmt::MemoryWriter> const& w, std::vector<uint8_t> &compression_buffer) {
17+
auto c = make_compressor(algorithm::GZIP);
18+
const int rawdata_size = w->size();
19+
const uint8_t* rawdata_ptr =
20+
reinterpret_cast<const uint8_t*>(w->data());
21+
int rawdata_cursor = 0;
22+
23+
int compressed_size = 0;
24+
bool done = false;
25+
26+
compression_buffer.resize(rawdata_size);
27+
while (!done)
28+
{
29+
size_t used = 0;
30+
if (compressed_size >= rawdata_size * 16)
31+
{
32+
throw std::runtime_error(
33+
"gzip data is 16 times larger than raw data. abort "
34+
"compression");
35+
}
36+
if (compressed_size == compression_buffer.size())
37+
{
38+
compression_buffer.resize(compressed_size * 2);
39+
}
40+
const int got = c->compress(
41+
rawdata_ptr + rawdata_cursor, // rawdata ptr
42+
rawdata_size - rawdata_cursor, // remaining rawdata size
43+
compression_buffer.data() +
44+
compressed_size, // buffer ptr to append
45+
compression_buffer.size() -
46+
compressed_size, // buffer available size
47+
web::http::compression::operation_hint::is_last, used, done);
48+
compressed_size += got;
49+
rawdata_cursor += used;
50+
}
51+
compression_buffer.resize(compressed_size);
52+
return compressed_size;
53+
}
54+
}
55+
}

src/influxdb-cpp-rest/deflate.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include <vector>
4+
#include <fmt/ostream.h>
5+
6+
namespace influxdb {
7+
namespace utility {
8+
int compress(std::shared_ptr<fmt::MemoryWriter> const &w, std::vector<uint8_t> &compression_buffer);
9+
}
10+
}
11+
12+

src/influxdb-cpp-rest/influxdb2_simple_async_api.cpp

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "influxdb2_simple_async_api.h"
22
#include "influxdb_line.h"
33
#include "input_sanitizer.h"
4+
#include "deflate.h"
45

56
#include <cpprest/http_client.h>
67
#include <cpprest/json.h>
@@ -91,42 +92,8 @@ struct simple_db::impl
9192
{
9293
try
9394
{
94-
auto c = make_compressor(algorithm::GZIP);
95-
96-
const int rawdata_size = w->size();
97-
const uint8_t* rawdata_ptr =
98-
reinterpret_cast<const uint8_t*>(w->data());
99-
int rawdata_cursor = 0;
100-
10195
std::vector<uint8_t> compression_buffer;
102-
int compressed_size = 0;
103-
bool done = false;
104-
105-
compression_buffer.resize(rawdata_size);
106-
while (!done)
107-
{
108-
size_t used = 0;
109-
if (compressed_size >= rawdata_size * 16)
110-
{
111-
throw std::runtime_error(
112-
"gzip data is 16 times larger than raw data. abort "
113-
"compression");
114-
}
115-
if (compressed_size == compression_buffer.size())
116-
{
117-
compression_buffer.resize(compressed_size * 2);
118-
}
119-
const int got = c->compress(
120-
rawdata_ptr + rawdata_cursor, // rawdata ptr
121-
rawdata_size - rawdata_cursor, // remaining rawdata size
122-
compression_buffer.data() +
123-
compressed_size, // buffer ptr to append
124-
compression_buffer.size() -
125-
compressed_size, // buffer available size
126-
web::http::compression::operation_hint::is_last, used, done);
127-
compressed_size += got;
128-
rawdata_cursor += used;
129-
}
96+
compress(w, compression_buffer);
13097

13198
http_request request;
13299
request.set_request_uri(uri_with_db);
@@ -136,7 +103,7 @@ struct simple_db::impl
136103

137104
request.set_body(
138105
concurrency::streams::rawptr_stream<uint8_t>::open_istream(
139-
compression_buffer.data(), compressed_size));
106+
compression_buffer.data(), compression_buffer.size()));
140107
auto response = client.request(request);
141108
try
142109
{
@@ -304,7 +271,7 @@ simple_db::simple_db(std::string const& url, std::string const& org, std::string
304271
std::string const& token, const int duration_seconds,
305272
const int shard_duration_seconds, const int window_max_lines,
306273
const int window_max_ms)
307-
: influxdb::async_api::simple_db(url, bucket, window_max_lines, window_max_ms),
274+
: influxdb::async_api::simple_db(url, bucket, window_max_lines, window_max_ms, true),
308275
pimpl(std::make_unique<impl>(url, org, bucket, token, duration_seconds,
309276
shard_duration_seconds, window_max_lines, window_max_ms))
310277
{

src/influxdb-cpp-rest/influxdb_raw_db.cpp

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77
//
88

99
#include "influxdb_raw_db.h"
10+
#include "deflate.h"
1011

12+
#include <fmt/ostream.h>
1113
#include <cpprest/streams.h>
1214
#include <cpprest/http_client.h>
15+
#include <cpprest/rawptrstream.h>
1316

1417
using namespace utility;
1518
using namespace web;
1619
using namespace web::http;
20+
using namespace web::http::compression::builtin;
1721

1822
namespace {
1923
inline void throw_response(http_response const& response) {
@@ -26,9 +30,9 @@ namespace {
2630

2731
inline http_request request_from(
2832
uri const& uri_with_db,
29-
std::string const& lines,
3033
std::string const& username,
3134
std::string const& password,
35+
bool deflate,
3236
std::string const& retention_policy = "",
3337
web::http::method const& m = methods::POST
3438
) {
@@ -58,16 +62,14 @@ namespace {
5862
)
5963
;
6064
}
61-
62-
request.set_body(lines);
63-
65+
6466
return request;
6567
}
6668
}
6769

68-
influxdb::raw::db::db(string_t const & url, string_t const & name)
70+
influxdb::raw::db::db(string_t const & url, string_t const & name, bool deflate)
6971
:
70-
client(url)
72+
client(url), deflate(deflate)
7173
{
7274
uri_builder builder(client.base_uri());
7375
builder.append(U("/write"));
@@ -82,9 +84,9 @@ void influxdb::raw::db::post(string_t const & query)
8284
builder.append_query(U("q"), query);
8385

8486
// synchronous for now
85-
auto response = client.request(
86-
request_from(builder.to_string(), "", username, password, retention_policy)
87-
);
87+
auto request = request_from(builder.to_string(), username, password, deflate, retention_policy);
88+
request.set_body("");
89+
auto response = client.request(request);
8890

8991
try {
9092
response.wait();
@@ -102,10 +104,10 @@ string_t influxdb::raw::db::get(string_t const & query)
102104

103105
builder.append_query(U("q"), query);
104106

107+
auto request = request_from(builder.to_string(), username, password, deflate, retention_policy);
108+
request.set_body("");
105109
// synchronous for now
106-
auto response = client.request(
107-
request_from(builder.to_string(), "", username, password, retention_policy)
108-
);
110+
auto response = client.request(request);
109111

110112
try {
111113
response.wait();
@@ -123,9 +125,19 @@ string_t influxdb::raw::db::get(string_t const & query)
123125
}
124126
}
125127

126-
void influxdb::raw::db::insert(std::string const & lines)
128+
void influxdb::raw::db::insert(std::shared_ptr<fmt::MemoryWriter> const& lines)
127129
{
128-
auto response = client.request(request_from(uri_with_db, lines, username, password, retention_policy));
130+
auto request = request_from(uri_with_db, username, password, deflate, retention_policy);
131+
std::vector<uint8_t> buffer; // needs to live until sending the request
132+
if (lines->size() > 0 && deflate) {
133+
int size = influxdb::utility::compress(lines, buffer);
134+
request.headers().add(header_names::content_encoding, algorithm::GZIP);
135+
request.set_body(concurrency::streams::rawptr_stream<uint8_t>::open_istream(
136+
buffer.data(), size));
137+
} else {
138+
request.set_body(lines->str());
139+
}
140+
auto response = client.request(request);
129141

130142
try {
131143
response.wait();
@@ -138,7 +150,7 @@ void influxdb::raw::db::insert(std::string const & lines)
138150
}
139151

140152
// synchronous for now
141-
void influxdb::raw::db::insert_async(std::string const & lines)
153+
void influxdb::raw::db::insert_async(std::shared_ptr<fmt::MemoryWriter> const& lines)
142154
{
143155
insert(lines);
144156
}

src/influxdb-cpp-rest/influxdb_raw_db.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#pragma once
77

88
#include <cpprest/http_client.h>
9+
#include <fmt/ostream.h>
910
#include <string>
1011

1112
using utility::string_t;
@@ -21,9 +22,10 @@ namespace influxdb {
2122
std::string username;
2223
std::string password;
2324
std::string retention_policy;
25+
bool deflate;
2426

2527
public:
26-
db(string_t const& url, string_t const& name);
28+
db(string_t const& url, string_t const& name, bool deflate);
2729

2830
/// post queries
2931
void post(string_t const& query);
@@ -32,10 +34,10 @@ namespace influxdb {
3234
string_t get(string_t const& query);
3335

3436
/// post measurements
35-
void insert(std::string const& lines);
37+
void insert(std::shared_ptr<fmt::MemoryWriter> const& lines);
3638

3739
/// post measurements and do not wait
38-
void insert_async(std::string const& lines);
40+
void insert_async(std::shared_ptr<fmt::MemoryWriter> const& lines);
3941

4042
/// set username & password for basic authentication
4143
void with_authentication(std::string const& username, std::string const& password);

src/influxdb-cpp-rest/influxdb_raw_db_utf8.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@ struct influxdb::raw::db_utf8::impl {
1717
db db_utf16;
1818

1919
public:
20-
impl(std::string const& url,std::string const& name)
20+
impl(std::string const& url,std::string const& name, bool deflate)
2121
:
2222
#ifndef _MSC_VER
23-
db_utf16(url, name)
23+
db_utf16(url, name, deflate)
2424
#else
25-
db_utf16(conversions::utf8_to_utf16(url), conversions::utf8_to_utf16(name))
25+
db_utf16(conversions::utf8_to_utf16(url), conversions::utf8_to_utf16(name), deflate)
2626
#endif
2727
{}
2828
};
2929

30-
influxdb::raw::db_utf8::db_utf8(std::string const & url, std::string const& name) :
31-
pimpl(std::make_unique<impl>(url, name))
30+
influxdb::raw::db_utf8::db_utf8(std::string const & url, std::string const& name, bool deflate) :
31+
pimpl(std::make_unique<impl>(url, name, deflate))
3232
{
3333
}
3434

@@ -56,11 +56,11 @@ std::string influxdb::raw::db_utf8::get(std::string const& query) {
5656
#endif
5757
}
5858

59-
void influxdb::raw::db_utf8::insert(std::string const & lines) {
59+
void influxdb::raw::db_utf8::insert(std::shared_ptr<fmt::MemoryWriter> const& lines) {
6060
pimpl->db_utf16.insert(lines);
6161
}
6262

63-
void influxdb::raw::db_utf8::insert_async(std::string const & lines)
63+
void influxdb::raw::db_utf8::insert_async(std::shared_ptr<fmt::MemoryWriter> const& lines)
6464
{
6565
pimpl->db_utf16.insert_async(lines);
6666
}

src/influxdb-cpp-rest/influxdb_raw_db_utf8.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#pragma once
77

8+
#include <fmt/ostream.h>
89
#include <string>
910
#include <memory>
1011

@@ -15,7 +16,7 @@ namespace influxdb {
1516
std::unique_ptr<impl> pimpl;
1617

1718
public:
18-
db_utf8(std::string const& url, std::string const& name);
19+
db_utf8(std::string const& url, std::string const& name, bool deflate);
1920
~db_utf8();
2021

2122
/// post queries
@@ -25,10 +26,10 @@ namespace influxdb {
2526
std::string get(std::string const& query);
2627

2728
/// post measurements
28-
void insert(std::string const& lines);
29+
void insert(std::shared_ptr<fmt::MemoryWriter> const& lines);
2930

3031
/// post measurements without waiting for an answer
31-
void insert_async(std::string const& lines);
32+
void insert_async(std::shared_ptr<fmt::MemoryWriter> const& lines);
3233

3334
/// set username & password for basic authentication
3435
void with_authentication(std::string const& username, std::string const& password);

src/influxdb-cpp-rest/influxdb_simple_api.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@ struct influxdb::api::simple_db::impl {
1717
std::string name;
1818
influxdb::raw::db_utf8 db;
1919

20-
impl(std::string const& url, std::string const& name) :
21-
db(url, name),
20+
impl(std::string const& url, std::string const& name, bool deflate) :
21+
db(url, name, deflate),
2222
name(name)
2323
{
2424
throw_on_invalid_identifier(name);
2525
}
2626
};
2727

28-
influxdb::api::simple_db::simple_db(std::string const& url, std::string const& name) :
29-
pimpl(std::make_unique<impl>(url, name))
28+
influxdb::api::simple_db::simple_db(std::string const& url, std::string const& name, bool deflate) :
29+
pimpl(std::make_unique<impl>(url, name, deflate))
3030
{
3131
}
3232

@@ -46,7 +46,9 @@ void influxdb::api::simple_db::drop()
4646

4747
void influxdb::api::simple_db::insert(line const & lines)
4848
{
49-
pimpl->db.insert(lines.get());
49+
auto w = std::make_shared<fmt::MemoryWriter>();
50+
*w << lines.get();
51+
pimpl->db.insert(w);
5052
}
5153

5254
void influxdb::api::simple_db::with_authentication(std::string const& username, std::string const& password)

src/influxdb-cpp-rest/influxdb_simple_api.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace influxdb {
1919
std::unique_ptr<impl> pimpl;
2020

2121
public:
22-
simple_db(std::string const& url, std::string const& name);
22+
simple_db(std::string const& url, std::string const& name, bool deflate);
2323
~simple_db();
2424

2525
public:

0 commit comments

Comments
 (0)