版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
Activemq-cpp開發(fā)手冊
丁靖
-05-06
1引言
1-1編寫目
迅速學習CMS,提高CMS開發(fā)效率,提供一種CMS開發(fā)參照手冊
詳細API手冊請參照
1.2功能簡介
Activemq-cpp是一種與ActiveMQ交互通訊C++API開發(fā)庫,為C++開發(fā)者提供了一,種訪問
ActiveMQ接口。
Winkeemq-cpp是一種在Activ^mq-cpp基本上封裝API庫,對某些重復機械初始化及銷毀清
除及某些不關懷細節(jié)進行了封裝,從而簡化了編程。
1.3術語解析
ActiveMQ:開源消息隊列服務消
Broker:消息中介,每個消息隊列服務淵中至少有一種broker,是消息隊列載體
Destination:消息在broker上目地
Queue:消息隊列
Topic:主題
Message:消息
Producer:消息產生者
Consumer:消息消費者
Client:客戶端.生產者和消費者都在客戶端上
Server:Aclivemq服務器
BrokcrUri:客戶端訪問服務器上broker時Uri
其他資料請參照
2開發(fā)前準備
在開發(fā)前必要先安裝activemq-cpp及winkeemq-cpp庫,詳細環(huán)參照《aclivemq-cpp安裝
及使用文檔.doc》
3CMS
3.1概述
CMS(standsforC++MessagingService)是一組C++應用程序接口(C++API),它提供創(chuàng)立、
發(fā)送、接受、讀取消息服務。定義了一組和Sun公司和它合伙伙伴設計CMSAPI相似公共
應用程序接口和相應語法,使得C++程序可以和其她消息組件進行通信。
CMS是一種與廠商無關APL用來訪問消息收發(fā)系統(tǒng)。它類似于JDBC(JavaDatabase
Connectivity):這里,JDBC是可以用來訪問許多不同關系數(shù)據(jù)庫API,而CMS則提供同
樣與廠商無關訪問辦法,以訪問消息收發(fā)服務。CMS使您可以通過消息收發(fā)服務(有時稱
為消息中介程序或路由器)從一種CMS客戶機向另一種客戶機發(fā)送消息。消息是CMS中
一種類型對象,由兩某些構成:報頭和消息主體、gg由路隹信息以及關于該消息元數(shù)據(jù)
構成。消息主體則攜帶著應用程序數(shù)據(jù)或有效負載。依照有效負載類型來劃分,可以將消
息分為幾種類型,它們分別攜帶:簡樸文本(TcxtMcssagc)、可序列化對象(ObjcclMcssagc)、
屬性集合(MapMessage)>字節(jié)流(ByiesMessage)、原始值流(SlreamMessage),尚有無有效
負載消息(Message)。
消息收發(fā)系統(tǒng)是異步,也就是說,CMS客戶機可以發(fā)送消息而不必等待回應。比較可
知,這完全不同于基于RPC(基于遠程過程)系統(tǒng),如EJB1.1、CORBA和JavaRMI引
用實現(xiàn)。在RPC中,客戶機調用服務器上某個分布式對象一種辦法。在辦法調用返回之前,
該客戶機被阻塞:該客戶機在可以執(zhí)行卜.一條指令之前,必要等待辦法調用結束.在CMS
中,客戶機將消息發(fā)送給一種虛擬通道(主題或隊列),而其他CMS客戶機則預訂或監(jiān)聽
這個虛擬通道。當CMS客戶機發(fā)送消息時,它并不等待回應。它執(zhí)行發(fā)送操作,然后繼
續(xù)執(zhí)行下一條指令。消息也許最后轉發(fā)到一種或許各種客戶機,這些客戶機都不需要作出回
應。
CMS通用接口集合以異步方式發(fā)送或接受消息。異步方式接受消息顯然是使用間斷網(wǎng)
絡連接客戶機,諸如移動電話和PDA最佳選取。此外,CMS采用一種寬松結合方式整合
公司系統(tǒng)辦法,其重要目就是創(chuàng)立可以使用跨平臺數(shù)據(jù)信息、可移植公司級應用程序,而把
開發(fā)人力解放出來。
CMS消息服務支持兩種消息模型:Point-to-Point消息(P2P)和發(fā)布訂閱消息(Publish
Subscribemessaging,簡稱Pub/Sub)。CMS規(guī)范并不規(guī)定供應商同步支持這兩種消息模型,
/日開發(fā)者應當熟悉這物種消息模型優(yōu)勢與缺陷.
P2P消息模型是在點對點之間傳遞消息時使用。如果應用程序開發(fā)者但愿每一條消息都
可以被解決,那么應當使用P2P消息模型。與Pub/Sub消息模型不同,P2P消息總是可以被
傳送到指定位置。
Pub/Sub模型在一到多消息廣播時使用。如果一定陽度消息傳遞不可靠性可以被接受話,
那么應用程序開發(fā)者也可以使用Pub/Sub消息模型。換句話說,它合用于所有消息消費程序
并不規(guī)定可以收到所有信息或者消息消費程序并不想接受到任訶消息狀況。
CMS通過容許創(chuàng)立持久訂閱來簡化時間有關性,雖然消息預訂者未激活也可以接受到
消息。此外,使用持久訂閱還可通過隊列提供靈活性和可靠性,而依然容許消息被發(fā)給許
多接受者。TopicSubscribertopicSubscriber=iopicSession.createDurableSubscriber(topic.
subscrip(ionNamc);Connection對象表達了到兩種消息模型中任一種消息系統(tǒng)連接。服
務器端和客戶機端對象規(guī)定管理創(chuàng)立CMS連接狀態(tài)。連接是由ConnectionFaclory創(chuàng)立并
且通過JNDI查尋定位。
〃獲得用于P2PQueueConnectionFactorj,
QneiieConnectionFartorjf=qneiieCannecti(^nF;ictory():
Contextmessaging=newInitialContext():
QucucConncciionFactory
=(QucucConncctionFactory)Messaging.lookup(*,QucucConncctionFactory^^);
〃獲得用于pub/subTopicConnectionFactorjf
TopicConnectonFactorytopicConnectionFactory;
Contextmessaging=newInitialContext();
topicConneclionFactory
=(TopicConneclionFactory)mcssaging.lookup(“TopicConncciionFaciory"):
注意:用于P2P代碼和用于PublishSubscribc代碼非常相似。
如果session被標記為transactional話,確認消息就通過確認和校正來自動地解決。如果
session沒有標記為transactioral.你有三個用干消息確認詵項°
?AUTO.ACKNOWLEDGEsession將自動地確認收到一則消息。
?CLIENT_ACKNOWLEDGE客戶端程序將確認收到一則消息,調用這則消息確認辦
法。?DUPS_OK_ACKNOWLEDGE這個選項命令session''懶散''確認消息傳遞,可
以想到,這將導致消息提供者傳遞某些復制消息也許會出錯。這種確認方式只應當用于消息
消費程序可以容忍潛在副本消息存在狀況。
queueSession
=queueConnection.createQueiieSession(false,session.AUTO-ACKNOWLEDGE);//!^?
lopicSession
=topicConnection.createTopicSession(false,session.AUTO_ACKNOWLEDGE);//Pub-Sub
注意:在本例中,一種session目從連結中創(chuàng)立,非值指出session是non-lransac【ional,
并且session將自動地確認收到一則消息。
CMS當前有兩種傳遞消息方式。標記為NON_PERSISTENT消息最多投遞?次,而標
記為PERSISTENT消息將使用暫存后再轉送機理投遞。如果一種CMS服務離線,那么持
久性消息不會丟失但是得等到這個服務恢復聯(lián)機時才會被傳遞.因此默認消息傳遞方式是
非持久性。雖然使用非持久性消息也許減少內務和需要存儲器,并且這種傳遞方式只有當你
不需要接受所有消息時才使用。
雖然CMS規(guī)范并不需要CMS供應商實現(xiàn)消息優(yōu)先級路線,但是它需要遞送加快消息
優(yōu)先于普通級別消息。CMS定義了從0到9優(yōu)先級路線級別,0是最低優(yōu)先級而9則是最
高。更特殊是。到4是正常優(yōu)先級變化幅度,而5到9是加快比先級變化幅度。舉例來說:
topicPublisher.publish(message?Deliver)fMode.PERSISTENT,8,10000)://Pub-Sub或
queueSender.send(message,DeliveryMode.PERSISTENT,8,10000);//P2P這個代碼片
斷,有兩種消息模型,映射遞送方式是持久,優(yōu)先級為加快型,生存周期是10000(以亳秒
度量)。如果生存周期設立為零,這則消息將永遠不會過期。當消息需要時間限制否則將使
其無效時,設立生存周期是有用。
CMS定義/五種不同消息正文格式,以及調用消息類型,容許你發(fā)送并接受以某些不同形
式數(shù)據(jù),提供既有消息格式某些級別兼容性。
?StreamMessage-Java原始值數(shù)據(jù)流
?MapMessage——套名稱-值對
?TextMessage-一種字符由對象
?ObjectMessage——種序列化Java對象
?BylcsMcssagc-一種未解釋字節(jié)數(shù)據(jù)流
CMS應用程序接口提供用于創(chuàng)立每種類型消息和設立荷轂辦法例如,為了在?種隊列
創(chuàng)立并發(fā)送一種TextMessage實例,你可以使用下列語句:
TexiMessagemessage=queueSession.creaieTextMessage();
incssagc.sctTcxt(tcxiMsg);
以異步方式接受消息,需要創(chuàng)立一種消息監(jiān)聽器然后注冊一種或各種使用
MessageConsunierCMSMessageListener接口。會話(主題或隊列)負貢產生某些消息,這些消
息被傳送到使用onMessage辦法監(jiān)聽者那里。
Usingnamespacecms;
classExampleLislener:publicMessageListener{
〃把消息強制轉化為TexiMessage格式
publicvoidonMessage(Messagemessage){
TextMessagetextMsg=null;
〃打開并解決這段消息
當咱們創(chuàng)立QueueReceiver和TopicSubscriber時,咱們傳遞消息選取器字符串:
//P2PQueueReceiver
QueueReceiverreceiver;
receiver=session.createReceiver(queue.selector):
//Pub-SubTopicSubscriber
TopicSubscribersubscriber;
subscriber=session.createSubscriber(topic?selector);
為了啟動消息交付,無論是Pub/Sub還是P2P,都需要調用start辦法。
TopicConnection.startO;//pub-sub
QueueConnection.start():"P2P
當一條消息被捕獲時,這條消息做為一條必要被強制轉化為恰當消息類型普通Message
對象到達。如TextMessage
voidonMcssagc(constMessage*message){
TextMessagctxtMsg=dynarnic_cast<TextMcssagc*>(message):
〃對txtMsg做某些解決
)
停止消息傳遞.無論是P而Nuh還是P2P,都調用頷甲辦法.
TopicConncction.stop();//pub-sub
QueueConnection.stop();//P2P
3.2接口描述
CMS支持兩種消息類型P2P和Pub/Sub,分別稱作:P2PDomain和Pub/SubDomain,這
兩種接口都繼承統(tǒng)一CMSParcni接口,CMS重耍接口如下所示:
CMSParent
ConnectionFactory
Connection
Destination
Session
MessageProducer
MessageConsumer
如下是對這些接口簡樸描述:
ConnectionFactory:連接二廠,CMS用它創(chuàng)立連接
Connection:CMS客戶端到CMSProvider連接
Destination:消息目地
Session:一種發(fā)送或接受消息線程
MessageProducer:由Sess沁n對象創(chuàng)立用來發(fā)送消息對象
MessageConsumer:由Session對象創(chuàng)立用來接受消息對象
3.3CMS消息模型
CMS消息由如下幾某些構成:消息頭,屬性,消息體。
消息頭(Header)?消息頭包括消息辨認信息和路由信息,消息頭包括某些原則屬性如:
CMSDcstination.CMSMcssagcID等。
消息頭由誰設立
CMSDestinationsend或publish辦法
CMSDeliverjModesend或publish辦法
CMSExpirationsend或publish辦法
CMSPrioritysend或publish辦法
CMSMessagcIDsend或publish辦法
CMSTimcstampsend或publish辦法
CMSCorrclationlD客戶
CMSRcplyTo客戶
CMSType客戶
CMSRedeliveredCMSProvider
屬性(Properties)-除了消息頭中定義好原則屬性外,CMS提供一種機制增長新屬性到
消息頭中,這種新屬性包括如下幾種:
1.應用需要用到屬性;
2.消息頭中原有某些可選屬性;
3.CMSProvider需耍用到屬性。
原則CMS消息頭包括如下屬性:
CMSDeslinalion--消息發(fā)送3地
CMSDeliveiyMode一傳遞模式,有兩種模式:PERSISTENT和NON-PERSISTENT,
PERSISTENT表達該消息一定要被送到目地,否則會導致應用錯誤。NON_PERSISTENT表
達偶爾丟失該消息是被容許,這兩種模式使開發(fā)者可以在消息傳遞可靠性和吞吐量之間找到
平衡點。
CMSMessagelD唯一辨認每個消息標記,F(xiàn)hCMSProvider產生。
CMSTimestamp一種消息被提交給CMSProvider到消息被發(fā)出時間。
CMSCorrelationlD用來連接到此外一種消息,典型應用是在回答消息中連接到原消息
CMSReplyTo提供本消息回答消息目地址。
CMSRedelivered如果一種客戶端收到一種設立了CMSRedelivered屬性消息,則表達也
許該客戶端曾經(jīng)在早些時候收到過該消息,但并沒有簽收Sckncwledged”
CMSType消息類型辨認符,
CMSExpiration消息過期時間,等于QucucScndcrsend辦法中timcToLivc值或
TopicPublisherpublish辦法中(imeToLive值加上發(fā)送時刻GMT時間值。如果limeToLive
值等于零,則CMSExpiration被設為零,表達該消息永但是期。如果發(fā)送后,在消息過期
時間之后消息還沒有被發(fā)送到目地,則該消息被清除。
CMSPriority消息優(yōu)先級,從0-9十個級別,0-4是普通消息,5-9是加急消息。CMS不
規(guī)定CMSProvider嚴格按照這卜個優(yōu)先級發(fā)送消息、,但必要保證加急消息耍先于普通消息
到達。
消息體(Body)-CMSAPI定義了4種消息體格式,也叫消息類型,你可以使用不同形式
發(fā)送接受數(shù)據(jù)并可以兼容既有消息格式,下面描述這4種類型;
消息類型消息體
TextMessagestring對象,如xml文獻內容
MapMessage名/值對集合,名是string對象,值類型可以是C++任何基本類型
BytesMessage字節(jié)流
ObjcctMcssagc對象類型
Message沒有消息體,只有消息頭和屬性。
下例演示創(chuàng)立并發(fā)送一,種TexiMessage到一種隊列:
TextMessagemessage=queueSession.createTextMessage();
message.setText(msg_text)://msg_textisaString
message.seiCMSiypeC'iexr');
qucucSender.scnd(message);
下例演示接受消息并轉換為適當消息類型:
Message*m=queueReceiver.receive();
If(m->getCMSType()==
TcxtMcssagctx(=dynamic_cast<TcxiMcssagc*>(m);
//dosomething
}else(
)
4消息生產者客戶端
消息生產者產生消息并將消息發(fā)送到broker上隊列或主題中。
要使消息生產者生產消息被消息消費者消費,必要滿足兩個條外:
生產者和消費者必要連接到同一種Broker,即BrokerUri中主機名和端口相似
生產者和消費者必要具備相似desiinaiion,即同一種隊列名或主題名
4.1使用activemq-cpp來創(chuàng)立消息生產者
4.1.1頭文獻及名字空間
#include<activemq/core/ActiveMQConnectionFactory.h>
#include<aclivemq/util/Config.h>
#include<cms/Connection.h>
#include<cms/Session.h>
#include<cms/TcxiMessage.h>
#includc<cms/ExccptionListcner.h>
#include<stdlib.h>
#include<iostream>
usingnamespaceactivemq;
usingnamespaceactivemq::core;
usingnamespacecms;
usingnamespaceski;
4.1.2創(chuàng)立一種生產者類
classSiiiiplcPioducci{
private:
Connection*connection:〃連接對象
Session*session:〃會話
Destination*destination:〃消息目地
Messageproducer*producer;〃消息生產者
booluscTopic;〃與否采用采用主題模式
boolclientAck;〃與否自動確認消息接受
unsignedintnumMessages:〃生產消息數(shù)
std::stringbrokerURl:〃連接borkeruri
std::stringdestURI:〃隊列或主題名
public:
〃構造函數(shù)
SimpleProducer(constst(J::string&brokerURI,
unsignedintmnnMcssagcs,
constsid::string&destURI,
booluseTopic=false,
boolclientAck=false){
mnnecficn=NIII.1
session=NULL;
destination=NULL;
producer=NULL;
this->numMessages=numMessages;
this->useTopic=useTopic;
this->brokerURI=brokerURI;
this->destURI=dcstlRI;
this->clicntAck=clicntAck;
initialize();
}
virtual-SimplcProduccr(){
clcanupO;
}
4.1.3初始化及銷毀
//初始化
private:
Virtualvoidinitialize(){
//創(chuàng)立連接工廠
ActiveMQConnectionFactory*connectionFactory=
newActiveMQConnectionFactory(brokerURI);
//創(chuàng)立一種到broker連接
connection=connecti(inF;K'tory->cre;iteC()nnection(>;
connection->start();
//關閉連接工廠
deleteconnectionFactory;
//創(chuàng)立一種會話
if(clientAck){〃消息接受后由消費者客戶端確認
session=connection->createSession(Session::CLIENT_ACKNOWLEDGE);
}else〃消息接受自動確認
session=conncction->crcatcScssion(Session::AUTO_ACKNOWLEDGE);
}
//創(chuàng)立一種隊列或主題(TopicorQueue)
if(useTopic){
destination=session->createTopic(destURl);
}else{
destination=session->crcateQucuc(destURl);
)
//創(chuàng)立生產者并設定消息傳送模式
producer=sessicn->createProducer(destination);
producer->setDeliveryMode(DeliveryMode::NON」,ERSISTENT);
}catch(CMSException&e){
e.prin(StackTrace();
//銷毀
voidcleanup。{
//Destroyresources.
try(
if(destination!=NULL)deletedestination;
}catch(CMSException&e){e.printStackTrace():}
destination=NULL;
try(
if(producer!=NULL)deleteproducer;
}catch(CMSExcep(ion&e){e.printStackTrace():}
producer=NULL;
//Closeopenresources.
try{
if(session!=NULL)session->closc();
if(connection!=NULL)connection->close();
}catch(CMSException&e){e.printStackTrace():}
try(
if(session!=NULL)deletesession;
}catch(CMSException&e){e.printStackTrace();}
session=NULL;
try{
if(connection!=NULL)deleteconnection;
}catch(CMSException&e){e.printStackTrace();)
connection=NULL;
4.1.4生產一種消息并發(fā)送到隊列中
public:
voidsend(){
//消息內容
stringtext=(stiing)"Helloworld!thread
for(std::size_tix=0;ix<numMessages;++ix){
//創(chuàng)立種文本類型消息
TextMessage*message=session->createTextMessage(text);
//發(fā)送消息
printf("Sentmessage#%d\n"?ix+1);
produccr->scnd(message);
//釋放消息
deletemessage;
4.1.5發(fā)送消息主程序
Intmain(void){
//brokeruri
std::stringbrokerURI=
"(cp://l27.0.0.1:61616"
"?wircFormat=opcnwirc"
"&transport.useAsyncSend=true"
//發(fā)送消息數(shù)
unsignedintnumMessages=;
//消息隊列名
std::s(ringdestURI="TEST.FOO";
//使用隊列模式
booluseTopics=false;
〃初始化一種消息生產者對象并發(fā)送消息
SimplcProducerproducer(brokerURLnumMessages,destURI,useTopics);
producer.send();
return0;
)
4.1.6總結
綜上例子可知,每次發(fā)送?種消息到消息隊列中都需要定義?種生產者類,并完畢?種機械
初始化及銷毀過程。為了提高軟件開發(fā)效率,可以模仿生產者類定義一種消息發(fā)送者類,封
裝有關細方,并編譯成共享庫以供使用,簡化編程過程。
4.2使用winkeemq-cpp來創(chuàng)立消息生產者
4.2.1頭文獻及名字空間
#include<McssageSender.h>
usingnamespacewinkeemq;
usingnamespacestd;
4.2.2發(fā)送消息主程序
intmain(intargc,char*argv[]){
//brokeruri
std::stringbrokerURI=
"tcp://192.168.1.179:61616"
"?wircFonnat=opcnwirc"
"&wireFormat.maxInactiviiyDuration=0"
"&soKeepAlive=true"
"&lianspoiL.uscAsyncSciid-l!ue";
//隊列名
stringmqName="mm.mq";
//創(chuàng)立一種消息發(fā)送對象(采用隊列模式,每次只發(fā)一種消息)
MessageSenderms(brokerURI,l,faise,mqName);
stringbody=',hellowoHd\n";
//創(chuàng)立一種文本消息
lexlMessage*msg=dynair.ic_cast<TextMessage*>
(ms.crca(cMcssag(McssagcScndcr::TEXT_MESSAGE));
//設定消息體內容
msg->sctTcx((body);
H發(fā)送消息
nis.sendMessage();
//銷毀消息
ms.dcIctcMessagcO;
}
4.2.3總結
由上述例子可看出,采用win<ccmq-cpp后裔碼量精簡了諸多,開發(fā)員不需要關懷那些機械
初始化細節(jié)。要創(chuàng)立一種消息生產者,只需要給定Brokeruri,隊列名,消息目模式,然后
調用MessageSendcrcrealeMessage。創(chuàng)立一種詳細類型消息,crealeMessage()參數(shù)是一種在
MessageSender中定義?種無名enum,指明消息類型。調用MessageSendersendMessage()
發(fā)送消息到broker中,最后銷毀消息
5消息消費者客戶端
消息消費者從Broker上隊列或主題中取出消息并做相應解決。
5.1使用activemq-cpp來創(chuàng)立消息消息者
5.1.1頭文獻及名字空間
#include<activemq/concurrent/Thread.h>
ttinclude<activemq/concurrcnt/Runnable.h>
#includc<activcmq/concurrcnt/Coun(DownLatch.h>
#include<activeniq/core/ActiveMQConnectionFactorj'.h>
#include<cms/Connection.h>
#include<cms/Session.h>
#include<cms/TextMessage.h>
#include<cms/ExceptionListener.h>
#includc<cms/McssagcListcncr.h>
#include<stdlib.h>
#include<iostream>
usingnamespaceactivemq;
usingnamespaceactiveniq::core;
usingnamespacecms;
usingniimcspaccski;
5.1.2創(chuàng)立一種生產者類
classSimpleAsyncConsumer:publicExceplionLislener,
publicMessagcListcncr{
private:
Connection*connection:〃連接對象
Session*session;〃會話
l^estinntion*destination:〃消息目地
MessageConsumer*consumer;:〃消息消費者
booluseTopic;〃與否采用采用主題模式
boolclientAck;〃與否自動確認消息接受
std::stringbrokerURI:〃連接borkeruri
std::stringdestURI:〃隊列或主題名
public:
〃構造函數(shù)
Siinp!cAsyncConsumer(conststd::string&brokerURI,
conststd::string&destURI.
booluseTopic=false,
boolclientAck=false){
connection=NULL;
session=NULL:
destination=NULL;
consumer=NULL;
(his->useTopic=useTopic;
this->brokerURI=brokerURI;
this->destURI=destURI;
this->clientAck=clientAck;
initialize();
)
virtual~SimpleAsyncConsumer(){
cleanupO,
)
5.1.3初始化及銷毀
private:
//初始化
virtualvoidinitialize(){
try{
//創(chuàng)立連接工J
ActiveMQConnectionFactory*connectionFactory=
newActiveMQConneclionFactory(brokerURI);
//創(chuàng)立一種到troker連接
connection=conncc(ionFac(ory->crcatcConncction();
connecdon->start();
//設立連接異常偵聽類
connection->setExceptionListener(this);
//關閉連接工廠
deleteconnectionFactory;
//創(chuàng)立一種會話
if(clientAck){〃消息接受后由消費者客戶端確認
session=conncction->crcatcScssion(Session::CLIENT_ACKNOWLEDGE);
}clse〃消息接受自動確認
session=co<inection->cieateSession(Session::AUTO_ACKNOWLEDGE);
)
//創(chuàng)立一種隊磯或主題(TopicorQueue)
if(useTopic){
destination=scssion->crcatcTopic(dcstURI);
}else{
destination=session->createQueue(destURI);
)
//創(chuàng)立消費者并設定消息接受偵聽類
consumer=sessi3n->crcateConsumer(destination);
consumcr->sctMessagcLis(cncr(this);
}catch(CMSException&e){
e.printStackTrace();
)
)
//銷毀
voidcleanup(){
//Destroyresources.
try{
if(destination!=NULL)deletedestination;
}catch(CMSException&e){e.printStackTrace():|
destination=NULL;
lry{
if(producer!=NULL)deleteproducer;
}catch(CMSException&e){c.printStackTraceO;!
producer=NULL;
//Closeopenresources.
try1
if(session!=NULL)session->close();
if(connection!=NULL)conncction->closc();
}catch(CMSException&e){c.printStackTraceO;}
try{
if(session!=NULL)deletesession;
}catch(CMSException&e){e.printStackTrace():}
session=NULL;
try{
if(connection!=NULL)deleteconnection;
}catch(CMSException&e){e.printStackTrace();}
connection=NULL:
)
5.1.4從消息隊列中異步接受消息
如果隊列中有消息到來,程序會自動調用onMessage函數(shù),因而只需要在onMessage()中編
寫對■消息解決。onMessage。函數(shù)中message參數(shù)在onMessage。返回后便會自動銷毀,可以
通過調用MessagecloneO辦法拷貝自身來擴展其生命周期,clone。返回是在椎上分派消息,
因而需要手動銷毀。
virtualvoidonMcssagc(constMessage*message){
staticintcount=0;
try
(
count++;
//強制轉換消息到實際所屬類型
constTcxtMcssagc*(cxtMcssagc=
dynamic_cas(<constTextMessage*>(message);
stringtext=
if(textMessage!=NULL){
〃得到消息體內容
text=textMessage->getText();
}else(
text="NOTATEXTMESSAGE!";
)
//確認已經(jīng)接受
if(clicntAck){
mcssagc->acknowlcdgc();
}
printf("Message#%dReceived:%s\n",count,text.c_str());
}catch(CMSException&e){
e.printStackTrace();
);
5.1.5接受消息主程序
Intmain(void){
//brokeruri
std::stringbrokerURI=
"tcp://l27.0.0.1:61616"
"?wireFomiat=openwire"
"&tiansport.useAsyncSend=tiue"
//消息隊列名
std::stringdestURI="TEST.FOO";
//使用隊列模式
booluseTopics=false;
〃初始化一種消息消費者對象并偵聽消息
SinipleAsyncConsumerconsumer(brokerURI,destURI,useTopics.clientAck);
//偵聽接受消息5秒鐘,如果有消息到來會自動執(zhí)行0nMessage()對消息解決
Thread::sleep(5()()0);
return0;
)
5.1.6總結
綜合上例及消息生產者例子可知國,兩者初始化及銷毀過程非常相似,因而,和消息生產者
同樣,也可以對消息消費者進行封裝并編譯成共享庫于提高開發(fā)效率。
5.2使用winkeemq-cpp來創(chuàng)立消息消息者
5.2.1頭文獻及名字空間
#include<MessageReceiver.h>
#include<MessageQueue.h>
#include<MessageParser.h>
usingnamespacewinkeemq;
usingnamespacesld;
5.2.2接受消息主程序
intmain(intargc,char*argv[]){
std::stringbrokerURI=
"tcp://192.168.10.41:61616"
"?wireFormat=openwire"
"&transport.uscAsyncScnd=truc,,
"&soKeepAlive=true"
"&wireFormat.maxInactivityDuiation=0";
//消息隊列名
std::stringdestNanie=",mm.mq";
//創(chuàng)立一種消息消費者并設立偵聽時間為30秒
MessageReceivermr(brokerURI,1,false,destName,1(XX)*30);
mr.waitUnitIReadyO;
〃偵聽消息,并把接受到消息存入到本地MessageQueue隊列中
mr.listcnO;
//創(chuàng)立一種消息解決對象
McssagcParscrinp:
whille(!MessagcQueue::instance()->empty()){
//從本地MessageQueue隊列取一,種消息并解決
mp.run();
)
//關閉本地隊列
McssagcQucuc::instancc()->shutdown();
//關閉消息接受者
mr.shutdown();
上例中有些核心代碼實現(xiàn)將在Winkeemq-cpp簡介章節(jié)中給出。
5.2.3總結
由上例可知,winkccmq-cpp對消息消費者創(chuàng)立銷毀流程遂行了封裝,劃分/模塊,使
整個過程更加清晰,將消息解決獨立成一種模塊,以便后來擴展。通過在本地創(chuàng)立一種
MessageQueue緩沖隊列,如只消息到來onMessage要做僅是將其插入到緩沖返回,人加速
了onMessage解決進度,提高程序效率。此外,采用本地緩沖為采用多線程編程提供了基本。
6消息類型
CMS支持TextMessage,MapMessage,ByteMessage,ObjectMessage四種消息類型,其都繼
承自cms::Message類型。
以上兒種消息類都是抽象類,不能直接實例化。在實際使用中普通以指針形式浮現(xiàn),指向由
調用Session類辦法創(chuàng)立詳細實類。
指向詳細消息類指針在消息發(fā)送時向上轉換成基類指針Message*,接受又將基類指針強制
向卜.轉換成詳細消息類型指針。如
發(fā)送時:
TextMessage*txt=sessior.createTextMessage();
txt->setText(4ihelloworld'n");
scndcr.send(txt)://send參數(shù)是Message*做向上轉換
接受時:
virtualvoidonMessage(constMessage*msg){
//強制向下轉換
TexiMessage*txt=dynamic_cast<consiTextMessage*>(msg);
//dosomething
I
6.1cms::Message
6.1.1概述
cms::Message是ems中所有消息抽象基類,包括三某些:消息頭,消息體,顧客定義屬性。
詳情參照
6.L2接口
6.1.2.1virtualcms::Message::-Message()[inline,virtual]
6.1.2.2virtualMessage*cms::Message::clone()const[pure
virtual]
Clonethismessageexactly,retumsanewinstancethatthecallerisrequiredtodelete.
Returns:
newcopyofthismessage
6.1.2.3virtualvoidcms::Message::acknowledge()constthrow
(CMSException)[purevirtual]
Acknowledgesallconsumedmessagesofthesessionofthisconsuncdmessage.
AllconsumedCMSmessagessupporttheacknowledgemethodforusewhenaclienthasspecified
thatitsCMSsession'sconsumedmessagesaretobeexplicitlyacknowledged.Byinvoking
acknowledgeonaconsumedmessage?aclientacknowledgesallmessagesconsumedbythe
session(ha((hemessagewasdeliveredto.
Callstoacknowledgeareignoredforbothtransactedsessionsandsessionsspecifiedtouse
implicitacknowledgementmodes.
Aclientmayindividuallyacknuwlcdgccituliu心、ageusilisuuiisumcdtuiilmayuliuuscLu
acknowledgemessagesasanapplication-definedgroup(whichisdonebycallingacknowledgeon
thelastreceivedmessageoftlicgroup,(herebyacknowledgingallmessagesconsumedbythe
session.)
Messagesthathavebeenreceivedbutnotacknowledgedmayberedelivered.
6.1.2.4virtualvoidcms::Message::clearBody()[purevirtual]
Clearsoutthebodyof(hemessage.
Thisdocsnotcleartheheadersorproperties.
6.1.2.5virtualvoidems:iMessage::clearProperties()[pure
virtual]
Clearsoutthemessagebody.
Clearingamessage'sbodydocsnotclearitsheadervaluesorpropertyentries.
Ifthismessagebodywasread-only,callingthismethodleavesthemessagebodyinthesamestate
asanemptybodyinanewlycreatedmessage.
6.126Virtualstd::vector<std::string>
cms::Message::getPropertyName()const[purevirtual]
Retrievestheproperynames.
Returns:
Thecompletesetofpropertynamescurrentlyinthismessage.
6.1.2.7virtualboolcms::Message::propertyExists(const
std::string&name)const[purevirtual]
Indicateswhetherornotagivenpropertyexists.
Parameters:
nameThenameofthepropertytolookup.
Retums:
Trueiftheproperlyexistsinthismessage.
6.1.2.8virtualboolcms::Message::getBooleanProperty(const
std::string&name)constthrow(CMSException)[pure
virtual]
Getsabooleanproperty.
Parameters:
nameThenameofthepropertytoretrieve.
Retunis:
Thevalueforthenamedproperty.
Exceptions:
CMSExccpiionifthepropertydocsnotexist.
6.1.2.9virtualunsignedcharcms::Message::getByteProperty
(conststd::string&name)constthrow(CMSException)
[purevirtual]
Getsabyteproperty.
Parameters:
nameThenameofthepropertytoretrieve.
Returns:
Thevalueforthenamedproperty.
Exceptions:
CMSExceptionifthepropertydoesnotexist.
6.1.2.10virtualdoublecms::Message::getDoubleProperty
(conststd::string&name)constthrow(CMSException)
[purevirtual]
Getsadoubleproperly.
Parameters:
nameThenameoftheproperly(oretrieve.
Returns:
Thevalueforthenamedproperty.
Exceptions:
CMSEAcepliwnifIliepiupeilyduesnutexist.
6.1.2.11virtualfloatcms::Message::getFloatProperty(const
std::string&name)constthrow(CMSException)[pure
virtual]
Getsafloatproperty.
Parameters:
nameThenameofthepropertytoretrieve.
Returns:
Thevalueforthenamedproperty.
Exceptions:
CMSExceptionifthepropertydoesnotexist.
6.1.2.12virtualintcms::Message::getlntProperty(const
std::string&name)constthrow(CMSException)[pure
virtual]
Getsaintproperty.
Parameters:
nameThenameofthepropertytoretrieve.
Returns:
Thevalueforthenamedproperty.
Exceptions;
CMSExccptionifthepropertydocsnotexist.
6.1.2.13virtuallonglongcms::Message::getLongProperty
(conststd::string&name)constthrow(CMSException)
[purevirtual]
Getsalo
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 炎癥性腸病精準診療中的個體化給藥方案
- 災難心理創(chuàng)傷的一級預防策略
- 護膚銷售培訓課件
- 農村消防安全教學方案
- 年中考生物學一輪專題復習教材梳理課件(廣東)3從細胞到生物體
- 第4單元口語交際 請你支持我
- 健身中心會員服務與管理手冊
- 酒店餐飲廚房設備維護手冊(標準版)
- 云的介紹教學課件
- 云技術與應用專業(yè)介紹
- 申論范文寶典
- 【一例擴張型心肌病合并心力衰竭患者的個案護理】5400字【論文】
- 四川橋梁工程系梁專項施工方案
- 貴州省納雍縣水東鄉(xiāng)水東鉬鎳礦采礦權評估報告
- GB.T19418-2003鋼的弧焊接頭 缺陷質量分級指南
- 污水管網(wǎng)監(jiān)理規(guī)劃
- GB/T 35273-2020信息安全技術個人信息安全規(guī)范
- 2023年杭州臨平環(huán)境科技有限公司招聘筆試題庫及答案解析
- 《看圖猜成語》課件
- LF爐機械設備安裝施工方案
- 企業(yè)三級安全生產標準化評定表(新版)
評論
0/150
提交評論