require 'rubygems' require 'mqtt' require 'pp' # MQTT::Packet에 get 메서드 추가 # topic, payload(유저 메시지) class MQTT::Packet def topic return @topic end def payload return @payload end end # MQTT::Proxy의 process_packets 메서드 재 정의 # 원래의 process_packets 는 packet만 넘겨주는데, 이 걸로는 # 이 packet에 어떤 클라이언트의 것인지 확인할 수 없다. # 해서 클라이언트를 구분할 수 있는 값(소켓 객체의 id)을 추가로 넘기게 했다. class MQTT::Proxy def process_packets(client_socket,server_socket) loop do selected = IO.select([client_socket,server_socket], nil, nil, @select_timeout) if selected.nil? # Timeout raise "Timeout in select" else if selected[0].include?(client_socket) packet = MQTT::Packet.read(client_socket) packet = @client_filter.call(packet, client_socket.object_id) unless @client_filter.nil? # packet이 nil 이라면 인증 오류인 걸로 한다. if packet.nil? raise "Authorization Error" else server_socket.write(packet) end elsif selected[0].include?(server_socket) packet = MQTT::Packet.read(server_socket) packet = @server_filter.call(packet) unless @server_filter.nil? unless packet.nil? client_socket.write(packet) end else logger.error "Problem with select: socket is neither server or client" end end end end end class MQTTProxy attr_reader :proxy attr_reader :session_info attr_reader :logger def initialize(args) @proxy=MQTT::Proxy.new args @session_info = Hash.new @logger = args[:logger] if @logger.nil? @logger = Logger.new(STDOUT) @logger.level = Logger::INFO end end # 유저 세션을 검사한다. # 귀찮아서 123456이면 성공인 걸로 했다. def validationSession? session # session을 가져오기 위한 연산이 들어간다. puts "Check #{session == "123456"}" return session == "123456" end def run # 클라이언트 요청을 읽어서 분석한다. # MQTT::Proxy에서 읽은 클라이언트의 MQTT 패킷과 id를 매개변수로 연산을 한다. # id는 현재 유저 세션을 가리키는 key로 socket의 object_id를 이용했다. @proxy.client_filter = lambda { |packet, id| # connect 한 후 첫번째 메시지에 대해서 세션을 검사한다. if @session_info.has_key?(id) if @session_info[id] == 0 return nil if !validationSession? packet.payload logger.info "MQTT::Proxy User authentication success" end @session_info[id] += 1 @session_info['topic'] = "/user/#{id}" else @session_info[id] = 0 end return packet } # server filter @proxy.server_filter = lambda { |packet| #puts "From server: #{packet.inspect}" return packet } @proxy.run end end proxy = MQTTProxy.new( :local_host => '0.0.0.0', :local_port => 5000, :server_host => 'test.mosquitto.org', :server_port => 1883 ) proxy.run
# name : client.rb require 'rubygems' require 'mqtt' require 'readline' class MqttClient attr_reader :server_host attr_reader :server_port attr_reader :server attr_reader :logger attr_reader :client_session attr_reader :nickname def initialize(args={}) @server_host = args[:server_host] @server_port = args[:server_port] @nickname = args[:nickname] @server = nil @logger = args[:logger] if @logger.nil? @logger = Logger.new(STDOUT) @logger.level = Logger::INFO end end # Login 메서드 # 123456을 하드 코딩했다. def login session @server.publish "service/auth", session end def pub topic, message @server.publish topic, message end def sub topic puts @server.get topic end def run begin @server = MQTT::Client.connect( :host => @server_host, :port => @server_port ) rescue Exception=> exp puts "Error" end # User Auth login "123456" # Publish Thread Thread.new do while message = Readline.readline("", true) pub "/private/123456", "#{@nickname} -> #{message}" end end # Subscribe loop do sub "/private/123456" end end end nickname = ARGV[0] client = MqttClient.new( server_host: "localhost", server_port: 5000, client_session: "123456", nickname: nickname) client.run