Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Commit 1caca5e

Browse files
committed
refactor
1 parent b6211b4 commit 1caca5e

File tree

1 file changed

+45
-36
lines changed

1 file changed

+45
-36
lines changed

src/influxdb-cpp-rest/influxdb2_simple_async_api.cpp

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -91,57 +91,66 @@ struct simple_db::impl
9191
{
9292
try
9393
{
94-
std::vector<concurrency::streams::istream> streams;
9594
auto c = make_compressor(algorithm::GZIP);
9695

96+
const int rawdata_size = w->size();
97+
const uint8_t* rawdata_ptr =
98+
reinterpret_cast<const uint8_t*>(w->data());
99+
int rawdata_cursor = 0;
100+
101+
std::vector<uint8_t> compression_buffer;
102+
int compressed_size = 0;
97103
bool done = false;
98-
size_t processed = 0;
99-
size_t csize = 0;
100-
std::vector<uint8_t> pre;
101-
pre.resize(w->size());
104+
105+
compression_buffer.resize(rawdata_size);
102106
while (!done)
103107
{
104108
size_t used = 0;
105-
if (csize == pre.size())
109+
if (compressed_size >= rawdata_size * 16)
110+
{
111+
throw std::runtime_error(
112+
"gzip data is 16 times larger than raw data. abort "
113+
"compression");
114+
}
115+
if (compressed_size == compression_buffer.size())
106116
{
107-
pre.resize(csize * 2);
117+
compression_buffer.resize(compressed_size * 2);
108118
}
109-
auto got = c->compress(
110-
reinterpret_cast<const uint8_t*>(w->data()) + processed,
111-
w->size() - processed, pre.data() + csize, pre.size() - csize,
119+
const int got = c->compress(
120+
rawdata_ptr + rawdata_cursor, // rawdata ptr
121+
rawdata_size - rawdata_cursor, // remaining rawdata size
122+
compression_buffer.data() +
123+
compressed_size, // buffer ptr to append
124+
compression_buffer.size() -
125+
compressed_size, // buffer available size
112126
web::http::compression::operation_hint::is_last, used, done);
113-
csize += got;
114-
processed += used;
127+
compressed_size += got;
128+
rawdata_cursor += used;
115129
}
116-
streams.emplace_back(
117-
concurrency::streams::rawptr_stream<uint8_t>::open_istream(
118-
pre.data(), csize));
119130

120-
for (auto& stream : streams)
121-
{
122-
http_request request;
123-
request.set_request_uri(uri_with_db);
124-
request.set_method(methods::POST);
125-
request.headers().add("Authorization", "Token " + token);
126-
request.headers().add(header_names::content_encoding,
127-
algorithm::GZIP);
131+
http_request request;
132+
request.set_request_uri(uri_with_db);
133+
request.set_method(methods::POST);
134+
request.headers().add("Authorization", "Token " + token);
135+
request.headers().add(header_names::content_encoding, algorithm::GZIP);
128136

129-
request.set_body(stream);
130-
auto response = client.request(request);
131-
try
132-
{
133-
response.wait();
134-
if (!(response.get().status_code() == status_codes::OK ||
135-
response.get().status_code() == status_codes::NoContent))
136-
{
137-
throw std::runtime_error(response.get().extract_string().get());
138-
}
139-
}
140-
catch (const std::exception& e)
137+
request.set_body(
138+
concurrency::streams::rawptr_stream<uint8_t>::open_istream(
139+
compression_buffer.data(), compressed_size));
140+
auto response = client.request(request);
141+
try
142+
{
143+
response.wait();
144+
if (!(response.get().status_code() == status_codes::OK ||
145+
response.get().status_code() == status_codes::NoContent))
141146
{
142-
throw std::runtime_error(e.what());
147+
throw std::runtime_error(response.get().extract_string().get());
143148
}
144149
}
150+
catch (const std::exception& e)
151+
{
152+
throw std::runtime_error(e.what());
153+
}
145154
}
146155
catch (const std::exception& e)
147156
{

0 commit comments

Comments
 (0)