RabbitMQ Basic usage
Hello World
最基本用法, 用默认 Exchange 向 具名 Queue queue_name
里发布消息.
如果没有消费者, 消息缓存在 Queue 中, 待消费者上线之后再来取.
这种情况下, 消费者会一直监听该 Queue ,消费者并不会考虑自己的处理能力, 但凡有新的消息就会拉下来处理, 来不及处理的话就在客户端堆积. 如果存在处理不过来的消息, 消费者又crash了, 消息就丢了.
publisher:
require 'bunny'
url = "amqp://guest:guest@127.0.0.1:5672"
connection = Bunny.new(url).tap(&:start)
channel = connection.create_channel
channel.default_exchange.publish('Message Body', routing_key: 'queue_name')
connection.close
listener:
require 'bunny'
url = "amqp://guest:guest@127.0.0.1:5672"
connection = Bunny.new(url).tap(&:start)
channel = connection.create_channel
queue = channel.queue('queue_name')
begin
queue.subscribe(block: true) do |_delivery_info, _properties, body|
p body
sleep 5
end
rescue => _e
connection.close
exit(0)
end
Work Queue (Task Queue)
如果我们启动多个消费者, 消息会以 Round-robin
模式被处理 – 按顺序轮流发给各个消费者, 负载平摊到每个消费者上.
默认情况下, RabbitMQ 把发给消费者的消息自动删除.
如果消息是耗时的易错的, 我们可以设置为手动处理, 先接收消息处理消息, 等处理完成之后再发 ack
.
当 channel 关闭 / connection 关闭 / TCP 连接断开, 会被认为是该消费者下线, RabbitMQ 将把这个消费者还没有 ack 的消息重新入 Queue , 再发给其他在线的消费者.
选择手动处理 ack 后, 有一个很重要的特性需要知道, RabbitMQ 没有处理超时, 消费者没有发 ack 就会被认为一直在处理这个消息, 并不会超时自动结束.
也就是说, 如果一个消费者忘记发 ack 后, 他就不会再被派发任何新消息了. 更可拍的是, 重启消费者也不能解决这个问题, 连接断开后, 消息会重新回到 Queue, 然后再分给一个可用消费者, 它也会遇到相同的问题. 所以无论业务逻辑是成功还是失败, 都要发送 ack 来告知 RabbitMQ 你处理完这个消息了, 不然这个消费者就废掉了.
ack 是 channel 级别的.
Queue 的名字不能重复, 已存在的 Queue 不能直接修改它的配置.
默认情况下, RabbitQM 的 queues 和 messages 会在 RabbitMQ 重启之后自动清除, 可以手动开启持久化设置.
对 Queue 使用 durable
来限定 Queue 是持久的, 即使 RabbitMQ 重启 (生产者和消费者都需要在创建Queue的时候设置这个选项);
对 message publish 使用 persistent
来命令消息是落盘的 (只需要生产者配置) .
默认情况下, RabbitMQ 总是会把第n个消息发给第n个的消费者, 不管该消费者是不是忙碌, 也不管其他消费者是不是空闲.
我们可以在 Channel 上设置 prefetch
来限定, 该消费者最多可以同时处理几个消息.
如果 prefetch
设为 1, 如果一个消费者在忙, 那就不会再发给他一个新消息, 去发给下一个空闲的消费者.
publisher:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
message = ARGV.join(' ')
channel.default_exchange.publish(message, routing_key: 'task_queue', durable: true, persistent: true)
puts "published: #{message}"
conn.close
listener:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
channel.prefetch(1)
queue = channel.queue('task_queue', durable: true)
begin
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
p body
sleep 5 if body.include?('sleep')
puts '[DONE]'
channel.ack(delivery_info.delivery_tag)
end
rescue Interrupt => _
conn.close
end
Publish and Subscribe (fanout exchange)
生产者生产消息; 队列存贮消息; 消费者消费消息.
生产者不会直接发消息给消费者.
生产者把消息给 Exchange, Exchange 把消息给 Queue.
Exchange 分下面几个类型: direct, topic, headers, fanout.
新建一个独占的 Queue, 名字由RabbitMQ随机生成: queue = channel.queue('', exclusive: true)
.
当该连接断开的时候, Queue 自动回收, 因为他是被独占的.
对于一个fanout的 Exchange, 如果没人听, 也就是没有 Queue 绑定到这个 Exchange上, 那消息就直接丢弃掉了.
生成者需要确定好 Exchange 的模式和名字, 就往里面发消息就好了;
消费者需要知道 Exchange 的名字, 然后自己创建一个queue 来跟该 Exchange 绑定, 这样就能听到该 Exchange 的消息了.
publisher:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
exchange = channel.fanout('logs_exchange')
message = ARGV.join(' ')
exchange.publish(message)
puts "published: #{message}"
conn.close
listener:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
exchange = channel.fanout('logs_exchange')
queue = channel.queue('', exclusive: true)
queue.bind(exchange)
puts 'Waiting for logs:'
begin
queue.subscribe(block: true) do |_delivery_info, _properties, body|
puts "MSG: #{body}"
end
rescue
channel.close
conn.close
end
Routing (direct exchange)
fanout
是一对多无差别的广播; direct
可以向不同的 routing_key
广播, 消费者自己选择收听哪几个 routing_key
下的消息.
消息选择收听某个 routing_key
, 就需要 queue.bind(exchange, routing_key: routing_key)
.
这样有一个限制就是, routing_key
只能是确定的字符串.
publisher:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
exchange = channel.direct('logs_direct_exchange')
routing_key = ARGV[0] # level
message = ARGV[1..-1].join(' ')
exchange.publish(message, routing_key: routing_key)
puts "routing_key:#{routing_key}, message:#{message}"
conn.close
listener:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
exchange = channel.direct('logs_direct_exchange')
queue = channel.queue('', exclusive: true)
ARGV.each do |routing_key|
queue.bind(exchange, routing_key: routing_key)
end
begin
queue.subscribe(block: true) do |delivery_info, _properties, body|
puts "routing_key:#{delivery_info.routing_key}, message:#{body}"
end
rescue
channel.close
conn.close
exit 0
end
Topic (topic exchange)
* 至少一个
# 零个或多个
包含 hi 的任意 routing_key: "#.hi.#"
.
直接用 hi
就要求生产者只能用 hi
.
publisher:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
exchange = channel.topic('logs_topic_exchange')
routing_key = ARGV[0]
message = ARGV[1..-1].join(' ')
exchange.publish(message, routing_key: routing_key)
puts "routing_key:#{routing_key}, message:#{message}"
conn.close
listener:
require 'bunny'
conn = Bunny.new.tap(&:start)
channel = conn.create_channel
exchange = channel.topic('logs_topic_exchange')
queue = channel.queue('', exclusive: true)
ARGV.each do |routing_key|
p "bind routing_key:#{routing_key}"
queue.bind(exchange, routing_key: routing_key)
end
begin
queue.subscribe(block: true) do |delivery_info, _properties, body|
puts "routing_key:#{delivery_info.routing_key}, message:#{body}"
end
rescue
channel.close
conn.close
exit 0
end