最近重拾scala,由于某些原因想要尝试了rabbitmq。于是便有了这块代码。
刚才重新看了rabbitmq的官网,上面关于scala的客户端的信息貌似更新了?之前那个lift-amqp太旧了,没法使用。不过没关系,这篇就当做了记录下如何在scala的环境下用rabbitmq的Java客户端。如果你想直接用现成的scala客户端,可以参考这里。
首先你要安装rabbitmq,这样之后好测试。mac下用brew安装命令是
$ brew update $ brew install rabbitmq
第一句update是防止你用过旧的包。安装好之后应该会提示你用rabbitmq-server
即可启动服务端。另外有一个监控的web界面http://localhost:15672
。默认rabbitmq绑定在5672
端口。
接下来搭建你的sbt工程。
libraryDependencies += "com.rabbitmq" % "amqp-client" % "3.4.3" libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.3.9"
这里使用了最新的amqp的Java客户端,另外使用了akka的actor。因为我用的scala是2.10.4(sbt版本是0.13.2),scala在2.10.x开始不建议使用默认的actor并且分离为叫做scala-actor的一个包,可选的比如akka-actor。actor主要用在接受信息时。
参考rabbitmq网站上关于Java客户端的写法,这里简单构成了一个queue的模式。发送端
import com.rabbitmq.client.ConnectionFactory import java.io.{ByteArrayOutputStream, ObjectOutputStream} class SenderExample { private val QUEUE_NAME = "fruits" // setup channel // ref https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java val factory = new ConnectionFactory() factory.setHost("localhost") val conn = factory.newConnection() val channel = conn.createChannel() // durable = false, exclusive = false, autoDelete = false, arguments = null channel.queueDeclare(QUEUE_NAME, false, false, false, null) /** * Send 'Hello, world!' */ def send = { /* val bytes = new ByteArrayOutputStream val store = new ObjectOutputStream(bytes) store.writeObject("Hello, world!") store.close */ val payload = "Hello, world!" channel.basicPublish("", QUEUE_NAME, null, payload.getBytes) } }
最主要的是发送时候的basicPublish。
接收端稍微要复杂点,直接Java的客户端是while-true方式循环处理的,这就像常规的ServerSocket的处理方式。为了提高处理速率,可以采用线程的方式,直接的新建线程或者更普遍的ExecuteService的线程池。不过scala中更好的方式是actor,而且对于请求积压有mailbox等机制,更易使用而且稳定。
import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.rabbitmq.client.{ConnectionFactory, Connection, Channel, QueueingConsumer} class ReceiverExample { def receive(actor: ActorRef): Unit = { val factory = new ConnectionFactory factory.setHost("localhost") val connection = factory.newConnection val channel = connection.createChannel channel.queueDeclare("fruits", false, false, false, null) val consumer = new QueueingConsumer(channel) channel.basicConsume("fruits", true, consumer) while (true) { val delivery = consumer.nextDelivery() val payload = new String(delivery.getBody()) actor ! StringMessage(payload) } } } case class StringMessage(payload: String) class ReceiverActor extends Actor { def receive = { case StringMessage(payload) => println(s"receive string message [$payload]") } } object ReceiverExample { def main(args: Array[String]): Unit = { val example = new ReceiverExample val system = ActorSystem("DefaultSystem") val actor = system.actorOf(Props[ReceiverActor], name = "receiver") example.receive(actor) } }
注意这里使用了akka的actor,消息的识别并没有在actor,个人认为也没有特别的必要把所有逻辑都放进actor。while-true外层也可以处理部分功能无关的东西。大致来说,这里是Java的客户端适配scala的actor的方式。
最后,如果要测试的话,需要分别开发送方和接收方。可以开两个sbt console的方式。
另外因为涉及到akka-actor, rabbitmq-client等包,老实说用网页查询API也有点多,dash还是不错的,在这里帮了很多。