Compare commits

...

4 Commits

Author SHA1 Message Date
soul-walker
6bde9b2035 代码结构调整,基于graphql的仿真控制基本流程打通 2024-08-23 16:31:49 +08:00
soul-walker
e4db3c32df 代码结构重构;
初步实现多仿真管理结构
2024-08-19 20:45:44 +08:00
soul-walker
072e6ba262 道岔仿真逻辑完善
多线程多app实验
2024-08-15 00:07:21 +08:00
soul-walker
3487b8eb1c bevy ecs app相关模块基于道岔仿真学习尝试 2024-08-13 19:44:14 +08:00
35 changed files with 3875 additions and 3 deletions

14
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,14 @@
{
"cSpell.words": [
"Graphi",
"graphiql",
"hashbrown",
"Hasher",
"Joylink",
"jsonwebtoken",
"rtss",
"thiserror",
"timestep",
"trackside"
]
}

2495
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,4 +5,19 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[workspace]
members = ["crates/*"]
[workspace.dependencies]
bevy_app = "0.14.1"
bevy_core = "0.14.1"
bevy_ecs = "0.14.1"
bevy_time = "0.14.1"
rayon = "1.10.0"
tokio = { version = "1.39.3", features = ["macros", "rt-multi-thread"] }
thiserror = "1.0.63"
[dependencies] [dependencies]
tokio = { version = "1.39.3", features = ["macros", "rt-multi-thread"] }
rtss_log = { path = "crates/rtss_log" }
rtss_api = { path = "crates/rtss_api" }

View File

@ -1,6 +1,12 @@
# rtss-simulation # 项目描述
轨道交通信号系统仿真模块 轨道交通信号系统仿真模块
## 三种架构(先按第一种实现)
1. 服务端服务进程运行多个仿真,客户端处理显示(和之前的方式一致)
2. 服务端通过网络启动多个运行单个仿真的进程(可以容器启动),客户端还是只处理显示
3. 仿真逻辑通过WASM的形式最终打包到前端客户端就既有逻辑又有显示减少仿真状态的网络传输开销后端只负责路由操作应该和一般游戏架构类似服务端核心在同步的处理
## 环境设置说明 ## 环境设置说明
### 安装 Rust ### 安装 Rust

View File

@ -0,0 +1,22 @@
[package]
name = "rtss_api"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.125"
chrono = { version = "0.4.38", features = ["serde"] }
axum = "0.7.5"
axum-extra = { version = "0.9.3", features = ["typed-header"] }
jsonwebtoken = "9.3.0"
tower-http = { version = "0.5.0", features = ["cors"] }
async-graphql = { version = "7.0.7", features = ["chrono"] }
async-graphql-axum = "7.0.6"
base64 = "0.22.1"
bevy_ecs = { workspace = true }
rtss_log = { path = "../rtss_log" }
rtss_sim_manage = { path = "../rtss_sim_manage" }
rtss_trackside = { path = "../rtss_trackside" }

View File

@ -0,0 +1,82 @@
use std::sync::LazyLock;
use async_graphql::Result;
use axum::http::HeaderMap;
use jsonwebtoken::{decode, DecodingKey, Validation};
use rtss_log::tracing::error;
use serde::{Deserialize, Serialize};
static KEYS: LazyLock<Keys> = LazyLock::new(|| {
// let secret = std::env::var("JWT_SECRET").expect("JWT_SECRET must be set");
let secret = "joylink".to_string();
Keys::new(secret.as_bytes())
});
struct Keys {
// encoding: EncodingKey,
decoding: DecodingKey,
}
impl Keys {
pub fn new(secret: &[u8]) -> Self {
Self {
// encoding: EncodingKey::from_secret(secret),
decoding: DecodingKey::from_secret(secret),
}
}
}
#[derive(Debug)]
pub enum AuthError {
InvalidToken,
}
pub(crate) fn get_token_from_headers(headers: HeaderMap) -> Result<Option<Claims>, AuthError> {
let option_token = headers.get("Token");
if let Some(token) = option_token {
let token_data = decode::<Claims>(
token.to_str().unwrap(),
&KEYS.decoding,
&Validation::default(),
)
.map_err(|err| {
error!("Error decoding token: {:?}", err);
AuthError::InvalidToken
})?;
Ok(Some(token_data.claims))
} else {
Ok(None)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
pub id: u32,
pub sub: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_token_from_headers() {
rtss_log::Logging::default().init();
let mut headers: HeaderMap = HeaderMap::new();
headers.insert("Token", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjQ2NzAyMjcsImlkIjo2LCJvcmlnX2lhdCI6MTcyNDIzODIyNywic3ViIjoiNiJ9.sSfjdW7d3OqOE6G1p47c4dcCan4evRGoNjGPUyVfWLk".parse().unwrap());
let result = get_token_from_headers(headers);
match result {
Ok(Some(claims)) => {
assert_eq!(claims.id, 6);
assert_eq!(claims.sub, "6");
}
Ok(None) => {
panic!("Expected Some(claims), got None");
}
Err(e) => {
panic!("Error: {:?}", e);
}
}
}
}

View File

@ -0,0 +1,20 @@
mod jwt_auth;
mod server;
mod simulation;
mod simulation_operation;
pub use server::*;
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -0,0 +1,119 @@
use std::ops::Deref;
use async_graphql::*;
use async_graphql::{EmptySubscription, Schema};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::extract::State;
use axum::http::HeaderMap;
use axum::{
http::{HeaderValue, Method},
response::{Html, IntoResponse},
routing::get,
Router,
};
use http::{playground_source, GraphQLPlaygroundConfig};
use rtss_log::tracing::{debug, error, info};
use rtss_sim_manage::SimulationManager;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;
use crate::{jwt_auth, simulation};
pub struct ServerConfig {
pub port: u16,
}
impl Default for ServerConfig {
fn default() -> Self {
Self { port: 8080 }
}
}
impl ServerConfig {
pub fn new(port: u16) -> Self {
Self { port }
}
pub fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn to_socket_addr(&self) -> String {
format!("0.0.0.0:{}", self.port)
}
}
pub async fn serve(config: ServerConfig) {
let schema = new_schema().await;
let app = Router::new()
.route("/", get(graphiql).post(graphql_handler))
.with_state(schema)
.layer(
CorsLayer::new()
.allow_origin("*".parse::<HeaderValue>().unwrap())
.allow_headers(tower_http::cors::Any)
.allow_methods([Method::GET, Method::POST]),
);
debug!("Server started at http://{}", config.to_socket_addr());
info!("GraphiQL IDE: http://localhost:{}", config.port);
axum::serve(
TcpListener::bind(config.to_socket_addr()).await.unwrap(),
app,
)
.await
.unwrap();
}
async fn graphql_handler(
State(schema): State<SimulationSchema>,
headers: HeaderMap,
req: GraphQLRequest,
) -> GraphQLResponse {
let mut req = req.into_inner();
let token = jwt_auth::get_token_from_headers(headers);
match token {
Ok(token) => {
req = req.data(token);
}
Err(e) => {
error!("Error getting token from headers: {:?}", e);
}
}
schema.execute(req).await.into()
}
async fn graphiql() -> impl IntoResponse {
Html(playground_source(GraphQLPlaygroundConfig::new("/")))
}
pub type SimulationSchema = Schema<Query, Mutation, EmptySubscription>;
#[derive(Default, MergedObject)]
pub struct Query(simulation::SimulationQuery);
#[derive(Default, MergedObject)]
pub struct Mutation(simulation::SimulationMutation);
pub struct MutexSimulationManager(Mutex<SimulationManager>);
impl Default for MutexSimulationManager {
fn default() -> Self {
Self(Mutex::new(SimulationManager::default()))
}
}
impl Deref for MutexSimulationManager {
type Target = Mutex<SimulationManager>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub async fn new_schema() -> SimulationSchema {
Schema::build(Query::default(), Mutation::default(), EmptySubscription)
.data(MutexSimulationManager::default())
.finish()
}

View File

@ -0,0 +1,106 @@
use async_graphql::{Context, InputObject, Object};
use rtss_log::tracing::info;
use rtss_sim_manage::{AvailablePlugins, SimulationBuilder};
use crate::{jwt_auth::Claims, simulation_operation::SimulationOperation, MutexSimulationManager};
#[derive(Default)]
pub struct SimulationQuery;
#[Object]
impl SimulationQuery {
async fn simulations<'ctx>(&self, ctx: &Context<'ctx>) -> usize {
let sim = ctx.data::<MutexSimulationManager>().unwrap();
sim.lock().await.count()
}
}
#[derive(Default)]
pub struct SimulationMutation;
#[Object]
impl SimulationMutation {
async fn start_simulation<'ctx>(
&self,
ctx: &Context<'ctx>,
req: StartSimulationRequest,
) -> async_graphql::Result<String> {
let claims = ctx.data::<Option<Claims>>().unwrap();
match claims {
Some(claims) => {
info!("User {claims:?} started simulation");
}
_ => return Err("Unauthorized".into()),
}
let sim = ctx.data::<MutexSimulationManager>().unwrap();
let id = sim.lock().await.start_simulation(
SimulationBuilder::default()
.id(req.user_id)
.plugins(vec![AvailablePlugins::TrackSideEquipmentPlugin]),
)?;
Ok(id)
}
async fn exit_simulation<'ctx>(
&self,
ctx: &Context<'ctx>,
id: String,
) -> async_graphql::Result<bool> {
let sim = ctx.data::<MutexSimulationManager>().unwrap();
sim.lock().await.exit_simulation(id)?;
Ok(true)
}
async fn pause_simulation<'ctx>(
&self,
ctx: &Context<'ctx>,
id: String,
) -> async_graphql::Result<bool> {
let sim = ctx.data::<MutexSimulationManager>().unwrap();
sim.lock().await.pause_simulation(id)?;
Ok(true)
}
async fn resume_simulation<'ctx>(
&self,
ctx: &Context<'ctx>,
id: String,
) -> async_graphql::Result<bool> {
let sim = ctx.data::<MutexSimulationManager>().unwrap();
sim.lock().await.resume_simulation(id)?;
Ok(true)
}
async fn update_simulation_speed<'ctx>(
&self,
ctx: &Context<'ctx>,
id: String,
speed: f32,
) -> async_graphql::Result<bool> {
let sim = ctx.data::<MutexSimulationManager>().unwrap();
sim.lock().await.update_simulation_speed(id, speed)?;
Ok(true)
}
async fn trigger_simulation_operation<'ctx>(
&self,
ctx: &Context<'ctx>,
id: String,
entity_uid: String,
operation: SimulationOperation,
) -> async_graphql::Result<bool> {
let sim = ctx.data::<MutexSimulationManager>().unwrap();
sim.lock().await.trigger_entity_operation(
id,
entity_uid,
operation.to_operation_event(),
)?;
Ok(true)
}
}
#[derive(InputObject)]
struct StartSimulationRequest {
user_id: String,
sim_def_id: String,
}

View File

@ -0,0 +1,17 @@
use async_graphql::Enum;
use bevy_ecs::event::Event;
#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
pub enum SimulationOperation {
TurnoutControlDC,
TurnoutControlFC,
}
impl SimulationOperation {
pub fn to_operation_event(self) -> impl Event + Copy {
match self {
SimulationOperation::TurnoutControlDC => rtss_trackside::TurnoutControlEvent::DC,
SimulationOperation::TurnoutControlFC => rtss_trackside::TurnoutControlEvent::FC,
}
}
}

View File

@ -0,0 +1,6 @@
[package]
name = "rtss_ci"
version = "0.1.0"
edition = "2021"
[dependencies]

14
crates/rtss_ci/src/lib.rs Normal file
View File

@ -0,0 +1,14 @@
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -0,0 +1,7 @@
[package]
name = "rtss_common"
version = "0.1.0"
edition = "2021"
[dependencies]
bevy_ecs = {workspace = true}

View File

@ -0,0 +1,76 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use bevy_ecs::{component::Component, entity::Entity, system::Resource};
/// 仿真公共资源
pub struct SimulationResource {
id: String,
uid_entity_mapping: HashMap<String, Entity>,
}
impl SimulationResource {
pub fn new(id: String) -> Self {
SimulationResource {
id,
uid_entity_mapping: HashMap::new(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn get_entity(&self, uid: &str) -> Option<Entity> {
self.uid_entity_mapping.get(uid).cloned()
}
pub fn insert_entity(&mut self, uid: String, entity: Entity) {
self.uid_entity_mapping.insert(uid, entity);
}
}
// 设备编号组件
#[derive(Component, Debug, Clone, PartialEq, Eq)]
pub struct Uid(pub String);
impl Default for Uid {
fn default() -> Self {
Uid("".to_string())
}
}
#[derive(Resource)]
pub struct SharedSimulationResource(pub Arc<Mutex<SimulationResource>>);
impl SharedSimulationResource {
pub fn get_entity(&self, uid: &str) -> Option<Entity> {
self.0.lock().unwrap().uid_entity_mapping.get(uid).cloned()
}
pub fn insert_entity(&self, uid: String, entity: Entity) {
self.0
.lock()
.unwrap()
.uid_entity_mapping
.insert(uid, entity);
}
}
#[cfg(test)]
mod tests {
use bevy_ecs::world;
use super::*;
#[test]
fn it_works() {
let mut simulation_resource = SimulationResource::new("1".to_string());
let mut world = world::World::default();
let uid = Uid("1".to_string());
let entity = world.spawn(uid.clone()).id();
simulation_resource.insert_entity(uid.clone().0, entity);
assert_eq!(simulation_resource.get_entity(&uid.0), Some(entity));
}
}

View File

@ -0,0 +1,6 @@
[package]
name = "rtss_iscs"
version = "0.1.0"
edition = "2021"
[dependencies]

View File

@ -0,0 +1,14 @@
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -0,0 +1,14 @@
[package]
name = "rtss_log"
version = "0.1.0"
edition = "2021"
[dependencies]
tracing = { version = "0.1", default-features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.1", features = [
"registry",
"env-filter",
] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
tracing-wasm = "0.2.1"

View File

@ -0,0 +1,83 @@
use std::error::Error;
pub use tracing;
use tracing::Level;
pub use tracing_subscriber;
use tracing_subscriber::{
filter::{FromEnvError, ParseError},
layer::SubscriberExt,
registry::Registry,
EnvFilter,
};
/// 日志配置
pub struct Logging {
pub filter: String,
pub level: Level,
}
impl Default for Logging {
fn default() -> Self {
Self {
filter: "wgpu=error,naga=warn".to_string(),
level: Level::INFO,
}
}
}
impl Logging {
pub fn init(&self) {
let finished_subscriber;
let subscriber = Registry::default();
let default_filter = { format!("{},{}", self.level, self.filter) };
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|from_env_error| {
_ = from_env_error
.source()
.and_then(|source| source.downcast_ref::<ParseError>())
.map(|parse_err| {
// we cannot use the `error!` macro here because the logger is not ready yet.
eprintln!("LogPlugin failed to parse filter from env: {}", parse_err);
});
Ok::<EnvFilter, FromEnvError>(EnvFilter::builder().parse_lossy(&default_filter))
})
.unwrap();
let subscriber = subscriber.with(filter_layer);
#[cfg(not(target_arch = "wasm32"))]
{
let fmt_layer = tracing_subscriber::fmt::Layer::default().with_writer(std::io::stderr);
finished_subscriber = subscriber.with(fmt_layer);
}
#[cfg(target_arch = "wasm32")]
{
subscriber = subscriber.with(tracing_wasm::WASMLayer::new(
tracing_wasm::WASMLayerConfig::default(),
));
}
tracing::subscriber::set_global_default(finished_subscriber)
.expect("setting default subscriber failed");
}
}
#[cfg(test)]
mod tests {
use tracing::{debug, error, info, trace, warn};
use super::*;
#[test]
fn it_works() {
let logging = Logging {
level: Level::WARN,
..Default::default()
};
logging.init();
trace!("test trace");
debug!("test debug");
info!("test info");
warn!("test warn");
error!("test error");
}
}

View File

@ -0,0 +1,16 @@
[package]
name = "rtss_sim_manage"
version = "0.1.0"
edition = "2021"
[dependencies]
bevy_core = {workspace = true}
bevy_ecs = {workspace = true}
bevy_app = {workspace = true}
bevy_time = {workspace = true}
rayon = {workspace = true}
thiserror = {workspace = true}
rtss_log = { path = "../rtss_log" }
rtss_common = { path = "../rtss_common" }
rtss_trackside = { path = "../rtss_trackside" }

View File

@ -0,0 +1,17 @@
use bevy_app::App;
use rtss_trackside::TrackSideEquipmentPlugin;
#[derive(Debug)]
pub enum AvailablePlugins {
TrackSideEquipmentPlugin,
}
pub(crate) fn add_needed_plugins(app: &mut App, plugins: Vec<AvailablePlugins>) {
for plugin in plugins {
match plugin {
AvailablePlugins::TrackSideEquipmentPlugin => {
app.add_plugins(TrackSideEquipmentPlugin);
}
}
}
}

View File

@ -0,0 +1,19 @@
mod config_plugins;
mod simulation;
pub use config_plugins::*;
pub use simulation::*;
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -0,0 +1,468 @@
use std::{
cell::RefCell,
collections::HashMap,
ops::Deref,
sync::{mpsc, Arc, Mutex},
time::{Duration, Instant},
};
use bevy_app::{prelude::*, PluginsState};
use bevy_ecs::{
event::{Event, EventWriter},
observer::Trigger,
system::{Query, Res, ResMut, Resource},
world::OnAdd,
};
use bevy_time::{prelude::*, TimePlugin};
use rtss_common::{SharedSimulationResource, SimulationResource, Uid};
use rtss_log::tracing::{debug, error, warn};
use thiserror::Error;
use crate::{add_needed_plugins, AvailablePlugins};
/// 仿真管理器
/// 非线程安全,若需要线程安全请使用类似 `Arc<Mutex<SimulationManager>>` 的方式
pub struct SimulationManager {
txs: RefCell<HashMap<String, Simulation>>,
}
impl Default for SimulationManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Error, Debug)]
pub enum SimulationControlError {
#[error("Unknown error")]
UnknownError,
#[error("Simulation not exist")]
SimulationNotExist,
#[error("Trigger event failed")]
TriggerEventFailed,
#[error("Simulation entity not exist")]
SimulationEntityNotExist,
}
impl SimulationManager {
fn new() -> Self {
let txs = RefCell::new(HashMap::new());
SimulationManager { txs }
}
pub fn count(&self) -> usize {
self.txs.borrow().len()
}
pub fn start_simulation(
&self,
builder: SimulationBuilder,
) -> Result<String, SimulationControlError> {
let id = builder.id.clone();
let sim = Simulation::new(builder);
self.txs.borrow_mut().insert(id.clone(), sim);
Ok(id)
}
pub fn exit_simulation(&self, id: String) -> Result<(), SimulationControlError> {
match self.txs.borrow_mut().remove(&id) {
Some(sim) => sim.exit_simulation(),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn pause_simulation(&self, id: String) -> Result<(), SimulationControlError> {
match self.txs.borrow().get(&id) {
Some(sim) => sim.pause_simulation(),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn resume_simulation(&self, id: String) -> Result<(), SimulationControlError> {
match self.txs.borrow().get(&id) {
Some(sim) => sim.resume_simulation(),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn update_simulation_speed(
&self,
id: String,
speed: f32,
) -> Result<(), SimulationControlError> {
match self.txs.borrow().get(&id) {
Some(sim) => sim.update_simulation_speed(speed),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn trigger_operation<E>(&self, id: String, event: E) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
match self.txs.borrow().get(&id) {
Some(sim) => sim.trigger_operation(event),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn trigger_entity_operation<E>(
&self,
id: String,
entity_uid: String,
event: E,
) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
match self.txs.borrow().get(&id) {
Some(sim) => sim.trigger_entity_operation(entity_uid, event),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
}
pub struct SimulationBuilder {
/// 仿真ID
pub(crate) id: String,
/// 仿真主逻辑循环间隔,详细请查看 [`Time<Fixed>`](bevy_time::fixed::Fixed)
pub(crate) loop_duration: Duration,
/// 仿真所需插件
pub(crate) plugins: Vec<AvailablePlugins>,
}
impl Default for SimulationBuilder {
fn default() -> Self {
SimulationBuilder {
id: "default".to_string(),
loop_duration: Duration::from_millis(500),
plugins: Vec::new(),
}
}
}
impl SimulationBuilder {
pub fn id(mut self, id: String) -> Self {
self.id = id;
self
}
pub fn loop_duration(mut self, loop_duration: Duration) -> Self {
self.loop_duration = loop_duration;
self
}
pub fn plugins(mut self, plugins: Vec<AvailablePlugins>) -> Self {
self.plugins = plugins;
self
}
}
#[derive(Resource, Debug)]
pub struct SimulationId(String);
impl SimulationId {
pub fn new(id: String) -> Self {
SimulationId(id)
}
}
impl Deref for SimulationId {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Resource, Debug)]
pub struct SimulationStatus {
// 仿真倍速
pub speed: f32,
// 仿真是否暂停状态
pub paused: bool,
}
impl Default for SimulationStatus {
fn default() -> Self {
SimulationStatus {
speed: 1.0,
paused: true,
}
}
}
/// 仿真控制事件
#[derive(Event, Debug, Clone, Copy)]
pub enum SimulationControlEvent {
Pause,
Unpause,
UpdateSpeed(f32),
Exit,
}
pub struct Simulation {
tx: mpsc::Sender<Box<SimulationHandle>>,
resource: Arc<Mutex<SimulationResource>>,
}
pub type SimulationHandle = dyn FnMut(&mut App) + Send;
impl Simulation {
pub fn new(builder: SimulationBuilder) -> Self {
let simulation_resource = Arc::new(Mutex::new(SimulationResource::new(builder.id.clone())));
let cloned_resource = Arc::clone(&simulation_resource);
let (tx, mut rx) = mpsc::channel();
rayon::spawn(move || {
let mut app = App::new();
let mut virtual_time =
Time::<Virtual>::from_max_delta(builder.loop_duration.mul_f32(2f32));
virtual_time.pause();
// 初始化仿真App
app.add_plugins(TimePlugin)
.insert_resource(virtual_time)
.insert_resource(Time::<Fixed>::from_duration(builder.loop_duration))
.insert_resource(SimulationId::new(builder.id))
.insert_resource(SimulationStatus::default())
.insert_resource(SharedSimulationResource(Arc::clone(&cloned_resource)))
.add_event::<SimulationControlEvent>()
.observe(simulation_status_control)
.observe(entity_observer);
// 添加仿真所需插件
add_needed_plugins(&mut app, builder.plugins);
let wait = Some(builder.loop_duration);
app.set_runner(move |mut app: App| {
let plugins_state = app.plugins_state();
if plugins_state != PluginsState::Cleaned {
app.finish();
app.cleanup();
}
loop {
match runner(&mut app, wait, &mut rx) {
Ok(Some(delay)) => std::thread::sleep(delay),
Ok(None) => continue,
Err(exit) => return exit,
}
}
});
app.run();
});
Simulation {
tx,
resource: simulation_resource,
}
}
fn trigger_event(&self, event: SimulationControlEvent) -> Result<(), SimulationControlError> {
let id = self.resource.lock().unwrap().id().to_string();
let result = self.tx.send(Box::new(move |app: &mut App| {
app.world_mut().trigger(event);
}));
match result {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Failed to send event to simulation, id={}, error={:?}",
id, e
);
Err(SimulationControlError::TriggerEventFailed)
}
}
}
pub fn trigger_operation<E>(&self, event: E) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
let id = self.resource.lock().unwrap().id().to_string();
let result = self.tx.send(Box::new(move |app: &mut App| {
app.world_mut().trigger(event);
}));
match result {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Failed to send event to simulation, id={}, error={:?}",
id, e
);
Err(SimulationControlError::TriggerEventFailed)
}
}
}
pub fn trigger_entity_operation<E>(
&self,
entity_uid: String,
event: E,
) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
let id = self.resource.lock().unwrap().id().to_string();
match self.resource.lock().unwrap().get_entity(&entity_uid) {
Some(entity) => {
let result = self.tx.send(Box::new(move |app: &mut App| {
app.world_mut().trigger_targets(event, entity);
}));
match result {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Failed to send event to simulation, id={}, error={:?}",
id, e
);
Err(SimulationControlError::TriggerEventFailed)
}
}
}
None => {
error!("Entity not exist, id={}", entity_uid);
Err(SimulationControlError::SimulationEntityNotExist)
}
}
}
pub fn exit_simulation(&self) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::Exit)
}
pub fn pause_simulation(&self) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::Pause)
}
pub fn resume_simulation(&self) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::Unpause)
}
pub fn update_simulation_speed(&self, speed: f32) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::UpdateSpeed(speed))
}
}
pub fn entity_observer(
trigger: Trigger<OnAdd>,
query: Query<&Uid>,
shared: ResMut<SharedSimulationResource>,
) {
let entity = trigger.entity();
match query.get(entity) {
Ok(uid) => {
shared.insert_entity(uid.0.clone(), entity);
debug!("添加uid实体映射, Uid: {:?}, Entity: {:?}", uid, entity);
}
Err(_) => {
warn!("Failed to get Uid from entity: {:?}", entity);
}
}
}
pub fn simulation_status_control(
trigger: Trigger<SimulationControlEvent>,
mut time: ResMut<Time<Virtual>>,
sid: Res<SimulationId>,
mut exit: EventWriter<AppExit>,
) {
match trigger.event() {
SimulationControlEvent::Pause => {
debug!("Pausing simulation");
time.pause();
}
SimulationControlEvent::Unpause => {
debug!("Unpausing simulation");
time.unpause();
}
SimulationControlEvent::UpdateSpeed(speed) => {
debug!("Update simulation speed to {}", speed);
time.set_relative_speed(*speed);
}
SimulationControlEvent::Exit => {
debug!("Exiting simulation, id={:?}", *sid);
exit.send(AppExit::Success);
}
}
}
fn runner(
app: &mut App,
wait: Option<Duration>,
rx: &mut mpsc::Receiver<Box<SimulationHandle>>,
) -> Result<Option<Duration>, AppExit> {
let start_time = Instant::now();
if let Err(e) = rx.try_recv().map(|mut handle| handle(app)) {
match e {
mpsc::TryRecvError::Empty => {}
mpsc::TryRecvError::Disconnected => {
error!("Simulation handle channel disconnected");
}
}
}
app.update();
if let Some(exit) = app.should_exit() {
return Err(exit);
};
let end_time = Instant::now();
if let Some(wait) = wait {
let exe_time = end_time - start_time;
if exe_time < wait {
return Ok(Some(wait - exe_time));
}
}
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simulation_manager() {
let manager = SimulationManager::default();
assert_eq!(manager.count(), 0);
if let Ok(_) = manager.start_simulation(SimulationBuilder::default().id("0".to_string())) {
assert_eq!(manager.count(), 1);
}
if let Ok(_) = manager.start_simulation(SimulationBuilder::default().id("1".to_string())) {
assert_eq!(manager.count(), 2);
}
if let Ok(_) = manager.exit_simulation("0".to_string()) {
assert_eq!(manager.count(), 1);
}
if let Ok(_) = manager.exit_simulation("1".to_string()) {
assert_eq!(manager.count(), 0);
}
}
}

View File

@ -0,0 +1,13 @@
[package]
name = "rtss_trackside"
version = "0.1.0"
edition = "2021"
[dependencies]
bevy_core = {workspace = true}
bevy_ecs = {workspace = true}
bevy_app = {workspace = true}
bevy_time = {workspace = true}
rtss_log = { path = "../rtss_log" }
rtss_common = { path = "../rtss_common" }

View File

@ -0,0 +1,32 @@
use bevy_ecs::{bundle::Bundle, component::Component};
use rtss_common::Uid;
// 两常态位置转换组件,用于像道岔位置,屏蔽门位置等
#[derive(Component, Debug, Clone, PartialEq, Default)]
pub struct TwoNormalPositionsTransform {
// 当前实际位置,百分比值,0-100
pub position: i32,
// 当前转换速度
pub velocity: f32,
}
// 道岔设备状态组件
#[derive(Component, Debug, Clone, PartialEq, Eq, Default)]
pub struct TurnoutState {
// 定位表示
pub db: bool,
// 反位表示
pub fb: bool,
// 定操表示
pub dc: bool,
// 反操表示
pub fc: bool,
}
// 道岔设备组件包
#[derive(Bundle, Default)]
pub struct TurnoutBundle {
pub uid: Uid,
pub turnout_state: TurnoutState,
pub two_normal_positions_conversion: TwoNormalPositionsTransform,
}

View File

@ -0,0 +1,2 @@
mod equipment;
pub use equipment::*;

View File

@ -0,0 +1,9 @@
use bevy_ecs::event::Event;
#[derive(Event, Debug, Clone, Copy, Eq, PartialEq)]
pub enum TurnoutControlEvent {
// 道岔定操
DC,
// 道岔反操
FC,
}

View File

@ -0,0 +1,2 @@
mod equipment;
pub use equipment::*;

View File

@ -0,0 +1,10 @@
mod components;
mod events;
mod plugin;
mod resources;
mod systems;
pub use components::*;
pub use events::*;
pub use plugin::*;
pub use resources::*;
pub use systems::*;

View File

@ -0,0 +1,23 @@
use bevy_app::{FixedUpdate, Plugin, Startup};
use bevy_ecs::schedule::IntoSystemConfigs;
use crate::{
handle_turnout_control, loading, turnout_state_update, two_normal_position_transform,
SimulationConfig, TurnoutControlEvent,
};
#[derive(Default)]
pub struct TrackSideEquipmentPlugin;
impl Plugin for TrackSideEquipmentPlugin {
fn build(&self, app: &mut bevy_app::App) {
app.insert_resource(SimulationConfig::default())
.add_event::<TurnoutControlEvent>()
.add_systems(Startup, loading)
.add_systems(
FixedUpdate,
(two_normal_position_transform, turnout_state_update).chain(),
)
.observe(handle_turnout_control);
}
}

View File

@ -0,0 +1,15 @@
use bevy_ecs::system::Resource;
#[derive(Resource, Debug)]
pub struct SimulationConfig {
// 道岔转换总时间,单位ms
pub turnout_transform_time: i32,
}
impl Default for SimulationConfig {
fn default() -> Self {
SimulationConfig {
turnout_transform_time: 3500,
}
}
}

View File

@ -0,0 +1,32 @@
use bevy_ecs::{entity::Entity, system::Query};
use rtss_common::Uid;
use rtss_log::tracing::debug;
use crate::TwoNormalPositionsTransform;
pub const TWO_NORMAL_POSITION_MIN: i32 = 0;
pub const TWO_NORMAL_POSITION_MAX: i32 = 100;
// 两常态位置转换系统
pub fn two_normal_position_transform(
mut query: Query<(Entity, &Uid, &mut TwoNormalPositionsTransform)>,
) {
for (entity, uid, mut transform) in &mut query {
debug!(
"Entity: {:?}, Uid: {:?}, Conversion: {:?}",
entity, uid, transform
);
if transform.velocity == 0f32 {
continue;
}
let p = transform.position + transform.velocity as i32;
if p > TWO_NORMAL_POSITION_MAX {
transform.position = TWO_NORMAL_POSITION_MAX;
transform.velocity = TWO_NORMAL_POSITION_MIN as f32;
} else if p < TWO_NORMAL_POSITION_MIN {
transform.position = TWO_NORMAL_POSITION_MIN;
transform.velocity = 0 as f32;
} else {
transform.position = p;
}
}
}

View File

@ -0,0 +1,13 @@
use bevy_ecs::{prelude::Commands, system::ResMut};
use rtss_common::{SharedSimulationResource, Uid};
use crate::components;
pub fn loading(mut commands: Commands, res_uid_mapping: ResMut<SharedSimulationResource>) {
let uid = Uid("1".to_string());
let et = commands.spawn(components::TurnoutBundle {
uid: uid.clone(),
..Default::default()
});
res_uid_mapping.insert_entity(uid.0, et.id());
}

View File

@ -0,0 +1,6 @@
mod common;
mod loading;
mod turnout;
pub use common::*;
pub use loading::*;
pub use turnout::*;

View File

@ -0,0 +1,71 @@
use bevy_ecs::{
observer::Trigger,
system::{Query, Res, ResMut},
};
use bevy_time::{Fixed, Time};
use rtss_common::Uid;
use rtss_log::tracing::debug;
use crate::{
events::TurnoutControlEvent, SimulationConfig, TurnoutState, TwoNormalPositionsTransform,
};
// 道岔控制事件处理系统
pub fn handle_turnout_control(
trigger: Trigger<TurnoutControlEvent>,
time: ResMut<Time<Fixed>>,
config: Res<SimulationConfig>,
mut query: Query<(&Uid, &mut TurnoutState, &mut TwoNormalPositionsTransform)>,
) {
let (uid, mut state, mut conversion) = query
.get_mut(trigger.entity())
.expect("通过entity获取道岔异常");
let v = calculate_avg_velocity(
crate::TWO_NORMAL_POSITION_MAX as f32,
config.turnout_transform_time as f32,
time.timestep().as_millis() as f32,
);
match trigger.event() {
TurnoutControlEvent::DC => {
state.dc = true;
state.fc = false;
conversion.velocity = -v;
debug!("道岔定操处理:{:?}", uid);
}
TurnoutControlEvent::FC => {
state.dc = false;
state.fc = true;
conversion.velocity = v;
debug!("道岔反操处理uid={:?}, conversion={:?}", uid, conversion);
}
}
}
// 道岔状态更新系统
pub fn turnout_state_update(
mut query: Query<(&Uid, &mut TurnoutState, &mut TwoNormalPositionsTransform)>,
) {
for (uid, mut state, conversion) in &mut query {
debug!(
"更新道岔状态Uid={:?}, State={:?}, Conversion={:?}",
uid, state, conversion
);
if conversion.position == 0 {
state.db = true;
state.fb = false;
state.dc = false;
} else if conversion.position == 100 {
state.db = false;
state.fb = true;
state.fc = false;
} else {
state.db = false;
state.fb = false;
}
}
}
// 计算平均速度
fn calculate_avg_velocity(total_distance: f32, total_time: f32, time_step: f32) -> f32 {
total_distance / (total_time / time_step)
}

View File

@ -1,3 +1,11 @@
fn main() { use rtss_api::ServerConfig;
println!("Hello, world!");
#[tokio::main]
async fn main() {
rtss_log::Logging {
level: rtss_log::tracing::Level::DEBUG,
..Default::default()
}
.init();
rtss_api::serve(ServerConfig::default()).await;
} }