版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領
文檔簡介
第SpringbootWebsocketStomp消息訂閱推送
pom文件
直接引入spring-boot-starter-websocket即可。
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-websocket/artifactId
/dependency
聲明websocketendpoint
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.web.socket.server.standard.ServerEndpointExporter;
*@ClassNameWebSocketConfig
*@Authorscott
*@Date2025/6/16
*@VersionV1.0
@Configuration
publicclassWebSocketConfig{
*注入一個ServerEndpointExporter,該Bean會自動注冊使用@ServerEndpoint注解申明的websocketendpoint
@Bean
publicServerEndpointExporterserverEndpointExporter(){
returnnewServerEndpointExporter();
websocket實現(xiàn)類,其中通過注解監(jiān)聽了各種事件,實現(xiàn)了推送消息等相關邏輯
importmon.cache.Cache;
importmon.cache.CacheBuilder;
importmon.core.domain.AjaxResult;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.stereotype.Component;
importjavax.websocket.*;
importjavax.websocket.server.PathParam;
importjavax.websocket.server.ServerEndpoint;
importjava.util.Objects;
importjava.util.Set;
importjava.util.concurrent.ConcurrentMap;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicInteger;
*@ClassName:DataTypePushWebSocket
*@Author:scott
*@Date:2025/6/16
@ServerEndpoint(value="/ws/dataType/push/{token}")
@Component
publicclassDataTypePushWebSocket{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(DataTypePushWebSocket.class);
*記錄當前在線連接數(shù)
privatestaticAtomicIntegeronlineCount=newAtomicInteger(0);
privatestaticCacheString,SessionSESSION_CACHE=CacheBuilder.newBuilder()
.initialCapacity(10)
.maximumSize(300)
.expireAfterWrite(10,TimeUnit.MINUTES)
.build();
*連接建立成功調(diào)用的方法
@OnOpen
publicvoidonOpen(Sessionsession,@PathParam("token")Stringtoken){
StringsessionId=session.getId();
onlineCount.incrementAndGet();//在線數(shù)加1
this.sendMessage("sessionId:"+sessionId+",已經(jīng)和server建立連接",session);
SESSION_CACHE.put(sessionId,session);
("有新連接加入:{},當前在線連接數(shù)為:{}",session.getId(),onlineCount.get());
*連接關閉調(diào)用的方法
@OnClose
publicvoidonClose(Sessionsession,@PathParam("token")Stringtoken){
onlineCount.decrementAndGet();//在線數(shù)減1
SESSION_CACHE.invalidate(session.getId());
("有一連接關閉:{},當前在線連接數(shù)為:{}",session.getId(),onlineCount.get());
*收到客戶端消息后調(diào)用的方法
*@parammessage客戶端發(fā)送過來的消息
@OnMessage
publicvoidonMessage(Stringmessage,Sessionsession,@PathParam("token")Stringtoken){
("服務端收到客戶端[{}]的消息:{}",session.getId(),message);
this.sendMessage("服務端已收到推送消息:"+message,session);
@OnError
publicvoidonError(Sessionsession,Throwableerror){
log.error("發(fā)生錯誤");
error.printStackTrace();
*服務端發(fā)送消息給客戶端
privatestaticvoidsendMessage(Stringmessage,SessiontoSession){
try{
("服務端給客戶端[{}]發(fā)送消息{}",toSession.getId(),message);
toSession.getBasicRemote().sendText(message);
}catch(Exceptione){
log.error("服務端發(fā)送消息給客戶端失敗:{}",e);
publicstaticAjaxResultsendMessage(Stringmessage,StringsessionId){
Sessionsession=SESSION_CACHE.getIfPresent(sessionId);
if(Objects.isNull(session)){
returnAjaxResult.error("token已失效");
sendMessage(message,session);
returnAjaxResult.success();
publicstaticAjaxResultsendBroadcast(Stringmessage){
longsize=SESSION_CACHE.size();
if(size=0){
returnAjaxResult.error("當前沒有在線客戶端,無法推送消息");
ConcurrentMapString,SessionsessionConcurrentMap=SESSION_CACHE.asMap();
SetStringkeys=sessionConcurrentMap.keySet();
for(Stringkey:keys){
Sessionsession=SESSION_CACHE.getIfPresent(key);
DataTypePushWebSocket.sendMessage(message,session);
returnAjaxResult.success();
至此websocket服務端代碼已經(jīng)完成。
stomp協(xié)議
前端代碼.這個是在某個vue工程中寫的js,各位大佬自己動手改改即可。其中Settings.wsPath是后端定義的ws地址例如ws://localhost:9003/ws
importStompfrom'stompjs'
importSettingsfrom'@/settings.js'
exportdefault{
//是否啟用日志默認啟用
debug:true,
//客戶端連接信息
stompClient:{},
//初始化
init(callBack){
this.stompClient=Stomp.client(Settings.wsPath)
this.stompClient.hasDebug=this.debug
this.stompClient.connect({},suce={
this.console("連接成功,信息如下↓")
this.console(this.stompClient)
if(callBack){
callBack()
},err={
if(err){
this.console("連接失敗,信息如下↓")
this.console(err)
//訂閱
sub(address,callBack){
if(!this.stompClient.connected){
this.console("沒有連接,無法訂閱")
return
//生成id
lettimestamp=newDate().getTime()+address
this.console("訂閱成功-"+address)
this.stompClient.subscribe(address,message={
this.console(address+"訂閱消息通知,信息如下↓")
this.console(message)
letdata=message.body
callBack(data)
id:timestamp
unSub(address){
if(!this.stompClient.connected){
this.console("沒有連接,無法取消訂閱-"+address)
return
letid=""
for(letiteminthis.stompClient.subscriptions){
if(item.endsWith(address)){
id=item
break
this.stompClient.unsubscribe(id)
this.console("取消訂閱成功-id:"+id+"address:"+address)
//斷開連接
disconnect(callBack){
if(!this.stompClient.connected){
this.console("沒有連接,無法斷開連接")
return
this.stompClient.disconnect(()={
console.log("斷開成功")
if(callBack){
callBack()
//單位秒
reconnect(time){
setInterval(()={
if(!this.stompClient.connected){
this.console("重新連接中...")
this.init()
},time*1000)
console(msg){
if(this.debug){
console.log(msg)
//向訂閱發(fā)送消息
send(address,msg){
this.stompClient.send(address,{},msg)
后端stompconfig,里面都有注釋,寫的很詳細,并且我加入了和前端的心跳pingpong。
package.scott.config;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.messaging.simp.config.MessageBrokerRegistry;
importorg.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
importorg.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
importorg.springframework.web.socket.config.annotation.StompEndpointRegistry;
importorg.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
*@ClassName:WebSocketStompConfig
*@Author:scott
*@Date:2025/7/8
@Configuration
@EnableWebSocketMessageBroker
publicclassWebSocketStompConfigimplementsWebSocketMessageBrokerConfigurer{
privatestaticlongHEART_BEAT=10000;
@Override
publicvoidregisterStompEndpoints(StompEndpointRegistryregistry){
//允許使用socketJs方式訪問,訪問點為webSocket,允許跨域
//在網(wǎng)頁上我們就可以通過這個鏈接
//ws://:port/ws來和服務器的WebSocket連接
registry.addEndpoint("/ws").setAllowedOrigins("*");
@Override
publicvoidconfigureMessageBroker(MessageBrokerRegistryregistry){
ThreadPoolTaskSchedulerte=newThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread-");
te.initialize();
//基于內(nèi)存的STOMP消息代理來代替mq的消息代理
//訂閱Broker名稱,/user代表點對點即發(fā)指定用戶,/topic代表發(fā)布廣播即群發(fā)
//setHeartbeatValue設置心跳及心跳時間
registry.enableSimpleBroker("/user","/topic").setHeartbeatValue(newlong[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
//點對點使用的訂閱前綴,不設置的話,默認也是/user/
registry.setUserDestinationPrefix("/user/");
后端stomp協(xié)議接受、訂閱等動作通知
package.scott.ws;
importcom.alibaba.fastjson.JSON;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.messaging.handler.annotation.DestinationVariable;
importorg.springframework.messaging.handler.annot
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 大秦醫(yī)院面試題及答案
- C語言基礎選擇測試題含多知識點考察及答案
- 感控護士院感防控知識試題及答案
- 新疆成人考試真題及答案
- 成都三基試題題庫附答案
- 市事業(yè)單位招聘考試公共基礎知識試題題庫附答案詳解
- 輸血三基考試試題及答案
- 三級醫(yī)院護士招聘面試題含答案
- 嵌入式開發(fā)面試題及答案
- 河南專升本試題及答案
- 起重設備安全使用指導方案
- 江蘇省揚州市區(qū)2025-2026學年五年級上學期數(shù)學期末試題一(有答案)
- 干部履歷表(中共中央組織部2015年制)
- GB/T 5657-2013離心泵技術條件(Ⅲ類)
- GB/T 3518-2008鱗片石墨
- GB/T 17622-2008帶電作業(yè)用絕緣手套
- GB/T 1041-2008塑料壓縮性能的測定
- 400份食物頻率調(diào)查問卷F表
- 滑坡地質災害治理施工
- 實驗動物從業(yè)人員上崗證考試題庫(含近年真題、典型題)
- 可口可樂-供應鏈管理
評論
0/150
提交評論