Skip to content

Commit d6c3757

Browse files
committed
Fix async persistent connections
With the addition of persistent connections we need to make a separate InfluxDB::Client per worker as the persistent connection is attached to the client. Here this is done by copying the InfluxDB::Config object and creating a new host queue (so threads don't have to communicate to share hosts) and creating a new client without async enabled (so it will use HTTP or UDP write methods).
1 parent 9ac441e commit d6c3757

File tree

3 files changed

+29
-7
lines changed

3 files changed

+29
-7
lines changed

lib/influxdb/client.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ class Client
4949
# +:ssl_ca_cert+:: ssl CA certificate, chainfile or CA path.
5050
# The system CA path is automatically included
5151
# +:retry+:: number of times a failed request should be retried. Defaults to infinite.
52-
def initialize(database = nil, **opts)
52+
def initialize(database = nil, config = nil, **opts)
53+
raise ArgumentError, "provide config or opts, not both" if
54+
config && !opts.empty?
55+
5356
opts[:database] = database if database.is_a? String
54-
@config = InfluxDB::Config.new(**opts)
57+
@config = config || InfluxDB::Config.new(**opts)
5558
@stopped = false
5659
@writer = find_writer
5760

@@ -82,7 +85,9 @@ def now
8285

8386
def find_writer
8487
if config.async?
85-
InfluxDB::Writer::Async.new(self, config.async)
88+
client = InfluxDB::Client.new nil, config.writer_config
89+
90+
InfluxDB::Writer::Async.new(client, config.async)
8691
elsif config.udp.is_a?(Hash)
8792
InfluxDB::Writer::UDP.new(self, **config.udp)
8893
elsif config.udp?

lib/influxdb/config.rb

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ def initialize(url: nil, **opts)
8383
configure_hosts! opts[:hosts] || opts[:host] || "localhost".freeze
8484
end
8585

86+
def initialize_copy(source)
87+
super
88+
89+
configure_hosts! source.hosts
90+
end
91+
8692
def udp?
8793
udp != false
8894
end
@@ -105,7 +111,15 @@ def hosts
105111
end
106112
end
107113

108-
private
114+
def writer_config
115+
writer_config = dup
116+
117+
writer_config.set_ivar! :async, false
118+
119+
writer_config
120+
end
121+
122+
protected
109123

110124
def set_ivar!(name, value)
111125
case name
@@ -118,6 +132,8 @@ def set_ivar!(name, value)
118132
instance_variable_set "@#{name}", value
119133
end
120134

135+
private
136+
121137
def normalize_retry_option(value)
122138
case value
123139
when Integer then value

spec/influxdb/cases/async_client_spec.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
let(:client) { described_class.new(async: async_options) }
77
let(:subject) { client }
88
let(:stub_url) { "http://localhost:8086/write?db=&p=root&precision=s&u=root" }
9-
let(:worker) { client.writer.worker }
9+
let(:writer) { client.writer }
10+
let(:worker) { writer.worker }
1011

1112
specify { expect(subject.writer).to be_a(InfluxDB::Writer::Async) }
1213

@@ -40,7 +41,7 @@
4041

4142
it "writes aggregate payload to the client" do
4243
queue = Queue.new
43-
allow(client).to receive(:write) do |*args|
44+
allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args|
4445
queue.push(args)
4546
end
4647

@@ -61,7 +62,7 @@
6162

6263
it "writes separated payloads for each {precision, retention_policy, database} set" do
6364
queue = Queue.new
64-
allow(client).to receive(:write) do |*args|
65+
allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args|
6566
queue.push(args)
6667
end
6768

0 commit comments

Comments
 (0)