Há apenas alguns dias fiz uma apresentação no FISL10 sobre a utilização de XMPP PubSub com Ruby e sobre um fork de uma biblioteca popular à qual acrescentei os rudimentos do PubSub. Naquela mesma apresentação listei uma série de problemas que aquela abordagem tem e falei sobre um roadmap para o futuro…
Acontece que acabei me convencendo de que não posso utilizar o PubSub no lado XMPP da biblioteca e uma forma de periodical pooling no lado Ruby. Resolvi, então, substituir a biblioteca que havia forkado por uma versão Observable, preservando as coisas boas do XMPP4R-Simple. O resultado chamei de XMPP4R-Observable, e acabo de publicar no GitHub.
Uma boa parte do código está coberta por testes (e “roubei” alguns dos testes da própria XMPP4R-Simple)... pretendo cobrir o restante ao longo do tempo (contribuições são bem-vindas). Por hora, chamei esse primeiro release de versão 0.5.1 e acrescentei um .gemspec para gerar um .gem automaticamente… No entanto, o GitHub ainda não publicou o .gem… Quando publicar, para instalá-lo deve ser tão simples quanto:
bash# gem sources -a http://gems.github.com
bash# gem install spectra-xmpp4r-observable
Não deixem de reportar qualquer erro. Happy hacking.
Update 2009-09-13 10:29:00: Acabo de confirmar que o .gem foi publicado pelo GitHub.
Update 2009-10-10 20:21:00: O .gem do XMPP4R-Observable vai ser mantido no GemCutter, a partir de hoje.
Eventualmente todos enfrentaremos esta situação: você tem um objeto com “estados” que são lidos de uma fonte externa através de pooling. Usualmente você simplesmente adiciona uma thread no construtor do objeto e coloca dentro dela um loop infinito que verifica a cada intervalo de tempo se existe estado novo e, se for o caso, atualiza o estado do objeto.
Essa é uma construção muito simples e bastante utilizada, no entanto tem um problema de escalabilidade importante: para cada objeto você tem uma thread e um loop infinito. Se você tem poucos objetos, isso não chega a ser um problema… mas se tem muitos, aí é outra conversa.
Alguns vão argumentar que não é obrigatório usar essa construção, que dependendo da fonte dos dados, ela pode “atualizar” o objeto, em um padrão Observer, o que está mais do que correto… Mas nem sempre controlamos a fonte dos dados, e nem sempre ela é inteligente assim (na realizade, quando não a controlamos ela parece bantante burra!). Foi para isso que programei uma “Refresh Machine”: um container para objetos que precisam ser atualizados através de pooling. Ela presume duas coisas: que esse objeto tenha um método/atributo refresh que armazene o intervalo em segundos entre as atualizações, e um método do_refresh, que será executado entre os intervalos.
Vamos ao código:
require 'thread'
class RefreshMachine
attr_accessor :wait
def initialize(wait = false)
@wait = wait
@killall = false
@queue = Queue.new
@removed = Array.new
@thread = Thread.new do
loop do
( @removed.clear; @queue.clear; @killall = false ) if @killall
next if @queue.empty?
object, last_refresh, thread = @queue.deq
# Helps the garbage collection
thread = nil if (! thread.nil?) and (! thread.alive?)
# Three things can happen with a dequeued object
if (Time.now < (last_refresh + object.refresh)) or
(! thread.nil? and @wait)
# First: It's too early to refresh it or we still have
# a refresh thread running and have to 'wait', so we
# just put it back in the queue
@queue.enq [ object, last_refresh, thread ]
else
if @removed.include?(object)
# Second: We have a "remove request" for it, so we
# delete the request and avoid queueing it again
@removed.delete(object)
else
# Third: It's time to refresh it, so we
# call do_refresh and put it back in the queue
add(object)
end
end
end
end
end
def add(object)
@queue.enq [ object, Time.now, Thread.new { object.do_refresh } ]
end
def del(object)
@removed << object unless @removed.include?(object)
end
def killall
@killall = true
end
end # of class RefreshMachine
O objetivo é usar o mínimo possível de threads com loops infinitos…. e ficou pequena o bastante para blogar a respeito :-)