Redis
 sql >> Baza danych >  >> NoSQL >> Redis

Dostęp do zmiennej w wątku rails

ZAKTUALIZOWANA EDYCJA NA KONIEC:Pokazuje działający kod. Główny moduł niezmodyfikowany z wyjątkiem kodu debugowania. Uwaga:napotkałem problem, który już zauważyłem, dotyczący konieczności anulowania subskrypcji przed zakończeniem.

Kod wygląda poprawnie. Chciałbym zobaczyć, jak to tworzysz.

W config/application.rb prawdopodobnie masz co najmniej coś takiego:

require 'ws_communication'
config.middleware.use WsCommunication

Następnie w swoim kliencie JavaScript powinieneś mieć coś takiego:

var ws = new WebSocket(uri);

Czy tworzysz inną instancję WsCommunication? To ustawiłoby @clients na pustą tablicę i mogłoby wykazywać twoje objawy. Coś takiego byłoby niepoprawne:

var ws = new WsCommunication;

Pomogłoby nam, gdybyś pokazał klienta i być może plik config/application.rb, jeśli ten post nie pomoże.

Nawiasem mówiąc, zgadzam się z komentarzem, że @clients powinien być chroniony przez mutex przy każdej aktualizacji, jeśli nie jest również odczytywany. To dynamiczna struktura, która może się zmienić w dowolnym momencie w systemie sterowanym zdarzeniami. redis-mutex to dobra opcja. (Mam nadzieję, że link jest poprawny, ponieważ Github wydaje się w tej chwili zgłaszać 500 błędów we wszystkim.)

Możesz również zauważyć, że $redis.publish zwraca wartość całkowitą liczby klientów, którzy otrzymali wiadomość.

Może się też okazać, że przed zakończeniem musisz upewnić się, że Twój kanał jest wypisany z subskrypcji. Zdarzały mi się sytuacje, w których kończyło się wysyłaniem każdej wiadomości wiele, a nawet wiele razy, z powodu wcześniejszych subskrypcji tego samego kanału, które nie zostały wyczyszczone. Ponieważ subskrybujesz kanał w ramach wątku, będziesz musiał zrezygnować z subskrypcji w tym samym wątku, w przeciwnym razie proces po prostu „zawiesi się”, czekając na magicznie pojawienie się odpowiedniego wątku. Poradzę sobie z tą sytuacją, ustawiając flagę „anuluj subskrypcję”, a następnie wysyłając wiadomość. Następnie, w bloku on.message, testuję flagę anulowania subskrypcji i wystawiam tam informację o anulowaniu subskrypcji.

Dostarczony moduł, z niewielkimi modyfikacjami debugowania:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

Podany przeze mnie testowy kod subskrybenta:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

Testowy kod wydawcy, który podałem. Wydawcę i subskrybenta można łatwo połączyć, ponieważ są to tylko testy:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Przykładowy plik config.ru, który uruchamia to wszystko w warstwie oprogramowania pośredniego szafy:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

To jest główne. Usunąłem go z mojej działającej wersji, więc może wymagać ulepszenia, jeśli go użyjesz:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Nie można pobrać zasobu z puli (SocketTimeoutException:)

  2. Jak wizualizować użycie resque za pomocą Node.js, WebSockets i Redis

  3. Jak korzystać z masowego wstawiania Redis?

  4. Błąd pola w obiekcie „target” w polu „”:wartość odrzucona []; kody [typeMismatch.target.,typeMismatch.,typeMismatch.java.util.Date,typeMismatch]

  5. TTL dla członka zbioru