Fork me on GitHub

Apresentando XMPP4R-Observable 0

Há apenas alguns dias fiz uma apresentação no FISL10 sobre a utilização de XMPP PubSub com Ruby e sobre um fork de uma biblioteca popular à qual acrescentei os rudimentos do PubSub. Naquela mesma apresentação listei uma série de problemas que aquela abordagem tem e falei sobre um roadmap para o futuro…

Acontece que acabei me convencendo de que não posso utilizar o PubSub no lado XMPP da biblioteca e uma forma de periodical pooling no lado Ruby. Resolvi, então, substituir a biblioteca que havia forkado por uma versão Observable, preservando as coisas boas do XMPP4R-Simple. O resultado chamei de XMPP4R-Observable, e acabo de publicar no GitHub.

Uma boa parte do código está coberta por testes (e “roubei” alguns dos testes da própria XMPP4R-Simple)... pretendo cobrir o restante ao longo do tempo (contribuições são bem-vindas). Por hora, chamei esse primeiro release de versão 0.5.1 e acrescentei um .gemspec para gerar um .gem automaticamente… No entanto, o GitHub ainda não publicou o .gem… Quando publicar, para instalá-lo deve ser tão simples quanto:


bash# gem sources -a http://gems.github.com
bash# gem install spectra-xmpp4r-observable

Não deixem de reportar qualquer erro. Happy hacking.

Update 2009-09-13 10:29:00: Acabo de confirmar que o .gem foi publicado pelo GitHub.

Update 2009-10-10 20:21:00: O .gem do XMPP4R-Observable vai ser mantido no GemCutter, a partir de hoje.

Refresh Machine em Ruby 3

Eventualmente todos enfrentaremos esta situação: você tem um objeto com “estados” que são lidos de uma fonte externa através de pooling. Usualmente você simplesmente adiciona uma thread no construtor do objeto e coloca dentro dela um loop infinito que verifica a cada intervalo de tempo se existe estado novo e, se for o caso, atualiza o estado do objeto.

Essa é uma construção muito simples e bastante utilizada, no entanto tem um problema de escalabilidade importante: para cada objeto você tem uma thread e um loop infinito. Se você tem poucos objetos, isso não chega a ser um problema… mas se tem muitos, aí é outra conversa.

Alguns vão argumentar que não é obrigatório usar essa construção, que dependendo da fonte dos dados, ela pode “atualizar” o objeto, em um padrão Observer, o que está mais do que correto… Mas nem sempre controlamos a fonte dos dados, e nem sempre ela é inteligente assim (na realizade, quando não a controlamos ela parece bantante burra!). Foi para isso que programei uma “Refresh Machine”: um container para objetos que precisam ser atualizados através de pooling. Ela presume duas coisas: que esse objeto tenha um método/atributo refresh que armazene o intervalo em segundos entre as atualizações, e um método do_refresh, que será executado entre os intervalos.

Vamos ao código:


require 'thread'
class RefreshMachine

  attr_accessor :wait

  def initialize(wait = false)
    @wait    = wait
    @killall = false
    @queue   = Queue.new
    @removed = Array.new
    @thread  = Thread.new do
      loop do
        ( @removed.clear; @queue.clear; @killall = false ) if @killall
        next if @queue.empty?
        object, last_refresh, thread = @queue.deq

        # Helps the garbage collection
        thread = nil if (! thread.nil?) and (! thread.alive?)

        # Three things can happen with a dequeued object
        if (Time.now < (last_refresh + object.refresh)) or
           (! thread.nil? and @wait)
          # First: It's too early to refresh it or we still have
          #        a refresh thread running and have to 'wait', so we
          #        just put it back in the queue
          @queue.enq [ object, last_refresh, thread ]
        else
          if @removed.include?(object)
            # Second: We have a "remove request" for it, so we
            #         delete the request and avoid queueing it again
            @removed.delete(object)
          else
            # Third: It's time to refresh it, so we
            #        call do_refresh and put it back in the queue
            add(object)
          end
        end
      end
    end
  end

  def add(object)
    @queue.enq [ object, Time.now, Thread.new { object.do_refresh } ]
  end

  def del(object)
    @removed << object unless @removed.include?(object)
  end

  def killall
    @killall = true
  end

end # of class RefreshMachine

O objetivo é usar o mínimo possível de threads com loops infinitos…. e ficou pequena o bastante para blogar a respeito :-)