class Mongo::PoolManager
Attributes
Public Class Methods
Create a new set of connection pools.
The pool manager will by default use the original seed list passed to the connection objects, accessible via connection.seeds. In addition, the user may pass an additional list of seeds nodes discovered in real time. The union of these lists will be used when attempting to connect, with the newly-discovered nodes being used first.
# File lib/mongo/connection/pool_manager.rb, line 40 def initialize(client, seeds=[]) @client = client @seeds = seeds initialize_immutable_state initialize_mutable_state @pools = Set.new @primary = nil @primary_pool = nil @members = Set.new @refresh_required = false @max_bson_size = DEFAULT_MAX_BSON_SIZE @max_message_size = @max_bson_size * MESSAGE_SIZE_FACTOR @max_wire_version = 0 @min_wire_version = 0 @connect_mutex = Mutex.new thread_local[:locks][:connecting_manager] = false end
Public Instance Methods
We're healthy if all members are pingable and if the view of the replica set returned by isMaster is equivalent to our view. If any of these isn't the case, set @refresh_required to true, and return.
# File lib/mongo/connection/pool_manager.rb, line 90 def check_connection_health return if thread_local[:locks][:connecting_manager] members = copy_members begin seed = get_valid_seed_node rescue ConnectionFailure @refresh_required = true return end unless current_config = seed.config @refresh_required = true seed.close return end if current_config['hosts'].length != members.length @refresh_required = true seed.close return end current_config['hosts'].each do |host| member = members.detect do |m| m.address == host end if member && validate_existing_member(current_config, member) next else @refresh_required = true seed.close return end end seed.close end
# File lib/mongo/connection/pool_manager.rb, line 138 def close(opts={}) begin pools.each { |pool| pool.close(opts) } rescue ConnectionFailure end end
# File lib/mongo/connection/pool_manager.rb, line 134 def closed? pools.all? { |pool| pool.closed? } end
# File lib/mongo/connection/pool_manager.rb, line 64 def connect @connect_mutex.synchronize do begin thread_local[:locks][:connecting_manager] = true @refresh_required = false disconnect_old_members connect_to_members initialize_pools(@members) update_max_sizes @seeds = discovered_seeds ensure thread_local[:locks][:connecting_manager] = false end end clone_state end
# File lib/mongo/connection/pool_manager.rb, line 60 def inspect "<Mongo::PoolManager:0x#{self.object_id.to_s(16)} @seeds=#{@seeds}>" end
# File lib/mongo/connection/pool_manager.rb, line 145 def read read_pool.host_port end
# File lib/mongo/connection/pool_manager.rb, line 81 def refresh!(additional_seeds) @seeds |= additional_seeds connect end
The replica set connection should initiate a full refresh.
# File lib/mongo/connection/pool_manager.rb, line 130 def refresh_required? @refresh_required end
Private Instance Methods
# File lib/mongo/connection/pool_manager.rb, line 229 def assign_primary(member) member.last_state = :primary @primary = member.host_port if existing = @pools_mutable.detect {|pool| pool.node == member } @primary_pool = existing else @primary_pool = Pool.new(self.client, member.host, member.port, :size => self.client.pool_size, :timeout => self.client.pool_timeout, :node => member ) @pools_mutable << @primary_pool end end
# File lib/mongo/connection/pool_manager.rb, line 244 def assign_secondary(member) member.last_state = :secondary @secondaries_mutable << member.host_port if existing = @pools_mutable.detect {|pool| pool.node == member } @secondary_pools_mutable << existing else pool = Pool.new(self.client, member.host, member.port, :size => self.client.pool_size, :timeout => self.client.pool_timeout, :node => member ) @secondary_pools_mutable << pool @pools_mutable << pool end end
# File lib/mongo/connection/pool_manager.rb, line 306 def clone_state @hosts = @hosts_mutable.clone @pools = @pools_mutable.clone @secondaries = @secondaries_mutable.clone @secondary_pools = @secondary_pools_mutable.clone @arbiters = @arbiters_mutable.clone @hosts.freeze @pools.freeze @secondaries.freeze @secondary_pools.freeze @arbiters.freeze end
Connect to each member of the replica set as reported by the given seed node.
# File lib/mongo/connection/pool_manager.rb, line 177 def connect_to_members seed = get_valid_seed_node seed.node_list.each do |host| if existing = @members.detect {|node| node =~ host } if existing.healthy? # Refresh this node's configuration existing.set_config # If we are unhealthy after refreshing our config, drop from the set. if !existing.healthy? @members.delete(existing) else next end else existing.close @members.delete(existing) end end node = Mongo::Node.new(self.client, host) node.connect @members << node if node.healthy? end seed.close if @members.empty? raise ConnectionFailure, "Failed to connect to any given member." end end
# File lib/mongo/connection/pool_manager.rb, line 280 def copy_members members = Set.new @connect_mutex.synchronize do @members.map do |m| members << m.dup end end members end
For any existing members, close and remove any that are unhealthy or already closed.
# File lib/mongo/connection/pool_manager.rb, line 170 def disconnect_old_members @pools_mutable.reject! {|pool| !pool.healthy? } @members.reject! {|node| !node.healthy? } end
# File lib/mongo/connection/pool_manager.rb, line 276 def discovered_seeds @members.map(&:host_port) end
Iterate through the list of provided seed nodes until we've gotten a response from the replica set we're trying to connect to.
If we don't get a response, raise an exception.
# File lib/mongo/connection/pool_manager.rb, line 265 def get_valid_seed_node @seeds.each do |seed| node = Mongo::Node.new(self.client, seed) node.connect return node if node.healthy? end raise ConnectionFailure, "Cannot connect to a replica set using seeds " + "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}" end
# File lib/mongo/connection/pool_manager.rb, line 290 def initialize_immutable_state @hosts = Set.new.freeze @pools = Set.new.freeze @secondaries = Set.new.freeze @secondary_pools = [].freeze @arbiters = [].freeze end
# File lib/mongo/connection/pool_manager.rb, line 298 def initialize_mutable_state @hosts_mutable = Set.new @pools_mutable = Set.new @secondaries_mutable = Set.new @secondary_pools_mutable = [] @arbiters_mutable = [] end
Initialize the connection pools for the primary and secondary nodes.
# File lib/mongo/connection/pool_manager.rb, line 208 def initialize_pools(members) @primary_pool = nil @primary = nil @secondaries_mutable.clear @secondary_pools_mutable.clear @hosts_mutable.clear members.each do |member| member.last_state = nil @hosts_mutable << member.host_string if member.primary? assign_primary(member) elsif member.secondary? # member could be not primary but secondary still is false assign_secondary(member) end end @arbiters_mutable = members.first.arbiters end
# File lib/mongo/connection/pool_manager.rb, line 151 def update_max_sizes unless @members.size == 0 @max_bson_size = @members.map(&:max_bson_size).min @max_message_size = @members.map(&:max_message_size).min @max_wire_version = @members.map(&:max_wire_version).min @min_wire_version = @members.map(&:min_wire_version).max end end
# File lib/mongo/connection/pool_manager.rb, line 160 def validate_existing_member(current_config, member) if current_config['ismaster'] && member.last_state != :primary return false elsif member.last_state != :other return false end return true end