2015-08-03

AMAZON SQS(2)Java Client Consumer and Producer

Configuration File build.sbt
"com.amazonaws" % "aws-java-sdk" % "1.10.6",                //

First class, SQS Client builder, SQSQueue.scala
package com.sillycat.jobsconsumer.messagequeue

import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}

import com.amazonaws.services.sqs.buffered.{QueueBufferConfig, AmazonSQSBufferedAsyncClient}
import com.amazonaws.services.sqs.{AmazonSQSClient, AmazonSQSAsyncClient}
import scala.collection.JavaConverters._
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest, SendMessageBatchRequestEntry}
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.auth.BasicAWSCredentials

object SQSQueue extends IncludeLogger with IncludeConfig{

private def getCredential = {
new BasicAWSCredentials(
config.getString(envStr("sqs.keyId")),
config.getString(envStr("sqs.accessKey")))
}

def getAsyncClient = {
val client = new AmazonSQSAsyncClient(getCredential)
client.setRegion(Region.getRegion(Regions.fromName(config.getString(envStr("sqs.region")))))
client
}

}

At first, I am thinking that I can have a embedded SQS server, actually I found one, but there is AKKA system conflict there. So I do not want to speed more time on that.
package com.sillycat.jobsconsumer.messagequeue

import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.{ReceiveMessageRequest}
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}

class SQSQueueSpec extends FunSpec with Matchers with BeforeAndAfterAll with IncludeConfig with IncludeLogger{

val queueName = config.getString(envStr("sqs.queueName.rawjobs"))

var client: AmazonSQSAsyncClient= _

var queueUrl: String = _

override def beforeAll(): Unit = {
//build client

if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
client = SQSQueue.getAsyncClient
queueUrl = client.getQueueUrl(queueName).getQueueUrl
}
}

override def afterAll() {
}

describe("IncludeSQSProducer") {
describe("sendMessage"){
it("Directly Send String") {
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
val expect = "134343143"
client.sendMessage(queueUrl ,expect)
val msgs = client
.receiveMessage(new ReceiveMessageRequest((
queueUrl
)).withMaxNumberOfMessages(10)).getMessages
msgs.size() should be (1)
msgs.get(0).getBody should be (expect)
client.deleteMessage(queueUrl,msgs.get(0).getReceiptHandle)
}
}
}
}

}

Write some code in the trait to make the consumer and producer easier.
The consumer trait, IncludeSQSConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer

import com.amazonaws.services.sqs.model.Message

import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
import scala.collection.JavaConverters._

trait IncludeSQSConsumer extends IncludeLogger with IncludeConfig{

protected def queueName = "default"

while(true){
SQSQueue.getAsyncClient
.receiveMessage(new ReceiveMessageRequest((getQueueUrl)).withMaxNumberOfMessages(10))
.getMessages
.asScala
.map(handleMessage)
.foreach(deleteMessage)
}

protected def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}

protected def handleMessage(msg: Message): Message

protected def deleteMessage(msg: Message) = {
logger.debug("Deleting Message after operation " + msg.getReceiptHandle)
SQSQueue.getAsyncClient.deleteMessageAsync(
new DeleteMessageRequest(getQueueUrl,msg.getReceiptHandle))
}

}

Simple Consumer, RawjobConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer

import com.amazonaws.services.sqs.model.{Message}
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}

object RawJobConsumer extends App with IncludeSQSConsumer with IncludeLogger with IncludeConfig{

override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
logger.debug("Polling the Raw Job Message from Queue: " + queueName)
queueName
}

protected def handleMessage(msg: Message): Message = {
logger.debug("Receiving Message from SQS " + msg.getBody)
msg
}

}

trait of producer, IncludeSQSProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer

import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageBatchRequestEntry}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}

trait IncludeSQSProducer extends IncludeLogger with IncludeConfig{

protected def queueName = "default"

def sendMessage(sourceMsg:Any): Unit = {
logger.debug("Send the Message to Queue " + getQueueUrl)
val msg = new SendMessageRequest(getQueueUrl,prepareMessage(sourceMsg))
SQSQueue.getAsyncClient.sendMessage(msg)
}

protected def prepareMessage(sourceMsg:Any):String

private def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}

}

Simple Producer, ClassifiedJobProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer

object ClassifiedJobProducer extends IncludeSQSProducer{

override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.classifiedjobs"))
logger.debug("Send the classified Job Message to Queue: " + queueName)
queueName
}

def prepareMessage(sourceMsg:Any):String = {
sourceMsg.toString
}

}

References:
JAVA or SCALA
http://aws.amazon.com/java/
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq

old blog
https://github.com/luohuazju/sillycat-analyzer-java
https://github.com/adamw/mqperf

Message Compress
kryo
http://blog.csdn.net/rocklee/article/details/26706145

messagepack
http://web2.0coder.com/archives/347
http://www.cnblogs.com/peiandsky/archive/2012/04/24/2467766.html

protobuf
https://github.com/google/protobuf
https://github.com/drslump/Protobuf-PHP

serializing and deserializing XML
http://alvinalexander.com/scala/serializing-deserializing-xml-scala-classes

Testing SQS Server
http://maciejb.me/2012/10/17/testing-your-java-amazon-sqs-code-with-elasticmq/
https://github.com/adamw/elasticmq

已有 0 人发表留言,猛击->>这里<<-参与讨论

ITeye推荐

—软件人才免语言低担保 赴美带薪读研!—

Show more