RabbitMQ 是一个由 Erlang 写成的 Advanced Message Queuing Protocol (AMQP) 实现,AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR 的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。反正现在这个世道通常都是小公司拥抱标准,大企业自己搞一套标准,不过公开标准总还是对大众有利的。
RabbitMQ 是由 LShift 提供的一个 AMQP 的开源实现,由以高性能、健壮以及 Scalability 出名的 Erlang 写成,因此也是继承了这些优点。不过,也许还有一些人对 Messaging 到底是干什么有些疑问,这里的消息和 XMPP 里通常是不一样的,比如一个大型系统里,各个组件之间的相互交流,因此通常 QMQP 里对实时性、健壮性、容错性等各方面的要求都要比 XMPP 要高一些。
AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型:
要使用 RabbitMQ 这个 Broker 非常简单,只要把 server 运行起来就可以了,当然 RabbitMQ 还提供了工具供服务器管理以及认证等,不过作为一个简单的示例,可以不考虑这些,下载代码,然后运行即可(当然,要事先安装好 Erlang ,并且还需要 Python 以及 Python 的 json 支持,用于生成一部分代码):
hg clone http://hg.rabbitmq.com/rabbitmq-codegen hg clone http://hg.rabbitmq.com/rabbitmq-server cd rabbitmq-server make run |
然后不管是 Producer 还是 Consumer 都是这个 Broker 的客户端,当然可以按照协议的定义手工来做这个客户端,但是大家通常都更喜欢直接用 API ,这样可以把精力集中在系统的逻辑上,而不是琐碎的网络通讯、编码解码以及同步等东西。正好 RabbitMQ 的客户端 API 是非常丰富的,除了 C/C++ ,其他流行的语言基本上都能找到好用的 API ,参见它的官方列表。我搜索了一下,发现目前官方正在开发一个 C++ 的客户端 API ,但是没有任何文档,所以也不知道完成情况如何,不过大致也可以理解,用 C/C++ 来做这样的模块,要处理网络、线程等东西的话,做到跨平台就很麻烦了,不仅库开发起来很难,大概开发出来的库也用起来不太方便。这里我们就用 Ruby 的一个客户端 API 来举例子吧。
举一个股票行情的例子,Producer 比如是交易所,它会定时(例如,每秒 3 次)公布实时行情。Producer 要产生消息必须要创建一个 Exchange ,Exchange 用于转发消息,但是它不会做存储,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息,如下所示,将股票行情信息转换为 json 格式发送到一个叫做 stocks 的 Exchange 上:
mq.topic('stocks').publish(json) |
当然如果消息总是发送过去就被直接丢弃那就没有什么意思了,一个 Consumer 想要接受消息的话,就要创建一个 Queue ,并把这个 Queue bind 到指定的 Exchange 上,然后 Exchange 会把消息转发到 Queue 那里,Queue 会负责存储消息,Consumer 可以通过主动 Pop 或者是 Subscribe 之后被动回调的方式来从 Queue 中取得消息,例如:
mq.queue("stock-reader-#{ID}").bind(mq.topic('stocks')).subscribe { |data| # ... } |
逻辑结构就是这么简单,完整的代码,Producer 随机生成一些行情数据并编码成 json 发送过来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | require 'mq' STOCK_NAMES = ('a'..'z').to_a.combination(2).map { |a,b| a+b } AMQP.start(:host => 'cad.pluskid.org') do def fake_market STOCK_NAMES.map { |name| [name, 100*rand] } end def dump_json(market) content = market.map { |name, price| "#{name.inspect}: #{price}" }.join(",") return "{" + content + "}" end mq = MQ.new EM.add_periodic_timer(0.4) do json = dump_json(fake_market) mq.topic('stocks').publish(json) end end |
Consumer 则是 subscribe 之后在回调函数中得到行情数据。这里我并没有解析 json ,我本想做一个试验,看大批量的客户端创建大量 Queue 之后服务端的性能表现,所以我在 Consumer 里选一个 1000 以内的随机数,在接受这么多个行情数据包以后自动退出,并将接收速度记录到文件中,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | require 'socket' require 'mq' ID = "#{Process.pid}@#{Socket.gethostname}" AMQP.start(:host => 'cad.pluskid.org') do mq = MQ.new cnt_max = rand(1000) cnt = 0 tstart = Time.now puts "Start to receive #{cnt_max} market data at #{tstart}..." mq.queue("stock-reader-#{ID}").bind(mq.topic('stocks')).subscribe { |data| cnt += 1 if cnt >= cnt_max tend = Time.now speed = cnt/(tend-tstart) File.open("#{ID}.txt", "w") do |f| f.write("#{speed}") end exit(0) end } end |
不过我没办法找到几千几万台机器来测,所以就找了三台机器,为了尽量让各个 Consumer 保持独立,我让它们跑在独立的 Ruby 进程里,不过当我在每台机器上都一下子启动 2.5k 个 Ruby VM 之后,服务端还没反应,客户端倒是都立即内存耗尽而死了,我才发现 Ruby 是不能当 Erlang 来使的。-,-bb 其中有一台俱乐部的机器是不能重启的,费了九牛二虎之力终于恢复过来,以后在也不敢拿 Ruby VM 开这种玩笑了。 😀
另外,如果还想了解关于 AMQP 以及 RabbitMQ 的更多东西的话,这个 Ruby 客户端 API 的项目 Readme 页面里有很多有用的链接。
囧一下,即使是Erlang,单机也起不了2.5k个 *VM* 啊……
@liancheng
呵呵,当时没有仔细想,完全去考虑服务器的承受能力了。不过对于一个普通机器通常能开多少个 VM 我倒是确实没概念。 :p 不过如果是 VM 的话,大概 Erlang 的那个比 Ruby 的还要重量级一些呢,感觉启动退出都要慢许多。
在一台机器上搞那么多RUBY VM,在服务端开几千个QUEUE是没问题的,但是我开了几万个就有问题了,但是在服务器端开几万甚至几百万,都是需要的,这样肯定不能满足我们的需求