博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
最佳实践:如何基于MNS实现一对多拉取消息消费模型
阅读量:5891 次
发布时间:2019-06-19

本文共 4196 字,大约阅读时间需要 13 分钟。

如何实现一对多拉取消息消费模型

问题背景

阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push模式。上面两种模型基本能满足我们大多数应用场景。

 

推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送。有些情况下,比如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型。具体方案如下:

 

解决方案:

让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到1对多的广播消息,又不需要暴露消费者的地址;如下图所示:

fd522053707462f71e44928315782ab2e1cde430

接口说明:

MNS最新的 (1.1.5)中的CloudPullTopic 默认支持上述解决方案。其中

MNSClient 提供下面两个接口来快速创建CloudPullTopic

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector
queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate) public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector
queueNameList)

其中,TopicMeta 是创建topicmeta 设置, queueNameList里指定topic消息推送的队列名列表;needCreateQueue表明queueNameList是否需要创建;queueMetaTemplate是创建queue需要的queue meta 参数设置;

Demo 代码

CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);        MNSClient client = account.getMNSClient();        // build consumer name list.        Vector
consumerNameList = new Vector
(); String consumerName1 = "consumer001"; String consumerName2 = "consumer002"; String consumerName3 = "consumer003"; consumerNameList.add(consumerName1); consumerNameList.add(consumerName2); consumerNameList.add(consumerName3); QueueMeta queueMetaTemplate = new QueueMeta(); queueMetaTemplate.setPollingWaitSeconds(30); try{ //producer code: // create pull topic which will send message to 3 queues for consumer. String topicName = "demo-topic-for-pull"; TopicMeta topicMeta = new TopicMeta(); topicMeta.setTopicName(topicName); CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate); //publish message and consume message. String messageBody = "broadcast message to all the consumers:hello the world."; // if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly. TopicMessage tMessage = new RawTopicMessage(); tMessage.setBaseMessageBody(messageBody); pullTopic.publishMessage(tMessage); // consumer code: //3 consumers receive the message. CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1); CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2); CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3); Message consumer1Msg = queueForConsumer1.popMessage(30); if(consumer1Msg != null) { System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString()); }else{ System.out.println("the queue is empty"); } Message consumer2Msg = queueForConsumer2.popMessage(30); if(consumer2Msg != null) { System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString()); }else{ System.out.println("the queue is empty"); } Message consumer3Msg = queueForConsumer3.popMessage(30); if(consumer3Msg != null) { System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString()); }else{ System.out.println("the queue is empty"); } // delete the fullTopic. pullTopic.delete(); }catch(ClientException ce) { System.out.println("Something wrong with the network connection between client and MNS service." + "Please check your network and DNS availablity."); ce.printStackTrace(); } catch(ServiceException se) { /*you can get more MNS service error code in following link. https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html?spm=5176.docmns/api_reference/error_code/error_response */ se.printStackTrace(); } client.close();

转载地址:http://uabsx.baihongyu.com/

你可能感兴趣的文章
lintcode 单词接龙II
查看>>
Material Design学习之 ProgreesBar
查看>>
WEB版一次选择多个文件进行批量上传(WebUploader)的解决方案
查看>>
Redis之 命令行 操作
查看>>
Jvm(46),指令集----对象创建与访问指令
查看>>
EL 表达式小结
查看>>
内部排序
查看>>
jQuery EasyUI API 中文文档 - 组合(Combo)
查看>>
10个关于 Dropbox 的另类功用(知乎问答精编)[还是转来了]
查看>>
Oracle体系结构
查看>>
用Modelsim仿真QII FFT IP核的时候出现的Error: Illegal target for defparam
查看>>
javascript Error对象详解
查看>>
nc 局域网聊天+文件传输(netcat)
查看>>
每天一个linux命令(25):linux文件属性详解
查看>>
go微服务框架go-micro深度学习(三) Registry服务的注册和发现
查看>>
python 重载方法有哪些特点 - 老王python - 博客园
查看>>
在Fedora8上安装MySQL5.0.45的过程
查看>>
TCP长连接与短连接的区别
查看>>
设计模式之命令模式
查看>>
android 测试 mondey
查看>>