1.设计数据库表结构

2.设计项目代码结构
3.实现草稿数据和发布数据数据库访问
4.实现草稿数据graphQL接口
This commit is contained in:
soul-walker 2024-09-14 16:45:33 +08:00
parent c499b45bf4
commit 86469d6b35
39 changed files with 3676 additions and 191 deletions

1
.env Normal file
View File

@ -0,0 +1 @@
DATABASE_URL=postgresql://joylink:Joylink@0503@localhost:5432/joylink

View File

@ -1,18 +1,24 @@
{
"cSpell.words": [
"chrono",
"cpus",
"Graphi",
"graphiql",
"hashbrown",
"Hasher",
"Iden",
"Iscs",
"Joylink",
"jsonwebtoken",
"mplj",
"plpgsql",
"prost",
"proto",
"protoc",
"protos",
"repr",
"rtss",
"sqlx",
"sysinfo",
"thiserror",
"timestep",

1153
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,15 +9,29 @@ edition = "2021"
members = ["crates/*"]
[workspace.dependencies]
bevy_app = "0.14.1"
bevy_core = "0.14.1"
bevy_ecs = "0.14.1"
bevy_time = "0.14.1"
rayon = "1.10.0"
tokio = { version = "1.39.3", features = ["macros", "rt-multi-thread"] }
thiserror = "1.0.63"
bevy_app = "0.14"
bevy_core = "0.14"
bevy_ecs = "0.14"
bevy_time = "0.14"
rayon = "1.10"
tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] }
thiserror = "1.0"
sqlx = { version = "0.8", features = [
"runtime-tokio",
"postgres",
"json",
"chrono",
] }
serde = { version = "1.0", features = ["derive"] }
anyhow = "1.0"
[dependencies]
tokio = { version = "1.39.3", features = ["macros", "rt-multi-thread"] }
rtss_log = { path = "crates/rtss_log" }
rtss_api = { path = "crates/rtss_api" }
rtss_db = { path = "crates/rtss_db" }
serde = { workspace = true }
config = "0.14.0"
clap = { version = "4.5", features = ["derive"] }
enum_dispatch = "0.3"
anyhow = { workspace = true }

View File

@ -32,6 +32,8 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
- TODO Highlight: TODO 高亮
- vscode-icons: 图标优化
- YAML: YAML 文件支持
- Prettier SQL VSCode: SQL文件格式化
- vscode-proto3: proto文件支持
### 安装 pre-commit

7
conf/default.toml Normal file
View File

@ -0,0 +1,7 @@
[server]
port = 8765
[database]
[log]
level = "debug"

2
conf/dev.toml Normal file
View File

@ -0,0 +1,2 @@
[database]
url = "postgresql://joylink:Joylink@0503@localhost:5432/joylink"

View File

@ -0,0 +1,2 @@
[database]
url = "postgresql://joylink:Joylink@0503@192.168.33.233:5432/joylink"

5
conf/local_test.toml Normal file
View File

@ -0,0 +1,5 @@
[database]
url = "postgresql://joylink:Joylink@0503@192.168.33.233:5432/joylink"
[log]
level = "info"

View File

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.125"
@ -21,3 +22,5 @@ bevy_ecs = { workspace = true }
rtss_log = { path = "../rtss_log" }
rtss_sim_manage = { path = "../rtss_sim_manage" }
rtss_trackside = { path = "../rtss_trackside" }
rtss_db = { path = "../rtss_db" }
rtss_dto = { path = "../rtss_dto" }

View File

@ -0,0 +1,191 @@
use async_graphql::{Context, InputObject, Object, SimpleObject};
use chrono::{DateTime, Local};
use rtss_db::DraftDataAccessor;
use rtss_db::RtssDbAccessor;
use rtss_dto::common::DataType;
use crate::pagination::PageQueryDto;
#[derive(Default)]
pub struct DraftDataQuery;
#[derive(Default)]
pub struct DraftDataMutation;
#[Object]
impl DraftDataQuery {
/// 分页查询草稿数据
async fn draft_data_paging<'ctx>(
&self,
ctx: &Context<'ctx>,
paging: PageQueryDto,
query: DraftDataFilterDto,
) -> async_graphql::Result<DraftDataPage> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let paging_result = db_accessor
.query_draft_data(query.into(), paging.into())
.await?;
Ok(paging_result.into())
}
/// 根据id获取草稿数据
async fn draft_data(&self, ctx: &Context<'_>, id: i32) -> async_graphql::Result<DraftData> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let draft_data = db_accessor.query_draft_data_by_id(id).await?;
Ok(draft_data.into())
}
/// 查询是否已经存在同一用户下的同名草稿数据
async fn draft_data_exist(
&self,
ctx: &Context<'_>,
user_id: i32,
name: String,
) -> async_graphql::Result<bool> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let exist = db_accessor.is_draft_data_exist(user_id, &name).await?;
Ok(exist)
}
}
#[Object]
impl DraftDataMutation {
/// 创建草稿数据
async fn create_draft_data(
&self,
ctx: &Context<'_>,
input: CreateDraftDataDto,
) -> async_graphql::Result<DraftData> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let draft_data = db_accessor.create_draft_data(input.into()).await?;
Ok(draft_data.into())
}
/// 更新草稿数据name
async fn update_draft_data_name(
&self,
ctx: &Context<'_>,
id: i32,
name: String,
) -> async_graphql::Result<DraftData> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let draft_data = db_accessor.update_draft_data_name(id, &name).await?;
Ok(draft_data.into())
}
/// 更新草稿数据data
async fn update_draft_data_data(
&self,
ctx: &Context<'_>,
id: i32,
data: Vec<u8>,
) -> async_graphql::Result<DraftData> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let draft_data = db_accessor
.update_draft_data_data(id, data.as_slice())
.await?;
Ok(draft_data.into())
}
/// 删除草稿数据
async fn delete_draft_data(
&self,
ctx: &Context<'_>,
id: Vec<i32>,
) -> async_graphql::Result<bool> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
db_accessor.delete_draft_data(id.as_slice()).await?;
Ok(true)
}
/// 设置草稿数据的默认发布数据
async fn set_default_release_data_id(
&self,
ctx: &Context<'_>,
id: i32,
release_data_id: i32,
) -> async_graphql::Result<DraftData> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let draft_data = db_accessor
.set_default_release_data_id(id, release_data_id)
.await?;
Ok(draft_data.into())
}
/// 草稿数据另存为
async fn save_as_new_draft_data(
&self,
ctx: &Context<'_>,
id: i32,
name: String,
user_id: i32,
) -> async_graphql::Result<DraftData> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let draft_data = db_accessor.save_as_new_draft(id, &name, user_id).await?;
Ok(draft_data.into())
}
}
#[derive(Debug, InputObject)]
pub struct CreateDraftDataDto {
pub name: String,
pub data_type: rtss_dto::common::DataType,
pub user_id: i32,
}
impl From<CreateDraftDataDto> for rtss_db::CreateDraftData {
fn from(value: CreateDraftDataDto) -> Self {
Self::new(&value.name, value.data_type, value.user_id)
}
}
/// 草稿数据查询条件
#[derive(Debug, InputObject)]
pub struct DraftDataFilterDto {
pub user_id: Option<i32>,
pub name: Option<String>,
pub data_type: Option<rtss_dto::common::DataType>,
}
impl From<DraftDataFilterDto> for rtss_db::DraftDataQuery {
fn from(value: DraftDataFilterDto) -> Self {
Self {
user_id: value.user_id,
name: value.name,
data_type: value.data_type,
}
}
}
#[derive(Debug, SimpleObject)]
pub struct DraftData {
pub id: i32,
pub name: String,
pub data_type: rtss_dto::common::DataType,
pub data: Option<Vec<u8>>,
pub user_id: i32,
pub created_at: DateTime<Local>,
pub updated_at: DateTime<Local>,
}
impl From<rtss_db::model::DraftDataModel> for DraftData {
fn from(value: rtss_db::model::DraftDataModel) -> Self {
Self {
id: value.id,
name: value.name,
data_type: DataType::try_from(value.data_type).unwrap(),
data: value.data,
user_id: value.user_id,
created_at: value.created_at,
updated_at: value.updated_at,
}
}
}
#[derive(Debug, SimpleObject)]
pub struct DraftDataPage {
pub total: i64,
pub data: Vec<DraftData>,
}
impl From<rtss_db::common::PageResult<rtss_db::model::DraftDataModel>> for DraftDataPage {
fn from(value: rtss_db::common::PageResult<rtss_db::model::DraftDataModel>) -> Self {
Self {
total: value.total,
data: value.data.into_iter().map(|m| m.into()).collect(),
}
}
}

View File

@ -1,21 +1,9 @@
mod draft_data;
mod jwt_auth;
mod pagination;
mod server;
mod simulation;
mod simulation_operation;
mod simulation_definition;
mod sys_info;
pub use server::*;
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -0,0 +1,29 @@
use async_graphql::{Enum, InputObject, SimpleObject};
#[derive(Enum, Copy, Clone, Default, Eq, PartialEq, Debug)]
#[graphql(remote = "rtss_db::common::SortOrder")]
pub enum SortOrder {
#[default]
Asc,
Desc,
}
#[derive(InputObject, Debug)]
pub struct PageQueryDto {
pub page: i32,
pub items_per_page: i32,
}
impl From<PageQueryDto> for rtss_db::common::PageQuery {
fn from(value: PageQueryDto) -> Self {
Self {
page: value.page,
items_per_page: value.items_per_page,
}
}
}
#[derive(SimpleObject)]
pub struct PageDto {
pub total: i64,
}

View File

@ -1,5 +1,3 @@
use std::ops::Deref;
use async_graphql::*;
use async_graphql::{EmptySubscription, Schema};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
@ -13,31 +11,23 @@ use axum::{
};
use http::{playground_source, GraphQLPlaygroundConfig};
use rtss_log::tracing::{debug, error, info};
use rtss_sim_manage::SimulationManager;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;
use crate::{jwt_auth, simulation};
use crate::simulation_definition::MutexSimulationManager;
use crate::{draft_data, jwt_auth};
pub struct ServerConfig {
pub database_url: String,
pub port: u16,
}
impl Default for ServerConfig {
fn default() -> Self {
Self { port: 8080 }
}
}
impl ServerConfig {
pub fn new(port: u16) -> Self {
Self { port }
pub fn new(database_url: &str, port: u16) -> Self {
Self {
database_url: database_url.to_string(),
port,
}
pub fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn to_socket_addr(&self) -> String {
@ -45,8 +35,8 @@ impl ServerConfig {
}
}
pub async fn serve(config: ServerConfig) {
let schema = new_schema().await;
pub async fn serve(config: ServerConfig) -> anyhow::Result<()> {
let schema = new_schema(&config).await;
let app = Router::new()
.route("/", get(graphiql).post(graphql_handler))
@ -64,8 +54,8 @@ pub async fn serve(config: ServerConfig) {
TcpListener::bind(config.to_socket_addr()).await.unwrap(),
app,
)
.await
.unwrap();
.await?;
Ok(())
}
async fn graphql_handler(
@ -93,27 +83,15 @@ async fn graphiql() -> impl IntoResponse {
pub type SimulationSchema = Schema<Query, Mutation, EmptySubscription>;
#[derive(Default, MergedObject)]
pub struct Query(simulation::SimulationQuery);
pub struct Query(draft_data::DraftDataQuery);
#[derive(Default, MergedObject)]
pub struct Mutation(simulation::SimulationMutation);
pub struct Mutation(draft_data::DraftDataMutation);
pub struct MutexSimulationManager(Mutex<SimulationManager>);
impl Default for MutexSimulationManager {
fn default() -> Self {
Self(Mutex::new(SimulationManager::default()))
}
}
impl Deref for MutexSimulationManager {
type Target = Mutex<SimulationManager>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub async fn new_schema() -> SimulationSchema {
pub async fn new_schema(config: &ServerConfig) -> SimulationSchema {
let dba = rtss_db::get_db_accessor(&config.database_url).await;
Schema::build(Query::default(), Mutation::default(), EmptySubscription)
.data(dba)
.data(MutexSimulationManager::default())
.finish()
}

View File

@ -2,7 +2,10 @@ use async_graphql::{Context, InputObject, Object};
use rtss_log::tracing::info;
use rtss_sim_manage::{AvailablePlugins, SimulationBuilder};
use crate::{jwt_auth::Claims, simulation_operation::SimulationOperation, MutexSimulationManager};
use crate::{
jwt_auth::Claims,
simulation_definition::{MutexSimulationManager, SimulationOperation},
};
#[derive(Default)]
pub struct SimulationQuery;

View File

@ -1,5 +1,23 @@
use std::ops::Deref;
use async_graphql::Enum;
use bevy_ecs::event::Event;
use rtss_sim_manage::SimulationManager;
use tokio::sync::Mutex;
pub struct MutexSimulationManager(Mutex<SimulationManager>);
impl Default for MutexSimulationManager {
fn default() -> Self {
Self(Mutex::new(SimulationManager::default()))
}
}
impl Deref for MutexSimulationManager {
type Target = Mutex<SimulationManager>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
pub enum SimulationOperation {

View File

@ -4,3 +4,17 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio",
"macros",
"chrono",
"json",
"derive",
"postgres",
"uuid",
] }
thiserror = { workspace = true }
rtss_dto = { path = "../rtss_dto" }
rtss_log = { path = "../rtss_log" }

View File

@ -0,0 +1,160 @@
use std::fmt::Display;
#[derive(Debug, Clone)]
pub struct PageQuery {
pub page: i32,
pub items_per_page: i32,
}
impl Default for PageQuery {
fn default() -> Self {
Self {
page: DEFAULT_PAGE,
items_per_page: DEFAULT_ITEMS_PER_PAGE,
}
}
}
const DEFAULT_PAGE: i32 = 1;
const DEFAULT_ITEMS_PER_PAGE: i32 = 10;
impl PageQuery {
pub fn new(page: i32, items_per_page: i32) -> Self {
Self {
page,
items_per_page,
}
}
pub(crate) fn to_limit_clause(&self) -> String {
format!(
"LIMIT {} OFFSET {}",
self.items_per_page,
(self.page - 1) * self.items_per_page
)
}
}
pub struct TableFilterClause {
pub where_clause: String,
pub params: Vec<String>,
}
/// Trait for table filter
pub trait TableFilter {
fn to_where_clause(&self) -> String;
}
pub trait TableColumn {
fn name(&self) -> &str;
}
impl TableColumn for &str {
fn name(&self) -> &str {
self
}
}
pub fn to_sort_by_clause<T>(sorts: Vec<Sort<T>>) -> String
where
T: TableColumn,
{
if sorts.is_empty() {
return "".to_string();
}
let sorts: Vec<String> = sorts
.iter()
.map(|s| format!("{} {}", s.field.name(), s.order))
.collect();
format!("ORDER BY {}", sorts.join(", "))
}
pub struct Sort<T>
where
T: TableColumn,
{
pub field: T,
pub order: SortOrder,
}
impl<T> Sort<T>
where
T: TableColumn,
{
pub fn new(field: T, order: SortOrder) -> Self {
Self { field, order }
}
pub fn to_order_by_clause(&self) -> String {
format!("ORDER BY {} {}", self.field.name(), self.order)
}
}
#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
pub enum SortOrder {
#[default]
Asc,
Desc,
}
impl Display for SortOrder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Asc => write!(f, "ASC"),
Self::Desc => write!(f, "DESC"),
}
}
}
#[derive(Debug)]
pub struct PageResult<T> {
pub total: i64,
pub data: Vec<T>,
}
impl<T> PageResult<T> {
pub fn new(total: i64, data: Vec<T>) -> Self {
Self { total, data }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page_query_to_limit_clause() {
let page_query = PageQuery {
page: 1,
items_per_page: 10,
};
assert_eq!(page_query.to_limit_clause(), "LIMIT 10 OFFSET 0");
let page_query = PageQuery {
page: 2,
items_per_page: 10,
};
assert_eq!(page_query.to_limit_clause(), "LIMIT 10 OFFSET 10");
let page_query = PageQuery {
page: 3,
items_per_page: 10,
};
assert_eq!(page_query.to_limit_clause(), "LIMIT 10 OFFSET 20");
}
#[test]
fn test_to_sort_by_clause() {
let sorts = vec![
Sort {
field: "id",
order: SortOrder::Asc,
},
Sort {
field: "name",
order: SortOrder::Desc,
},
];
assert_eq!(to_sort_by_clause(sorts), "ORDER BY id ASC, name DESC");
}
}

View File

@ -0,0 +1,457 @@
use std::vec;
use rtss_dto::common::DataType;
use rtss_log::tracing::debug;
use crate::{
common::{PageQuery, PageResult, Sort, SortOrder, TableColumn},
model::{DraftDataColumn, DraftDataModel},
DbAccessError,
};
use super::RtssDbAccessor;
/// 草稿数据管理
#[allow(async_fn_in_trait)]
pub trait DraftDataAccessor {
/// 查询用户草稿数据
async fn query_draft_data(
&self,
query: DraftDataQuery,
page: PageQuery,
) -> Result<PageResult<DraftDataModel>, DbAccessError>;
/// 根据id查询草稿数据
async fn query_draft_data_by_id(&self, id: i32) -> Result<DraftDataModel, DbAccessError>;
/// 是否user_id+name的数据已存在
async fn is_draft_data_exist(&self, user_id: i32, name: &str) -> Result<bool, DbAccessError>;
/// 创建草稿数据基本信息
async fn create_draft_data(
&self,
create: CreateDraftData,
) -> Result<DraftDataModel, DbAccessError>;
/// 更新草稿数据名称
async fn update_draft_data_name(
&self,
id: i32,
name: &str,
) -> Result<DraftDataModel, DbAccessError>;
/// 更新草稿数据数据
async fn update_draft_data_data(
&self,
id: i32,
data: &[u8],
) -> Result<DraftDataModel, DbAccessError>;
/// 删除草稿数据
async fn delete_draft_data(&self, id: &[i32]) -> Result<(), DbAccessError>;
/// 设置默认发布数据id
async fn set_default_release_data_id(
&self,
draft_id: i32,
release_data_id: i32,
) -> Result<DraftDataModel, DbAccessError>;
/// 草稿数据另存为新草稿数据
async fn save_as_new_draft(
&self,
draft_id: i32,
name: &str,
user_id: i32,
) -> Result<DraftDataModel, DbAccessError>;
}
#[derive(Debug, Default)]
pub struct DraftDataQuery {
pub user_id: Option<i32>,
pub name: Option<String>,
pub data_type: Option<DataType>,
}
impl DraftDataQuery {
pub fn with_user_id(mut self, user_id: i32) -> Self {
self.user_id = Some(user_id);
self
}
pub fn with_name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
pub fn with_data_type(mut self, data_type: DataType) -> Self {
self.data_type = Some(data_type);
self
}
fn build_filter(&self) -> String {
let mut filters = vec![];
if let Some(user_id) = self.user_id {
filters.push(format!("{} = {}", DraftDataColumn::UserId.name(), user_id));
}
if let Some(name) = &self.name {
filters.push(format!(
"{} LIKE '%{}%'",
DraftDataColumn::Name.name(),
name
));
}
if let Some(data_type) = self.data_type {
filters.push(format!(
"{} = {}",
DraftDataColumn::DataType.name(),
data_type as i32
));
}
if filters.is_empty() {
"".to_string()
} else {
format!("WHERE {}", filters.join(" AND "))
}
}
}
pub struct CreateDraftData {
name: String,
data_type: DataType,
data: Option<Vec<u8>>,
default_release_data_id: Option<i32>,
user_id: i32,
}
impl CreateDraftData {
pub fn new(name: &str, data_type: DataType, user_id: i32) -> Self {
CreateDraftData {
name: name.to_string(),
data_type,
data: None,
default_release_data_id: None,
user_id,
}
}
pub fn with_data(mut self, data: &[u8]) -> Self {
self.data = Some(data.to_vec());
self
}
pub fn with_default_release_data_id(mut self, default_release_data_id: i32) -> Self {
self.default_release_data_id = Some(default_release_data_id);
self
}
}
impl DraftDataAccessor for RtssDbAccessor {
async fn query_draft_data(
&self,
query: DraftDataQuery,
page: PageQuery,
) -> Result<PageResult<DraftDataModel>, DbAccessError> {
let table = DraftDataColumn::Table.name();
let where_clause = query.build_filter();
let sql = format!("SELECT COUNT(*) FROM {table} {where_clause}");
// log sql
debug!("count sql: {}", sql);
let total: i64 = sqlx::query_scalar(&sql).fetch_one(&self.pool).await?;
if total == 0 {
return Ok(PageResult::new(total, vec![]));
}
let select_columns = format!(
"{}, {}, {}, {}, {}, {}",
DraftDataColumn::Id.name(),
DraftDataColumn::Name.name(),
DraftDataColumn::DataType.name(),
DraftDataColumn::UserId.name(),
DraftDataColumn::CreatedAt.name(),
DraftDataColumn::UpdatedAt.name(),
);
let sort = Sort::new(DraftDataColumn::UpdatedAt, SortOrder::Desc);
let order_by = sort.to_order_by_clause();
let limit_clause = page.to_limit_clause();
let paging_sql = format!(
"SELECT {select_columns} FROM {table} {where_clause} {order_by} {limit_clause}",
);
// log sql
debug!("paging sql: {}", paging_sql);
let list: Vec<DraftDataModel> = sqlx::query_as(&paging_sql).fetch_all(&self.pool).await?;
Ok(PageResult::new(total, list))
}
async fn query_draft_data_by_id(&self, id: i32) -> Result<DraftDataModel, DbAccessError> {
let table = DraftDataColumn::Table.name();
let sql = format!("SELECT * FROM {table} WHERE id = {id}");
let draft_data: DraftDataModel = sqlx::query_as(&sql).fetch_one(&self.pool).await?;
Ok(draft_data)
}
async fn is_draft_data_exist(&self, user_id: i32, name: &str) -> Result<bool, DbAccessError> {
let table = DraftDataColumn::Table.name();
let filter = format!(
"WHERE {} = '{}' AND {} = {}",
DraftDataColumn::Name.name(),
name,
DraftDataColumn::UserId.name(),
user_id
);
let sql = format!("SELECT COUNT(*) FROM {table} {filter}");
// log sql
debug!("draft data exist check sql: {}", sql);
let count: i64 = sqlx::query_scalar(&sql).fetch_one(&self.pool).await?;
Ok(count > 0)
}
async fn create_draft_data(
&self,
create: CreateDraftData,
) -> Result<DraftDataModel, DbAccessError> {
// 检查是否已存在
let exist = self
.is_draft_data_exist(create.user_id, &create.name)
.await?;
if exist {
return Err(DbAccessError::RowExist);
}
// 插入数据
let table = DraftDataColumn::Table.name();
let columns = format!(
"{}, {}, {}, {}, {}",
DraftDataColumn::Name.name(),
DraftDataColumn::DataType.name(),
DraftDataColumn::UserId.name(),
DraftDataColumn::Data.name(),
DraftDataColumn::DefaultReleaseDataId.name(),
);
let sql =
format!("INSERT INTO {table} ({columns}) VALUES ($1, $2, $3, $4, $5) RETURNING *",);
// log sql
debug!("create sql: {}", sql);
// 插入数据
let draft_data: DraftDataModel = sqlx::query_as(&sql)
.bind(create.name)
.bind(create.data_type as i32)
.bind(create.user_id)
.bind(create.data)
.bind(create.default_release_data_id)
.fetch_one(&self.pool)
.await?;
Ok(draft_data)
}
async fn update_draft_data_name(
&self,
id: i32,
name: &str,
) -> Result<DraftDataModel, DbAccessError> {
let table = DraftDataColumn::Table.name();
let name_column = DraftDataColumn::Name.name();
let updated_at_column = DraftDataColumn::UpdatedAt.name();
let id_column = DraftDataColumn::Id.name();
let sql = format!("UPDATE {table} SET {name_column} = $1, {updated_at_column} = 'now()' WHERE {id_column} = $2 RETURNING *",);
// log sql
debug!("update name sql: {}", sql);
let draft_data = sqlx::query_as(&sql)
.bind(name)
.bind(id)
.fetch_one(&self.pool)
.await?;
Ok(draft_data)
}
async fn update_draft_data_data(
&self,
id: i32,
data: &[u8],
) -> Result<DraftDataModel, DbAccessError> {
let table = DraftDataColumn::Table.name();
let data_column = DraftDataColumn::Data.name();
let updated_at_column = DraftDataColumn::UpdatedAt.name();
let id_column = DraftDataColumn::Id.name();
let sql = format!("UPDATE {table} SET {data_column} = $1, {updated_at_column} = 'now()' WHERE {id_column} = $2 RETURNING *",);
// log sql
debug!("update data sql: {}", sql);
let draft_data = sqlx::query_as(&sql)
.bind(data)
.bind(id)
.fetch_one(&self.pool)
.await?;
Ok(draft_data)
}
async fn delete_draft_data(&self, ids: &[i32]) -> Result<(), DbAccessError> {
let table = DraftDataColumn::Table.name();
let id_column = DraftDataColumn::Id.name();
let sql = format!("DELETE FROM {table} WHERE {id_column} = ANY($1)");
// log sql
debug!("delete sql: {}", sql);
sqlx::query(&sql).bind(ids).execute(&self.pool).await?;
Ok(())
}
async fn set_default_release_data_id(
&self,
draft_id: i32,
release_data_id: i32,
) -> Result<DraftDataModel, DbAccessError> {
let table = DraftDataColumn::Table.name();
let draft_id_column = DraftDataColumn::Id.name();
let release_data_id_column = DraftDataColumn::DefaultReleaseDataId.name();
let sql = format!(
"UPDATE {table} SET {release_data_id_column} = $1 WHERE {draft_id_column} = $2 RETURNING *",
);
// log sql
debug!("set default release data id sql: {}", sql);
let draft_data = sqlx::query_as(&sql)
.bind(release_data_id)
.bind(draft_id)
.fetch_one(&self.pool)
.await?;
Ok(draft_data)
}
async fn save_as_new_draft(
&self,
draft_id: i32,
name: &str,
user_id: i32,
) -> Result<DraftDataModel, DbAccessError> {
let draft_data = self.query_draft_data_by_id(draft_id).await?;
let create = CreateDraftData::new(
name,
DataType::try_from(draft_data.data_type).unwrap(),
user_id,
)
.with_data(draft_data.data.as_ref().unwrap());
self.create_draft_data(create).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use rtss_log::tracing::Level;
use sqlx::PgPool;
// You could also do `use foo_crate::MIGRATOR` and just refer to it as `MIGRATOR` here.
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn basic_use_test(pool: PgPool) -> Result<(), DbAccessError> {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtssDbAccessor::new(pool);
// 创建草稿数据测试
let res = accessor
.create_draft_data(CreateDraftData::new("test", DataType::Em, 10))
.await?;
println!("res: {:?}", res);
println!("res.created_at: {:?}", res.created_at.naive_local());
assert!(res.id > 0);
// 重复创建测试
let repeat_create_result = accessor
.create_draft_data(CreateDraftData::new("test", DataType::Em, 10))
.await;
if let Some(e) = repeat_create_result.err() {
match e {
DbAccessError::RowExist => {
println!("repeat create test pass");
}
_ => {
panic!("unexpected error: {:?}", e);
}
}
}
// query by id测试
let get_by_id = accessor.query_draft_data_by_id(res.id).await;
assert!(get_by_id.is_ok() && get_by_id.unwrap().name.eq("test"));
// update name测试
let get_by_id = accessor
.update_draft_data_name(res.id, "test update")
.await?;
assert!(get_by_id.name.eq("test update"));
// update data测试
let data = "tests".as_bytes();
let get_by_id = accessor.update_draft_data_data(res.id, data).await?;
println!("{:?}", get_by_id);
assert!(get_by_id.data.unwrap() == data);
// save as new draft测试
let new_draft = accessor.save_as_new_draft(res.id, "new draft", 11).await?;
assert_eq!(new_draft.name, "new draft");
assert_eq!(new_draft.user_id, 11);
assert_eq!(new_draft.data.unwrap(), data);
assert!(new_draft.updated_at > new_draft.created_at);
// delete测试
accessor.delete_draft_data(&[res.id, new_draft.id]).await?;
let get_by_id = accessor.query_draft_data_by_id(res.id).await;
if let Some(e) = get_by_id.err() {
match e {
DbAccessError::SqlxError(sqlx::Error::RowNotFound) => {
println!("delete test pass");
}
_ => {
panic!("delete test error: {:?}", e);
}
}
}
// 查询确认当前数据已删除
let page = accessor
.query_draft_data(
DraftDataQuery {
user_id: None,
name: None,
data_type: None,
},
PageQuery::new(1, 100),
)
.await?;
assert_eq!(page.total, 0);
// 分页查询测试
// 分四个user_id各插入5条数据
for i in 1..5 {
for j in 1..6 {
accessor
.create_draft_data(CreateDraftData::new(&format!("test{}", j), DataType::Em, i))
.await?;
}
}
let page = accessor
.query_draft_data(
DraftDataQuery {
user_id: Some(1),
name: Some("test".to_string()),
data_type: Some(DataType::Em),
},
PageQuery::new(1, 10),
)
.await?;
assert_eq!(page.total, 5);
let page = accessor
.query_draft_data(
DraftDataQuery {
user_id: None,
name: None,
data_type: None,
},
PageQuery::new(1, 100),
)
.await?;
assert_eq!(page.total, 20);
Ok(())
}
}

View File

@ -0,0 +1,19 @@
mod draft_data;
pub use draft_data::*;
mod release_data;
pub use release_data::*;
pub struct RtssDbAccessor {
pool: sqlx::PgPool,
}
impl RtssDbAccessor {
pub fn new(pool: sqlx::PgPool) -> Self {
RtssDbAccessor { pool }
}
}
pub async fn get_db_accessor(url: &str) -> RtssDbAccessor {
let pool = sqlx::PgPool::connect(url).await.expect("连接数据库失败");
RtssDbAccessor::new(pool)
}

View File

@ -0,0 +1,702 @@
use rtss_dto::common::DataType;
use sqlx::types::chrono;
use crate::{
common::{PageQuery, PageResult, Sort, SortOrder, TableColumn},
model::{
DraftDataModel, ReleaseDataColumn, ReleaseDataModel, ReleaseDataVersionColumn,
ReleaseDataVersionModel,
},
DbAccessError,
};
use super::{CreateDraftData, DraftDataAccessor, RtssDbAccessor};
#[allow(async_fn_in_trait)]
pub trait ReleaseDataAccessor {
/// 从草稿发布新release data和version并使用新version
async fn release_new_from_draft(
&self,
draft_id: i32,
name: &str,
description: &str,
) -> Result<(ReleaseDataModel, ReleaseDataVersionModel), DbAccessError>;
/// 从草稿发布到已有草稿默认的release data的新version并使用新version
async fn release_to_existing(
&self,
draft_id: i32,
description: &str,
) -> Result<(ReleaseDataModel, ReleaseDataVersionModel), DbAccessError>;
/// 分页查询发布数据列表
async fn query_release_data_list(
&self,
query: ReleaseDataQuery,
page: PageQuery,
) -> Result<PageResult<ReleaseDataModel>, DbAccessError>;
/// 检查name是否存在
async fn is_release_data_name_exist(&self, name: &str) -> Result<bool, DbAccessError>;
/// 查询发布数据
async fn query_release_data_by_id(
&self,
release_id: i32,
) -> Result<ReleaseDataModel, DbAccessError>;
/// 查询发布数据所有版本信息
async fn query_release_data_versions(
&self,
release_id: i32,
) -> Result<Vec<ReleaseDataVersionModel>, DbAccessError>;
/// 根据id查询发布版本数据
async fn query_release_data_version_by_id(
&self,
version_id: i32,
) -> Result<ReleaseDataVersionModel, DbAccessError>;
/// 查询发布数据详情
async fn query_release_data_with_used_version(
&self,
release_id: i32,
) -> Result<(ReleaseDataModel, ReleaseDataVersionModel), DbAccessError>;
/// 更新发布数据名称
async fn update_release_data_name(
&self,
release_id: i32,
name: &str,
) -> Result<ReleaseDataModel, DbAccessError>;
/// 上架/下架发布数据
async fn update_release_data_published(
&self,
release_id: i32,
is_published: bool,
) -> Result<ReleaseDataModel, DbAccessError>;
/// 设置在使用的版本
async fn set_used_version(
&self,
release_id: i32,
version_id: i32,
) -> Result<ReleaseDataModel, DbAccessError>;
/// 从指定的版本数据创建草稿
async fn create_draft_from_version(
&self,
version_id: i32,
user_id: i32,
) -> Result<DraftDataModel, DbAccessError>;
}
/// 草稿发布结果
pub struct ReleaseFromDraftResult {
pub release_id: i32,
pub version_id: i32,
}
/// 发布数据查询条件
pub struct ReleaseDataQuery {
pub name: Option<String>,
pub user_id: Option<i32>,
pub data_type: Option<rtss_dto::common::DataType>,
pub is_published: Option<bool>,
}
impl Default for ReleaseDataQuery {
fn default() -> Self {
Self::new()
}
}
impl ReleaseDataQuery {
pub fn new() -> Self {
Self {
name: None,
user_id: None,
data_type: None,
is_published: None,
}
}
pub fn with_name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
pub fn with_user_id(mut self, user_id: i32) -> Self {
self.user_id = Some(user_id);
self
}
pub fn with_data_type(mut self, data_type: rtss_dto::common::DataType) -> Self {
self.data_type = Some(data_type);
self
}
pub fn with_is_published(mut self, is_published: bool) -> Self {
self.is_published = Some(is_published);
self
}
pub fn build_filter(&self) -> String {
let mut filters = vec![];
let rd_name = ReleaseDataColumn::Name.name();
let rd_user_id = ReleaseDataColumn::UserId.name();
let rd_data_type = ReleaseDataColumn::DataType.name();
let rd_is_published = ReleaseDataColumn::IsPublished.name();
if let Some(name) = &self.name {
filters.push(format!("{rd_name} LIKE '%{name}%'"));
}
if let Some(user_id) = self.user_id {
filters.push(format!("{rd_user_id} = {user_id}"));
}
if let Some(data_type) = self.data_type {
filters.push(format!("{rd_data_type} = {}", data_type as i32));
}
if let Some(is_published) = self.is_published {
filters.push(format!("{rd_is_published} = {is_published}"));
}
if filters.is_empty() {
"".to_string()
} else {
format!("WHERE {}", filters.join(" AND "))
}
}
}
impl ReleaseDataAccessor for RtssDbAccessor {
async fn release_new_from_draft(
&self,
draft_id: i32,
name: &str,
description: &str,
) -> Result<(ReleaseDataModel, ReleaseDataVersionModel), DbAccessError> {
// 判断发布数据名称是否已存在
if self.is_release_data_name_exist(name).await? {
return Err(DbAccessError::DataError("发布数据名称已存在".to_string()));
}
// 开启事务
let mut tx = self.pool.begin().await?;
// 查询草稿数据
let draft = self.query_draft_data_by_id(draft_id).await?;
// 创建发布数据
let rd_table = ReleaseDataColumn::Table.name();
let rd_insert_columns = format!(
"({}, {}, {})",
ReleaseDataColumn::Name.name(),
ReleaseDataColumn::DataType.name(),
ReleaseDataColumn::UserId.name(),
);
let rd_insert_clause =
format!("INSERT INTO {rd_table} {rd_insert_columns} VALUES ($1, $2, $3) RETURNING *");
let mut rd = sqlx::query_as::<_, ReleaseDataModel>(&rd_insert_clause)
.bind(name)
.bind(draft.data_type as i32)
.bind(draft.user_id)
.fetch_one(&mut *tx)
.await?;
// 创建发布数据版本
let rdv_table = ReleaseDataVersionColumn::Table.name();
let rdv_insert_columns = format!(
"{}, {}, {}, {}",
ReleaseDataVersionColumn::ReleaseDataId.name(),
ReleaseDataVersionColumn::Data.name(),
ReleaseDataVersionColumn::Description.name(),
ReleaseDataVersionColumn::UserId.name(),
);
let rdv_insert_clause = format!(
"INSERT INTO {rdv_table} ({rdv_insert_columns}) VALUES ($1, $2, $3, $4) RETURNING *"
);
let rdv = sqlx::query_as::<_, ReleaseDataVersionModel>(&rdv_insert_clause)
.bind(rd.id)
.bind(draft.data)
.bind(description)
.bind(draft.user_id)
.fetch_one(&mut *tx)
.await?;
// 更新发布数据使用的版本
let used_version_id = ReleaseDataColumn::UsedVersionId.name();
let rd_id = ReleaseDataColumn::Id.name();
let rd_update_clause =
format!("UPDATE {rd_table} SET {used_version_id} = $1 WHERE {rd_id} = $2");
sqlx::query(&rd_update_clause)
.bind(rdv.id)
.bind(rd.id)
.execute(&mut *tx)
.await?;
// 成功后提交事务
tx.commit().await?;
// 更新草稿数据的默认发布数据id
self.set_default_release_data_id(draft_id, rd.id).await?;
rd.used_version_id = Some(rdv.id);
Ok((rd, rdv))
}
async fn release_to_existing(
&self,
draft_id: i32,
description: &str,
) -> Result<(ReleaseDataModel, ReleaseDataVersionModel), DbAccessError> {
// 查询草稿数据
let draft = self.query_draft_data_by_id(draft_id).await?;
// 判断草稿是否设置了默认发布数据
if draft.default_release_data_id.is_none() {
return Err(DbAccessError::DataError(
"草稿未设置默认发布数据".to_string(),
));
}
// 查询默认发布数据
let mut rd = self
.query_release_data_by_id(draft.default_release_data_id.unwrap())
.await?;
// 开启事务
let mut tx = self.pool.begin().await?;
// 创建发布数据版本
let rdv_table = ReleaseDataVersionColumn::Table.name();
let rdv_insert_columns = format!(
"{}, {}, {}, {}",
ReleaseDataVersionColumn::ReleaseDataId.name(),
ReleaseDataVersionColumn::Data.name(),
ReleaseDataVersionColumn::Description.name(),
ReleaseDataVersionColumn::UserId.name(),
);
let rdv_insert_clause = format!(
"INSERT INTO {rdv_table} ({rdv_insert_columns}) VALUES ($1, $2, $3, $4) RETURNING *"
);
let rdv = sqlx::query_as::<_, ReleaseDataVersionModel>(&rdv_insert_clause)
.bind(rd.id)
.bind(draft.data)
.bind(description)
.bind(draft.user_id)
.fetch_one(&mut *tx)
.await?;
// 更新发布数据使用的版本
let rd_table = ReleaseDataColumn::Table.name();
let used_version_id = ReleaseDataColumn::UsedVersionId.name();
let rd_updated_at = ReleaseDataColumn::UpdatedAt.name();
let rd_id = ReleaseDataColumn::Id.name();
let rd_update_clause =
format!("UPDATE {rd_table} SET {used_version_id} = $1, {rd_updated_at} = 'now()' WHERE {rd_id} = $2");
sqlx::query(&rd_update_clause)
.bind(rdv.id)
.bind(rd.id)
.execute(&mut *tx)
.await?;
// 成功后提交事务
tx.commit().await?;
rd.used_version_id = Some(rdv.id);
Ok((rd, rdv))
}
async fn query_release_data_list(
&self,
query: ReleaseDataQuery,
page: PageQuery,
) -> Result<PageResult<ReleaseDataModel>, DbAccessError> {
let rd_table = ReleaseDataColumn::Table.name();
let where_clause = query.build_filter();
let count_clause = format!("SELECT COUNT(*) FROM {rd_table} {where_clause}");
// 查询总数
let total = sqlx::query_scalar(&count_clause)
.fetch_one(&self.pool)
.await?;
if total == 0 {
return Ok(PageResult::new(0, vec![]));
}
let paging_clause = page.to_limit_clause();
let order_by = Sort::new(ReleaseDataColumn::UpdatedAt, SortOrder::Desc);
let order_by_clause = order_by.to_order_by_clause();
let query_clause =
format!("SELECT * FROM {rd_table} {where_clause} {order_by_clause} {paging_clause}",);
let data = sqlx::query_as::<_, ReleaseDataModel>(&query_clause)
.fetch_all(&self.pool)
.await?;
Ok(PageResult::new(total, data))
}
async fn is_release_data_name_exist(&self, name: &str) -> Result<bool, DbAccessError> {
let rd_table = ReleaseDataColumn::Table.name();
let rd_name = ReleaseDataColumn::Name.name();
let rd_query_clause = format!(
"SELECT COUNT(*) FROM {rd_table} WHERE {rd_name} = $1",
rd_table = rd_table,
rd_name = rd_name
);
let count: i64 = sqlx::query_scalar(&rd_query_clause)
.bind(name)
.fetch_one(&self.pool)
.await?;
Ok(count > 0)
}
async fn query_release_data_by_id(
&self,
release_id: i32,
) -> Result<ReleaseDataModel, DbAccessError> {
// 查询发布数据
let rd_table = ReleaseDataColumn::Table.name();
let rd_id = ReleaseDataColumn::Id.name();
let rd_query_clause = format!(
"SELECT * FROM {rd_table} WHERE {rd_id} = $1",
rd_table = rd_table,
rd_id = rd_id
);
let rd = sqlx::query_as::<_, ReleaseDataModel>(&rd_query_clause)
.bind(release_id)
.fetch_one(&self.pool)
.await?;
Ok(rd)
}
async fn query_release_data_versions(
&self,
release_id: i32,
) -> Result<Vec<ReleaseDataVersionModel>, DbAccessError> {
// 查询发布数据版本
let rdv_table = ReleaseDataVersionColumn::Table.name();
// 查询列除了data列
let rdv_columns = format!(
"{}, {}, {}, {}, {}, {}",
ReleaseDataVersionColumn::Id.name(),
ReleaseDataVersionColumn::ReleaseDataId.name(),
ReleaseDataVersionColumn::Version.name(),
ReleaseDataVersionColumn::Description.name(),
ReleaseDataVersionColumn::UserId.name(),
ReleaseDataVersionColumn::CreatedAt.name(),
);
let rdv_release_id = ReleaseDataVersionColumn::ReleaseDataId.name();
// 按版本号倒序排序
let sort = Sort::new(ReleaseDataVersionColumn::Version, SortOrder::Desc);
let order_by_clause = sort.to_order_by_clause();
let rdv_query_clause = format!(
"SELECT {rdv_columns} FROM {rdv_table} WHERE {rdv_release_id} = $1 {order_by_clause}",
);
let rdv = sqlx::query_as::<_, ReleaseDataVersionModel>(&rdv_query_clause)
.bind(release_id)
.fetch_all(&self.pool)
.await?;
Ok(rdv)
}
async fn query_release_data_version_by_id(
&self,
version_id: i32,
) -> Result<ReleaseDataVersionModel, DbAccessError> {
// 查询发布数据版本
let rdv_table = ReleaseDataVersionColumn::Table.name();
let rdv_id = ReleaseDataVersionColumn::Id.name();
let rdv_query_clause = format!(
"SELECT * FROM {rdv_table} WHERE {rdv_id} = $1",
rdv_table = rdv_table,
rdv_id = rdv_id
);
let rdv = sqlx::query_as::<_, ReleaseDataVersionModel>(&rdv_query_clause)
.bind(version_id)
.fetch_one(&self.pool)
.await?;
Ok(rdv)
}
async fn query_release_data_with_used_version(
&self,
release_id: i32,
) -> Result<(ReleaseDataModel, ReleaseDataVersionModel), DbAccessError> {
// 查询发布数据
let rd = self.query_release_data_by_id(release_id).await?;
if let Some(version_id) = rd.used_version_id {
// 查询发布数据使用的版本
let rdv = self.query_release_data_version_by_id(version_id).await?;
return Ok((rd, rdv));
}
Err(DbAccessError::DataError(
"发布数据未设置使用的版本".to_string(),
))
}
async fn update_release_data_name(
&self,
release_id: i32,
name: &str,
) -> Result<ReleaseDataModel, DbAccessError> {
let rd_table = ReleaseDataColumn::Table.name();
let rd_name = ReleaseDataColumn::Name.name();
let rd_updated_at = ReleaseDataColumn::UpdatedAt.name();
let rd_id = ReleaseDataColumn::Id.name();
let rd_update_clause = format!(
"UPDATE {rd_table} SET {rd_name} = $1, {rd_updated_at} = 'now()' WHERE {rd_id} = $2 RETURNING *",
rd_table = rd_table,
rd_name = rd_name,
rd_id = rd_id
);
let rd = sqlx::query_as(&rd_update_clause)
.bind(name)
.bind(release_id)
.fetch_one(&self.pool)
.await?;
Ok(rd)
}
async fn update_release_data_published(
&self,
release_id: i32,
is_published: bool,
) -> Result<ReleaseDataModel, DbAccessError> {
let rd_table = ReleaseDataColumn::Table.name();
let rd_is_published = ReleaseDataColumn::IsPublished.name();
let rd_updated_at = ReleaseDataColumn::UpdatedAt.name();
let rd_id = ReleaseDataColumn::Id.name();
let rd_update_clause = format!(
"UPDATE {rd_table} SET {rd_is_published} = $1, {rd_updated_at} = 'now()' WHERE {rd_id} = $2 RETURNING *",
rd_table = rd_table,
rd_is_published = rd_is_published,
rd_id = rd_id
);
let rd = sqlx::query_as(&rd_update_clause)
.bind(is_published)
.bind(release_id)
.fetch_one(&self.pool)
.await?;
Ok(rd)
}
async fn set_used_version(
&self,
release_id: i32,
version_id: i32,
) -> Result<ReleaseDataModel, DbAccessError> {
// 确定版本存在
let _ = self.query_release_data_version_by_id(version_id).await?;
// 更新发布数据使用的版本
let rd_table = ReleaseDataColumn::Table.name();
let rd_used_version_id = ReleaseDataColumn::UsedVersionId.name();
let rd_updated_at = ReleaseDataColumn::UpdatedAt.name();
let rd_id = ReleaseDataColumn::Id.name();
let rd_update_clause = format!(
"UPDATE {rd_table} SET {rd_used_version_id} = $1, {rd_updated_at} = 'now()' WHERE {rd_id} = $2 RETURNING *",
rd_table = rd_table,
rd_used_version_id = rd_used_version_id,
rd_id = rd_id
);
let rd = sqlx::query_as(&rd_update_clause)
.bind(version_id)
.bind(release_id)
.fetch_one(&self.pool)
.await?;
Ok(rd)
}
async fn create_draft_from_version(
&self,
version_id: i32,
user_id: i32,
) -> Result<DraftDataModel, DbAccessError> {
// 查询版本数据
let rdv = self.query_release_data_version_by_id(version_id).await?;
// 查询发布数据
let rd = self.query_release_data_by_id(rdv.release_data_id).await?;
// 检查草稿名称是否已存在
let name = format!("{}_{}", rd.name, chrono::Local::now().timestamp());
// 创建草稿数据
let draft = self
.create_draft_data(
CreateDraftData::new(&name, DataType::try_from(rd.data_type).unwrap(), user_id)
.with_data(&rdv.data)
.with_default_release_data_id(rd.id),
)
.await?;
Ok(draft)
}
}
#[cfg(test)]
mod tests {
use crate::{CreateDraftData, DraftDataAccessor, RtssDbAccessor};
use super::*;
use rtss_log::tracing::Level;
use sqlx::PgPool;
#[test]
fn test_release_query() {
// 测试构造发布数据查询条件,名称过滤
let expects = "WHERE name LIKE '%test%'";
let query_with_name = ReleaseDataQuery::new().with_name("test".to_string());
assert_eq!(query_with_name.build_filter(), expects);
// 测试构造发布数据查询条件用户id过滤
let expects = "WHERE user_id = 1";
let query_with_user_id = ReleaseDataQuery::new().with_user_id(1);
assert_eq!(query_with_user_id.build_filter(), expects);
// 测试构造发布数据查询条件,数据类型过滤
let expects = "WHERE data_type = 1";
let query_with_data_type = ReleaseDataQuery::new().with_data_type(DataType::Em);
assert_eq!(query_with_data_type.build_filter(), expects);
// 测试构造发布数据查询条件,是否上架过滤
let expects = "WHERE is_published = true";
let query_with_is_published = ReleaseDataQuery::new().with_is_published(true);
assert_eq!(query_with_is_published.build_filter(), expects);
// 测试构造发布数据查询条件,多条件过滤
let expects =
"WHERE name LIKE '%test%' AND user_id = 1 AND data_type = 1 AND is_published = true";
let query_with_all = ReleaseDataQuery::new()
.with_name("test".to_string())
.with_user_id(1)
.with_data_type(DataType::Em)
.with_is_published(true);
assert_eq!(query_with_all.build_filter(), expects);
}
// You could also do `use foo_crate::MIGRATOR` and just refer to it as `MIGRATOR` here.
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_basic_use(pool: PgPool) -> Result<(), DbAccessError> {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = RtssDbAccessor::new(pool);
// 创建草稿
let data = "test".as_bytes();
let draft = accessor
.create_draft_data(
CreateDraftData::new("test", rtss_dto::common::DataType::Em, 1).with_data(data),
)
.await?;
let name = "test_release";
let description = "test release";
// 发布到默认发布数据
let result = accessor.release_to_existing(draft.id, description).await;
assert!(result.is_err());
if let Some(e) = result.err() {
match e {
DbAccessError::DataError(_) => {
println!("发布到已存在发布数据:草稿未设置默认发布数据测试正确");
}
_ => panic!("发布到默认发布数据草稿未设置默认发布数据测试错误: {:?}", e),
}
}
// 发布新版本
let (release_data, version1) = accessor
.release_new_from_draft(draft.id, name, &description)
.await?;
assert_eq!(release_data.name, name);
// 检查使用版本是刚刚发布的版本
assert_eq!(release_data.used_version_id, Some(version1.id));
// 检查版本数据
assert_eq!(version1.data, data);
// 检查版本描述
assert_eq!(version1.description, description);
// 检查上架状态
assert_eq!(release_data.is_published, true);
// 检查草稿数据的默认发布数据id
let draft = accessor.query_draft_data_by_id(draft.id).await?;
assert_eq!(draft.default_release_data_id, Some(release_data.id));
println!("发布新版本测试成功");
// name重复检查
let exist = accessor.is_release_data_name_exist(name).await?;
assert_eq!(exist, true);
// 修改草稿数据
let data = "test2".as_bytes();
accessor.update_draft_data_data(draft.id, data).await?;
// 发布到已存在发布数据成功测试
let (release_data, version2) = accessor.release_to_existing(draft.id, description).await?;
assert_eq!(release_data.name, name);
// 检查使用版本是刚刚发布的版本
assert_eq!(release_data.used_version_id, Some(version2.id));
// 数据检查
assert_eq!(version2.data, data);
println!("发布到已存在发布数据测试成功");
// 测试更新发布数据名称
let new_name = "test_release_new";
let release_data = accessor
.update_release_data_name(release_data.id, new_name)
.await?;
assert_eq!(release_data.name, new_name);
assert!(release_data.updated_at > release_data.created_at);
println!("更新发布数据名称测试成功");
// 测试更新发布数据上架状态
let release_data = accessor
.update_release_data_published(release_data.id, false)
.await?;
assert_eq!(release_data.is_published, false);
assert!(release_data.updated_at > release_data.created_at);
println!("更新发布数据上架状态测试成功");
// 设置使用的版本
let release_data = accessor
.set_used_version(release_data.id, version1.id)
.await?;
assert_eq!(release_data.used_version_id, Some(version1.id));
assert!(release_data.updated_at > release_data.created_at);
println!("设置使用的版本测试成功");
// 查询发布数据所有版本
let versions = accessor
.query_release_data_versions(release_data.id)
.await?;
assert_eq!(versions.len(), 2);
println!("查询发布数据所有版本测试成功: {:?}", versions);
// 查询发布数据详情
let (release_data, used_version) = accessor
.query_release_data_with_used_version(release_data.id)
.await?;
assert_eq!(release_data.used_version_id.unwrap(), used_version.id);
println!("查询发布数据详情测试成功");
// 从版本创建草稿
let draft = accessor
.create_draft_from_version(used_version.id, 1)
.await?;
assert_eq!(draft.data, Some(used_version.data));
assert_eq!(draft.default_release_data_id, Some(release_data.id));
println!("从版本创建草稿测试成功");
// 构造分页查询所需发布数据4个人每人发布2个数据
for i in 2..6 {
let name = format!("test_{}", i);
let description = format!("test release {}", i);
let data = "test data".as_bytes();
let draft = accessor
.create_draft_data(
CreateDraftData::new(&name, rtss_dto::common::DataType::Em, i).with_data(data),
)
.await?;
let (release_data, _) = accessor
.release_new_from_draft(draft.id, &name, &description)
.await?;
assert_eq!(release_data.name, name);
let another_name = format!("{}_another", name);
let (release_data, _) = accessor
.release_new_from_draft(draft.id, &another_name, &description)
.await?;
assert_eq!(release_data.name, another_name);
}
// 分页查询发布数据,无条件
let query = ReleaseDataQuery::new();
let page = PageQuery::new(1, 100);
let page_result = accessor.query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 9);
// 分页查询发布数据,按是否上架过滤
let query = ReleaseDataQuery::new().with_is_published(true);
let page = PageQuery::new(1, 100);
let page_result = accessor.query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 8);
// 分页查询发布数据,按名称过滤
let query = ReleaseDataQuery::new().with_name("test_2".to_string());
let page = PageQuery::new(1, 10);
let page_result = accessor.query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 2);
// 分页查询发布数据按用户id过滤
let query = ReleaseDataQuery::new().with_user_id(2);
let page = PageQuery::new(1, 10);
let page_result = accessor.query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 2);
// 分页查询发布数据,按数据类型过滤
let query = ReleaseDataQuery::new().with_data_type(rtss_dto::common::DataType::Em);
let page = PageQuery::new(1, 10);
let page_result = accessor.query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 9);
println!("分页查询发布数据测试成功");
Ok(())
}
}

View File

@ -0,0 +1,13 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum DbAccessError {
#[error("未知的数据库访问错误")]
Unknown,
#[error("sqlx 错误: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("数据已存在")]
RowExist,
#[error("数据错误:{0}")]
DataError(String),
}

View File

@ -1,14 +1,27 @@
pub fn add(left: u64, right: u64) -> u64 {
left + right
pub mod common;
mod db_access;
mod error;
pub mod model;
pub use db_access::*;
pub use error::*;
use sqlx::pool::PoolOptions;
pub use sqlx::types::chrono::*;
pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations");
pub async fn run_migrations(url: &str) -> anyhow::Result<()> {
let pool = PoolOptions::<sqlx::Postgres>::new()
.acquire_timeout(std::time::Duration::from_secs(5))
.connect(url)
.await?;
MIGRATOR.run(&pool).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
pub mod prelude {
pub use crate::common::*;
pub use crate::db_access::*;
pub use crate::model::*;
pub use crate::DbAccessError;
}

306
crates/rtss_db/src/model.rs Normal file
View File

@ -0,0 +1,306 @@
use sqlx::types::chrono::{DateTime, Local};
use crate::common::TableColumn;
/// 数据库表 rtss.draft_data 列映射
#[derive(Debug)]
pub(crate) enum DraftDataColumn {
Table,
Id,
Name,
DataType,
Data,
DefaultReleaseDataId,
UserId,
CreatedAt,
UpdatedAt,
}
#[derive(Debug, sqlx::FromRow)]
pub struct DraftDataModel {
pub id: i32,
pub name: String,
pub data_type: i32,
#[sqlx(default)]
pub data: Option<Vec<u8>>,
#[sqlx(default)]
pub default_release_data_id: Option<i32>,
pub user_id: i32,
pub created_at: DateTime<Local>,
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.release_data 列映射
#[derive(Debug)]
pub(crate) enum ReleaseDataColumn {
Table,
Id,
Name,
DataType,
UsedVersionId,
UserId,
IsPublished,
#[allow(dead_code)]
CreatedAt,
UpdatedAt,
}
#[derive(Debug, sqlx::FromRow)]
pub struct ReleaseDataModel {
pub id: i32,
pub name: String,
pub data_type: i32,
pub used_version_id: Option<i32>,
pub user_id: i32,
pub is_published: bool,
pub created_at: DateTime<Local>,
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.release_data_version 列映射
#[derive(Debug)]
pub(crate) enum ReleaseDataVersionColumn {
Table,
Id,
ReleaseDataId,
Data,
Version,
Description,
UserId,
CreatedAt,
}
#[derive(Debug, sqlx::FromRow)]
pub struct ReleaseDataVersionModel {
pub id: i32,
pub release_data_id: i32,
#[sqlx(default)]
pub data: Vec<u8>,
pub version: i32,
pub description: String,
pub user_id: i32,
pub created_at: DateTime<Local>,
}
/// 数据库表 rtss.feature 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum FeatureColumn {
Table,
Id,
FeatureType,
Name,
Description,
IsPublished,
CreatorId,
UpdaterId,
CreatedAt,
UpdatedAt,
}
#[derive(Debug, sqlx::FromRow)]
pub struct FeatureModel {
pub id: i32,
pub feature_type: i32,
pub name: String,
pub description: String,
pub is_published: bool,
pub creator_id: i32,
pub updater_id: i32,
pub created_at: DateTime<Local>,
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.feature_release_data 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum FeatureReleaseDataColumn {
Table,
FeatureId,
ReleaseDataId,
}
#[derive(Debug, sqlx::FromRow)]
pub struct FeatureReleaseDataModel {
pub feature_id: i32,
pub release_data_id: i32,
}
/// 数据库表 rtss.feature_group 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum FeatureGroupColumn {
Table,
Id,
Name,
Description,
IsPublished,
CreatorId,
UpdaterId,
CreatedAt,
UpdatedAt,
}
#[derive(Debug, sqlx::FromRow)]
pub struct FeatureGroupModel {
pub id: i32,
pub name: String,
pub description: String,
pub is_published: bool,
pub creator_id: i32,
pub updater_id: i32,
pub created_at: DateTime<Local>,
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.feature_group_feature 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum FeatureGroupFeatureColumn {
Table,
FeatureGroupId,
FeatureId,
}
#[derive(Debug, sqlx::FromRow)]
pub struct FeatureGroupFeatureModel {
pub feature_group_id: i32,
pub feature_id: i32,
}
/// 数据库表 rtss.feature_config 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum FeatureConfigColumn {
Table,
Id,
UserId,
FeatureId,
Config,
CreatedAt,
UpdatedAt,
}
#[derive(Debug, sqlx::FromRow)]
pub struct FeatureConfigModel {
pub id: i32,
pub user_id: i32,
pub feature_id: i32,
pub config: Vec<u8>,
pub created_at: DateTime<Local>,
pub updated_at: DateTime<Local>,
}
impl TableColumn for DraftDataColumn {
fn name(&self) -> &str {
match self {
DraftDataColumn::Table => "rtss.draft_data",
DraftDataColumn::Id => "id",
DraftDataColumn::Name => "name",
DraftDataColumn::DataType => "data_type",
DraftDataColumn::Data => "data",
DraftDataColumn::DefaultReleaseDataId => "default_release_data_id",
DraftDataColumn::UserId => "user_id",
DraftDataColumn::CreatedAt => "created_at",
DraftDataColumn::UpdatedAt => "updated_at",
}
}
}
impl TableColumn for ReleaseDataColumn {
fn name(&self) -> &str {
match self {
ReleaseDataColumn::Table => "rtss.release_data",
ReleaseDataColumn::Id => "id",
ReleaseDataColumn::Name => "name",
ReleaseDataColumn::DataType => "data_type",
ReleaseDataColumn::UsedVersionId => "used_version_id",
ReleaseDataColumn::UserId => "user_id",
ReleaseDataColumn::IsPublished => "is_published",
ReleaseDataColumn::CreatedAt => "created_at",
ReleaseDataColumn::UpdatedAt => "updated_at",
}
}
}
impl TableColumn for ReleaseDataVersionColumn {
fn name(&self) -> &str {
match self {
ReleaseDataVersionColumn::Table => "rtss.release_data_version",
ReleaseDataVersionColumn::Id => "id",
ReleaseDataVersionColumn::ReleaseDataId => "release_data_id",
ReleaseDataVersionColumn::Data => "data",
ReleaseDataVersionColumn::Version => "version",
ReleaseDataVersionColumn::Description => "description",
ReleaseDataVersionColumn::UserId => "user_id",
ReleaseDataVersionColumn::CreatedAt => "created_at",
}
}
}
impl TableColumn for FeatureColumn {
fn name(&self) -> &str {
match self {
FeatureColumn::Table => "rtss.feature",
FeatureColumn::Id => "id",
FeatureColumn::FeatureType => "feature_type",
FeatureColumn::Name => "name",
FeatureColumn::Description => "description",
FeatureColumn::IsPublished => "is_published",
FeatureColumn::CreatorId => "creator_id",
FeatureColumn::UpdaterId => "updater_id",
FeatureColumn::CreatedAt => "created_at",
FeatureColumn::UpdatedAt => "updated_at",
}
}
}
impl TableColumn for FeatureReleaseDataColumn {
fn name(&self) -> &str {
match self {
FeatureReleaseDataColumn::Table => "rtss.feature_release_data",
FeatureReleaseDataColumn::FeatureId => "feature_id",
FeatureReleaseDataColumn::ReleaseDataId => "release_data_id",
}
}
}
impl TableColumn for FeatureGroupColumn {
fn name(&self) -> &str {
match self {
FeatureGroupColumn::Table => "rtss.feature_group",
FeatureGroupColumn::Id => "id",
FeatureGroupColumn::Name => "name",
FeatureGroupColumn::Description => "description",
FeatureGroupColumn::IsPublished => "is_published",
FeatureGroupColumn::CreatorId => "creator_id",
FeatureGroupColumn::UpdaterId => "updater_id",
FeatureGroupColumn::CreatedAt => "created_at",
FeatureGroupColumn::UpdatedAt => "updated_at",
}
}
}
impl TableColumn for FeatureGroupFeatureColumn {
fn name(&self) -> &str {
match self {
FeatureGroupFeatureColumn::Table => "rtss.feature_group_feature",
FeatureGroupFeatureColumn::FeatureGroupId => "feature_group_id",
FeatureGroupFeatureColumn::FeatureId => "feature_id",
}
}
}
impl TableColumn for FeatureConfigColumn {
fn name(&self) -> &str {
match self {
FeatureConfigColumn::Table => "rtss.feature_config",
FeatureConfigColumn::Id => "id",
FeatureConfigColumn::UserId => "user_id",
FeatureConfigColumn::FeatureId => "feature_id",
FeatureConfigColumn::Config => "config",
FeatureConfigColumn::CreatedAt => "created_at",
FeatureConfigColumn::UpdatedAt => "updated_at",
}
}
}

View File

@ -5,6 +5,8 @@ edition = "2021"
[dependencies]
prost = "0.13"
async-graphql = { version = "7.0.7", features = ["chrono"] }
sqlx = { workspace = true }
[build-dependencies]
prost-build = "0.13"

View File

@ -1,7 +1,8 @@
use std::process::Command;
use prost_build::Config;
fn main() {
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-changed=../../rtss-proto-msg/src/basic_signal_data.proto");
// println!("cargo:rerun-if-changed=build.rs");
#[cfg(target_os = "windows")]
{
std::env::set_var(
@ -11,9 +12,21 @@ fn main() {
}
Config::new()
.out_dir("src/pb")
.type_attribute("common.DataType", "#[derive(sqlx::Type)]")
.type_attribute("common.DataType", "#[derive(async_graphql::Enum)]")
.type_attribute("common.FeatureType", "#[derive(sqlx::Type)]")
.compile_protos(
&["../../rtss-proto-msg/src/basic_signal_data.proto"],
&[
"../../rtss-proto-msg/src/em_data.proto",
"../../rtss-proto-msg/src/common.proto",
],
&["../../rtss-proto-msg/src/"],
)
.unwrap();
// Run cargo fmt to format the generated code
Command::new("cargo")
.args(["fmt"])
.status()
.expect("Failed to run cargo fmt on rtss-dto");
}

View File

@ -14,9 +14,9 @@ mod tests {
#[test]
fn test_encode_decode() {
let point = basic_signal_data::Point { x: 1.0, y: 2.0 };
let point = common::Point { x: 1.0, y: 2.0 };
let encoded = point.encode_to_vec();
let decoded = basic_signal_data::Point::decode(encoded.as_ref()).unwrap();
let decoded = common::Point::decode(encoded.as_ref()).unwrap();
assert_eq!(point, decoded);
println!(
"point: {:?}, encoded: {:?}, decoded: {:?}",

View File

@ -1,12 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Storage {
#[prost(message, optional, tag = "1")]
pub canvas: ::core::option::Option<Canvas>,
#[prost(message, repeated, tag = "2")]
pub stations: ::prost::alloc::vec::Vec<Station>,
}
/// 画布数据
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@ -96,42 +88,91 @@ pub struct CommonInfo {
#[prost(message, repeated, tag = "4")]
pub children_transform: ::prost::alloc::vec::Vec<ChildTransform>,
}
/// 公里标
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KilometerMark {
/// 公里标坐标系
#[prost(string, tag = "1")]
pub coordinate: ::prost::alloc::string::String,
/// 公里标数值
#[prost(int64, tag = "2")]
pub value: i64,
/// 数据类型
#[derive(
sqlx::Type,
async_graphql::Enum,
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration,
)]
#[repr(i32)]
pub enum DataType {
/// 数据类型未知
Unknown = 0,
/// 电子地图数据
Em = 1,
/// IBP数据
Ibp = 2,
/// PSL数据
Psl = 3,
/// ISCS数据
Iscs = 4,
}
impl DataType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
DataType::Unknown => "DataType_Unknown",
DataType::Em => "DataType_Em",
DataType::Ibp => "DataType_Ibp",
DataType::Psl => "DataType_Psl",
DataType::Iscs => "DataType_Iscs",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"DataType_Unknown" => Some(Self::Unknown),
"DataType_Em" => Some(Self::Em),
"DataType_Ibp" => Some(Self::Ibp),
"DataType_Psl" => Some(Self::Psl),
"DataType_Iscs" => Some(Self::Iscs),
_ => None,
}
}
}
/// 功能特性类型
#[derive(
sqlx::Type, Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration,
)]
#[repr(i32)]
pub enum FeatureType {
/// 未知
Unknown = 0,
/// 仿真
Simulation = 1,
/// 运行图编制
RunPlan = 2,
}
impl FeatureType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
FeatureType::Unknown => "FeatureType_Unknown",
FeatureType::Simulation => "FeatureType_Simulation",
FeatureType::RunPlan => "FeatureType_RunPlan",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"FeatureType_Unknown" => Some(Self::Unknown),
"FeatureType_Simulation" => Some(Self::Simulation),
"FeatureType_RunPlan" => Some(Self::RunPlan),
_ => None,
}
}
/// 车站数据
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Station {
#[prost(message, optional, tag = "1")]
pub common: ::core::option::Option<CommonInfo>,
/// 车站名
#[prost(string, tag = "2")]
pub name: ::prost::alloc::string::String,
/// 车站站名
#[prost(string, tag = "3")]
pub zhan_name: ::prost::alloc::string::String,
/// 车站名拼音简写
#[prost(string, tag = "4")]
pub name_pinyin: ::prost::alloc::string::String,
/// 公里标
#[prost(message, optional, tag = "6")]
pub km: ::core::option::Option<KilometerMark>,
/// 是否集中站
#[prost(bool, tag = "10")]
pub concentration: bool,
/// 是否车辆段
#[prost(bool, tag = "11")]
pub depots: bool,
/// 集中站管理的车站-id
#[prost(uint32, repeated, tag = "13")]
pub manage_station_ids: ::prost::alloc::vec::Vec<u32>,
}

View File

@ -0,0 +1,49 @@
// This file is @generated by prost-build.
/// 电子地图数据
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Em {
#[prost(message, optional, tag = "1")]
pub canvas: ::core::option::Option<super::common::Canvas>,
#[prost(message, repeated, tag = "2")]
pub stations: ::prost::alloc::vec::Vec<Station>,
}
/// 公里标
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KilometerMark {
/// 公里标坐标系
#[prost(string, tag = "1")]
pub coordinate: ::prost::alloc::string::String,
/// 公里标数值
#[prost(int64, tag = "2")]
pub value: i64,
}
/// 车站数据
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Station {
#[prost(message, optional, tag = "1")]
pub common: ::core::option::Option<super::common::CommonInfo>,
/// 车站名
#[prost(string, tag = "2")]
pub name: ::prost::alloc::string::String,
/// 车站站名
#[prost(string, tag = "3")]
pub zhan_name: ::prost::alloc::string::String,
/// 车站名拼音简写
#[prost(string, tag = "4")]
pub name_pinyin: ::prost::alloc::string::String,
/// 公里标
#[prost(message, optional, tag = "6")]
pub km: ::core::option::Option<KilometerMark>,
/// 是否集中站
#[prost(bool, tag = "10")]
pub concentration: bool,
/// 是否车辆段
#[prost(bool, tag = "11")]
pub depots: bool,
/// 集中站管理的车站-id
#[prost(uint32, repeated, tag = "13")]
pub manage_station_ids: ::prost::alloc::vec::Vec<u32>,
}

View File

@ -1 +1,2 @@
pub mod basic_signal_data;
pub mod common;
pub mod em_data;

View File

@ -20,13 +20,19 @@ pub struct Logging {
impl Default for Logging {
fn default() -> Self {
Self {
filter: "wgpu=error,naga=warn".to_string(),
// 过滤器: like "naga=warn,axum=info"等
filter: "".to_string(),
level: Level::INFO,
}
}
}
impl Logging {
pub fn with_level(mut self, level: Level) -> Self {
self.level = level;
self
}
pub fn init(&self) {
let finished_subscriber;
let subscriber = Registry::default();

View File

@ -0,0 +1 @@
DROP SCHEMA rtss CASCADE;

View File

@ -0,0 +1,234 @@
-- 初始化数据库SCHEMA(所有轨道交通信号系统仿真的表、类型等都在rtss SCHEMA下)
CREATE SCHEMA rtss;
-- 创建草稿数据表
CREATE TABLE
rtss.draft_data (
id SERIAL PRIMARY KEY, -- id 自增主键
name VARCHAR(128) NOT NULL, -- 草稿名称
data_type INT NOT NULL, -- 数据类型
data BYTEA, -- 草稿数据
default_release_data_id INT NULL, -- 默认发布数据id
user_id INT NOT NULL, -- 创建用户id
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 更新时间
UNIQUE (name, user_id) -- 一个用户的草稿名称唯一
);
-- 创建草稿数据用户索引
CREATE INDEX ON rtss.draft_data (user_id);
-- 创建草稿数据类型索引
CREATE INDEX ON rtss.draft_data (data_type);
-- 注释草稿数据表
COMMENT ON TABLE rtss.draft_data IS '草稿数据表';
-- 注释草稿数据表字段
COMMENT ON COLUMN rtss.draft_data.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.draft_data.name IS '草稿名称';
COMMENT ON COLUMN rtss.draft_data.data_type IS '数据类型';
COMMENT ON COLUMN rtss.draft_data.data IS '草稿数据';
COMMENT ON COLUMN rtss.draft_data.user_id IS '创建用户id';
COMMENT ON COLUMN rtss.draft_data.created_at IS '创建时间';
COMMENT ON COLUMN rtss.draft_data.updated_at IS '更新时间';
-- 创建发布数据表
CREATE TABLE
rtss.release_data (
id SERIAL PRIMARY KEY, -- id 自增主键
name VARCHAR(128) NOT NULL UNIQUE, -- 发布数据名称(数据唯一标识)
data_type INT NOT NULL, -- 数据类型
used_version_id INT NULL, -- 使用的版本数据id
user_id INT NOT NULL, -- 发布/更新用户id
is_published BOOLEAN NOT NULL DEFAULT TRUE, -- 是否上架
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()' -- 更新时间
);
-- 注释发布数据表
COMMENT ON TABLE rtss.release_data IS '发布数据表';
-- 注释发布数据表字段
COMMENT ON COLUMN rtss.release_data.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.release_data.name IS '发布数据名称(数据唯一标识)';
COMMENT ON COLUMN rtss.release_data.data_type IS '数据类型';
COMMENT ON COLUMN rtss.release_data.used_version_id IS '使用的版本数据id';
COMMENT ON COLUMN rtss.release_data.user_id IS '发布/更新用户id';
COMMENT ON COLUMN rtss.release_data.is_published IS '是否上架';
COMMENT ON COLUMN rtss.release_data.created_at IS '创建时间';
COMMENT ON COLUMN rtss.release_data.updated_at IS '更新时间';
-- 创建发布数据版本表
CREATE TABLE
rtss.release_data_version (
id SERIAL PRIMARY KEY, -- id 自增主键
release_data_id INT NOT NULL, -- 发布数据id
data BYTEA NOT NULL, -- 数据
version SERIAL NOT NULL, -- 版本号
description TEXT NOT NULL, -- 版本描述
user_id INT NOT NULL, -- 发布用户id
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 创建时间
FOREIGN KEY (release_data_id) REFERENCES rtss.release_data (id) ON DELETE CASCADE
);
-- 创建发布数据当前版本外键
ALTER TABLE rtss.release_data ADD FOREIGN KEY (used_version_id) REFERENCES rtss.release_data_version (id) ON DELETE SET NULL;
-- 创建草稿数据默认发布数据外键
ALTER TABLE rtss.draft_data ADD FOREIGN KEY (default_release_data_id) REFERENCES rtss.release_data (id) ON DELETE SET NULL;
-- 注释发布数据版本表
COMMENT ON TABLE rtss.release_data_version IS '发布数据版本表';
-- 注释发布数据版本表字段
COMMENT ON COLUMN rtss.release_data_version.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.release_data_version.release_data_id IS '发布数据id';
COMMENT ON COLUMN rtss.release_data_version.data IS '数据';
COMMENT ON COLUMN rtss.release_data_version.version IS '版本号';
COMMENT ON COLUMN rtss.release_data_version.description IS '版本描述';
COMMENT ON COLUMN rtss.release_data_version.user_id IS '发布用户id';
COMMENT ON COLUMN rtss.release_data_version.created_at IS '创建时间';
-- 创建feature表
CREATE TABLE
rtss.feature (
id SERIAL PRIMARY KEY, -- id 自增主键
feature_type INT NOT NULL, -- feature类型
name VARCHAR(128) NOT NULL UNIQUE, -- feature名称
description TEXT NOT NULL, -- feature描述
is_published BOOLEAN NOT NULL DEFAULT TRUE, -- 是否上架
creator_id INT NOT NULL, -- 创建用户id
updater_id INT NOT NULL, -- 更新用户id
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()' -- 更新时间
);
-- 注释仿真feature表
COMMENT ON TABLE rtss.feature IS 'feature表';
-- 注释仿真feature表字段
COMMENT ON COLUMN rtss.feature.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.feature.feature_type IS 'feature类型';
COMMENT ON COLUMN rtss.feature.name IS 'feature名称';
COMMENT ON COLUMN rtss.feature.description IS 'feature描述';
COMMENT ON COLUMN rtss.feature.is_published IS '是否上架';
COMMENT ON COLUMN rtss.feature.creator_id IS '创建用户id';
COMMENT ON COLUMN rtss.feature.created_at IS '创建时间';
COMMENT ON COLUMN rtss.feature.updated_at IS '更新时间';
-- 创建仿真feature和发布数据关联表
CREATE TABLE
rtss.feature_release_data (
feature_id INT NOT NULL, -- 仿真feature id
release_data_id INT NOT NULL, -- 发布数据id
PRIMARY KEY (feature_id, release_data_id),
FOREIGN KEY (feature_id) REFERENCES rtss.feature (id) ON DELETE CASCADE,
FOREIGN KEY (release_data_id) REFERENCES rtss.release_data (id) ON DELETE CASCADE
);
-- 注释仿真feature和发布数据关联表
COMMENT ON TABLE rtss.feature_release_data IS '仿真feature和发布数据关联表';
-- 注释仿真feature和发布数据关联表字段
COMMENT ON COLUMN rtss.feature_release_data.feature_id IS '仿真feature id';
COMMENT ON COLUMN rtss.feature_release_data.release_data_id IS '发布数据id';
-- 创建feature group表
CREATE TABLE
rtss.feature_group (
id SERIAL PRIMARY KEY, -- id 自增主键
name VARCHAR(128) NOT NULL UNIQUE, -- feature group名称
description TEXT NOT NULL, -- feature group描述
is_published BOOLEAN NOT NULL DEFAULT TRUE, -- 是否上架
creator_id INT NOT NULL, -- 创建用户id
updater_id INT NOT NULL, -- 更新用户id
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()' -- 更新时间
);
-- 注释仿真feature group表
COMMENT ON TABLE rtss.feature_group IS 'feature group表';
-- 注释仿真feature group表字段
COMMENT ON COLUMN rtss.feature_group.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.feature_group.name IS 'feature group名称';
COMMENT ON COLUMN rtss.feature_group.description IS 'feature group描述';
COMMENT ON COLUMN rtss.feature_group.is_published IS '是否上架';
COMMENT ON COLUMN rtss.feature_group.creator_id IS '创建用户id';
COMMENT ON COLUMN rtss.feature_group.created_at IS '创建时间';
COMMENT ON COLUMN rtss.feature_group.updated_at IS '更新时间';
-- 创建feature group和feature关联表
CREATE TABLE
rtss.feature_group_feature (
feature_group_id INT NOT NULL, -- feature group id
feature_id INT NOT NULL, -- feature id
PRIMARY KEY (feature_id, feature_group_id),
FOREIGN KEY (feature_id) REFERENCES rtss.feature (id) ON DELETE CASCADE,
FOREIGN KEY (feature_group_id) REFERENCES rtss.feature_group (id) ON DELETE CASCADE
);
-- 注释仿真feature group和feature关联表
COMMENT ON TABLE rtss.feature_group_feature IS '仿真feature group和feature关联表';
-- 创建用户feature配置表
CREATE TABLE
rtss.feature_config (
id SERIAL PRIMARY KEY, -- id 自增主键
user_id INT NOT NULL, -- 用户id
feature_id INT NOT NULL, -- 仿真feature id
config BYTEA NOT NULL, -- 配置
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'now()', -- 更新时间
FOREIGN KEY (feature_id) REFERENCES rtss.feature (id) ON DELETE CASCADE
);
-- 注释用户feature配置表
COMMENT ON TABLE rtss.feature_config IS '用户feature配置表';
-- 注释用户feature配置表字段
COMMENT ON COLUMN rtss.feature_config.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.feature_config.user_id IS '用户id';
COMMENT ON COLUMN rtss.feature_config.feature_id IS '仿真feature id';
COMMENT ON COLUMN rtss.feature_config.config IS '配置';
COMMENT ON COLUMN rtss.feature_config.created_at IS '创建时间';
COMMENT ON COLUMN rtss.feature_config.updated_at IS '更新时间';

@ -1 +1 @@
Subproject commit 9dee9892e87b1dca12a1753077e64aca7fced4b8
Subproject commit 4e447beffa31c94b30bcee57a7cf229dcf01351f

84
src/app_config.rs Normal file
View File

@ -0,0 +1,84 @@
use std::env;
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct Server {
pub port: u16,
}
#[derive(Debug, Deserialize)]
#[allow(unused)]
pub struct Database {
pub url: String,
}
#[derive(Debug, Deserialize)]
#[allow(unused)]
pub struct Log {
level: String,
}
impl From<Log> for rtss_log::Logging {
fn from(log: Log) -> Self {
rtss_log::Logging {
level: log.level.parse().unwrap(),
..Default::default()
}
}
}
#[derive(Debug, Deserialize)]
#[allow(unused)]
pub struct AppConfig {
pub server: Server,
pub log: Log,
pub database: Database,
}
impl AppConfig {
pub fn new(dir: &str) -> Result<Self, ConfigError> {
let run_mode = env::var("RUN_MODE")
.unwrap_or_else(|_| "dev".into())
.trim()
.to_string();
println!("RUN_MODE: {}", run_mode);
// log 当前目录
// println!("Current dir: {:?}", std::env::current_dir().unwrap());
let s = Config::builder()
// Start off by merging in the "default" configuration file
.add_source(File::with_name(&format!("{dir}/default")))
// Add in a local configuration file
// This file shouldn't be checked in to git
.add_source(File::with_name(&format!("{dir}/local")).required(false))
// Add in the current environment file
// Default to 'dev' env
// Note that this file is _optional_
.add_source(File::with_name(&format!("{dir}/{run_mode}")).required(true))
// Add in settings from the environment (with a prefix of RTSS_SIM)
// Eg.. `APP_DEBUG=1 ./target/app` would set the `debug` key
.add_source(Environment::with_prefix("RTSS_SIM").separator("_"))
// You may also programmatically change settings
// .set_override("database.url", "postgres://")?
// build the configuration
.build()?;
// You can deserialize (and thus freeze) the entire configuration as
s.try_deserialize()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_app_config() {
std::env::set_var("RUN_MODE", "local_test");
let config = AppConfig::new("conf").unwrap();
println!("{:?}", config);
}
}

41
src/cmd.rs Normal file
View File

@ -0,0 +1,41 @@
use clap::Parser;
use enum_dispatch::enum_dispatch;
use crate::{app_config, db::DbSubCommand, CmdExecutor};
#[derive(Parser, Debug)]
#[command(name = "rtss-sim", version, author, about, long_about = None)]
pub struct Cmd {
#[command(subcommand)]
pub cmd: SubCommand,
}
#[derive(Parser, Debug)]
#[enum_dispatch(CmdExecutor)]
pub enum SubCommand {
#[command(name = "serve")]
Serve(ServerOpts),
#[command(name = "db", subcommand, about = "Database operations")]
Db(DbSubCommand),
}
#[derive(Parser, Debug)]
pub struct ServerOpts {
#[clap(short, long, required = false, default_value = "conf")]
config_path: String,
}
impl CmdExecutor for ServerOpts {
async fn execute(&self) -> anyhow::Result<()> {
println!("ServerOpts: {:?}", self);
let app_config =
app_config::AppConfig::new(&self.config_path).expect("Failed to load app config");
let log: rtss_log::Logging = app_config.log.into();
log.init();
rtss_api::serve(rtss_api::ServerConfig::new(
&app_config.database.url,
app_config.server.port,
))
.await
}
}

28
src/db.rs Normal file
View File

@ -0,0 +1,28 @@
use clap::Parser;
use enum_dispatch::enum_dispatch;
use crate::{app_config, CmdExecutor};
#[derive(Parser, Debug)]
#[enum_dispatch(CmdExecutor)]
pub enum DbSubCommand {
#[command(name = "migrate", about = "Migrate database")]
Migrate(MigrateOpts),
}
#[derive(Parser, Debug)]
pub struct MigrateOpts {
#[clap(long, required = false, default_value = "conf")]
config_path: String,
#[clap(long, required = false, default_value = "migrations")]
file_path: String,
}
impl CmdExecutor for MigrateOpts {
async fn execute(&self) -> anyhow::Result<()> {
let app_config =
app_config::AppConfig::new(&self.config_path).expect("Failed to load app config");
println!("{:?}", app_config);
rtss_db::run_migrations(&app_config.database.url).await
}
}

13
src/lib.rs Normal file
View File

@ -0,0 +1,13 @@
mod app_config;
mod cmd;
mod db;
pub use cmd::*;
use db::*;
use enum_dispatch::enum_dispatch;
#[allow(async_fn_in_trait)]
#[enum_dispatch]
pub trait CmdExecutor {
async fn execute(&self) -> anyhow::Result<()>;
}

View File

@ -1,11 +1,9 @@
use rtss_api::ServerConfig;
use clap::Parser;
use rtss_simulation::{Cmd, CmdExecutor};
#[tokio::main]
async fn main() {
rtss_log::Logging {
level: rtss_log::tracing::Level::DEBUG,
..Default::default()
}
.init();
rtss_api::serve(ServerConfig::default()).await;
async fn main() -> anyhow::Result<()> {
let cmd = Cmd::parse();
cmd.cmd.execute().await?;
Ok(())
}