require 'thread' module Sequel class ConnectionPool attr_reader :mutex attr_reader :max_size attr_accessor :connection_proc attr_reader :available_connections, :allocated, :created_count def initialize(max_size = 4, &block) @max_size = max_size @mutex = Mutex.new @connection_proc = block @available_connections = [] @waiting_threads = [] @allocated = {} @created_count = 0 end def size @created_count end def hold t = Thread.current if (conn = owned_connection(t)) return yield(conn) end conn = acquire(t) # while !(conn = acquire(t)) # sleep 0.001 # end begin yield conn ensure release(t, conn) end rescue Exception => e raise e.is_a?(StandardError) ? e : e.message end private def owned_connection(thread) @mutex.synchronize {@allocated[thread]} end def acquire(thread) @mutex.synchronize do if conn = available return @allocated[thread] = conn else @waiting_threads << thread end end puts "sleeping #{thread.inspect}" sleep puts "woken up #{thread.inspect}" c = thread[:conn] puts "here's my conn: #{c.inspect}" c end def available @available_connections.pop || make_new end def make_new if @created_count < @max_size @created_count += 1 @connection_proc.call end end def release(thread, conn) wake_up = false @mutex.synchronize do @allocated.delete(thread) # thread[:conn] = nil if thread = @waiting_threads.shift @allocated[thread] = conn thread[:conn] = conn wake_up = true else @available_connections << conn end end if wake_up thread.wakup puts "waking up #{thread.inspect}" end end end end $c = Sequel::ConnectionPool.new x = 0 $c.connection_proc = proc {x += 1} def test Thread.new {$c.hold {|c| sleep 3; puts c}} end puts "*************************" 5.times {test} while Thread.list.size > 1; sleep 0.1; end