require 'rubygems'
require 'mqtt'
require 'pp'
# MQTT::Packet에 get 메서드 추가
# topic, payload(유저 메시지)
class MQTT::Packet
def topic
return @topic
end
def payload
return @payload
end
end
# MQTT::Proxy의 process_packets 메서드 재 정의
# 원래의 process_packets 는 packet만 넘겨주는데, 이 걸로는
# 이 packet에 어떤 클라이언트의 것인지 확인할 수 없다.
# 해서 클라이언트를 구분할 수 있는 값(소켓 객체의 id)을 추가로 넘기게 했다.
class MQTT::Proxy
def process_packets(client_socket,server_socket)
loop do
selected = IO.select([client_socket,server_socket], nil, nil, @select_timeout)
if selected.nil?
# Timeout
raise "Timeout in select"
else
if selected[0].include?(client_socket)
packet = MQTT::Packet.read(client_socket)
packet = @client_filter.call(packet, client_socket.object_id) unless @client_filter.nil?
# packet이 nil 이라면 인증 오류인 걸로 한다.
if packet.nil?
raise "Authorization Error"
else
server_socket.write(packet)
end
elsif selected[0].include?(server_socket)
packet = MQTT::Packet.read(server_socket)
packet = @server_filter.call(packet) unless @server_filter.nil?
unless packet.nil?
client_socket.write(packet)
end
else
logger.error "Problem with select: socket is neither server or client"
end
end
end
end
end
class MQTTProxy
attr_reader :proxy
attr_reader :session_info
attr_reader :logger
def initialize(args)
@proxy=MQTT::Proxy.new args
@session_info = Hash.new
@logger = args[:logger]
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::INFO
end
end
# 유저 세션을 검사한다.
# 귀찮아서 123456이면 성공인 걸로 했다.
def validationSession? session
# session을 가져오기 위한 연산이 들어간다.
puts "Check #{session == "123456"}"
return session == "123456"
end
def run
# 클라이언트 요청을 읽어서 분석한다.
# MQTT::Proxy에서 읽은 클라이언트의 MQTT 패킷과 id를 매개변수로 연산을 한다.
# id는 현재 유저 세션을 가리키는 key로 socket의 object_id를 이용했다.
@proxy.client_filter = lambda { |packet, id|
# connect 한 후 첫번째 메시지에 대해서 세션을 검사한다.
if @session_info.has_key?(id)
if @session_info[id] == 0
return nil if !validationSession? packet.payload
logger.info "MQTT::Proxy User authentication success"
end
@session_info[id] += 1
@session_info['topic'] = "/user/#{id}"
else
@session_info[id] = 0
end
return packet
}
# server filter
@proxy.server_filter = lambda { |packet|
#puts "From server: #{packet.inspect}"
return packet
}
@proxy.run
end
end
proxy = MQTTProxy.new(
:local_host => '0.0.0.0',
:local_port => 5000,
:server_host => 'test.mosquitto.org',
:server_port => 1883
)
proxy.run
# name : client.rb
require 'rubygems'
require 'mqtt'
require 'readline'
class MqttClient
attr_reader :server_host
attr_reader :server_port
attr_reader :server
attr_reader :logger
attr_reader :client_session
attr_reader :nickname
def initialize(args={})
@server_host = args[:server_host]
@server_port = args[:server_port]
@nickname = args[:nickname]
@server = nil
@logger = args[:logger]
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::INFO
end
end
# Login 메서드
# 123456을 하드 코딩했다.
def login session
@server.publish "service/auth", session
end
def pub topic, message
@server.publish topic, message
end
def sub topic
puts @server.get topic
end
def run
begin
@server = MQTT::Client.connect(
:host => @server_host,
:port => @server_port
)
rescue Exception=> exp
puts "Error"
end
# User Auth
login "123456"
# Publish Thread
Thread.new do
while message = Readline.readline("", true)
pub "/private/123456", "#{@nickname} -> #{message}"
end
end
# Subscribe
loop do
sub "/private/123456"
end
end
end
nickname = ARGV[0]
client = MqttClient.new(
server_host: "localhost",
server_port: 5000,
client_session: "123456",
nickname: nickname)
client.run