WebSocket 消息服务抽象和demo

This commit is contained in:
walker 2023-06-16 17:16:53 +08:00
parent 98a4ba7571
commit 5f6a36e933
7 changed files with 209 additions and 21 deletions

View File

@ -1,6 +1,7 @@
package club.joylink.xiannccda.configuration;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
@ -23,6 +24,7 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo
@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
@ -71,6 +73,8 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
}
} catch (AuthenticationException e) {
throw new MessagingException(String.valueOf(HttpStatus.UNAUTHORIZED.value()));
} catch (Throwable e) {
log.error("WebSocket Connect 认证逻辑异常", e);
}
}
return message;
@ -78,4 +82,5 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
});
}
}

View File

@ -0,0 +1,39 @@
package club.joylink.xiannccda.ws;
import java.util.Map;
/**
* 消息服务
*/
public interface IMessageServer {
/**
* 服务路径pattern
*
* @return
*/
String getDestinationPattern();
/**
* 订阅时发送给客户端的消息
*
* @return null为不发送
*/
Object onSubscription(String destination, Map<String, String> paramMap);
/**
* 发送消息间隔时间
*
* @return 0为不定时发送
* @throws IllegalArgumentException 小于0时
*/
int getInterval();
/**
* 定时发送的消息
*
* @return null为不发送
*/
Object onTick();
}

View File

@ -1,20 +0,0 @@
package club.joylink.xiannccda.ws;
import club.joylink.xiannccda.entity.User;
import java.time.LocalDateTime;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
@Controller
public class TestController {
@SubscribeMapping("/queue/test")
public User testSubscription() {
User user = new User();
user.setId(1);
user.setName("sheng");
user.setRegisterTime(LocalDateTime.now());
return user;
}
}

View File

@ -0,0 +1,47 @@
package club.joylink.xiannccda.ws;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Rtu;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Signal;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage.Builder;
import club.joylink.xiannccda.exception.BusinessExceptionAssertEnum;
import com.alibaba.fastjson2.JSON;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestMessageServer implements IMessageServer {
static final String LineIdPathKey = "lineId";
static final String PathPattern = String.format("/queue/line/{%s}", LineIdPathKey);
@Override
public String getDestinationPattern() {
return PathPattern;
}
@Override
public Object onSubscription(String destination, Map<String, String> paramMap) {
String lineId = paramMap.get(LineIdPathKey);
BusinessExceptionAssertEnum.ARGUMENT_ILLEGAL.assertNotNull(lineId);
log.info("线路lineId={}订阅,发布全量数据", lineId);
Builder builder = WsLineMessage.newBuilder();
builder.addRtu(Rtu.newBuilder().setId("rtu1").setIpRtuStusInCentralCtrl(true).build());
builder.addSignal(
Signal.newBuilder().setId("signal1").setRedOpen(true).setAutoMode(true).build());
byte[] bytes = builder.build().toByteArray();
System.out.println(JSON.toJSONString(bytes));
return bytes;
}
@Override
public int getInterval() {
return 0;
}
@Override
public Object onTick() {
return null;
}
}

View File

@ -0,0 +1,53 @@
package club.joylink.xiannccda.ws;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Rtu;
import club.joylink.xiannccda.dto.protos.DeviceStatusProto.Signal;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage;
import club.joylink.xiannccda.dto.protos.WsMessageProto.WsLineMessage.Builder;
import club.joylink.xiannccda.exception.BusinessExceptionAssertEnum;
import com.alibaba.fastjson2.JSON;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestMessageServer2 implements IMessageServer {
// static final String LineIdPathKey = "lineId";
// static final String PathPattern = String.format("/queue/line/{%s}", LineIdPathKey);
final String lineId;
final Object dataSource;
public TestMessageServer2(String lineId, Object dataSource) {
this.lineId = lineId;
this.dataSource = dataSource;
}
@Override
public String getDestinationPattern() {
return String.format("/queue/line/%s", this.lineId);
}
@Override
public Object onSubscription(String destination, Map<String, String> paramMap) {
log.info("线路lineId={}订阅,发布全量数据", lineId);
Builder builder = WsLineMessage.newBuilder();
builder.addRtu(Rtu.newBuilder().setId("rtu1").setIpRtuStusInCentralCtrl(true).build());
builder.addSignal(
Signal.newBuilder().setId("signal1").setRedOpen(true).setAutoMode(true).build());
byte[] bytes = builder.build().toByteArray();
System.out.println(JSON.toJSONString(bytes));
return bytes;
}
@Override
public int getInterval() {
return 0;
}
@Override
public Object onTick() {
return null;
}
}

View File

@ -0,0 +1,64 @@
package club.joylink.xiannccda.ws;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
@Component
@Slf4j
public class WsMessageServerManager {
private SimpMessagingTemplate smt;
public WsMessageServerManager(SimpMessagingTemplate smt) {
this.smt = smt;
this.registerMessageServer(new TestMessageServer());
this.registerMessageServer(new TestMessageServer2("1", "data source1"));
this.registerMessageServer(new TestMessageServer2("2", "data source2"));
}
static final Map<String, IMessageServer> messageServerMap = new ConcurrentHashMap<>();
static final AntPathMatcher MATCHER = new AntPathMatcher();
void registerMessageServer(IMessageServer messageServer) {
messageServerMap.put(messageServer.getDestinationPattern(), messageServer);
}
String getDestinationFromStompSubscribeMessage(Message<?> message) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
List<String> destination = accessor.getNativeHeader("destination");
if (destination == null || destination.isEmpty()) {
throw new RuntimeException("订阅消息中没有订阅路径头: destination");
}
return destination.get(0);
}
@EventListener
public void onSubscription(SessionSubscribeEvent subscribeEvent) {
String destination = getDestinationFromStompSubscribeMessage(subscribeEvent.getMessage());
log.info("订阅事件: destination={}, user={}", destination, Objects.requireNonNull(
subscribeEvent.getUser()).getName());
messageServerMap.forEach((dest, server) -> {
if (MATCHER.match(server.getDestinationPattern(), destination)) {
Object msg = server.onSubscription(destination,
MATCHER.extractUriTemplateVariables(server.getDestinationPattern(), destination));
if (msg != null) {
smt.convertAndSend(destination, msg);
}
}
});
}
}

@ -1 +1 @@
Subproject commit de6b01d1316835220c43fceba3402de3552bb96b
Subproject commit f0cdf851ebfe03ceb073e1c55706f3ccb49e73c1