require 'thread' class Worker < Thread class WorkerStopError < RuntimeError; end attr_reader :queue def initialize(db = nil) @queue = Queue.new t = self if db super {db.transaction {t.work}} else super {t.work} end end def work begin loop {@queue.pop.call} rescue WorkerStopError # do nothing end end def async(&block) @queue << block end def join while !@queue.empty? sleep 0.1 end self.raise WorkerStopError super end end require 'rubygems' require 'sequel/postgres' DB = Sequel('postgres://postgres:postgres@localhost:5432/reality_spec') # populate if !DB.table_exists?(:ttt) DB.create_table :ttt do primary_key :id text :name end end DB[:ttt].delete 100.times {|i| DB[:ttt] << {:name => "record_#{i}"}} w = Worker.new(DB) ds = DB[:ttt].order(:id).limit(10) ds.print ds.each do |r| w.async {DB[:ttt].filter(:id => r[:id]).update(:name => r[:name].upcase)} end w.join ds.print