Spring Boot & Stomp, random client disconnecting -
i have spring boot websocket server application , stomp client connected to. have 1 client connected server , needs connected (one week , more) session lifetime. notice random ws connection interrruption or "freeze". after each interruption need reset client , reconnect server manually. didn't notice physical internet interruption etc.
can on code , give me advices if: -anything wrong? -did miss , reasone of random interruptions?
websocket server
configuration
@configuration @enablewebsocketmessagebroker public class websocketconfig extends abstractwebsocketmessagebrokerconfigurer { public static final string ws_commands_main_queue = "/topic"; @override public void configuremessagebroker(messagebrokerregistry config) { config.enablesimplebroker(ws_commands_main_queue); config.setapplicationdestinationprefixes("/app"); } @override public void registerstompendpoints(stompendpointregistry registry) { registry.addendpoint("/ws-commands-broadcaster") .withsockjs(); } }
rest api sends requests ws client
@restcontroller @requestmapping(websocketcommandservice.ws_command_service) public class websocketcommandservice extends genericcontroller { public static final string ws_command_service = "ws-command-service"; public static final string ws_commands_queue = ws_commands_main_queue + "/ws-commands-queue"; private static final logger logger = logger.getlogger(websocketcommandservice.class); private simpmessagingtemplate wstemplate; private syncapimanager syncapimanager; @autowired public websocketcommandservice(simpmessagingtemplate wstemplate, syncapimanager syncapimanager) { this.wstemplate = wstemplate; this.syncapimanager = syncapimanager; } @getmapping(value = "/{houseid}/{commandtype}") @messagemapping(ws_command_service) public void broadcastnewcommand(@pathvariable integer houseid, @pathvariable commandtype commandtype) { logger.info("broadcasting command: " + commandtype.name() + " websocket clients houseid=" + houseid); commanddto newcommand = new commanddto(houseid, commandtype); syncapimanager.updatestartsynctime(houseid, commandtype); wstemplate.convertandsend(ws_commands_queue, jsonutil.tojson(newcommand)); } }
client configuration
@component public class websocketcommandlistener { private static final long ten_seconds = 10 * 1000; //10sec private static final logger logger = logger.getlogger(websocketcommandlistener.class); private static final string ws_endpoint_name = "ws-commands-broadcaster"; private static final string ws_commands_queue = "/topic/ws-commands-queue"; @value("${server.api.address}") private string serverapiaddress; @value("${server.api.suffix}") private string serverapisuffix; private commanddispatcher commanddispatcher; private websocketstompclient stompclient; private stompsession stompsession; private final taskscheduler defaultheartbeat; private final timer reconnecttimer; @autowired public websocketcommandlistener(commanddispatcher commanddispatcher) { this.commanddispatcher = commanddispatcher; this.defaultheartbeat = new threadpooltaskscheduler(); this.reconnecttimer = new timer("reconnecttimer"); } @postconstruct public void setupwebsockethandler() { createclient(); doconnect(); } private void createclient() { transport websockettransport = new websockettransport(new standardwebsocketclient()); list<transport> transports = collections.singletonlist(websockettransport); sockjsclient sockjsclient = new sockjsclient(transports); sockjsclient.setmessagecodec(new jackson2sockjsmessagecodec()); stompclient = new websocketstompclient(sockjsclient); } private void disconnect() { if (stompsession != null) { stompsession.disconnect(); stompsession = null; } if (stompclient != null) { stompclient.stop(); stompclient = null; } } private void reconnect() { disconnect(); createclient(); doconnect(); } private void doconnect() { final string wsuri = getwebsocketendpointaddr(); listenablefuture<stompsession> future = stompclient.connect(wsuri, new stompsessionhandleradapter() { @override public void handletransporterror(stompsession session, throwable exception) { if (exception instanceof connectionlostexception) { reconnect(); } } }); future.addcallback( newstompsession -> { stompsession = newstompsession; registercommandhandler(stompsession); }, (throwable throwable) -> { reconnectwebsocket(ten_seconds); } ); } private void registercommandhandler(stompsession stompsession) { stompsession.subscribe(ws_commands_queue, new stompframehandler() { public type getpayloadtype(stompheaders stompheaders) { return byte[].class; } public void handleframe(stompheaders stompheaders, object requestbody) { string commandasjson = new string((byte[]) requestbody); commanddto commandtype = jsonutil.fromjson(commandasjson, commanddto.class); logger.info("received websocket command: " + commandtype.tostring()); commanddispatcher.dispatch(commandtype); } }); } private void reconnectwebsocket(final long reconnectdelay) { reconnecttimer.schedule(new timertask() { @override public void run() { reconnect(); } }, reconnectdelay); } private string getwebsocketendpointaddr() { return string.format("ws://%s:%s/%s", serverapiaddress, serverapisuffix, ws_endpoint_name); } }
Comments
Post a Comment