最近重拾scala,由于某些原因想要尝试了rabbitmq。于是便有了这块代码。
刚才重新看了rabbitmq的官网,上面关于scala的客户端的信息貌似更新了?之前那个lift-amqp太旧了,没法使用。不过没关系,这篇就当做了记录下如何在scala的环境下用rabbitmq的Java客户端。如果你想直接用现成的scala客户端,可以参考这里。
首先你要安装rabbitmq,这样之后好测试。mac下用brew安装命令是
$ brew update $ brew install rabbitmq
第一句update是防止你用过旧的包。安装好之后应该会提示你用rabbitmq-server即可启动服务端。另外有一个监控的web界面http://localhost:15672。默认rabbitmq绑定在5672端口。
接下来搭建你的sbt工程。
libraryDependencies += "com.rabbitmq" % "amqp-client" % "3.4.3" libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.3.9"
这里使用了最新的amqp的Java客户端,另外使用了akka的actor。因为我用的scala是2.10.4(sbt版本是0.13.2),scala在2.10.x开始不建议使用默认的actor并且分离为叫做scala-actor的一个包,可选的比如akka-actor。actor主要用在接受信息时。
参考rabbitmq网站上关于Java客户端的写法,这里简单构成了一个queue的模式。发送端
import com.rabbitmq.client.ConnectionFactory
import java.io.{ByteArrayOutputStream, ObjectOutputStream}
class SenderExample {
private val QUEUE_NAME = "fruits"
// setup channel
// ref https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java
val factory = new ConnectionFactory()
factory.setHost("localhost")
val conn = factory.newConnection()
val channel = conn.createChannel()
// durable = false, exclusive = false, autoDelete = false, arguments = null
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
/**
* Send 'Hello, world!'
*/
def send = {
/* val bytes = new ByteArrayOutputStream
val store = new ObjectOutputStream(bytes)
store.writeObject("Hello, world!")
store.close */
val payload = "Hello, world!"
channel.basicPublish("", QUEUE_NAME, null, payload.getBytes)
}
}
最主要的是发送时候的basicPublish。
接收端稍微要复杂点,直接Java的客户端是while-true方式循环处理的,这就像常规的ServerSocket的处理方式。为了提高处理速率,可以采用线程的方式,直接的新建线程或者更普遍的ExecuteService的线程池。不过scala中更好的方式是actor,而且对于请求积压有mailbox等机制,更易使用而且稳定。
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.rabbitmq.client.{ConnectionFactory, Connection, Channel, QueueingConsumer}
class ReceiverExample {
def receive(actor: ActorRef): Unit = {
val factory = new ConnectionFactory
factory.setHost("localhost")
val connection = factory.newConnection
val channel = connection.createChannel
channel.queueDeclare("fruits", false, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume("fruits", true, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val payload = new String(delivery.getBody())
actor ! StringMessage(payload)
}
}
}
case class StringMessage(payload: String)
class ReceiverActor extends Actor {
def receive = {
case StringMessage(payload) => println(s"receive string message [$payload]")
}
}
object ReceiverExample {
def main(args: Array[String]): Unit = {
val example = new ReceiverExample
val system = ActorSystem("DefaultSystem")
val actor = system.actorOf(Props[ReceiverActor], name = "receiver")
example.receive(actor)
}
}
注意这里使用了akka的actor,消息的识别并没有在actor,个人认为也没有特别的必要把所有逻辑都放进actor。while-true外层也可以处理部分功能无关的东西。大致来说,这里是Java的客户端适配scala的actor的方式。
最后,如果要测试的话,需要分别开发送方和接收方。可以开两个sbt console的方式。
另外因为涉及到akka-actor, rabbitmq-client等包,老实说用网页查询API也有点多,dash还是不错的,在这里帮了很多。