rabbitmq在scala下的使用尝试


最近重拾scala,由于某些原因想要尝试了rabbitmq。于是便有了这块代码
刚才重新看了rabbitmq的官网,上面关于scala的客户端的信息貌似更新了?之前那个lift-amqp太旧了,没法使用。不过没关系,这篇就当做了记录下如何在scala的环境下用rabbitmq的Java客户端。如果你想直接用现成的scala客户端,可以参考这里

首先你要安装rabbitmq,这样之后好测试。mac下用brew安装命令是

$ brew update
$ brew install rabbitmq

第一句update是防止你用过旧的包。安装好之后应该会提示你用rabbitmq-server即可启动服务端。另外有一个监控的web界面http://localhost:15672。默认rabbitmq绑定在5672端口。

接下来搭建你的sbt工程。

libraryDependencies += "com.rabbitmq" % "amqp-client" % "3.4.3"

libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.3.9"

这里使用了最新的amqp的Java客户端,另外使用了akka的actor。因为我用的scala是2.10.4(sbt版本是0.13.2),scala在2.10.x开始不建议使用默认的actor并且分离为叫做scala-actor的一个包,可选的比如akka-actor。actor主要用在接受信息时。

参考rabbitmq网站上关于Java客户端的写法,这里简单构成了一个queue的模式。发送端

import com.rabbitmq.client.ConnectionFactory
import java.io.{ByteArrayOutputStream, ObjectOutputStream}

class SenderExample {

  private val QUEUE_NAME = "fruits"

  // setup channel
  // ref https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java
  val factory = new ConnectionFactory()
  factory.setHost("localhost")
  val conn = factory.newConnection()
  val channel = conn.createChannel()
  // durable = false, exclusive = false, autoDelete = false, arguments = null
  channel.queueDeclare(QUEUE_NAME, false, false, false, null)

  /**
   * Send 'Hello, world!'
   */
  def send = {
    /* val bytes = new ByteArrayOutputStream
    val store = new ObjectOutputStream(bytes)
    store.writeObject("Hello, world!")
    store.close */
    val payload = "Hello, world!"
    channel.basicPublish("", QUEUE_NAME, null, payload.getBytes)
  }

}

最主要的是发送时候的basicPublish。
接收端稍微要复杂点,直接Java的客户端是while-true方式循环处理的,这就像常规的ServerSocket的处理方式。为了提高处理速率,可以采用线程的方式,直接的新建线程或者更普遍的ExecuteService的线程池。不过scala中更好的方式是actor,而且对于请求积压有mailbox等机制,更易使用而且稳定。

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.rabbitmq.client.{ConnectionFactory, Connection, Channel, QueueingConsumer}

class ReceiverExample {
  
  def receive(actor: ActorRef): Unit = {
    val factory = new ConnectionFactory
    factory.setHost("localhost")
    val connection = factory.newConnection
    val channel = connection.createChannel
    channel.queueDeclare("fruits", false, false, false, null)
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume("fruits", true, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val payload = new String(delivery.getBody())
      actor ! StringMessage(payload)
    }
  }
}

case class StringMessage(payload: String)

class ReceiverActor extends Actor {
  def receive = {
    case StringMessage(payload) => println(s"receive string message [$payload]")
  }
}

object ReceiverExample {
  def main(args: Array[String]): Unit = {
    val example = new ReceiverExample
    val system = ActorSystem("DefaultSystem")
    val actor = system.actorOf(Props[ReceiverActor], name = "receiver")
    example.receive(actor)
  }
}

注意这里使用了akka的actor,消息的识别并没有在actor,个人认为也没有特别的必要把所有逻辑都放进actor。while-true外层也可以处理部分功能无关的东西。大致来说,这里是Java的客户端适配scala的actor的方式。

最后,如果要测试的话,需要分别开发送方和接收方。可以开两个sbt console的方式。
另外因为涉及到akka-actor, rabbitmq-client等包,老实说用网页查询API也有点多,dash还是不错的,在这里帮了很多。