Skip to content

Commit 4e40e74

Browse files
committed
RUBY-456 Read Semantics update for MongoDB 2.2
Updates read semantics for the Ruby driver in order to comply with the specification outlined here: http://docs.mongodb.org/manual/applications/replication/#replica-set-read-preference - Modified ReplSetConnection :read parameter to accept and respect the following read preference modes (Note :secondary behaves differently than before) :primary :primary_preferred :secondary :secondary_preferred :nearest - Added :tag_sets parameter to ReplSetConnection in order for tag sets to be specified as part of read preference. - Added :secondary_acceptable_latency_ms parameter to ReplSetConnection for setting maximum latency specification as part of read preference.
1 parent 3f4254d commit 4e40e74

27 files changed

+786
-490
lines changed

lib/mongo/collection.rb

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ class Collection
2323

2424
attr_reader :db, :name, :pk_factory, :hint, :safe
2525

26+
# Read Preference
27+
attr_accessor :read_preference, :tag_sets, :acceptable_latency
28+
2629
# Initialize a collection object.
2730
#
2831
# @param [String, Symbol] name the name of the collection.
@@ -97,6 +100,8 @@ def initialize(name, db, opts={})
97100
value = @db.read_preference
98101
end
99102
@read_preference = value.is_a?(Hash) ? value.dup : value
103+
@tag_sets = opts.fetch(:tag_sets, @db.tag_sets)
104+
@acceptable_latency = opts.fetch(:acceptable_latency, @db.acceptable_latency)
100105
end
101106
@pk_factory = pk_factory || opts[:pk] || BSON::ObjectId
102107
@hint = nil
@@ -220,6 +225,8 @@ def find(selector={}, opts={})
220225
transformer = opts.delete(:transformer)
221226
show_disk_loc = opts.delete(:show_disk_loc)
222227
read = opts.delete(:read) || @read_preference
228+
tag_sets = opts.delete(:tag_sets) || @tag_sets
229+
acceptable_latency = opts.delete(:acceptable_latency) || @acceptable_latency
223230

224231
if timeout == false && !block_given?
225232
raise ArgumentError, "Collection#find must be invoked with a block when timeout is disabled."
@@ -247,7 +254,9 @@ def find(selector={}, opts={})
247254
:max_scan => max_scan,
248255
:show_disk_loc => show_disk_loc,
249256
:return_key => return_key,
250-
:read => read
257+
:read => read,
258+
:tag_sets => tag_sets,
259+
:acceptable_latency => acceptable_latency
251260
})
252261

253262
if block_given?
@@ -763,13 +772,6 @@ def group(opts, condition={}, initial={}, reduce=nil, finalize=nil)
763772
end
764773
end
765774

766-
# The value of the read preference. This will be
767-
# either +:primary+, +:secondary+, or an object
768-
# representing the tags to be read from.
769-
def read_preference
770-
@read_preference
771-
end
772-
773775
private
774776

775777
def new_group(opts={})

lib/mongo/connection.rb

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class Connection
4242

4343
attr_reader :logger, :size, :auths, :primary, :safe, :host_to_try,
4444
:pool_size, :connect_timeout, :pool_timeout,
45-
:primary_pool, :socket_class, :op_timeout
45+
:primary_pool, :socket_class, :op_timeout, :tag_sets, :acceptable_latency
4646

4747
# Create a connection to single MongoDB instance.
4848
#
@@ -127,6 +127,10 @@ def initialize(host=nil, port=nil, opts={})
127127
@primary = nil
128128
@primary_pool = nil
129129

130+
# Not set for direct connection
131+
@tag_sets = {}
132+
@acceptable_latency = 15
133+
130134
check_opts(opts)
131135
setup(opts)
132136
end
@@ -334,6 +338,15 @@ def [](db_name)
334338
DB.new(db_name, self)
335339
end
336340

341+
def refresh
342+
end
343+
344+
def pin_pool(pool)
345+
end
346+
347+
def unpin_pool(pool)
348+
end
349+
337350
# Drop a database.
338351
#
339352
# @param [String] name name of an existing database.
@@ -471,7 +484,7 @@ def read_pool
471484
# The value of the read preference.
472485
def read_preference
473486
if slave_ok?
474-
:secondary
487+
:secondary_preferred
475488
else
476489
:primary
477490
end
@@ -492,15 +505,9 @@ def max_bson_size
492505
@max_bson_size
493506
end
494507

495-
# Prefer primary pool but fall back to secondary
496-
def checkout_best
497-
connect unless connected?
498-
@primary_pool.checkout
499-
end
500-
501508
# Checkout a socket for reading (i.e., a secondary node).
502509
# Note: this is overridden in ReplSetConnection.
503-
def checkout_reader
510+
def checkout_reader(mode=:primary, tag_sets={}, acceptable_latency=15)
504511
connect unless connected?
505512
@primary_pool.checkout
506513
end
@@ -515,24 +522,11 @@ def checkout_writer
515522
# Check a socket back into its pool.
516523
# Note: this is overridden in ReplSetConnection.
517524
def checkin(socket)
518-
if @primary_pool && socket
525+
if @primary_pool && socket && socket.pool
519526
socket.pool.checkin(socket)
520527
end
521528
end
522529

523-
# Excecutes block with the best available socket
524-
def best_available_socket
525-
socket = nil
526-
begin
527-
socket = checkout_best
528-
yield socket
529-
ensure
530-
if socket
531-
socket.pool.checkin(socket)
532-
end
533-
end
534-
end
535-
536530
protected
537531

538532
def valid_opts

lib/mongo/cursor.rb

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ def initialize(collection, opts={})
7777
value = collection.read_preference
7878
end
7979
@read_preference = value.is_a?(Hash) ? value.dup : value
80+
@tag_sets = opts.fetch(:tag_sets, @collection.tag_sets)
81+
@acceptable_latency = opts.fetch(:acceptable_latency, @collection.acceptable_latency)
82+
8083
batch_size(opts[:batch_size] || 0)
8184

8285
@full_collection_name = "#{@collection.db.name}.#{@collection.name}"
@@ -335,7 +338,7 @@ def close
335338
message.put_int(1)
336339
message.put_long(@cursor_id)
337340
log(:debug, "Cursor#close #{@cursor_id}")
338-
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message)
341+
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, :socket => @socket)
339342
end
340343
@cursor_id = 0
341344
@closed = true
@@ -458,19 +461,37 @@ def refresh
458461
end
459462
end
460463

464+
# Sends initial query -- which is always a read unless it is a command
465+
#
466+
# Upon ConnectionFailure, tries query 3 times if socket was not provided
467+
# and the query is either not a command or is a secondary_ok command.
468+
#
469+
# Pins pools upon successful read and unpins pool upon ConnectionFailure
470+
#
461471
def send_initial_query
462-
message = construct_query_message
463-
sock = @socket || checkout_socket_from_connection
472+
tries = 0
464473
instrument(:find, instrument_payload) do
465474
begin
466-
results, @n_received, @cursor_id = @connection.receive_message(
467-
Mongo::Constants::OP_QUERY, message, nil, sock, @command,
468-
nil, @options & OP_QUERY_EXHAUST != 0)
469-
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
470-
checkin_socket(sock) unless @socket
475+
tries += 1
476+
message = construct_query_message
477+
sock = @socket || checkout_socket_from_connection
478+
results, @n_received, @cursor_id = @connection.receive_message(
479+
Mongo::Constants::OP_QUERY, message, nil, sock, @command,
480+
nil, @options & OP_QUERY_EXHAUST != 0)
481+
rescue ConnectionFailure => ex
482+
if tries < 3 && !@socket && (!@command || Mongo::Support.secondary_ok?(@selector))
483+
@connection.unpin_pool(sock.pool) if sock
484+
@connection.refresh
485+
retry
486+
else
487+
raise ex
488+
end
489+
rescue OperationFailure, OperationTimeout => ex
471490
raise ex
491+
ensure
492+
checkin_socket(sock) unless @socket
472493
end
473-
checkin_socket(sock) unless @socket
494+
@connection.pin_pool(sock.pool) if !@command && !@socket
474495
@returned += @n_received
475496
@cache += results
476497
@query_run = true
@@ -498,38 +519,32 @@ def send_get_more
498519
# Cursor id.
499520
message.put_long(@cursor_id)
500521
log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger
522+
501523
sock = @socket || checkout_socket_from_connection
502524

503525
begin
504-
results, @n_received, @cursor_id = @connection.receive_message(
505-
Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
506-
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
526+
results, @n_received, @cursor_id = @connection.receive_message(
527+
Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
528+
ensure
507529
checkin_socket(sock) unless @socket
508-
raise ex
509530
end
510-
checkin_socket(sock) unless @socket
531+
511532
@returned += @n_received
512533
@cache += results
513534
close_cursor_if_query_complete
514535
end
515536

516537
def checkout_socket_from_connection
517-
socket = nil
518538
begin
519-
@checkin_connection = true
520-
if @command || @read_preference == :primary
521-
socket = @connection.checkout_writer
522-
elsif @read_preference == :secondary_only
523-
socket = @connection.checkout_secondary
539+
if @command && !Mongo::Support::secondary_ok?(@selector)
540+
@connection.checkout_reader(:primary)
524541
else
525-
socket = @connection.checkout_reader
542+
@connection.checkout_reader(@read_preference, @tag_sets, @acceptable_latency)
526543
end
527544
rescue SystemStackError, NoMemoryError, SystemCallError => ex
528545
@connection.close
529546
raise ex
530547
end
531-
532-
socket
533548
end
534549

535550
def checkin_socket(sock)

lib/mongo/db.rb

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ def strict?; @strict; end
5353
# The length of time that Collection.ensure_index should cache index calls
5454
attr_accessor :cache_time
5555

56+
# Read Preference
57+
attr_accessor :read_preference, :tag_sets, :acceptable_latency
58+
5659
# Instances of DB are normally obtained by calling Mongo#db.
5760
#
5861
# @param [String] name the database name.
@@ -72,6 +75,7 @@ def strict?; @strict; end
7275
# value is provided, the default value set on this instance's Connection object will be used. This
7376
# default can be overridden upon instantiation of any collection by explicity setting a :safe value
7477
# on initialization
78+
#
7579
# @option opts [Integer] :cache_time (300) Set the time that all ensure_index calls should cache the command.
7680
#
7781
# @core databases constructor_details
@@ -87,6 +91,8 @@ def initialize(name, connection, opts={})
8791
value = @connection.read_preference
8892
end
8993
@read_preference = value.is_a?(Hash) ? value.dup : value
94+
@tag_sets = opts.fetch(:tag_sets, @connection.tag_sets)
95+
@acceptable_latency = opts.fetch(:acceptable_latency, @connection.acceptable_latency)
9096
@cache_time = opts[:cache_time] || 300 #5 minutes.
9197
end
9298

@@ -112,8 +118,11 @@ def authenticate(username, password, save_auth=true)
112118
end
113119
end
114120

115-
@connection.best_available_socket do |socket|
121+
begin
122+
socket = @connection.checkout_reader(:primary_preferred)
116123
issue_authentication(username, password, save_auth, :socket => socket)
124+
ensure
125+
socket.pool.checkin(socket) if socket
117126
end
118127

119128
@connection.authenticate_pools
@@ -628,13 +637,6 @@ def validate_collection(name)
628637
doc
629638
end
630639

631-
# The value of the read preference. This will be
632-
# either +:primary+, +:secondary+, or an object
633-
# representing the tags to be read from.
634-
def read_preference
635-
@read_preference
636-
end
637-
638640
private
639641

640642
def system_command_collection

0 commit comments

Comments
 (0)