武汉8号线列车位置支持订阅返回消息,用户名密码的支持

This commit is contained in:
tiger_zhou 2022-09-14 14:41:05 +08:00
parent 144f6fef4a
commit b863f2c87d
15 changed files with 282 additions and 34 deletions

2
sql/20220914-zhouyin.sql Normal file
View File

@ -0,0 +1,2 @@
-- 武汉8号线添加第三方账号
INSERT INTO `joylink`.`sys_account` ( `account`, `parent_account`, `type`, `org_id`, `name`, `nickname`, `avatar_path`, `password`, `mobile`, `nationcode`, `email`, `wx_id`, `wx_union_id`, `wm_open_id`, `status`, `roles`, `source`, `create_time`, `update_user_id`, `update_time`) VALUES ('yjddzh_train_place', NULL, '2', NULL, NULL, '武汉8号线_train_place', NULL, 'train_place', 'train_place', NULL, 'yjddzh@tests.com', NULL, NULL, NULL, '1', '01', NULL, '2022-09-14 10:00:14', NULL, NULL);

View File

@ -7,7 +7,11 @@ import club.joylink.rtss.simulation.cbtc.Simulation;
import club.joylink.rtss.simulation.cbtc.build.SimulationBuildParams;
import club.joylink.rtss.simulation.cbtc.data.map.Section;
import club.joylink.rtss.util.JsonUtils;
import club.joylink.rtss.vo.client.SocketMessageVO;
import club.joylink.rtss.vo.client.WebSocketMessageType;
import club.joylink.rtss.vo.client.factory.SocketMessageFactory;
import club.joylink.rtss.vo.ws.TrainPosition;
import club.joylink.rtss.websocket.StompMessageService;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
@ -28,6 +32,9 @@ public class TrainPositionService {
private NewAtsTrainLoadService newAtsTrainLoadService;
@Resource
private SimulationManager simulationManager;
@Resource
StompMessageService messageService;
/**
* 初始化或更新列车的位置
@ -44,6 +51,8 @@ public class TrainPositionService {
return;
}
boolean d = true;
SocketMessageVO<TrainPosition> vo = SocketMessageFactory.buildBasic(WebSocketMessageType.YJDDZH_TRAIN_POSITION,tp);
this.messageService.send(vo);
for (Simulation sim : simulationList) {
List<Section> sectionList = findDirectionAllSection(sim,tp.getRoadType());
Section section = this.findSecion(sectionList,tp);
@ -56,8 +65,12 @@ public class TrainPositionService {
log.error("未找到对应的区段 param:{}",json);
return;
}
this.newAtsTrainLoadService.loadTripNumberTrain(sim,tp,section);
}
// SocketMessageVO messageVO = new SocketMessageVO(WebSocketMessageType.YJDDZH_RETURN_MSG,);
// messageService.send();
}
/**

View File

@ -17,7 +17,8 @@ public enum SimulationSubscribeTopic {
WeChatMini("/topic/simulation/assistant/{id}"),
Ctc("/queue/simulation/{id}/ctc"), //大铁CTC
Room("/queue/room/{id}"),
Wgu3d("/topic/simulation/wgu3d/{id}")
Wgu3d("/topic/simulation/wgu3d/{id}"),
YJDDZH_TRAIN_POSITION("/topic/yjddzh/trainPosition")
;
private String destPattern;

View File

@ -117,7 +117,7 @@ public class UserSimulationStatsManager {
Set<SimulationUseInfo> infos = userAndUseInfosMap.get(info.getUserId());
infos.removeIf(i -> group.equals(i.getGroup()));
});
useInfos.forEach(info -> {
useInfos.stream().filter(d->Objects.nonNull(d.prdType)).forEach(info -> {
iUserSimulationStatService.addUserSimulationStats(info);
});
}

View File

@ -7,7 +7,10 @@ import lombok.Getter;
*/
@Getter
public enum WebSocketMessageType {
/**
* 应急调度指挥系统 返回前端的数据
*/
YJDDZH_TRAIN_POSITION,
/**
* 订单支付结果
*/

View File

@ -63,6 +63,10 @@ public class SocketMessageFactory {
private static List<String> getTopicsBy(WebSocketMessageType messageType, String group) {
List<String> topicList = new ArrayList<>();
switch (messageType) {
case YJDDZH_TRAIN_POSITION:{
topicList.add(SimulationSubscribeTopic.YJDDZH_TRAIN_POSITION.getDestPattern());
break;
}
case BROADCAST:
case Order_Pay_Result:
case JointTraining_Room_Invite:

View File

@ -43,4 +43,5 @@ public class TrainPosition {
public String toString(){
return JsonUtils.writeValueNullableFieldAsString(this);
}
}

View File

@ -12,6 +12,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import javax.annotation.Resource;
@ -36,7 +37,9 @@ public class WebSocketController {
}
@MessageMapping("/{mapName}/trainPosition")
// @SendTo("/topic/trainPosition")
public void mlbs( String json,@DestinationVariable String mapName) {
this.trainPositionService.initTrainOrUpdate(mapName,json);
// return json;
}
}

View File

@ -1,7 +1,12 @@
package club.joylink.rtss.websocket;
import club.joylink.rtss.services.LoginSessionManager;
import club.joylink.rtss.services.SysUserService;
import club.joylink.rtss.vo.AccountVO;
import club.joylink.rtss.vo.LoginUserInfoVO;
import club.joylink.rtss.websocket.interceptor.PresenceChannelInterceptor;
import club.joylink.rtss.websocket.interceptor.SessionAuthHandshakeInterceptor;
import club.joylink.rtss.websocket.interceptor.VSimpleInterceptor;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@ -15,6 +20,7 @@ import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import javax.annotation.Resource;
import java.security.Principal;
import java.util.Map;
import java.util.Objects;
@ -25,7 +31,10 @@ public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private LoginSessionManager loginSessionManager;
@Resource
private SysUserService sysUserService;
@Resource
private PresenceChannelInterceptor presenceChannelInterceptor;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
@ -63,6 +72,16 @@ public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
})
.setAllowedOrigins("*");
registry.addEndpoint("/joylink-tbi-websocket")
.addInterceptors(new VSimpleInterceptor(this.sysUserService))
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
return new TrainPositionPrincipal((AccountVO) attributes.get(VSimpleInterceptor.ATT_USER_KEY));
}
})
.setAllowedOrigins("*");
}
@ -73,6 +92,7 @@ public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(2) //设置消息输入通道的线程池线程数
.maxPoolSize(2); //最大线程数
registration.interceptors(presenceChannelInterceptor);
}
/**
@ -107,4 +127,20 @@ public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
}
}
@Getter
public class TrainPositionPrincipal implements Principal {
private AccountVO user;
public TrainPositionPrincipal(AccountVO user) {
Objects.requireNonNull(user, "用户不能为null");
this.user = user;
}
@Override
public String getName() {
return this.user.getMobile();
}
}
}

View File

@ -34,7 +34,7 @@ public class StompClientManager {
WebSocketStompClient stompClient = new WebSocketStompClient(socketClient);
SimulationSessionHandler handler = new SimulationSessionHandler();
ListenableFuture<StompSession> future = stompClient
.connect("ws://127.0.0.1:9000/joylink-tbi-websocket",handler, "null");
.connect("ws://127.0.0.1:9000/joylink-tbi-websocket?user_account=train_place&account_pwd=train_place",handler, "null");
StompSession stompSession = future.get();
LogTest lt = new LogTest();
List<TrainPosition> list = lt.readFile("d:\\spring.log2");

View File

@ -0,0 +1,59 @@
package club.joylink.rtss.websocket.client;
import club.joylink.rtss.util.JsonUtils;
import club.joylink.rtss.util.test.LogTest;
import club.joylink.rtss.vo.ws.TrainPosition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class StompClientManager2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
StandardWebSocketClient socketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(socketClient);
SimulationSessionHandler handler = new SimulationSessionHandler();
ListenableFuture<StompSession> future = stompClient
.connect("ws://127.0.0.1:9000/joylink-websocket?token=6a8dbf62d3683844a6aa722b73b989f1",handler, "null");
StompSession stompSession = future.get();
stompSession.subscribe("/topic/yjddzh/trainPosition", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
@Override
public void handleFrame(StompHeaders stompHeaders, Object o) {
String recv=null;
try {
recv = new String((byte[]) o,"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(recv);
}
});
stompSession.setAutoReceipt(true);
TimeUnit.SECONDS.sleep(100);
}
}

View File

@ -0,0 +1,46 @@
package club.joylink.rtss.websocket.interceptor;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.HashMap;
import java.util.Map;
public abstract class BaseInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
return false;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
public Map<String,String> findQueryParams( ServerHttpRequest request){
String query = request.getURI().getQuery();
Map<String, String> map = buildQueryMap(query);
return map;
}
private Map<String, String> buildQueryMap(String query) {
Map<String, String> map = new HashMap<>();
if (StringUtils.hasText(query)) {
String[] splits = query.split("&");
for (int i = 0; i < splits.length; ++i) {
String temp = splits[i];
int index = temp.indexOf("=");
if (index >= 0) {
String key = temp.substring(0, index);
String val = temp.substring(index + 1);
map.put(key, val);
} else {
map.put(temp, "");
}
}
}
return map;
}
}

View File

@ -0,0 +1,58 @@
package club.joylink.rtss.websocket.interceptor;
import club.joylink.rtss.websocket.WebsocketConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
import java.security.Principal;
import java.util.Objects;
@Slf4j
@Component
public class PresenceChannelInterceptor implements ChannelInterceptor {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);
if(Objects.isNull(sha.getCommand())) {
return;
}
Principal principal = sha.getUser();
log.info(" STOMP " + sha.getCommand());
String wsSessionId = sha.getSessionId();
String dest = sha.getDestination();
switch(sha.getCommand()) {
// case SUBSCRIBE:
// case UNSUBSCRIBE:
case DISCONNECT:
this.unregister(wsSessionId,dest,principal);
break;
case CONNECT:
case CONNECTED:
this.register(wsSessionId,dest,principal);
default:
break;
}
}
private void register(String wsSessionId, String destination, Principal principal) {
if(Objects.equals(false,principal instanceof WebsocketConfig.TrainPositionPrincipal)){
return;
}
log.info("武汉8号线 创建stmop链接name:{}",principal.getName());
}
private void unregister(String wsSessionId, String destination, Principal principal) {
if(Objects.equals(false,principal instanceof WebsocketConfig.TrainPositionPrincipal)){
return;
}
log.info("武汉8号线 关闭stmop链接name:{}",principal.getName());
}
}

View File

@ -1,4 +1,4 @@
package club.joylink.rtss.websocket;
package club.joylink.rtss.websocket.interceptor;
import club.joylink.rtss.exception.BusinessExceptionAssertEnum;
import club.joylink.rtss.services.LoginSessionManager;
@ -15,7 +15,7 @@ import java.util.HashMap;
import java.util.Map;
@Slf4j
public class SessionAuthHandshakeInterceptor implements HandshakeInterceptor {
public class SessionAuthHandshakeInterceptor extends BaseInterceptor {
private static final String Token_Key = "token";
public static final String ATTR_USER_KEY = "user";
private LoginSessionManager loginSessionManager;
@ -39,9 +39,9 @@ public class SessionAuthHandshakeInterceptor implements HandshakeInterceptor {
}
private LoginUserInfoVO getLoginUser(ServerHttpRequest request) {
String query = request.getURI().getQuery();
Map<String, String> map = buildQueryMap(query);
log.info(String.format("webSocket handshake query: [%s]", query));
// String query = request.getURI().getQuery();
Map<String, String> map = this.findQueryParams(request);
log.info(String.format("webSocket handshake query: [%s]", map));
BusinessExceptionAssertEnum.NOT_LOGIN.assertTrue(StringUtils.hasText(map.get(Token_Key)));
return this.loginSessionManager.getLoginInfoByToken(map.get(Token_Key));
// AccountVO accountVO = loginInfoVO.getAccountVO();
@ -49,28 +49,4 @@ public class SessionAuthHandshakeInterceptor implements HandshakeInterceptor {
// return accountVO;
}
private Map<String, String> buildQueryMap(String query) {
Map<String, String> map = new HashMap<>();
if (StringUtils.hasText(query)) {
String[] splits = query.split("&");
for (int i = 0; i < splits.length; ++i) {
String temp = splits[i];
int index = temp.indexOf("=");
if (index >= 0) {
String key = temp.substring(0, index);
String val = temp.substring(index + 1);
map.put(key, val);
} else {
map.put(temp, "");
}
}
}
return map;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
}

View File

@ -0,0 +1,46 @@
package club.joylink.rtss.websocket.interceptor;
import club.joylink.rtss.constants.Project;
import club.joylink.rtss.services.SysUserService;
import club.joylink.rtss.vo.AccountVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import java.util.Map;
import java.util.Objects;
@Slf4j
public class VSimpleInterceptor extends BaseInterceptor {
private SysUserService sysUserService;
public VSimpleInterceptor(SysUserService sysUserService) {
this.sysUserService = sysUserService;
}
public final static String ATT_USER_KEY = "userVo";
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
Map<String, String> map = this.findQueryParams(request);
log.info(String.format("webSocket handshake query: [%s]", map));
try{
AccountVO vo = this.checkUserInfo(map);
if(Objects.isNull(vo)){
return false;
}
attributes.put(ATT_USER_KEY,vo);
return true;
}catch (Exception e){
return false;
}
}
private final static String USER_ACCOUNT = "user_account";
private final static String ACCOUNT_PWD = "account_pwd";
private AccountVO checkUserInfo(Map<String,String> dataMap){
AccountVO vo = this.sysUserService.findUserByAccountAndPassword(dataMap.get(USER_ACCOUNT),dataMap.get(ACCOUNT_PWD), Project.DEFAULT);
return vo;
}
}