仿真结构抽象(操作分发调整,用户订阅/取消订阅逻辑框架构建,添加固定频率任务接口)

This commit is contained in:
walker-sheng 2021-03-03 16:56:32 +08:00
parent fe2892dad0
commit 8876292193
10 changed files with 202 additions and 77 deletions

View File

@ -45,6 +45,7 @@ public enum BusinessExceptionAssertEnum implements BusinessExceptionAssert {
// 仿真
SIMULATION_NOT_EXIST(30001, "simulation not exist"),
SIMULATION_OPERATION_FAILED(30002, "simulation operation failed"),
//
LOGIN_INFO_ERROR(40003, "login info error"),

View File

@ -1,12 +1,11 @@
package club.joylink.rtss.simulation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.PropertyPlaceholderHelper;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -22,6 +21,8 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
* 运行速度
*/
private volatile int speed;
public static final int MIN_SPEED = 1;
public static final int MAX_SPEED = 10;
/**
* 仿真系统时间
*/
@ -63,7 +64,9 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
public static final String MESSAGE_SUB_PREFIX = "/queue/simulation/{id}";
public static final String PATH_SEPARATOR = "/";
private SimulationSubscribePathChecker subscribePathChecker;
private Map<String, String> destinationMap = new HashMap<>();
private static final PropertyPlaceholderHelper placeholderHelper = new PropertyPlaceholderHelper("{", "}");
private List<SimulationSubscribeMessageService> subscribeMessageServiceList;
private SimulationMessagePublisher publisher;
private Map<String, M> simulationMemberMap = new ConcurrentHashMap<>();
@ -174,12 +177,29 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
this.scheduledJobMap.put(name, scheduledJob);
}
/**
* 添加固定频率运行的逻辑可能主要是消息推送类逻辑
* @param name 任务名称同时也是id
* @param job 仿真逻辑
* @param rate 执行频率单位ms
*/
public void addFixedRateJob(String name, SimulationJob job, int rate) {
if (null == name) {
throw new IllegalArgumentException("name can't be null");
}
if (this.scheduledJobMap.containsKey(name)) {
throw new IllegalArgumentException(String.format("job name [%s] already exist", name));
}
SimulationScheduledJob scheduledJob = new SimulationScheduledJob(this, name, job, rate, true);
this.scheduledJobMap.put(name, scheduledJob);
}
/**
* 延时执行逻辑只执行一次
* @param job
* @param delay
*/
public void delayExecute(SimulationJob job, long delay) {
public void addDelayExecuteJob(SimulationJob job, long delay) {
SimulationDelayJob delayJob = new SimulationDelayJob(this, job, delay);
this.delayJobMap.put(delayJob.getId(), delayJob);
}
@ -232,25 +252,24 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
/**
* 更新仿真速度
* @param speed [1,8] 1<=speed<=8
* @param speed Simulation.MIN_SPEED <= speed <= Simulation.MAX_SPEED
*/
public void updateSpeed(int speed) {
if (speed < 1) {
throw new IllegalArgumentException("speed must big or equal than 1");
if (speed < MIN_SPEED) {
throw new IllegalArgumentException(String.format("speed must big or equal than [%s]", MIN_SPEED));
}
if (speed > 8) {
throw new IllegalArgumentException("speed must small or equal than 8");
if (speed > MAX_SPEED) {
throw new IllegalArgumentException(String.format("speed must small or equal than [%s]", MAX_SPEED));
}
if (this.speed == speed) { // 速度与当前相同返回
return;
if (this.speed != speed) { // 速度不同重新更改并重新按新速度运行
this.pause();
this.speed = speed;
this.runAsSpeed(speed);
for (SimulationScheduledJob job : this.scheduledJobMap.values()) {
job.updateRunPeriod(speed);
}
this.start();
}
this.pause();
this.speed = speed;
this.runAsSpeed(speed);
for (SimulationScheduledJob job : this.scheduledJobMap.values()) {
job.updateRunPeriod(speed);
}
this.start();
}
public void runError(Throwable throwable) {
@ -300,9 +319,13 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
return this.simulationMemberMap.remove(id);
}
public U querySimulationUserById(String id) {
Objects.requireNonNull(id, "id 不能为空");
return this.simulationUserMap.get(id);
public U querySimulationUserById(String uid) {
Objects.requireNonNull(uid, "uid 不能为空");
U user = this.simulationUserMap.get(uid);
if (user == null) {
log.info(String.format("仿真[%s]下id为[%s]的仿真用户不存在", this.id, uid));
}
return user;
}
public U getSimulationUserById(String id) {
@ -330,9 +353,9 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
}
public void publishMessage(String destination, Object message) {
if (this.publisher == null) {
throw new UnsupportedOperationException(String.format("仿真[%s]没有消息发布对象:publisher = null", this.id));
}
this.checkPublisherExist();
Objects.requireNonNull(destination);
destination = this.handleDestination(destination);
for (U simulationUser : this.simulationUserMap.values()) {
if (simulationUser.isSubscribe(destination)) {
this.publisher.publishToUser(simulationUser.getId(), destination, message);
@ -340,9 +363,62 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
}
}
public void publishMessageToUser(String userId, String destination, Object message) {
this.checkPublisherExist();
Objects.requireNonNull(destination);
destination = this.handleDestination(destination);
if (userId != null && message != null) {
this.publisher.publishToUser(userId, destination, message);
}
}
private void checkPublisherExist() {
if (this.publisher == null) {
throw new UnsupportedOperationException(String.format("仿真[%s]没有消息发布对象:publisher = null.无法发布消息", this.id));
}
}
public abstract String debugStr();
public void setSubscribeMessageServiceList(List<SimulationSubscribeMessageService> list) {
this.subscribeMessageServiceList = list;
}
public void addSubscribeMessageService(SimulationSubscribeMessageService subscribeMessageService) {
if (this.subscribeMessageServiceList == null) {
this.subscribeMessageServiceList = new ArrayList<>();
}
this.subscribeMessageServiceList.add(subscribeMessageService);
}
public String handleDestination(String destinationPattern) {
String destination = this.destinationMap.get(destinationPattern);
if (destination != null) {
return destination;
}
Properties properties = new Properties();
properties.put("id", this.id);
String dest = placeholderHelper.replacePlaceholders(destinationPattern, properties);
this.destinationMap.put(destinationPattern, dest);
return dest;
}
public boolean isAcceptedSubscribe(String destination) {
if (this.subscribeMessageServiceList != null) {
for (SimulationSubscribeMessageService service : this.subscribeMessageServiceList) {
if (service.acceptedSubscribePath(destination)) {
return true;
}
}
}
return false;
}
public static String tryExtractSidFromDestination(String destination) {
Objects.requireNonNull(destination);
if (!destination.startsWith(MESSAGE_SUB_PREFIX.substring(0, MESSAGE_SUB_PREFIX.length() - 5))) {
return null;
}
String[] patterns = StringUtils.tokenizeToStringArray(MESSAGE_SUB_PREFIX, PATH_SEPARATOR);
String[] dests = StringUtils.tokenizeToStringArray(destination, PATH_SEPARATOR);
for (int i = 0; i < patterns.length; i++) {
@ -356,4 +432,33 @@ public abstract class Simulation<U extends SimulationUser, M extends Simulation
}
return null;
}
public boolean handleUserSubscribe(String userId, String wsSessionId, String destination) {
if (!this.isAcceptedSubscribe(destination)) {
log.warn(String.format("仿真[%s]不支持的订阅: [%s]", this.id, destination));
}
U user = this.querySimulationUserById(userId);
if (user == null) {
return false;
}
user.subscribe(wsSessionId, destination);
if (this.subscribeMessageServiceList != null) {
for (SimulationSubscribeMessageService service : this.subscribeMessageServiceList) {
if (service.acceptedSubscribePath(destination)) {
Object msg = service.buildMessageOfSubscribe(destination);
if (msg != null) {
this.publishMessageToUser(userId, destination, msg);
}
}
}
}
return true;
}
public void handleUserUnsubscribe(String userId, String wsSessionId, String destination) {
U user = this.querySimulationUserById(userId);
if (user != null) {
user.unsubscribe(wsSessionId, destination);
}
}
}

View File

@ -1,17 +1,21 @@
package club.joylink.rtss.simulation;
import io.swagger.annotations.Api;
import club.joylink.rtss.simulation.operation.SimulationOperationDispatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@Api(tags = {"仿真公共控制接口"})
/**
* 仿真通用接口
*/
@RestController
@RequestMapping("/common/simulation")
public class SimulationCommonController {
@Autowired
private SimulationManager simulationManager;
@Autowired
private SimulationOperationDispatcher simulationOperationDispatcher;
@PutMapping("/{id}/pause")
public void pause(@PathVariable String id) {
@ -29,9 +33,9 @@ public class SimulationCommonController {
}
@PostMapping("/{id}/member/{memberId}/operate/{type}")
public void operate(@PathVariable String id, @PathVariable String memberId,
public Object operate(@PathVariable String id, @PathVariable String memberId,
@PathVariable String type, @RequestBody Map<String, Object> params) {
return this.simulationOperationDispatcher.doDispatch(id, memberId, type, params);
}
@PutMapping("/{id}/member/{memberId}/playby/{userId}")

View File

@ -42,11 +42,7 @@ public final class SimulationDelayJob implements Runnable {
@Override
public void run() {
try {
this.job.run();
} catch (Throwable e) {
log.error("仿真延时任务执行异常", e);
}
this.job.run();
}
public boolean isTimeToRun(LocalDateTime systemTime) {

View File

@ -2,5 +2,5 @@ package club.joylink.rtss.simulation;
public interface SimulationJob {
void run() throws InterruptedException;
void run();
}

View File

@ -16,14 +16,18 @@ public final class SimulationScheduledJob implements Runnable {
/**
* 原始频率单位ms
*/
private int rate;
private final int rate;
/**
* 是否固定频率
*/
private final boolean fixed;
/**
* 实际运行间隔单位ns
*/
long runPeriod;
LocalDateTime nextRunTime;
public SimulationScheduledJob(Simulation simulation, String name, SimulationJob job, int rate) {
public SimulationScheduledJob(Simulation simulation, String name, SimulationJob job, int rate, boolean fixed) {
if (null == job) {
throw new IllegalArgumentException("job must not be null");
}
@ -34,28 +38,32 @@ public final class SimulationScheduledJob implements Runnable {
this.name = name;
this.job = job;
this.rate = rate;
this.fixed = fixed;
this.updateRunPeriod(1);
}
public SimulationScheduledJob(Simulation simulation, String name, SimulationJob job, int rate) {
this(simulation, name, job, rate, false);
}
public static final int TIMEOUT = 10;
@Override
public void run() {
try {
long start = System.nanoTime();
this.job.run();
long used = System.nanoTime() - start;
if (used > TimeUnit.MILLISECONDS.toNanos(TIMEOUT)) {
log.warn(String.format("仿真任务[%s]执行耗时[%sns]超过[%sms],请检查并调优",
this.name, TimeUnit.NANOSECONDS.toMillis(used), TIMEOUT));
}
} catch (Throwable e) {
simulation.runError(e);
log.error(String.format("仿真任务[%s]执行异常,仿真停止运行", this.name), e);
long start = System.nanoTime();
this.job.run();
long used = System.nanoTime() - start;
if (used > TimeUnit.MILLISECONDS.toNanos(TIMEOUT)) {
log.warn(String.format("仿真任务[%s]执行耗时[%sns]超过[%sms],请检查并调优",
this.name, TimeUnit.NANOSECONDS.toMillis(used), TIMEOUT));
}
}
public void updateRunPeriod(int speed) {
this.runPeriod = TimeUnit.MILLISECONDS.toNanos(this.rate) / speed;
if (this.fixed) {
this.runPeriod = TimeUnit.MILLISECONDS.toNanos(this.rate);
} else {
this.runPeriod = TimeUnit.MILLISECONDS.toNanos(this.rate) / speed;
}
}
public boolean isTimeToRun(LocalDateTime systemTime) {

View File

@ -0,0 +1,8 @@
package club.joylink.rtss.simulation;
public interface SimulationSubscribeMessageService {
boolean acceptedSubscribePath(String destination);
Object buildMessageOfSubscribe(String destination);
}

View File

@ -1,6 +0,0 @@
package club.joylink.rtss.simulation;
public interface SimulationSubscribePathChecker {
boolean acceptedSubscribePath(String destination);
}

View File

@ -50,24 +50,14 @@ public class DefaultSubscribeManager {
}
wsIdDestMap.put(this.buildWsSessionSubId(wsSessionId, subId), destination);
String sid = Simulation.tryExtractSidFromDestination(destination);
SimulationUser simulationUser = this.tryGetSimulationUser(sid, user.getName());
if (simulationUser == null) {
if (sid == null) {
return;
}
simulationUser.subscribe(wsSessionId, destination);
wsSidMap.put(wsSessionId, sid);
}
private SimulationUser tryGetSimulationUser(String sid, String userId) {
if (sid == null) {
return null;
}
Simulation simulation = this.simulationManager.queryById(sid);
if (simulation == null) {
return null;
if (simulation != null) {
simulation.handleUserSubscribe(user.getName(), wsSessionId, destination);
}
SimulationUser simulationUser = simulation.querySimulationUserById(userId);
return simulationUser;
wsSidMap.put(wsSessionId, sid);
}
private String buildWsSessionSubId(String wsSessionId, String subId) {
@ -77,11 +67,16 @@ public class DefaultSubscribeManager {
public void unsubscribe(WebsocketConfig.MyPrincipal user, String wsSessionId, String subId) {
String key = this.buildWsSessionSubId(wsSessionId, subId);
String destination = wsIdDestMap.remove(key);
String sid = Simulation.tryExtractSidFromDestination(destination);
SimulationUser simulationUser = this.tryGetSimulationUser(sid, user.getName());
if (simulationUser == null) {
if (destination == null) {
return;
}
simulationUser.unsubscribe(wsSessionId, destination);
String sid = Simulation.tryExtractSidFromDestination(destination);
if (sid == null) {
return;
}
Simulation simulation = this.simulationManager.queryById(sid);
if (simulation != null) {
simulation.handleUserUnsubscribe(user.getName(), wsSessionId, destination);
}
}
}

View File

@ -1,14 +1,18 @@
package club.joylink.rtss.simulation.operation;
import club.joylink.rtss.exception.BusinessExceptionAssertEnum;
import club.joylink.rtss.simulation.Simulation;
import club.joylink.rtss.simulation.SimulationManager;
import club.joylink.rtss.simulation.SimulationMember;
import club.joylink.rtss.simulation.operation.converter.ConvertUtil;
import club.joylink.rtss.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Map;
@Slf4j
@ -29,8 +33,8 @@ public class SimulationOperationDispatcher {
return handlerMethod.getMethod().invoke(handlerMethod.getBean(), args);
} catch (Exception e) {
log.error(String.format("仿真[%s]操作[%s]分发异常", id, operation), e);
throw BusinessExceptionAssertEnum.SIMULATION_OPERATION_FAILED.exception(e);
}
return null;
}
private Object[] buildParameters(Simulation simulation, OperationHandlerMethod handlerMethod, Map<String, Object> params) {
@ -43,10 +47,20 @@ public class SimulationOperationDispatcher {
continue;
}
Object o = params.get(parameter.getName());
// 简单对象处理
args[i] = ConvertUtil.convert(o, parameter.getType());
// 复杂对象处理需要再加
// if (args[i] == null && o != null) {
// }
// 复杂对象处理
if (args[i] == null && o != null) {
if (List.class.isAssignableFrom(parameter.getType())) {
ParameterizedType parameterizedType = (ParameterizedType) parameter.getParameterizedType();
Class<?> actualClass = (Class<?>) parameterizedType.getActualTypeArguments()[0];
args[i] = JsonUtils.read(JsonUtils.writeValueAsString(params.get(parameter.getName())),
JsonUtils.getCollectionType(parameter.getType(), actualClass));
} else {
args[i] = JsonUtils.read(JsonUtils.writeValueAsString(params.get(parameter.getName())),
parameter.getType());
}
}
}
return args;
}