AMAZON SQS(2)Java Client Consumer and Producer
Configuration File build.sbt
"com.amazonaws" % "aws-java-sdk" % "1.10.6", //
First class, SQS Client builder, SQSQueue.scala
package com.sillycat.jobsconsumer.messagequeue
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import com.amazonaws.services.sqs.buffered.{QueueBufferConfig, AmazonSQSBufferedAsyncClient}
import com.amazonaws.services.sqs.{AmazonSQSClient, AmazonSQSAsyncClient}
import scala.collection.JavaConverters._
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest, SendMessageBatchRequestEntry}
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.auth.BasicAWSCredentials
object SQSQueue extends IncludeLogger with IncludeConfig{
private def getCredential = {
new BasicAWSCredentials(
config.getString(envStr("sqs.keyId")),
config.getString(envStr("sqs.accessKey")))
}
def getAsyncClient = {
val client = new AmazonSQSAsyncClient(getCredential)
client.setRegion(Region.getRegion(Regions.fromName(config.getString(envStr("sqs.region")))))
client
}
}
At first, I am thinking that I can have a embedded SQS server, actually I found one, but there is AKKA system conflict there. So I do not want to speed more time on that.
package com.sillycat.jobsconsumer.messagequeue
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.{ReceiveMessageRequest}
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
class SQSQueueSpec extends FunSpec with Matchers with BeforeAndAfterAll with IncludeConfig with IncludeLogger{
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
var client: AmazonSQSAsyncClient= _
var queueUrl: String = _
override def beforeAll(): Unit = {
//build client
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
client = SQSQueue.getAsyncClient
queueUrl = client.getQueueUrl(queueName).getQueueUrl
}
}
override def afterAll() {
}
describe("IncludeSQSProducer") {
describe("sendMessage"){
it("Directly Send String") {
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
val expect = "134343143"
client.sendMessage(queueUrl ,expect)
val msgs = client
.receiveMessage(new ReceiveMessageRequest((
queueUrl
)).withMaxNumberOfMessages(10)).getMessages
msgs.size() should be (1)
msgs.get(0).getBody should be (expect)
client.deleteMessage(queueUrl,msgs.get(0).getReceiptHandle)
}
}
}
}
}
Write some code in the trait to make the consumer and producer easier.
The consumer trait, IncludeSQSConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer
import com.amazonaws.services.sqs.model.Message
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
import scala.collection.JavaConverters._
trait IncludeSQSConsumer extends IncludeLogger with IncludeConfig{
protected def queueName = "default"
while(true){
SQSQueue.getAsyncClient
.receiveMessage(new ReceiveMessageRequest((getQueueUrl)).withMaxNumberOfMessages(10))
.getMessages
.asScala
.map(handleMessage)
.foreach(deleteMessage)
}
protected def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}
protected def handleMessage(msg: Message): Message
protected def deleteMessage(msg: Message) = {
logger.debug("Deleting Message after operation " + msg.getReceiptHandle)
SQSQueue.getAsyncClient.deleteMessageAsync(
new DeleteMessageRequest(getQueueUrl,msg.getReceiptHandle))
}
}
Simple Consumer, RawjobConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer
import com.amazonaws.services.sqs.model.{Message}
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
object RawJobConsumer extends App with IncludeSQSConsumer with IncludeLogger with IncludeConfig{
override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
logger.debug("Polling the Raw Job Message from Queue: " + queueName)
queueName
}
protected def handleMessage(msg: Message): Message = {
logger.debug("Receiving Message from SQS " + msg.getBody)
msg
}
}
trait of producer, IncludeSQSProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer
import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageBatchRequestEntry}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
trait IncludeSQSProducer extends IncludeLogger with IncludeConfig{
protected def queueName = "default"
def sendMessage(sourceMsg:Any): Unit = {
logger.debug("Send the Message to Queue " + getQueueUrl)
val msg = new SendMessageRequest(getQueueUrl,prepareMessage(sourceMsg))
SQSQueue.getAsyncClient.sendMessage(msg)
}
protected def prepareMessage(sourceMsg:Any):String
private def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}
}
Simple Producer, ClassifiedJobProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer
object ClassifiedJobProducer extends IncludeSQSProducer{
override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.classifiedjobs"))
logger.debug("Send the classified Job Message to Queue: " + queueName)
queueName
}
def prepareMessage(sourceMsg:Any):String = {
sourceMsg.toString
}
}
References:
JAVA or SCALA
http://aws.amazon.com/java/
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq
old blog
https://github.com/luohuazju/sillycat-analyzer-java
https://github.com/adamw/mqperf
Message Compress
kryo
http://blog.csdn.net/rocklee/article/details/26706145
messagepack
http://web2.0coder.com/archives/347
http://www.cnblogs.com/peiandsky/archive/2012/04/24/2467766.html
protobuf
https://github.com/google/protobuf
https://github.com/drslump/Protobuf-PHP
serializing and deserializing XML
http://alvinalexander.com/scala/serializing-deserializing-xml-scala-classes
Testing SQS Server
http://maciejb.me/2012/10/17/testing-your-java-amazon-sqs-code-with-elasticmq/
https://github.com/adamw/elasticmq
已有 0 人发表留言,猛击->>这里<<-参与讨论
ITeye推荐
—软件人才免语言低担保 赴美带薪读研!—