【AFC 网关解析逻辑处理(网关部署成功后删除)】

This commit is contained in:
weizhihong 2023-03-08 09:45:20 +08:00
parent c1a36a0503
commit 6bf292df47
2 changed files with 54 additions and 20 deletions

View File

@ -2,17 +2,18 @@ package club.joylink.rtss.simulation.cbtc.device.real.afctransfer;
import club.joylink.rtss.simulation.cbtc.device.real.modbustcp.PlcGatewayConnectManager;
import club.joylink.rtss.simulation.cbtc.device.real.modbustcp.handler.ModbusMessageHandler2;
import com.digitalpetri.modbus.FunctionCode;
import com.digitalpetri.modbus.codec.ModbusTcpPayload;
import io.netty.channel.Channel;
import com.digitalpetri.modbus.requests.*;
import com.digitalpetri.modbus.responses.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
import io.netty.util.internal.shaded.org.jctools.queues.MpscChunkedArrayQueue;
import io.netty.util.internal.shaded.org.jctools.queues.atomic.MpscAtomicArrayQueue;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -54,17 +55,50 @@ public class AfcMessageHandler extends SimpleChannelInboundHandler<ModbusTcpPayl
AfcQueryQueue.wait();
}
AfcClientQuery query = AfcQueryQueue.poll();
Channel channel = plcGatewayConnectManager.getChannel(GatewayName);
if(channel == null) {
log.warn("AFC网关不在线,不发送数据");
return;
}
// 发送消息到网关
CompletableFuture<ModbusTcpPayload> future = modbusMessageHandler.sendModbusMessage(channel, query.getPayload());
// TODO 网关调试通后放开注释
// Channel channel = plcGatewayConnectManager.getChannel(GatewayName);
// if(channel == null) {
// log.warn("AFC网关不在线,不发送数据");
// return;
// }
// // 发送消息到网关
// CompletableFuture<ModbusTcpPayload> future = modbusMessageHandler.sendModbusMessage(channel, query.getPayload());
try {
// 获取网关消息并回写给AFC客户端
ModbusTcpPayload payload = future.get();
query.getChannel().writeAndFlush(payload);
// ModbusTcpPayload payload = future.get();
// query.getChannel().writeAndFlush(payload);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", StandardCharsets.UTF_8);
ModbusResponse modbusResponse = null;
FunctionCode functionCode = query.getPayload().getModbusPdu().getFunctionCode();
switch (functionCode) {
case ReadCoils:
modbusResponse = new ReadCoilsResponse(buf);
break;
case ReadDiscreteInputs:
modbusResponse = new ReadDiscreteInputsResponse(buf);
break;
case ReadHoldingRegisters:
modbusResponse = new ReadHoldingRegistersResponse(buf);
break;
case ReadInputRegisters:
modbusResponse = new ReadInputRegistersResponse(buf);
break;
case WriteSingleCoil:
modbusResponse = new WriteSingleCoilResponse(0, 19);
break;
case WriteSingleRegister:
modbusResponse = new WriteSingleRegisterResponse(0, 19);
break;
case WriteMultipleCoils:
modbusResponse = new WriteMultipleCoilsResponse(0, 19);
break;
case WriteMultipleRegisters:
modbusResponse = new WriteMultipleRegistersResponse(0, 19);
break;
default:
modbusResponse = new ReadCoilsResponse(buf);
}
query.getChannel().writeAndFlush(modbusResponse);
} catch (Exception e) {
log.error("获取AFC网关响应异常", e);
}
@ -83,11 +117,11 @@ public class AfcMessageHandler extends SimpleChannelInboundHandler<ModbusTcpPayl
@Override
protected void channelRead0(ChannelHandlerContext ctx, ModbusTcpPayload msg) throws Exception {
Channel channel = this.plcGatewayConnectManager.getChannel(GatewayName);
if(channel == null) {
log.debug("AFC网关不在线,不记录数据");
return;
}
// Channel channel = this.plcGatewayConnectManager.getChannel(GatewayName);
// if(channel == null) {
// log.debug("AFC网关不在线,不记录数据");
// return;
// }
synchronized (AfcQueryQueue) {
// 生产客户端查询记录
AfcQueryQueue.offer(new AfcClientQuery(ctx.channel(), msg));

View File

@ -58,7 +58,7 @@ public class AfcTransferServer implements ApplicationRunner {
ChannelFuture future = b.bind(this.modbusTcpConfig.getAfcTransferPort()).sync();
if(future.isSuccess()) {
log.info(String.format("modbus-tcp server start on port [%s]", this.modbusTcpConfig.getPort()));
log.info(String.format("modbus-tcp server start on port [%s]", this.modbusTcpConfig.getAfcTransferPort()));
} else {
log.error("modbus-tcp server start failed", future.cause());
}