From a7c23615d5eae0b228906f7ad15df813e8d0383e Mon Sep 17 00:00:00 2001 From: Albert <18556863020@163.com> Date: Tue, 7 Apr 2026 18:46:07 +0800 Subject: [PATCH 1/2] Add workflow plugin and applicant menu seeds. --- backend/plugin/workflow/README.md | 3 + backend/plugin/workflow/__init__.py | 1 + backend/plugin/workflow/api/__init__.py | 1 + backend/plugin/workflow/api/router.py | 16 + backend/plugin/workflow/api/v1/__init__.py | 1 + backend/plugin/workflow/api/v1/category.py | 37 + backend/plugin/workflow/api/v1/definition.py | 87 ++ backend/plugin/workflow/api/v1/instance.py | 59 ++ backend/plugin/workflow/api/v1/message.py | 26 + backend/plugin/workflow/api/v1/task.py | 38 + backend/plugin/workflow/crud/__init__.py | 1 + backend/plugin/workflow/crud/crud_category.py | 31 + .../plugin/workflow/crud/crud_definition.py | 39 + backend/plugin/workflow/crud/crud_instance.py | 27 + backend/plugin/workflow/crud/crud_message.py | 19 + backend/plugin/workflow/crud/crud_task.py | 16 + backend/plugin/workflow/engine/__init__.py | 1 + .../plugin/workflow/engine/instance_no_gen.py | 12 + backend/plugin/workflow/enums.py | 30 + backend/plugin/workflow/exception.py | 2 + backend/plugin/workflow/model/__init__.py | 5 + backend/plugin/workflow/model/category.py | 19 + backend/plugin/workflow/model/definition.py | 22 + backend/plugin/workflow/model/instance.py | 21 + backend/plugin/workflow/model/message.py | 20 + backend/plugin/workflow/model/task.py | 22 + backend/plugin/workflow/plugin.toml | 17 + backend/plugin/workflow/schema/__init__.py | 1 + backend/plugin/workflow/schema/category.py | 30 + backend/plugin/workflow/schema/definition.py | 53 ++ backend/plugin/workflow/schema/instance.py | 35 + backend/plugin/workflow/schema/message.py | 20 + backend/plugin/workflow/schema/task.py | 30 + backend/plugin/workflow/service/__init__.py | 1 + .../workflow/service/category_service.py | 32 + .../workflow/service/definition_service.py | 358 ++++++++ .../workflow/service/instance_service.py | 807 ++++++++++++++++++ .../workflow/service/message_service.py | 55 ++ .../plugin/workflow/service/task_service.py | 93 ++ backend/plugin/workflow/sql/mysql/destroy.sql | 4 + .../workflow/sql/mysql/destroy_snowflake.sql | 4 + backend/plugin/workflow/sql/mysql/init.sql | 21 + .../workflow/sql/mysql/init_snowflake.sql | 21 + .../workflow/sql/postgresql/destroy.sql | 4 + .../sql/postgresql/destroy_snowflake.sql | 4 + .../plugin/workflow/sql/postgresql/init.sql | 17 + .../sql/postgresql/init_snowflake.sql | 17 + backend/plugin/workflow/tasks/__init__.py | 1 + .../sql/mysql/init_snowflake_test_data.sql | 9 + backend/sql/mysql/init_test_data.sql | 9 + .../postgresql/init_snowflake_test_data.sql | 9 + backend/sql/postgresql/init_test_data.sql | 9 + 52 files changed, 2217 insertions(+) create mode 100644 backend/plugin/workflow/README.md create mode 100644 backend/plugin/workflow/__init__.py create mode 100644 backend/plugin/workflow/api/__init__.py create mode 100644 backend/plugin/workflow/api/router.py create mode 100644 backend/plugin/workflow/api/v1/__init__.py create mode 100644 backend/plugin/workflow/api/v1/category.py create mode 100644 backend/plugin/workflow/api/v1/definition.py create mode 100644 backend/plugin/workflow/api/v1/instance.py create mode 100644 backend/plugin/workflow/api/v1/message.py create mode 100644 backend/plugin/workflow/api/v1/task.py create mode 100644 backend/plugin/workflow/crud/__init__.py create mode 100644 backend/plugin/workflow/crud/crud_category.py create mode 100644 backend/plugin/workflow/crud/crud_definition.py create mode 100644 backend/plugin/workflow/crud/crud_instance.py create mode 100644 backend/plugin/workflow/crud/crud_message.py create mode 100644 backend/plugin/workflow/crud/crud_task.py create mode 100644 backend/plugin/workflow/engine/__init__.py create mode 100644 backend/plugin/workflow/engine/instance_no_gen.py create mode 100644 backend/plugin/workflow/enums.py create mode 100644 backend/plugin/workflow/exception.py create mode 100644 backend/plugin/workflow/model/__init__.py create mode 100644 backend/plugin/workflow/model/category.py create mode 100644 backend/plugin/workflow/model/definition.py create mode 100644 backend/plugin/workflow/model/instance.py create mode 100644 backend/plugin/workflow/model/message.py create mode 100644 backend/plugin/workflow/model/task.py create mode 100644 backend/plugin/workflow/plugin.toml create mode 100644 backend/plugin/workflow/schema/__init__.py create mode 100644 backend/plugin/workflow/schema/category.py create mode 100644 backend/plugin/workflow/schema/definition.py create mode 100644 backend/plugin/workflow/schema/instance.py create mode 100644 backend/plugin/workflow/schema/message.py create mode 100644 backend/plugin/workflow/schema/task.py create mode 100644 backend/plugin/workflow/service/__init__.py create mode 100644 backend/plugin/workflow/service/category_service.py create mode 100644 backend/plugin/workflow/service/definition_service.py create mode 100644 backend/plugin/workflow/service/instance_service.py create mode 100644 backend/plugin/workflow/service/message_service.py create mode 100644 backend/plugin/workflow/service/task_service.py create mode 100644 backend/plugin/workflow/sql/mysql/destroy.sql create mode 100644 backend/plugin/workflow/sql/mysql/destroy_snowflake.sql create mode 100644 backend/plugin/workflow/sql/mysql/init.sql create mode 100644 backend/plugin/workflow/sql/mysql/init_snowflake.sql create mode 100644 backend/plugin/workflow/sql/postgresql/destroy.sql create mode 100644 backend/plugin/workflow/sql/postgresql/destroy_snowflake.sql create mode 100644 backend/plugin/workflow/sql/postgresql/init.sql create mode 100644 backend/plugin/workflow/sql/postgresql/init_snowflake.sql create mode 100644 backend/plugin/workflow/tasks/__init__.py diff --git a/backend/plugin/workflow/README.md b/backend/plugin/workflow/README.md new file mode 100644 index 000000000..058afdd3b --- /dev/null +++ b/backend/plugin/workflow/README.md @@ -0,0 +1,3 @@ +# Workflow Plugin + +Enterprise workflow and approval plugin for FBA. diff --git a/backend/plugin/workflow/__init__.py b/backend/plugin/workflow/__init__.py new file mode 100644 index 000000000..c9ae912a6 --- /dev/null +++ b/backend/plugin/workflow/__init__.py @@ -0,0 +1 @@ +"""Workflow plugin.""" diff --git a/backend/plugin/workflow/api/__init__.py b/backend/plugin/workflow/api/__init__.py new file mode 100644 index 000000000..ab3cebc9c --- /dev/null +++ b/backend/plugin/workflow/api/__init__.py @@ -0,0 +1 @@ +"""Workflow api.""" diff --git a/backend/plugin/workflow/api/router.py b/backend/plugin/workflow/api/router.py new file mode 100644 index 000000000..101d46d5e --- /dev/null +++ b/backend/plugin/workflow/api/router.py @@ -0,0 +1,16 @@ +from fastapi import APIRouter + +from backend.core.conf import settings +from backend.plugin.workflow.api.v1.category import router as category_router +from backend.plugin.workflow.api.v1.definition import router as definition_router +from backend.plugin.workflow.api.v1.instance import router as instance_router +from backend.plugin.workflow.api.v1.message import router as message_router +from backend.plugin.workflow.api.v1.task import router as task_router + +v1 = APIRouter(prefix=f'{settings.FASTAPI_API_V1_PATH}/workflow') + +v1.include_router(category_router, prefix='/category', tags=['审批流分类']) +v1.include_router(definition_router, prefix='/definition', tags=['审批流定义']) +v1.include_router(instance_router, prefix='/instance', tags=['审批流实例']) +v1.include_router(task_router, prefix='/task', tags=['审批流任务']) +v1.include_router(message_router, prefix='/message', tags=['审批流消息']) diff --git a/backend/plugin/workflow/api/v1/__init__.py b/backend/plugin/workflow/api/v1/__init__.py new file mode 100644 index 000000000..d9252cfd3 --- /dev/null +++ b/backend/plugin/workflow/api/v1/__init__.py @@ -0,0 +1 @@ +"""Workflow v1 api.""" diff --git a/backend/plugin/workflow/api/v1/category.py b/backend/plugin/workflow/api/v1/category.py new file mode 100644 index 000000000..19a23dec5 --- /dev/null +++ b/backend/plugin/workflow/api/v1/category.py @@ -0,0 +1,37 @@ +from fastapi import APIRouter, Depends, Path + +from backend.common.pagination import DependsPagination, PageData +from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base +from backend.common.security.jwt import DependsJwtAuth +from backend.common.security.permission import RequestPermission +from backend.common.security.rbac import DependsRBAC +from backend.database.db import CurrentSession, CurrentSessionTransaction +from backend.plugin.workflow.schema.category import ( + CreateWorkflowCategoryParam, + GetWorkflowCategoryDetail, + UpdateWorkflowCategoryParam, +) +from backend.plugin.workflow.service.category_service import workflow_category_service + +router = APIRouter() + + +@router.get('', dependencies=[DependsJwtAuth, DependsPagination]) +async def get_categories(db: CurrentSession) -> ResponseSchemaModel[PageData[GetWorkflowCategoryDetail]]: + return response_base.success(data=await workflow_category_service.get_list(db=db)) + + +@router.post('', dependencies=[Depends(RequestPermission('workflow:category:add')), DependsRBAC]) +async def create_category(db: CurrentSessionTransaction, obj: CreateWorkflowCategoryParam) -> ResponseModel: + await workflow_category_service.create(db=db, obj=obj) + return response_base.success() + + +@router.put('/{pk}', dependencies=[Depends(RequestPermission('workflow:category:edit')), DependsRBAC]) +async def update_category( + db: CurrentSessionTransaction, + obj: UpdateWorkflowCategoryParam, + pk: int = Path(), +) -> ResponseModel: + count = await workflow_category_service.update(db=db, pk=pk, obj=obj) + return response_base.success() if count > 0 else response_base.fail() diff --git a/backend/plugin/workflow/api/v1/definition.py b/backend/plugin/workflow/api/v1/definition.py new file mode 100644 index 000000000..800d4d0e5 --- /dev/null +++ b/backend/plugin/workflow/api/v1/definition.py @@ -0,0 +1,87 @@ +from fastapi import APIRouter, Depends, Path, Query, Request + +from backend.common.pagination import DependsPagination, PageData +from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base +from backend.common.security.jwt import DependsJwtAuth +from backend.common.security.permission import RequestPermission +from backend.common.security.rbac import DependsRBAC +from backend.database.db import CurrentSession, CurrentSessionTransaction +from backend.plugin.workflow.schema.definition import ( + CreateWorkflowDefinitionParam, + GetWorkflowDefinitionDetail, + PreviewWorkflowFlowResponse, + UpdateWorkflowDefinitionParam, +) +from backend.plugin.workflow.service.definition_service import workflow_definition_service + +router = APIRouter() + + +@router.get('', dependencies=[Depends(RequestPermission('workflow:definition:list')), DependsRBAC, DependsPagination]) +async def get_definitions(db: CurrentSession) -> ResponseSchemaModel[PageData[GetWorkflowDefinitionDetail]]: + return response_base.success(data=await workflow_definition_service.get_list(db=db)) + + +@router.get('/available', dependencies=[DependsJwtAuth, DependsPagination]) +async def get_available_definitions(db: CurrentSession) -> ResponseSchemaModel[PageData[GetWorkflowDefinitionDetail]]: + return response_base.success(data=await workflow_definition_service.get_available_list(db=db)) + + +@router.get('/available/{pk}', dependencies=[DependsJwtAuth]) +async def get_available_definition(db: CurrentSession, pk: int = Path()) -> ResponseSchemaModel[GetWorkflowDefinitionDetail]: + return response_base.success(data=await workflow_definition_service.get_available(db=db, pk=pk)) + + +@router.get('/available/{pk}/preview-flow', dependencies=[DependsJwtAuth]) +async def preview_available_definition_flow( + db: CurrentSession, + request: Request, + pk: int = Path(), + form_data: str | None = Query(default=None), +) -> ResponseSchemaModel[PreviewWorkflowFlowResponse]: + definition = await workflow_definition_service.get_available(db=db, pk=pk) + payload = await workflow_definition_service.preview_flow( + db=db, + definition=definition, + user_id=request.user.id, + form_data=workflow_definition_service._parse_config(form_data), + ) + return response_base.success(data=payload) + + +@router.get('/{pk}', dependencies=[Depends(RequestPermission('workflow:definition:list')), DependsRBAC]) +async def get_definition(db: CurrentSession, pk: int = Path()) -> ResponseSchemaModel[GetWorkflowDefinitionDetail]: + return response_base.success(data=await workflow_definition_service.get(db=db, pk=pk)) + + +@router.get('/{pk}/preview-flow', dependencies=[Depends(RequestPermission('workflow:definition:list')), DependsRBAC]) +async def preview_definition_flow( + db: CurrentSession, + request: Request, + pk: int = Path(), + form_data: str | None = Query(default=None), +) -> ResponseSchemaModel[PreviewWorkflowFlowResponse]: + definition = await workflow_definition_service.get(db=db, pk=pk) + payload = await workflow_definition_service.preview_flow( + db=db, + definition=definition, + user_id=request.user.id, + form_data=workflow_definition_service._parse_config(form_data), + ) + return response_base.success(data=payload) + + +@router.post('', dependencies=[Depends(RequestPermission('workflow:definition:add')), DependsRBAC]) +async def create_definition(db: CurrentSessionTransaction, obj: CreateWorkflowDefinitionParam) -> ResponseModel: + await workflow_definition_service.create(db=db, obj=obj) + return response_base.success() + + +@router.put('/{pk}', dependencies=[Depends(RequestPermission('workflow:definition:edit')), DependsRBAC]) +async def update_definition( + db: CurrentSessionTransaction, + obj: UpdateWorkflowDefinitionParam, + pk: int = Path(), +) -> ResponseModel: + count = await workflow_definition_service.update(db=db, pk=pk, obj=obj) + return response_base.success() if count > 0 else response_base.fail() diff --git a/backend/plugin/workflow/api/v1/instance.py b/backend/plugin/workflow/api/v1/instance.py new file mode 100644 index 000000000..9f1880541 --- /dev/null +++ b/backend/plugin/workflow/api/v1/instance.py @@ -0,0 +1,59 @@ +from fastapi import APIRouter, Path, Request + +from backend.common.pagination import DependsPagination, PageData +from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base +from backend.common.security.jwt import DependsJwtAuth +from backend.database.db import CurrentSession, CurrentSessionTransaction +from backend.plugin.workflow.schema.instance import GetWorkflowInstanceDetail, StartWorkflowInstanceParam +from backend.plugin.workflow.service.instance_service import workflow_instance_service + +router = APIRouter() + + +@router.post('', dependencies=[DependsJwtAuth]) +async def start_instance( + db: CurrentSessionTransaction, + request: Request, + obj: StartWorkflowInstanceParam, +) -> ResponseSchemaModel[GetWorkflowInstanceDetail]: + instance = await workflow_instance_service.start(db=db, obj=obj, user_id=request.user.id) + return response_base.success(data=instance) + + +@router.get('/my-apply', dependencies=[DependsJwtAuth, DependsPagination]) +async def get_my_apply(db: CurrentSession, request: Request) -> ResponseSchemaModel[PageData[GetWorkflowInstanceDetail]]: + return response_base.success(data=await workflow_instance_service.get_my_apply(db=db, user_id=request.user.id)) + + +@router.get('/my-todo', dependencies=[DependsJwtAuth, DependsPagination]) +async def get_my_todo(db: CurrentSession, request: Request) -> ResponseSchemaModel[PageData[GetWorkflowInstanceDetail]]: + return response_base.success(data=await workflow_instance_service.get_my_todo(db=db, user_id=request.user.id)) + + +@router.get('/todo-count', dependencies=[DependsJwtAuth]) +async def get_todo_count(db: CurrentSession, request: Request) -> ResponseSchemaModel[int]: + return response_base.success(data=await workflow_instance_service.get_todo_count(db=db, user_id=request.user.id)) + + +@router.get('/{pk}', dependencies=[DependsJwtAuth]) +async def get_instance(db: CurrentSession, request: Request, pk: int = Path()) -> ResponseSchemaModel[GetWorkflowInstanceDetail]: + return response_base.success(data=await workflow_instance_service.get_detail(db=db, pk=pk, user_id=request.user.id)) + + +@router.post('/{pk}/withdraw', dependencies=[DependsJwtAuth]) +async def withdraw_instance( + db: CurrentSessionTransaction, + request: Request, + pk: int = Path(), +) -> ResponseSchemaModel[GetWorkflowInstanceDetail]: + return response_base.success(data=await workflow_instance_service.withdraw(db=db, pk=pk, user_id=request.user.id)) + + +@router.post('/{pk}/urge', dependencies=[DependsJwtAuth]) +async def urge_instance( + db: CurrentSessionTransaction, + request: Request, + pk: int = Path(), +) -> ResponseModel: + await workflow_instance_service.urge(db=db, pk=pk, user_id=request.user.id) + return response_base.success() diff --git a/backend/plugin/workflow/api/v1/message.py b/backend/plugin/workflow/api/v1/message.py new file mode 100644 index 000000000..9afa21b70 --- /dev/null +++ b/backend/plugin/workflow/api/v1/message.py @@ -0,0 +1,26 @@ +from fastapi import APIRouter, Path, Request + +from backend.common.pagination import DependsPagination, PageData +from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base +from backend.common.security.jwt import DependsJwtAuth +from backend.database.db import CurrentSession, CurrentSessionTransaction +from backend.plugin.workflow.schema.message import GetWorkflowMessageDetail +from backend.plugin.workflow.service.message_service import workflow_message_service + +router = APIRouter() + + +@router.get('', dependencies=[DependsJwtAuth, DependsPagination]) +async def get_messages(db: CurrentSession, request: Request) -> ResponseSchemaModel[PageData[GetWorkflowMessageDetail]]: + return response_base.success(data=await workflow_message_service.get_list(db=db, user_id=request.user.id)) + + +@router.get('/unread-count', dependencies=[DependsJwtAuth]) +async def get_unread_count(db: CurrentSession, request: Request) -> ResponseSchemaModel[int]: + return response_base.success(data=await workflow_message_service.unread_count(db=db, user_id=request.user.id)) + + +@router.put('/{pk}/read', dependencies=[DependsJwtAuth]) +async def mark_read(db: CurrentSessionTransaction, request: Request, pk: int = Path()) -> ResponseModel: + count = await workflow_message_service.mark_read(db=db, pk=pk, user_id=request.user.id) + return response_base.success() if count > 0 else response_base.fail() diff --git a/backend/plugin/workflow/api/v1/task.py b/backend/plugin/workflow/api/v1/task.py new file mode 100644 index 000000000..f41c4735d --- /dev/null +++ b/backend/plugin/workflow/api/v1/task.py @@ -0,0 +1,38 @@ +from fastapi import APIRouter, Path, Request + +from backend.common.response.response_schema import ResponseSchemaModel, response_base +from backend.common.security.jwt import DependsJwtAuth +from backend.database.db import CurrentSession, CurrentSessionTransaction +from backend.plugin.workflow.schema.task import ( + ApproveWorkflowTaskParam, + GetWorkflowTaskDetail, + RejectWorkflowTaskParam, +) +from backend.plugin.workflow.service.task_service import workflow_task_service + +router = APIRouter() + + +@router.get('/{pk}', dependencies=[DependsJwtAuth]) +async def get_task(db: CurrentSession, request: Request, pk: int = Path()) -> ResponseSchemaModel[GetWorkflowTaskDetail]: + return response_base.success(data=await workflow_task_service.get(db=db, pk=pk, user_id=request.user.id)) + + +@router.post('/{pk}/approve', dependencies=[DependsJwtAuth]) +async def approve_task( + db: CurrentSessionTransaction, + request: Request, + obj: ApproveWorkflowTaskParam, + pk: int = Path(), +) -> ResponseSchemaModel[GetWorkflowTaskDetail]: + return response_base.success(data=await workflow_task_service.approve(db=db, pk=pk, user_id=request.user.id, obj=obj)) + + +@router.post('/{pk}/reject', dependencies=[DependsJwtAuth]) +async def reject_task( + db: CurrentSessionTransaction, + request: Request, + obj: RejectWorkflowTaskParam, + pk: int = Path(), +) -> ResponseSchemaModel[GetWorkflowTaskDetail]: + return response_base.success(data=await workflow_task_service.reject(db=db, pk=pk, user_id=request.user.id, obj=obj)) diff --git a/backend/plugin/workflow/crud/__init__.py b/backend/plugin/workflow/crud/__init__.py new file mode 100644 index 000000000..42e2702c9 --- /dev/null +++ b/backend/plugin/workflow/crud/__init__.py @@ -0,0 +1 @@ +"""Workflow crud package.""" diff --git a/backend/plugin/workflow/crud/crud_category.py b/backend/plugin/workflow/crud/crud_category.py new file mode 100644 index 000000000..659268de7 --- /dev/null +++ b/backend/plugin/workflow/crud/crud_category.py @@ -0,0 +1,31 @@ +from collections.abc import Sequence + +from sqlalchemy import Select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.plugin.workflow.model import WorkflowCategory +from backend.plugin.workflow.schema.category import CreateWorkflowCategoryParam, UpdateWorkflowCategoryParam + + +class CRUDWorkflowCategory(CRUDPlus[WorkflowCategory]): + async def get(self, db: AsyncSession, pk: int) -> WorkflowCategory | None: + return await self.select_model(db, pk) + + async def get_all(self, db: AsyncSession) -> Sequence[WorkflowCategory]: + return await self.select_models_order(db, 'sort', 'asc') + + async def get_select(self) -> Select: + return await self.select_order('sort', 'asc') + + async def get_by_code(self, db: AsyncSession, code: str) -> WorkflowCategory | None: + return await self.select_model_by_column(db, code=code) + + async def create(self, db: AsyncSession, obj: CreateWorkflowCategoryParam) -> None: + await self.create_model(db, obj) + + async def update(self, db: AsyncSession, pk: int, obj: UpdateWorkflowCategoryParam) -> int: + return await self.update_model(db, pk, obj) + + +workflow_category_dao: CRUDWorkflowCategory = CRUDWorkflowCategory(WorkflowCategory) diff --git a/backend/plugin/workflow/crud/crud_definition.py b/backend/plugin/workflow/crud/crud_definition.py new file mode 100644 index 000000000..76aa33748 --- /dev/null +++ b/backend/plugin/workflow/crud/crud_definition.py @@ -0,0 +1,39 @@ +from collections.abc import Sequence +from typing import Any + +from sqlalchemy import Select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.plugin.workflow.model import WorkflowDefinition +from backend.plugin.workflow.schema.definition import CreateWorkflowDefinitionParam, UpdateWorkflowDefinitionParam + + +class CRUDWorkflowDefinition(CRUDPlus[WorkflowDefinition]): + async def get(self, db: AsyncSession, pk: int) -> WorkflowDefinition | None: + return await self.select_model(db, pk) + + async def get_available(self, db: AsyncSession, pk: int) -> WorkflowDefinition | None: + return await self.select_model_by_column(db, id=pk, status=1) + + async def get_all(self, db: AsyncSession) -> Sequence[WorkflowDefinition]: + return await self.select_models_order(db, 'id', 'desc') + + async def get_select(self) -> Select: + return await self.select_order('id', 'desc') + + async def get_available_select(self) -> Select: + return await self.select_order('id', 'desc', status=1) + + async def get_by_code(self, db: AsyncSession, code: str) -> WorkflowDefinition | None: + return await self.select_model_by_column(db, code=code) + + async def create(self, db: AsyncSession, obj: CreateWorkflowDefinitionParam | dict[str, Any]) -> None: + payload = obj if isinstance(obj, dict) else obj.model_dump() + db.add(self.model(**payload)) + + async def update(self, db: AsyncSession, pk: int, obj: UpdateWorkflowDefinitionParam | dict[str, Any]) -> int: + return await self.update_model(db, pk, obj) + + +workflow_definition_dao: CRUDWorkflowDefinition = CRUDWorkflowDefinition(WorkflowDefinition) diff --git a/backend/plugin/workflow/crud/crud_instance.py b/backend/plugin/workflow/crud/crud_instance.py new file mode 100644 index 000000000..512396275 --- /dev/null +++ b/backend/plugin/workflow/crud/crud_instance.py @@ -0,0 +1,27 @@ +from sqlalchemy import Select, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.plugin.workflow.model import WorkflowInstance + + +class CRUDWorkflowInstance(CRUDPlus[WorkflowInstance]): + async def get(self, db: AsyncSession, pk: int) -> WorkflowInstance | None: + return await self.select_model(db, pk) + + async def get_select_by_initiator(self, initiator_id: int) -> Select: + return await self.select_order('id', 'desc', initiator_id=initiator_id) + + async def get_select_todo(self, assignee_id: int) -> Select: + from backend.plugin.workflow.model import WorkflowTask + + instance_ids_stmt = select(WorkflowTask.instance_id).where( + WorkflowTask.assignee_id == assignee_id, + WorkflowTask.status == 'PENDING', + ) + return select(WorkflowInstance).where( + WorkflowInstance.id.in_(instance_ids_stmt) + ).order_by(WorkflowInstance.id.desc()) + + +workflow_instance_dao: CRUDWorkflowInstance = CRUDWorkflowInstance(WorkflowInstance) diff --git a/backend/plugin/workflow/crud/crud_message.py b/backend/plugin/workflow/crud/crud_message.py new file mode 100644 index 000000000..3710df482 --- /dev/null +++ b/backend/plugin/workflow/crud/crud_message.py @@ -0,0 +1,19 @@ +from sqlalchemy import Select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.plugin.workflow.model import WorkflowMessage + + +class CRUDWorkflowMessage(CRUDPlus[WorkflowMessage]): + async def get(self, db: AsyncSession, pk: int) -> WorkflowMessage | None: + return await self.select_model(db, pk) + + async def get_select_by_receiver(self, receiver_id: int) -> Select: + return await self.select_order('id', 'desc', receiver_id=receiver_id) + + async def get_select_by_instance(self, instance_id: int) -> Select: + return await self.select_order('id', 'desc', instance_id=instance_id) + + +workflow_message_dao: CRUDWorkflowMessage = CRUDWorkflowMessage(WorkflowMessage) diff --git a/backend/plugin/workflow/crud/crud_task.py b/backend/plugin/workflow/crud/crud_task.py new file mode 100644 index 000000000..7e1a3c842 --- /dev/null +++ b/backend/plugin/workflow/crud/crud_task.py @@ -0,0 +1,16 @@ +from sqlalchemy import Select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy_crud_plus import CRUDPlus + +from backend.plugin.workflow.model import WorkflowTask + + +class CRUDWorkflowTask(CRUDPlus[WorkflowTask]): + async def get(self, db: AsyncSession, pk: int) -> WorkflowTask | None: + return await self.select_model(db, pk) + + async def get_select_by_assignee(self, assignee_id: int) -> Select: + return await self.select_order('id', 'desc', assignee_id=assignee_id) + + +workflow_task_dao: CRUDWorkflowTask = CRUDWorkflowTask(WorkflowTask) diff --git a/backend/plugin/workflow/engine/__init__.py b/backend/plugin/workflow/engine/__init__.py new file mode 100644 index 000000000..ff0c8dc88 --- /dev/null +++ b/backend/plugin/workflow/engine/__init__.py @@ -0,0 +1 @@ +"""Workflow engine package.""" diff --git a/backend/plugin/workflow/engine/instance_no_gen.py b/backend/plugin/workflow/engine/instance_no_gen.py new file mode 100644 index 000000000..d3e287feb --- /dev/null +++ b/backend/plugin/workflow/engine/instance_no_gen.py @@ -0,0 +1,12 @@ +from backend.core.conf import settings +from backend.utils.timezone import timezone + + +class WorkflowInstanceNoGenerator: + @staticmethod + def generate() -> str: + prefix = getattr(settings, 'WORKFLOW_INSTANCE_NO_PREFIX', 'WF') + return f"{prefix}{timezone.now().strftime('%Y%m%d%H%M%S%f')[:-3]}" + + +instance_no_generator = WorkflowInstanceNoGenerator() diff --git a/backend/plugin/workflow/enums.py b/backend/plugin/workflow/enums.py new file mode 100644 index 000000000..8492dbea3 --- /dev/null +++ b/backend/plugin/workflow/enums.py @@ -0,0 +1,30 @@ +from enum import Enum + + +class WorkflowDefinitionStatus(int, Enum): + draft = 0 + published = 1 + disabled = 2 + + +class WorkflowInstanceStatus(str, Enum): + draft = 'DRAFT' + running = 'RUNNING' + approved = 'APPROVED' + rejected = 'REJECTED' + withdrawn = 'WITHDRAWN' + cancelled = 'CANCELLED' + + +class WorkflowTaskStatus(str, Enum): + pending = 'PENDING' + approved = 'APPROVED' + rejected = 'REJECTED' + cancelled = 'CANCELLED' + + +class WorkflowMessageType(str, Enum): + pending_approval = 'PENDING_APPROVAL' + approved = 'APPROVED' + rejected = 'REJECTED' + withdrawn = 'WITHDRAWN' diff --git a/backend/plugin/workflow/exception.py b/backend/plugin/workflow/exception.py new file mode 100644 index 000000000..a705e012d --- /dev/null +++ b/backend/plugin/workflow/exception.py @@ -0,0 +1,2 @@ +class WorkflowError(Exception): + """Workflow base exception.""" diff --git a/backend/plugin/workflow/model/__init__.py b/backend/plugin/workflow/model/__init__.py new file mode 100644 index 000000000..0e73d6988 --- /dev/null +++ b/backend/plugin/workflow/model/__init__.py @@ -0,0 +1,5 @@ +from backend.plugin.workflow.model.category import WorkflowCategory as WorkflowCategory +from backend.plugin.workflow.model.definition import WorkflowDefinition as WorkflowDefinition +from backend.plugin.workflow.model.instance import WorkflowInstance as WorkflowInstance +from backend.plugin.workflow.model.message import WorkflowMessage as WorkflowMessage +from backend.plugin.workflow.model.task import WorkflowTask as WorkflowTask diff --git a/backend/plugin/workflow/model/category.py b/backend/plugin/workflow/model/category.py new file mode 100644 index 000000000..15abb8b9a --- /dev/null +++ b/backend/plugin/workflow/model/category.py @@ -0,0 +1,19 @@ +import sqlalchemy as sa + +from sqlalchemy.orm import Mapped, mapped_column + +from backend.common.model import Base, id_key + + +class WorkflowCategory(Base): + """审批流分类表""" + + __tablename__ = 'workflow_category' + + id: Mapped[id_key] = mapped_column(init=False) + name: Mapped[str] = mapped_column(sa.String(64), comment='分类名称') + code: Mapped[str] = mapped_column(sa.String(64), unique=True, index=True, comment='分类编码') + icon: Mapped[str | None] = mapped_column(sa.String(128), default=None, comment='分类图标') + sort: Mapped[int] = mapped_column(default=0, comment='排序') + status: Mapped[int] = mapped_column(default=1, comment='状态(0停用 1启用)') + remark: Mapped[str | None] = mapped_column(sa.String(255), default=None, comment='备注') diff --git a/backend/plugin/workflow/model/definition.py b/backend/plugin/workflow/model/definition.py new file mode 100644 index 000000000..6fd044d96 --- /dev/null +++ b/backend/plugin/workflow/model/definition.py @@ -0,0 +1,22 @@ +import sqlalchemy as sa + +from sqlalchemy.orm import Mapped, mapped_column + +from backend.common.model import Base, UniversalText, id_key + + +class WorkflowDefinition(Base): + """审批流定义表""" + + __tablename__ = 'workflow_definition' + + id: Mapped[id_key] = mapped_column(init=False) + name: Mapped[str] = mapped_column(sa.String(100), comment='流程名称') + code: Mapped[str] = mapped_column(sa.String(100), unique=True, index=True, comment='流程编码') + category_id: Mapped[int | None] = mapped_column(sa.BigInteger, index=True, default=None, comment='分类ID') + description: Mapped[str | None] = mapped_column(UniversalText, default=None, comment='流程描述') + form_config: Mapped[str | None] = mapped_column(UniversalText, default=None, comment='表单配置JSON') + flow_config: Mapped[str | None] = mapped_column(UniversalText, default=None, comment='流程配置JSON') + status: Mapped[int] = mapped_column(default=0, comment='状态(0草稿 1发布 2停用)') + allow_withdraw: Mapped[bool] = mapped_column(default=True, comment='是否允许撤回') + allow_urge: Mapped[bool] = mapped_column(default=True, comment='是否允许催办') diff --git a/backend/plugin/workflow/model/instance.py b/backend/plugin/workflow/model/instance.py new file mode 100644 index 000000000..0ba88d108 --- /dev/null +++ b/backend/plugin/workflow/model/instance.py @@ -0,0 +1,21 @@ +import sqlalchemy as sa + +from sqlalchemy.orm import Mapped, mapped_column + +from backend.common.model import Base, UniversalText, id_key + + +class WorkflowInstance(Base): + """审批流实例表""" + + __tablename__ = 'workflow_instance' + + id: Mapped[id_key] = mapped_column(init=False) + instance_no: Mapped[str] = mapped_column(sa.String(50), unique=True, index=True, comment='实例编号') + definition_id: Mapped[int] = mapped_column(sa.BigInteger, index=True, comment='流程定义ID') + title: Mapped[str] = mapped_column(sa.String(200), comment='标题') + initiator_id: Mapped[int] = mapped_column(sa.BigInteger, index=True, comment='发起人ID') + status: Mapped[str] = mapped_column(sa.String(20), default='RUNNING', comment='实例状态') + current_task_id: Mapped[int | None] = mapped_column(sa.BigInteger, index=True, default=None, comment='当前任务ID') + form_data: Mapped[str | None] = mapped_column(UniversalText, default=None, comment='表单数据JSON') + remark: Mapped[str | None] = mapped_column(UniversalText, default=None, comment='备注') diff --git a/backend/plugin/workflow/model/message.py b/backend/plugin/workflow/model/message.py new file mode 100644 index 000000000..154221918 --- /dev/null +++ b/backend/plugin/workflow/model/message.py @@ -0,0 +1,20 @@ +import sqlalchemy as sa + +from sqlalchemy.orm import Mapped, mapped_column + +from backend.common.model import Base, UniversalText, id_key + + +class WorkflowMessage(Base): + """审批流消息表""" + + __tablename__ = 'workflow_message' + + id: Mapped[id_key] = mapped_column(init=False) + receiver_id: Mapped[int] = mapped_column(sa.BigInteger, index=True, comment='接收人ID') + message_type: Mapped[str] = mapped_column(sa.String(40), comment='消息类型') + title: Mapped[str] = mapped_column(sa.String(200), comment='标题') + content: Mapped[str] = mapped_column(UniversalText, comment='内容') + instance_id: Mapped[int | None] = mapped_column(sa.BigInteger, index=True, default=None, comment='实例ID') + task_id: Mapped[int | None] = mapped_column(sa.BigInteger, index=True, default=None, comment='任务ID') + is_read: Mapped[bool] = mapped_column(default=False, comment='是否已读') diff --git a/backend/plugin/workflow/model/task.py b/backend/plugin/workflow/model/task.py new file mode 100644 index 000000000..3ba2e069a --- /dev/null +++ b/backend/plugin/workflow/model/task.py @@ -0,0 +1,22 @@ +import sqlalchemy as sa + +from sqlalchemy.orm import Mapped, mapped_column + +from backend.common.model import Base, UniversalText, id_key + + +class WorkflowTask(Base): + """审批流任务表""" + + __tablename__ = 'workflow_task' + + id: Mapped[id_key] = mapped_column(init=False) + instance_id: Mapped[int] = mapped_column(sa.BigInteger, index=True, comment='实例ID') + definition_id: Mapped[int] = mapped_column(sa.BigInteger, index=True, comment='定义ID') + task_name: Mapped[str] = mapped_column(sa.String(100), comment='任务名称') + assignee_id: Mapped[int] = mapped_column(sa.BigInteger, index=True, comment='审批人ID') + node_key: Mapped[str | None] = mapped_column(sa.String(100), default=None, index=True, comment='流程节点ID') + status: Mapped[str] = mapped_column(sa.String(20), default='PENDING', comment='任务状态') + comment: Mapped[str | None] = mapped_column(UniversalText, default=None, comment='审批意见') + sort: Mapped[int] = mapped_column(default=1, comment='节点顺序') + completed_time: Mapped[str | None] = mapped_column(sa.String(32), default=None, comment='完成时间') diff --git a/backend/plugin/workflow/plugin.toml b/backend/plugin/workflow/plugin.toml new file mode 100644 index 000000000..0d06fbecc --- /dev/null +++ b/backend/plugin/workflow/plugin.toml @@ -0,0 +1,17 @@ +[plugin] +summary = "企业审批流" +version = "0.1.0" +description = "流程定义、审批实例、审批任务与消息通知" +author = "Claude Code" +tags = ["task", "notification"] +database = ["mysql", "postgresql"] + +[app] +router = ["v1"] + +[settings] +WORKFLOW_INSTANCE_NO_PREFIX = "WF" +WORKFLOW_DEFAULT_TIMEOUT_HOURS = 72 +WORKFLOW_ALLOW_BATCH_APPROVE = true +WORKFLOW_MAX_URGE_COUNT = 3 +WORKFLOW_URGE_INTERVAL_HOURS = 24 diff --git a/backend/plugin/workflow/schema/__init__.py b/backend/plugin/workflow/schema/__init__.py new file mode 100644 index 000000000..17e2ef866 --- /dev/null +++ b/backend/plugin/workflow/schema/__init__.py @@ -0,0 +1 @@ +"""Workflow schema package.""" diff --git a/backend/plugin/workflow/schema/category.py b/backend/plugin/workflow/schema/category.py new file mode 100644 index 000000000..6a206a620 --- /dev/null +++ b/backend/plugin/workflow/schema/category.py @@ -0,0 +1,30 @@ +from datetime import datetime + +from pydantic import ConfigDict, Field + +from backend.common.schema import SchemaBase + + +class WorkflowCategorySchemaBase(SchemaBase): + name: str = Field(description='分类名称') + code: str = Field(description='分类编码') + icon: str | None = Field(None, description='图标') + sort: int = Field(0, description='排序') + status: int = Field(1, description='状态') + remark: str | None = Field(None, description='备注') + + +class CreateWorkflowCategoryParam(WorkflowCategorySchemaBase): + pass + + +class UpdateWorkflowCategoryParam(WorkflowCategorySchemaBase): + pass + + +class GetWorkflowCategoryDetail(WorkflowCategorySchemaBase): + model_config = ConfigDict(from_attributes=True) + + id: int + created_time: datetime + updated_time: datetime | None = None diff --git a/backend/plugin/workflow/schema/definition.py b/backend/plugin/workflow/schema/definition.py new file mode 100644 index 000000000..f48ebf47f --- /dev/null +++ b/backend/plugin/workflow/schema/definition.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import ConfigDict, Field + +from backend.common.schema import SchemaBase + + +WorkflowConfigValue = dict[str, Any] | list[Any] | str | None + + +class WorkflowDefinitionSchemaBase(SchemaBase): + category_id: int | None = Field(None, description='分类ID') + name: str = Field(description='流程名称') + code: str = Field(description='流程编码') + description: str | None = Field(None, description='描述') + form_config: WorkflowConfigValue = Field(None, description='表单配置') + flow_config: WorkflowConfigValue = Field(None, description='流程配置') + status: int = Field(0, description='状态') + allow_withdraw: bool = Field(True, description='允许撤回') + allow_urge: bool = Field(True, description='允许催办') + + +class CreateWorkflowDefinitionParam(WorkflowDefinitionSchemaBase): + pass + + +class UpdateWorkflowDefinitionParam(WorkflowDefinitionSchemaBase): + pass + + +class GetWorkflowDefinitionDetail(WorkflowDefinitionSchemaBase): + model_config = ConfigDict(from_attributes=True) + + id: int + created_time: datetime + updated_time: datetime | None = None + + +class PreviewWorkflowFlowItem(SchemaBase): + node_id: str = Field(description='节点ID') + node_type: str = Field(description='节点类型') + label: str = Field(description='节点名称') + assignee_id: int | None = Field(None, description='审批人ID') + assignee_name: str | None = Field(None, description='审批人显示名') + self_select_options: list[int] = Field(default_factory=list, description='自选审批范围') + self_select_option_labels: dict[int, str] = Field(default_factory=dict, description='自选审批人名称映射') + + +class PreviewWorkflowFlowResponse(SchemaBase): + items: list[PreviewWorkflowFlowItem] = Field(default_factory=list, description='审批链预览') diff --git a/backend/plugin/workflow/schema/instance.py b/backend/plugin/workflow/schema/instance.py new file mode 100644 index 000000000..e57bfe1da --- /dev/null +++ b/backend/plugin/workflow/schema/instance.py @@ -0,0 +1,35 @@ +from datetime import datetime +from typing import Any + +from pydantic import ConfigDict, Field + +from backend.common.schema import SchemaBase +from backend.plugin.workflow.schema.message import GetWorkflowMessageDetail + + +class StartWorkflowInstanceParam(SchemaBase): + definition_id: int = Field(description='流程定义ID') + title: str = Field(description='标题') + remark: str | None = Field(None, description='备注') + form_data: dict[str, Any] = Field(default_factory=dict, description='表单数据') + self_select_assignees: dict[str, int] = Field(default_factory=dict, description='自选审批人') + + +class GetWorkflowInstanceDetail(SchemaBase): + model_config = ConfigDict(from_attributes=True) + + id: int + instance_no: str + definition_id: int + title: str + initiator_id: int + status: str + current_task_id: int | None = None + form_data: dict[str, Any] | str | None = None + remark: str | None = None + todo_count: int | None = None + allow_withdraw: bool | None = None + allow_urge: bool | None = None + messages: list[GetWorkflowMessageDetail] = Field(default_factory=list, description='流程消息列表') + created_time: datetime + updated_time: datetime | None = None diff --git a/backend/plugin/workflow/schema/message.py b/backend/plugin/workflow/schema/message.py new file mode 100644 index 000000000..b8b6588fe --- /dev/null +++ b/backend/plugin/workflow/schema/message.py @@ -0,0 +1,20 @@ +from datetime import datetime + +from pydantic import ConfigDict + +from backend.common.schema import SchemaBase + + +class GetWorkflowMessageDetail(SchemaBase): + model_config = ConfigDict(from_attributes=True) + + id: int + receiver_id: int + instance_id: int | None = None + task_id: int | None = None + message_type: str + title: str + content: str + is_read: bool + created_time: datetime + updated_time: datetime | None = None diff --git a/backend/plugin/workflow/schema/task.py b/backend/plugin/workflow/schema/task.py new file mode 100644 index 000000000..0532ae409 --- /dev/null +++ b/backend/plugin/workflow/schema/task.py @@ -0,0 +1,30 @@ +from datetime import datetime + +from pydantic import ConfigDict, Field + +from backend.common.schema import SchemaBase + + +class ApproveWorkflowTaskParam(SchemaBase): + comment: str | None = Field(None, description='审批意见') + + +class RejectWorkflowTaskParam(SchemaBase): + comment: str | None = Field(None, description='拒绝意见') + + +class GetWorkflowTaskDetail(SchemaBase): + model_config = ConfigDict(from_attributes=True) + + id: int + instance_id: int + definition_id: int + task_name: str + assignee_id: int + node_key: str | None = None + status: str + comment: str | None = None + sort: int + completed_time: str | None = None + created_time: datetime + updated_time: datetime | None = None diff --git a/backend/plugin/workflow/service/__init__.py b/backend/plugin/workflow/service/__init__.py new file mode 100644 index 000000000..ba5e0a942 --- /dev/null +++ b/backend/plugin/workflow/service/__init__.py @@ -0,0 +1 @@ +"""Workflow service package.""" diff --git a/backend/plugin/workflow/service/category_service.py b/backend/plugin/workflow/service/category_service.py new file mode 100644 index 000000000..adcc476a7 --- /dev/null +++ b/backend/plugin/workflow/service/category_service.py @@ -0,0 +1,32 @@ +from backend.common.exception import errors +from backend.common.pagination import paging_data +from backend.plugin.workflow.crud.crud_category import workflow_category_dao +from backend.plugin.workflow.schema.category import CreateWorkflowCategoryParam, UpdateWorkflowCategoryParam + + +class WorkflowCategoryService: + @staticmethod + async def get_list(*, db): + return await paging_data(db, await workflow_category_dao.get_select()) + + @staticmethod + async def get_all(*, db): + return await workflow_category_dao.get_all(db) + + @staticmethod + async def create(*, db, obj: CreateWorkflowCategoryParam) -> None: + if await workflow_category_dao.get_by_code(db, obj.code): + raise errors.ConflictError(msg='分类编码已存在') + await workflow_category_dao.create(db, obj) + + @staticmethod + async def update(*, db, pk: int, obj: UpdateWorkflowCategoryParam) -> int: + model = await workflow_category_dao.get(db, pk) + if not model: + raise errors.NotFoundError(msg='分类不存在') + if model.code != obj.code and await workflow_category_dao.get_by_code(db, obj.code): + raise errors.ConflictError(msg='分类编码已存在') + return await workflow_category_dao.update(db, pk, obj) + + +workflow_category_service = WorkflowCategoryService() diff --git a/backend/plugin/workflow/service/definition_service.py b/backend/plugin/workflow/service/definition_service.py new file mode 100644 index 000000000..93ca1d6b7 --- /dev/null +++ b/backend/plugin/workflow/service/definition_service.py @@ -0,0 +1,358 @@ +import json +from collections import deque + +from backend.common.exception import errors +from backend.common.pagination import paging_data +from backend.plugin.workflow.crud.crud_definition import workflow_definition_dao +from backend.plugin.workflow.schema.definition import CreateWorkflowDefinitionParam, UpdateWorkflowDefinitionParam +from backend.plugin.workflow.service.instance_service import _get_user_dept_id, _get_user_name, _resolve_node_assignee_runtime + + +class WorkflowDefinitionService: + @staticmethod + def _serialize_config(value): + if value is None: + return None + if isinstance(value, str): + return value + return json.dumps(value, ensure_ascii=False) + + @staticmethod + def _parse_config(value): + if not value: + return {} + if isinstance(value, (dict, list)): + return value + try: + return json.loads(value) + except json.JSONDecodeError: + return {} + + @classmethod + def _validate_flow_config(cls, flow_config) -> None: + parsed = cls._parse_config(flow_config) + nodes = parsed.get('nodes') if isinstance(parsed, dict) else None + edges = parsed.get('edges') if isinstance(parsed, dict) else None + if not isinstance(nodes, list) or not isinstance(edges, list): + raise errors.RequestError(msg='流程配置格式不正确') + + node_map = { + node.get('id'): node + for node in nodes + if isinstance(node, dict) and node.get('id') + } + node_ids = set(node_map.keys()) + start_nodes = [node for node in nodes if isinstance(node, dict) and node.get('type') == 'START'] + end_nodes = [node for node in nodes if isinstance(node, dict) and node.get('type') == 'END'] + approver_nodes = [node for node in nodes if isinstance(node, dict) and node.get('type') == 'APPROVER'] + if not start_nodes: + raise errors.RequestError(msg='流程缺少开始节点') + if len(start_nodes) != 1: + raise errors.RequestError(msg='当前仅支持一个开始节点') + if not end_nodes: + raise errors.RequestError(msg='流程缺少结束节点') + if len(end_nodes) != 1: + raise errors.RequestError(msg='当前仅支持一个结束节点') + if not approver_nodes: + raise errors.RequestError(msg='流程至少需要一个审批节点') + + edge_map: dict[str, list[str]] = {} + reverse_edge_map: dict[str, list[str]] = {} + for edge in edges: + if not isinstance(edge, dict): + raise errors.RequestError(msg='流程连线格式不正确') + source = edge.get('source') + target = edge.get('target') + if source not in node_ids or target not in node_ids: + raise errors.RequestError(msg='流程连线包含无效节点') + edge_map.setdefault(source, []).append(target) + reverse_edge_map.setdefault(target, []).append(source) + + start_id = start_nodes[0].get('id') + next_edges = edge_map.get(start_id, []) + if len(next_edges) != 1: + raise errors.RequestError(msg='开始节点必须且只能连接一个下游节点') + + visited: set[str] = set() + queue = deque([start_id]) + while queue: + node_id = queue.popleft() + if node_id in visited: + continue + visited.add(node_id) + node = node_map.get(node_id) + if not isinstance(node, dict): + continue + node_type = node.get('type') + outgoing = edge_map.get(node_id, []) + incoming = reverse_edge_map.get(node_id, []) + if node_type == 'END': + if outgoing: + raise errors.RequestError(msg='结束节点不能连接下游节点') + continue + if node_type == 'START': + if incoming: + raise errors.RequestError(msg='开始节点不能有上游节点') + + if node_type == 'PARALLEL': + if len(outgoing) < 2: + raise errors.RequestError(msg='并行节点至少需要两个下游分支') + elif node_type != 'CONDITION' and len(outgoing) > 1: + raise errors.RequestError(msg='当前运行时仅条件节点和并行节点支持多个下游节点') + if not outgoing: + raise errors.RequestError(msg=f"节点“{node.get('data', {}).get('label') or node_id}”缺少下游节点") + + if node_type == 'APPROVER': + data = node.get('data') or {} + approver_type = data.get('approverType') or data.get('approver_type') or 'DESIGNATED_USER' + approver_id = data.get('approver_id') or data.get('approverId') + approver_ids = data.get('approver_ids') or data.get('approverIds') or [] + role_ids = data.get('roleIds') or data.get('role_ids') or [] + form_field_key = data.get('formFieldKey') or data.get('form_field_key') + dept_level = data.get('deptLevel') or data.get('dept_level') + self_select_options = data.get('selfSelectOptions') or data.get('self_select_options') or [] + if approver_type == 'DESIGNATED_ROLE': + if not isinstance(role_ids, list) or not role_ids: + raise errors.RequestError(msg=f"审批节点“{data.get('label') or node.get('id')}”未配置角色") + elif approver_type == 'FORM_FIELD_USER': + if not form_field_key: + raise errors.RequestError(msg=f"审批节点“{data.get('label') or node.get('id')}”未配置表单用户字段") + elif approver_type == 'DEPT_LEADER_UP': + if not dept_level: + raise errors.RequestError(msg=f"审批节点“{data.get('label') or node.get('id')}”未配置上级层级") + elif approver_type == 'SELF_SELECT': + if not isinstance(self_select_options, list) or not self_select_options: + raise errors.RequestError(msg=f"审批节点“{data.get('label') or node.get('id')}”未配置自选范围") + elif approver_type in {'INITIATOR', 'DEPT_LEADER', 'INITIATOR_LEADER'}: + pass + elif not approver_id and not (isinstance(approver_ids, list) and approver_ids): + raise errors.RequestError(msg=f"审批节点“{data.get('label') or node.get('id')}”未配置审批人") + elif node_type == 'CONDITION': + data = node.get('data') or {} + if len(outgoing) < 2: + raise errors.RequestError(msg='条件节点至少需要两个下游分支') + condition_group = data.get('conditionGroup') + if isinstance(condition_group, dict): + conditions = condition_group.get('conditions') or [] + if condition_group.get('operator') not in {'AND', 'OR'}: + raise errors.RequestError(msg=f"条件节点“{data.get('label') or node.get('id')}”未配置条件关系") + if not isinstance(conditions, list) or not conditions: + raise errors.RequestError(msg=f"条件节点“{data.get('label') or node.get('id')}”未配置条件规则") + for condition in conditions: + if not isinstance(condition, dict) or not condition.get('field'): + raise errors.RequestError(msg=f"条件节点“{data.get('label') or node.get('id')}”未配置条件字段") + if not condition.get('operator'): + raise errors.RequestError(msg=f"条件节点“{data.get('label') or node.get('id')}”未配置比较方式") + else: + if not data.get('conditionField'): + raise errors.RequestError(msg=f"条件节点“{data.get('label') or node.get('id')}”未配置条件字段") + if not data.get('conditionOperator'): + raise errors.RequestError(msg=f"条件节点“{data.get('label') or node.get('id')}”未配置比较方式") + elif node_type == 'CC': + data = node.get('data') or {} + if len(outgoing) != 1: + raise errors.RequestError(msg='抄送节点必须且只能连接一个下游节点') + cc_user_ids = data.get('ccUserIds') or [] + if not isinstance(cc_user_ids, list) or not cc_user_ids: + raise errors.RequestError(msg=f"抄送节点“{data.get('label') or node.get('id')}”未配置抄送人") + elif node_type == 'TRIGGER': + if len(outgoing) != 1: + raise errors.RequestError(msg='触发器节点必须且只能连接一个下游节点') + elif node_type not in {'START', 'END', 'PARALLEL'}: + raise errors.RequestError(msg=f'暂不支持节点类型:{node_type}') + + for next_node_id in outgoing: + queue.append(next_node_id) + + end_id = end_nodes[0].get('id') + if end_id not in visited: + raise errors.RequestError(msg='流程未形成到结束节点的有效路径') + + @classmethod + def _normalize_definition_payload(cls, obj): + flow_config = obj.get('flow_config') if isinstance(obj, dict) else obj.flow_config + form_config = obj.get('form_config') if isinstance(obj, dict) else obj.form_config + cls._validate_flow_config(flow_config) + payload = obj.model_dump() if hasattr(obj, 'model_dump') else dict(obj) + payload['flow_config'] = cls._serialize_config(flow_config) + payload['form_config'] = cls._serialize_config(form_config) + return payload + + @staticmethod + async def get_list(*, db): + return await paging_data(db, await workflow_definition_dao.get_select()) + + @staticmethod + async def get_available_list(*, db): + return await paging_data(db, await workflow_definition_dao.get_available_select()) + + @staticmethod + async def get(*, db, pk: int): + model = await workflow_definition_dao.get(db, pk) + if not model: + raise errors.NotFoundError(msg='流程定义不存在') + return model + + @staticmethod + async def get_available(*, db, pk: int): + model = await workflow_definition_dao.get_available(db, pk) + if not model: + raise errors.NotFoundError(msg='流程定义不存在或未发布') + return model + + @classmethod + async def create(cls, *, db, obj: CreateWorkflowDefinitionParam) -> None: + if await workflow_definition_dao.get_by_code(db, obj.code): + raise errors.ConflictError(msg='流程编码已存在') + await workflow_definition_dao.create(db, cls._normalize_definition_payload(obj)) + + @classmethod + async def preview_flow(cls, *, db, definition, user_id: int, form_data: dict | None = None) -> dict: + parsed = cls._parse_config(definition.flow_config) + if not isinstance(parsed, dict): + return {'items': []} + nodes = parsed.get('nodes') or [] + edges = parsed.get('edges') or [] + if not isinstance(nodes, list) or not isinstance(edges, list): + return {'items': []} + + node_map = { + node.get('id'): node + for node in nodes + if isinstance(node, dict) and node.get('id') + } + start_nodes = [ + str(node.get('id')) + for node in nodes + if isinstance(node, dict) and node.get('type') == 'START' and node.get('id') + ] + edge_map: dict[str, list[str]] = {} + for edge in edges: + if not isinstance(edge, dict): + continue + source = edge.get('source') + target = edge.get('target') + if source and target: + edge_map.setdefault(source, []).append(target) + + runtime_form_data = form_data or {} + initiator_dept_id = await _get_user_dept_id(db, user_id) + preview_items: list[dict] = [] + visited: set[str] = set() + + async def append_node(node: dict, *, assignee_id: int | None = None, assignee_name: str | None = None): + data = node.get('data') or {} + self_select_options = data.get('selfSelectOptions') or data.get('self_select_options') or [] + normalized_options = [int(item) for item in self_select_options if str(item).isdigit()] + self_select_option_labels = { + option_id: (await _get_user_name(db, option_id)) or f'用户#{option_id}' + for option_id in normalized_options + } + preview_items.append({ + 'node_id': str(node.get('id') or ''), + 'node_type': str(node.get('type') or ''), + 'label': str((node.get('data') or {}).get('label') or node.get('id') or ''), + 'assignee_id': assignee_id, + 'assignee_name': assignee_name, + 'self_select_options': normalized_options, + 'self_select_option_labels': self_select_option_labels, + }) + + def evaluate_rule(rule: dict) -> bool: + field = rule.get('field') + operator = rule.get('operator') or 'EQ' + expected_value = rule.get('value') + if not field: + return False + current_value = runtime_form_data.get(field) + if operator == 'EQ': + return str(current_value) == str(expected_value) + if operator == 'NEQ': + return str(current_value) != str(expected_value) + if operator == 'CONTAINS': + return str(expected_value or '') in str(current_value or '') + try: + current_number = float(current_value) + expected_number = float(expected_value) + except (TypeError, ValueError): + return False + if operator == 'GT': + return current_number > expected_number + if operator == 'GTE': + return current_number >= expected_number + if operator == 'LT': + return current_number < expected_number + if operator == 'LTE': + return current_number <= expected_number + return False + + def condition_targets(node: dict) -> list[str]: + node_id = node.get('id') + if not node_id: + return [] + targets = edge_map.get(node_id, []) + if not targets: + return [] + data = node.get('data') or {} + condition_group = data.get('conditionGroup') or { + 'operator': 'AND', + 'conditions': [ + { + 'field': data.get('conditionField') or '', + 'operator': data.get('conditionOperator') or 'EQ', + 'value': data.get('conditionValue'), + } + ], + } + conditions = condition_group.get('conditions') or [] + results = [evaluate_rule(condition) for condition in conditions if isinstance(condition, dict)] + matched = any(results) if condition_group.get('operator') == 'OR' else all(results) + if matched: + return targets[:1] + return targets[1:2] if len(targets) > 1 else targets[:1] + + async def walk(node_id: str): + if node_id in visited: + return + visited.add(node_id) + node = node_map.get(node_id) + if not isinstance(node, dict): + return + node_type = node.get('type') + if node_type == 'APPROVER': + assignee_id, assignee_name = await _resolve_node_assignee_runtime( + db=db, + node=node, + initiator_id=user_id, + initiator_dept_id=initiator_dept_id, + form_data=runtime_form_data, + ) + await append_node(node, assignee_id=assignee_id, assignee_name=assignee_name) + return + await append_node(node) + if node_type == 'CONDITION': + for next_node_id in condition_targets(node): + await walk(next_node_id) + return + if node_type == 'PARALLEL': + for next_node_id in edge_map.get(node_id, []): + await walk(next_node_id) + return + for next_node_id in edge_map.get(node_id, [])[:1]: + await walk(next_node_id) + + for start_node_id in start_nodes[:1]: + await walk(start_node_id) + return {'items': preview_items} + + @classmethod + async def update(cls, *, db, pk: int, obj: UpdateWorkflowDefinitionParam) -> int: + model = await workflow_definition_dao.get(db, pk) + if not model: + raise errors.NotFoundError(msg='流程定义不存在') + if model.code != obj.code and await workflow_definition_dao.get_by_code(db, obj.code): + raise errors.ConflictError(msg='流程编码已存在') + return await workflow_definition_dao.update(db, pk, cls._normalize_definition_payload(obj)) + + +workflow_definition_service = WorkflowDefinitionService() diff --git a/backend/plugin/workflow/service/instance_service.py b/backend/plugin/workflow/service/instance_service.py new file mode 100644 index 000000000..ba3d9f8e3 --- /dev/null +++ b/backend/plugin/workflow/service/instance_service.py @@ -0,0 +1,807 @@ +import json +from collections import deque + +from sqlalchemy import func, select + +from backend.app.admin.model import Dept, User, user_role +from backend.common.exception import errors +from backend.common.pagination import paging_data +from backend.plugin.workflow.crud.crud_definition import workflow_definition_dao +from backend.plugin.workflow.crud.crud_instance import workflow_instance_dao +from backend.plugin.workflow.crud.crud_message import workflow_message_dao +from backend.plugin.workflow.crud.crud_task import workflow_task_dao +from backend.plugin.workflow.engine.instance_no_gen import instance_no_generator +from backend.plugin.workflow.model import WorkflowInstance, WorkflowMessage, WorkflowTask +from backend.plugin.workflow.schema.instance import StartWorkflowInstanceParam +from backend.plugin.workflow.service.message_service import workflow_message_service +from backend.utils.timezone import timezone + + +async def _todo_count(db, user_id: int) -> int: + stmt = select(func.count(WorkflowTask.id)).where( + WorkflowTask.assignee_id == user_id, + WorkflowTask.status == 'PENDING', + ) + return int((await db.execute(stmt)).scalar() or 0) + + +async def _has_user_pending_task_for_instance(db, instance_id: int, user_id: int) -> bool: + stmt = select(func.count(WorkflowTask.id)).where( + WorkflowTask.instance_id == instance_id, + WorkflowTask.assignee_id == user_id, + WorkflowTask.status == 'PENDING', + ) + return int((await db.execute(stmt)).scalar() or 0) > 0 + + +async def _get_pending_tasks_for_instance(db, instance_id: int) -> list[WorkflowTask]: + stmt = ( + select(WorkflowTask) + .where( + WorkflowTask.instance_id == instance_id, + WorkflowTask.status == 'PENDING', + ) + .order_by(WorkflowTask.id.asc()) + ) + return list((await db.execute(stmt)).scalars().all()) + + +async def _get_task_for_instance_node(db, instance_id: int, node_key: str) -> WorkflowTask | None: + stmt = ( + select(WorkflowTask) + .where( + WorkflowTask.instance_id == instance_id, + WorkflowTask.node_key == node_key, + ) + .order_by(WorkflowTask.id.asc()) + ) + return (await db.execute(stmt)).scalars().first() + + +async def _sync_instance_runtime_state(db, instance_id: int, *, reached_end: bool) -> list[WorkflowTask]: + pending_tasks = await _get_pending_tasks_for_instance(db, instance_id) + if pending_tasks: + await workflow_instance_dao.update_model( + db, + instance_id, + {'current_task_id': pending_tasks[0].id, 'status': 'RUNNING'}, + ) + return pending_tasks + if reached_end: + await workflow_instance_dao.update_model( + db, + instance_id, + {'status': 'APPROVED', 'current_task_id': None}, + ) + return [] + await workflow_instance_dao.update_model( + db, + instance_id, + {'status': 'RUNNING', 'current_task_id': None}, + ) + return [] + + +def _serialize_instance_detail(instance: WorkflowInstance | dict, *, todo_count: int | None = None) -> dict: + if isinstance(instance, dict): + data = dict(instance) + if todo_count is not None: + data['todo_count'] = todo_count + return data + + data = { + 'id': instance.id, + 'instance_no': instance.instance_no, + 'definition_id': instance.definition_id, + 'title': instance.title, + 'initiator_id': instance.initiator_id, + 'status': instance.status, + 'current_task_id': instance.current_task_id, + 'form_data': instance.form_data, + 'remark': instance.remark, + 'created_time': instance.created_time, + 'updated_time': instance.updated_time, + } + if todo_count is not None: + data['todo_count'] = todo_count + return data + + + +def _serialize_page_items(page_data, *, todo_count: int | None = None): + page_data['items'] = [ + _serialize_instance_detail(item, todo_count=todo_count) + for item in page_data['items'] + ] + return page_data + + + +def _serialize_single_instance(instance: WorkflowInstance, *, todo_count: int | None = None): + return _serialize_instance_detail(instance, todo_count=todo_count) + + + +def _parse_json_text(value: str | None): + if not value: + return {} + try: + return json.loads(value) + except json.JSONDecodeError: + return {'raw': value} + + + +def _build_instance_detail_payload( + instance: WorkflowInstance, + *, + todo_count: int | None = None, + messages: list[WorkflowMessage] | None = None, + definition=None, +) -> dict: + data = _serialize_instance_detail(instance, todo_count=todo_count) + data['form_data'] = _parse_json_text(instance.form_data) + data['messages'] = messages or [] + data['allow_withdraw'] = getattr(definition, 'allow_withdraw', None) if definition else None + data['allow_urge'] = getattr(definition, 'allow_urge', None) if definition else None + return data + + + +def _parse_flow_runtime( + flow_config_text: str | None, +) -> tuple[dict[str, dict], dict[str, list[str]], dict[str, list[str]]]: + if not flow_config_text: + return {}, {}, {} + try: + flow_config = json.loads(flow_config_text) + except json.JSONDecodeError: + return {}, {}, {} + if not isinstance(flow_config, dict): + return {}, {}, {} + + nodes = flow_config.get('nodes') or [] + edges = flow_config.get('edges') or [] + if not isinstance(nodes, list) or not isinstance(edges, list): + return {}, {}, {} + + node_map = { + node.get('id'): node + for node in nodes + if isinstance(node, dict) and node.get('id') + } + edge_map: dict[str, list[str]] = {} + reverse_edge_map: dict[str, list[str]] = {} + for edge in edges: + if not isinstance(edge, dict): + continue + source = edge.get('source') + target = edge.get('target') + if source and target: + edge_map.setdefault(source, []).append(target) + reverse_edge_map.setdefault(target, []).append(source) + return node_map, edge_map, reverse_edge_map + + +async def _get_user_name(db, user_id: int) -> str | None: + user = await db.get(User, user_id) + if not user: + return None + return user.nickname or user.username + + +async def _get_user_dept_id(db, user_id: int) -> int | None: + user = await db.get(User, user_id) + if not user or not user.dept_id: + return None + return int(user.dept_id) + + +async def _resolve_role_user(db, role_ids: list[int]) -> tuple[int | None, str | None]: + normalized_role_ids = [int(role_id) for role_id in role_ids if isinstance(role_id, int) or str(role_id).isdigit()] + if not normalized_role_ids: + return None, None + stmt = ( + select(User) + .join(user_role, user_role.c.user_id == User.id) + .where(user_role.c.role_id.in_(normalized_role_ids), User.status == 1) + .order_by(User.id.asc()) + ) + user = (await db.execute(stmt)).scalars().first() + if not user: + return None, None + return int(user.id), user.nickname or user.username + + +async def _resolve_dept_leader_user(db, dept_id: int | None) -> tuple[int | None, str | None]: + if not dept_id: + return None, None + dept = await db.get(Dept, dept_id) + if not dept or not dept.leader: + return None, None + leader_name = str(dept.leader).strip() + if not leader_name: + return None, None + stmt = select(User).where(User.status == 1, ((User.username == leader_name) | (User.nickname == leader_name))).order_by(User.id.asc()) + user = (await db.execute(stmt)).scalars().first() + if not user: + return None, None + return int(user.id), user.nickname or user.username + + +async def _resolve_dept_leader_up_user(db, dept_id: int | None, level: int) -> tuple[int | None, str | None]: + current_dept_id = dept_id + remaining = max(level, 1) + while current_dept_id and remaining > 0: + dept = await db.get(Dept, current_dept_id) + if not dept: + return None, None + current_dept_id = int(dept.parent_id) if dept.parent_id else None + remaining -= 1 + return await _resolve_dept_leader_user(db, current_dept_id) + + +async def _resolve_self_select_user(db, node: dict, form_data: dict) -> tuple[int | None, str | None]: + node_id = str(node.get('id') or '') + self_select_assignees = form_data.get('__self_select_assignees__') or {} + selected_user_id = self_select_assignees.get(node_id) if isinstance(self_select_assignees, dict) else None + if selected_user_id is not None and str(selected_user_id).isdigit(): + resolved_id = int(selected_user_id) + return resolved_id, await _get_user_name(db, resolved_id) + return None, None + + +async def _resolve_node_assignee_runtime(*, db, node: dict, initiator_id: int, initiator_dept_id: int | None, form_data: dict) -> tuple[int | None, str]: + data = node.get('data') or {} + approver_type = data.get('approverType') or data.get('approver_type') or 'DESIGNATED_USER' + + if approver_type == 'INITIATOR': + return initiator_id, (await _get_user_name(db, initiator_id)) or data.get('label') or '审批' + + if approver_type in {'DEPT_LEADER', 'INITIATOR_LEADER'}: + assignee_id, assignee_name = await _resolve_dept_leader_user(db, initiator_dept_id) + return assignee_id, assignee_name or data.get('label') or '审批' + + if approver_type == 'DEPT_LEADER_UP': + dept_level = data.get('deptLevel') or data.get('dept_level') or 1 + try: + normalized_level = int(dept_level) + except (TypeError, ValueError): + normalized_level = 1 + assignee_id, assignee_name = await _resolve_dept_leader_up_user(db, initiator_dept_id, normalized_level) + return assignee_id, assignee_name or data.get('label') or '审批' + + if approver_type == 'SELF_SELECT': + assignee_id, assignee_name = await _resolve_self_select_user(db, node, form_data) + return assignee_id, assignee_name or data.get('label') or '审批' + + if approver_type == 'FORM_FIELD_USER': + form_field_key = data.get('formFieldKey') or data.get('form_field_key') + form_user_id = form_data.get(form_field_key) if form_field_key else None + if form_user_id is not None and str(form_user_id).isdigit(): + resolved_id = int(form_user_id) + return resolved_id, (await _get_user_name(db, resolved_id)) or data.get('label') or '审批' + return None, data.get('label') or '审批' + + if approver_type == 'DESIGNATED_ROLE': + role_ids = data.get('roleIds') or data.get('role_ids') or [] + assignee_id, assignee_name = await _resolve_role_user(db, role_ids if isinstance(role_ids, list) else []) + return assignee_id, assignee_name or data.get('label') or '审批' + + approver_id = data.get('approver_id') or data.get('approverId') + approver_ids = data.get('approver_ids') or data.get('approverIds') or [] + resolved_assignee = approver_id + if not resolved_assignee and isinstance(approver_ids, list) and approver_ids: + resolved_assignee = approver_ids[0] + if resolved_assignee and str(resolved_assignee).isdigit(): + resolved_id = int(resolved_assignee) + return resolved_id, (await _get_user_name(db, resolved_id)) or data.get('label') or '审批' + return None, data.get('label') or '审批' + + + +def _evaluate_condition_rule(rule: dict, form_data: dict) -> bool: + field = rule.get('field') + operator = rule.get('operator') or 'EQ' + expected_value = rule.get('value') + if not field: + return False + current_value = form_data.get(field) + if operator == 'EQ': + return str(current_value) == str(expected_value) + if operator == 'NEQ': + return str(current_value) != str(expected_value) + if operator == 'CONTAINS': + return str(expected_value or '') in str(current_value or '') + try: + current_number = float(current_value) + expected_number = float(expected_value) + except (TypeError, ValueError): + return False + if operator == 'GT': + return current_number > expected_number + if operator == 'GTE': + return current_number >= expected_number + if operator == 'LT': + return current_number < expected_number + if operator == 'LTE': + return current_number <= expected_number + return False + + + +def _normalize_condition_group(data: dict) -> dict: + condition_group = data.get('conditionGroup') + if isinstance(condition_group, dict): + operator = condition_group.get('operator') or 'AND' + conditions = condition_group.get('conditions') or [] + if isinstance(conditions, list) and conditions: + normalized_conditions = [ + condition + for condition in conditions + if isinstance(condition, dict) + ] + if normalized_conditions: + return {'operator': operator, 'conditions': normalized_conditions} + return { + 'operator': 'AND', + 'conditions': [ + { + 'field': data.get('conditionField') or '', + 'operator': data.get('conditionOperator') or 'EQ', + 'value': data.get('conditionValue'), + } + ], + } + + + +def _evaluate_condition(node: dict, form_data: dict) -> bool: + data = node.get('data') or {} + condition_group = _normalize_condition_group(data) + conditions = condition_group.get('conditions') or [] + if not conditions: + return False + results = [_evaluate_condition_rule(condition, form_data) for condition in conditions] + if condition_group.get('operator') == 'OR': + return any(results) + return all(results) + + + +def _resolve_condition_targets(node: dict, edge_map: dict[str, list[str]], form_data: dict) -> list[str]: + node_id = node.get('id') + if not node_id: + return [] + targets = edge_map.get(node_id, []) + if not targets: + return [] + if _evaluate_condition(node, form_data): + return targets[:1] + return targets[1:2] if len(targets) > 1 else targets[:1] + + +async def _is_node_completed_for_join( + *, + db, + instance_id: int, + node_id: str, + node_map: dict[str, dict], + reverse_edge_map: dict[str, list[str]], + cache: dict[str, bool], + path: set[str], +) -> bool: + if node_id in cache: + return cache[node_id] + if node_id in path: + return False + node = node_map.get(node_id) + if not isinstance(node, dict): + return False + + node_type = node.get('type') + if node_type == 'START': + cache[node_id] = True + return True + + if node_type == 'APPROVER': + task = await _get_task_for_instance_node(db, instance_id, node_id) + completed = bool(task and task.status != 'PENDING') + cache[node_id] = completed + return completed + + upstream_ids = reverse_edge_map.get(node_id, []) + if not upstream_ids: + cache[node_id] = False + return False + + path.add(node_id) + completed = True + for upstream_id in upstream_ids: + if not await _is_node_completed_for_join( + db=db, + instance_id=instance_id, + node_id=upstream_id, + node_map=node_map, + reverse_edge_map=reverse_edge_map, + cache=cache, + path=path, + ): + completed = False + break + path.remove(node_id) + cache[node_id] = completed + return completed + + +async def _are_all_upstream_nodes_completed( + *, + db, + instance_id: int, + node_id: str, + node_map: dict[str, dict], + reverse_edge_map: dict[str, list[str]], +) -> bool: + upstream_ids = reverse_edge_map.get(node_id, []) + if len(upstream_ids) <= 1: + return True + cache: dict[str, bool] = {} + for upstream_id in upstream_ids: + if not await _is_node_completed_for_join( + db=db, + instance_id=instance_id, + node_id=upstream_id, + node_map=node_map, + reverse_edge_map=reverse_edge_map, + cache=cache, + path=set(), + ): + return False + return True + + + +async def _notify_task_assignees( + *, + db, + instance: WorkflowInstance, + tasks: list[WorkflowTask], + message_type: str, + title: str, + content: str, +): + notified_user_ids: set[int] = set() + for task in tasks: + assignee_id = int(task.assignee_id) + if assignee_id in notified_user_ids: + continue + notified_user_ids.add(assignee_id) + notify_message = WorkflowMessage( + receiver_id=assignee_id, + instance_id=instance.id, + task_id=task.id, + message_type=message_type, + title=title, + content=content, + is_read=False, + ) + db.add(notify_message) + await db.flush() + await workflow_message_service.push_message(db=db, message=notify_message) + + +async def _push_pending_task_message(*, db, instance: WorkflowInstance, task: WorkflowTask): + await _notify_task_assignees( + db=db, + instance=instance, + tasks=[task], + message_type='PENDING_APPROVAL', + title='您有新的待审批任务', + content=f'流程《{instance.title}》待您审批', + ) + + +async def _expand_runtime_from_nodes( + *, + db, + instance: WorkflowInstance, + definition, + node_map: dict[str, dict], + edge_map: dict[str, list[str]], + reverse_edge_map: dict[str, list[str]], + start_node_ids: list[str], + initiator_dept_id: int | None, + form_data: dict, + base_sort: int, +) -> tuple[list[WorkflowTask], bool]: + queue = deque(start_node_ids) + processed: set[str] = set() + created_tasks: list[WorkflowTask] = [] + reached_end = False + + while queue: + node_id = queue.popleft() + if node_id in processed: + continue + node = node_map.get(node_id) + if not isinstance(node, dict): + continue + if not await _are_all_upstream_nodes_completed( + db=db, + instance_id=instance.id, + node_id=node_id, + node_map=node_map, + reverse_edge_map=reverse_edge_map, + ): + continue + processed.add(node_id) + node_type = node.get('type') + + if node_type == 'APPROVER': + if await _get_task_for_instance_node(db, instance.id, str(node_id)): + continue + assignee_id, task_name = await _resolve_node_assignee_runtime( + db=db, + node=node, + initiator_id=instance.initiator_id, + initiator_dept_id=initiator_dept_id, + form_data=form_data, + ) + if not assignee_id: + node_label = str((node.get('data') or {}).get('label') or node_id) + raise errors.RequestError(msg=f'审批节点“{node_label}”未匹配到审批人,请检查审批人配置或部门负责人数据') + task = WorkflowTask( + instance_id=instance.id, + definition_id=definition.id, + task_name=task_name, + assignee_id=int(assignee_id), + node_key=str(node.get('id') or ''), + status='PENDING', + comment=None, + sort=base_sort + 1, + completed_time=None, + ) + db.add(task) + await db.flush() + created_tasks.append(task) + continue + + if node_type == 'CC': + data = node.get('data') or {} + cc_user_ids = data.get('ccUserIds') or [] + if isinstance(cc_user_ids, list): + for cc_user_id in cc_user_ids: + if not str(cc_user_id).isdigit(): + continue + message = WorkflowMessage( + receiver_id=int(cc_user_id), + instance_id=instance.id, + task_id=instance.current_task_id, + message_type='CC_NOTIFY', + title='您收到新的流程抄送', + content=f'流程《{instance.title}》抄送给您', + is_read=False, + ) + db.add(message) + await db.flush() + await workflow_message_service.push_message(db=db, message=message) + queue.extend(edge_map.get(node_id, [])) + continue + + if node_type == 'TRIGGER': + queue.extend(edge_map.get(node_id, [])) + continue + + if node_type == 'CONDITION': + queue.extend(_resolve_condition_targets(node, edge_map, form_data)) + continue + + if node_type == 'PARALLEL': + queue.extend(edge_map.get(node_id, [])) + continue + + if node_type == 'END': + reached_end = True + + return created_tasks, reached_end + + +class WorkflowInstanceService: + @staticmethod + async def start(*, db, obj: StartWorkflowInstanceParam, user_id: int) -> dict: + definition = await workflow_definition_dao.get(db, obj.definition_id) + if not definition: + raise errors.NotFoundError(msg='流程定义不存在') + if definition.status == 2: + raise errors.RequestError(msg='流程定义已停用,无法发起申请') + + runtime_form_data = obj.form_data if isinstance(obj.form_data, dict) else {} + runtime_form_data['__self_select_assignees__'] = obj.self_select_assignees if isinstance(obj.self_select_assignees, dict) else {} + initiator_dept_id = await _get_user_dept_id(db, user_id) + + instance = WorkflowInstance( + instance_no=instance_no_generator.generate(), + definition_id=definition.id, + title=obj.title, + initiator_id=user_id, + status='RUNNING', + current_task_id=None, + form_data=json.dumps(dict(runtime_form_data), ensure_ascii=False), + remark=obj.remark, + ) + db.add(instance) + await db.flush() + + node_map, edge_map, reverse_edge_map = _parse_flow_runtime(definition.flow_config) + if not node_map: + raise errors.RequestError(msg='流程配置无效') + + start_node_ids: list[str] = [] + for node_id, node in node_map.items(): + if node.get('type') == 'START': + start_node_ids.extend(edge_map.get(node_id, [])) + + created_tasks, reached_end = await _expand_runtime_from_nodes( + db=db, + instance=instance, + definition=definition, + node_map=node_map, + edge_map=edge_map, + reverse_edge_map=reverse_edge_map, + start_node_ids=start_node_ids, + initiator_dept_id=initiator_dept_id, + form_data=runtime_form_data, + base_sort=0, + ) + if not created_tasks and not reached_end: + raise errors.RequestError(msg='流程未配置首个审批人') + + for task in created_tasks: + await _push_pending_task_message(db=db, instance=instance, task=task) + pending_tasks = await _sync_instance_runtime_state(db, instance.id, reached_end=reached_end) + instance = await workflow_instance_dao.get(db, instance.id) or instance + todo_count = await _todo_count(db, int(pending_tasks[0].assignee_id)) if pending_tasks else None + return _serialize_single_instance(instance, todo_count=todo_count) + + @staticmethod + async def get_my_apply(*, db, user_id: int): + return _serialize_page_items( + await paging_data(db, await workflow_instance_dao.get_select_by_initiator(user_id)), + ) + + @staticmethod + async def get_my_todo(*, db, user_id: int): + todo_count = await _todo_count(db, user_id) + return _serialize_page_items( + await paging_data(db, await workflow_instance_dao.get_select_todo(user_id)), + todo_count=todo_count, + ) + + @staticmethod + async def get_todo_count(*, db, user_id: int) -> int: + return await _todo_count(db, user_id) + + + @staticmethod + async def get_detail(*, db, pk: int, user_id: int): + instance = await workflow_instance_dao.get(db, pk) + if not instance: + raise errors.NotFoundError(msg='流程实例不存在') + if instance.initiator_id != user_id and not await _has_user_pending_task_for_instance(db, instance.id, user_id): + raise errors.ForbiddenError(msg='无权查看该流程实例') + + definition = await workflow_definition_dao.get(db, instance.definition_id) + todo_count = await _todo_count(db, user_id) + message_stmt = await workflow_message_dao.get_select_by_instance(instance.id) + messages = list((await db.execute(message_stmt)).scalars().all()) + return _build_instance_detail_payload( + instance, + todo_count=todo_count, + messages=messages, + definition=definition, + ) + + @staticmethod + async def withdraw(*, db, pk: int, user_id: int): + instance = await workflow_instance_dao.get(db, pk) + if not instance: + raise errors.NotFoundError(msg='流程实例不存在') + if instance.initiator_id != user_id: + raise errors.ForbiddenError(msg='仅发起人可撤回该流程') + if instance.status != 'RUNNING': + raise errors.RequestError(msg='当前流程状态不允许撤回') + + definition = await workflow_definition_dao.get(db, instance.definition_id) + if not definition or not definition.allow_withdraw: + raise errors.RequestError(msg='当前流程未开启撤回') + + pending_tasks = await _get_pending_tasks_for_instance(db, instance.id) + completed_time = timezone.now().isoformat() + for task in pending_tasks: + await workflow_task_dao.update_model( + db, + task.id, + {'status': 'CANCELLED', 'comment': '发起人撤回', 'completed_time': completed_time}, + ) + + await workflow_instance_dao.update_model( + db, + instance.id, + {'status': 'WITHDRAWN', 'current_task_id': None}, + ) + + if pending_tasks: + await _notify_task_assignees( + db=db, + instance=instance, + tasks=pending_tasks, + message_type='WITHDRAWN', + title='流程已撤回', + content=f'流程《{instance.title}》已被发起人撤回', + ) + + latest_instance = await workflow_instance_dao.get(db, instance.id) or instance + message_stmt = await workflow_message_dao.get_select_by_instance(latest_instance.id) + messages = list((await db.execute(message_stmt)).scalars().all()) + return _build_instance_detail_payload( + latest_instance, + todo_count=await _todo_count(db, user_id), + messages=messages, + definition=definition, + ) + + @staticmethod + async def urge(*, db, pk: int, user_id: int) -> None: + instance = await workflow_instance_dao.get(db, pk) + if not instance: + raise errors.NotFoundError(msg='流程实例不存在') + if instance.initiator_id != user_id: + raise errors.ForbiddenError(msg='仅发起人可催办该流程') + if instance.status != 'RUNNING': + raise errors.RequestError(msg='当前流程状态不允许催办') + + definition = await workflow_definition_dao.get(db, instance.definition_id) + if not definition or not definition.allow_urge: + raise errors.RequestError(msg='当前流程未开启催办') + + pending_tasks = await _get_pending_tasks_for_instance(db, instance.id) + if not pending_tasks: + raise errors.RequestError(msg='当前没有可催办的待办任务') + + await _notify_task_assignees( + db=db, + instance=instance, + tasks=pending_tasks, + message_type='URGE_NOTIFY', + title='流程催办提醒', + content=f'流程《{instance.title}》请尽快处理', + ) + + +async def advance_workflow_runtime(*, db, instance: WorkflowInstance, definition, from_task: WorkflowTask): + node_map, edge_map, reverse_edge_map = _parse_flow_runtime(definition.flow_config) + if not node_map or not from_task.node_key: + await _sync_instance_runtime_state(db, instance.id, reached_end=True) + return None + + runtime_form_data = _parse_json_text(instance.form_data) + initiator_dept_id = await _get_user_dept_id(db, instance.initiator_id) + created_tasks, reached_end = await _expand_runtime_from_nodes( + db=db, + instance=instance, + definition=definition, + node_map=node_map, + edge_map=edge_map, + reverse_edge_map=reverse_edge_map, + start_node_ids=edge_map.get(from_task.node_key, []), + initiator_dept_id=initiator_dept_id, + form_data=runtime_form_data, + base_sort=from_task.sort or 0, + ) + for task in created_tasks: + await _push_pending_task_message(db=db, instance=instance, task=task) + await _sync_instance_runtime_state(db, instance.id, reached_end=reached_end) + return created_tasks[0] if created_tasks else None + + +workflow_instance_service = WorkflowInstanceService() diff --git a/backend/plugin/workflow/service/message_service.py b/backend/plugin/workflow/service/message_service.py new file mode 100644 index 000000000..a165957ba --- /dev/null +++ b/backend/plugin/workflow/service/message_service.py @@ -0,0 +1,55 @@ +from sqlalchemy import func, select + +from backend.common.pagination import paging_data +from backend.common.socketio.actions import workflow_message_notification +from backend.plugin.workflow.crud.crud_message import workflow_message_dao +from backend.plugin.workflow.model import WorkflowMessage + + +class WorkflowMessageService: + @staticmethod + async def get_list(*, db, user_id: int): + return await paging_data(db, await workflow_message_dao.get_select_by_receiver(user_id)) + + @staticmethod + async def unread_count(*, db, user_id: int) -> int: + stmt = select(func.count(WorkflowMessage.id)).where( + WorkflowMessage.receiver_id == user_id, + WorkflowMessage.is_read.is_(False), + ) + return int((await db.execute(stmt)).scalar() or 0) + + @staticmethod + async def push_message(*, db, message: WorkflowMessage) -> None: + unread_count = await WorkflowMessageService.unread_count(db=db, user_id=message.receiver_id) + await workflow_message_notification( + message.receiver_id, + { + 'id': message.id, + 'instance_id': message.instance_id, + 'task_id': message.task_id, + 'message_type': message.message_type, + 'title': message.title, + 'content': message.content, + 'is_read': message.is_read, + 'created_time': message.created_time.isoformat() if message.created_time else None, + 'unread_count': unread_count, + }, + ) + + @staticmethod + async def mark_read(*, db, pk: int, user_id: int) -> int: + count = await workflow_message_dao.update_model(db, pk, {'is_read': True}, receiver_id=user_id) + if count > 0: + await workflow_message_notification( + user_id, + { + 'id': pk, + 'type': 'READ', + 'unread_count': await WorkflowMessageService.unread_count(db=db, user_id=user_id), + }, + ) + return count + + +workflow_message_service = WorkflowMessageService() diff --git a/backend/plugin/workflow/service/task_service.py b/backend/plugin/workflow/service/task_service.py new file mode 100644 index 000000000..b389bc5ab --- /dev/null +++ b/backend/plugin/workflow/service/task_service.py @@ -0,0 +1,93 @@ +from backend.common.exception import errors +from backend.plugin.workflow.crud.crud_definition import workflow_definition_dao +from backend.plugin.workflow.crud.crud_instance import workflow_instance_dao +from backend.plugin.workflow.crud.crud_task import workflow_task_dao +from backend.plugin.workflow.model import WorkflowMessage +from backend.plugin.workflow.schema.task import ApproveWorkflowTaskParam, RejectWorkflowTaskParam +from backend.plugin.workflow.service.instance_service import advance_workflow_runtime +from backend.plugin.workflow.service.message_service import workflow_message_service +from backend.utils.timezone import timezone + + +class WorkflowTaskService: + @staticmethod + async def approve(*, db, pk: int, user_id: int, obj: ApproveWorkflowTaskParam): + task = await workflow_task_dao.get(db, pk) + if not task: + raise errors.NotFoundError(msg='任务不存在') + if task.assignee_id != user_id: + raise errors.ForbiddenError(msg='无权审批该任务') + if task.status != 'PENDING': + raise errors.RequestError(msg='任务已处理') + + await workflow_task_dao.update_model( + db, + pk, + {'status': 'APPROVED', 'comment': obj.comment, 'completed_time': timezone.now().isoformat()}, + ) + task = await workflow_task_dao.get(db, pk) + instance = await workflow_instance_dao.get(db, task.instance_id) + if instance: + definition = await workflow_definition_dao.get(db, task.definition_id) + next_task = None + if definition: + next_task = await advance_workflow_runtime(db=db, instance=instance, definition=definition, from_task=task) + instance = await workflow_instance_dao.get(db, task.instance_id) or instance + if next_task is None and instance.status == 'APPROVED': + message = WorkflowMessage( + receiver_id=instance.initiator_id, + instance_id=instance.id, + task_id=task.id, + message_type='APPROVED', + title='您的审批已通过', + content=f'流程《{instance.title}》已审批通过', + is_read=False, + ) + db.add(message) + await db.flush() + await workflow_message_service.push_message(db=db, message=message) + return await workflow_task_dao.get(db, pk) + + @staticmethod + async def reject(*, db, pk: int, user_id: int, obj: RejectWorkflowTaskParam): + task = await workflow_task_dao.get(db, pk) + if not task: + raise errors.NotFoundError(msg='任务不存在') + if task.assignee_id != user_id: + raise errors.ForbiddenError(msg='无权审批该任务') + if task.status != 'PENDING': + raise errors.RequestError(msg='任务已处理') + + await workflow_task_dao.update_model( + db, + pk, + {'status': 'REJECTED', 'comment': obj.comment, 'completed_time': timezone.now().isoformat()}, + ) + instance = await workflow_instance_dao.get(db, task.instance_id) + if instance: + await workflow_instance_dao.update_model(db, instance.id, {'status': 'REJECTED', 'current_task_id': None}) + message = WorkflowMessage( + receiver_id=instance.initiator_id, + instance_id=instance.id, + task_id=task.id, + message_type='REJECTED', + title='您的审批被拒绝', + content=f'流程《{instance.title}》被拒绝', + is_read=False, + ) + db.add(message) + await db.flush() + await workflow_message_service.push_message(db=db, message=message) + return await workflow_task_dao.get(db, pk) + + @staticmethod + async def get(*, db, pk: int, user_id: int): + task = await workflow_task_dao.get(db, pk) + if not task: + raise errors.NotFoundError(msg='任务不存在') + if task.assignee_id != user_id: + raise errors.ForbiddenError(msg='无权查看该任务') + return task + + +workflow_task_service = WorkflowTaskService() diff --git a/backend/plugin/workflow/sql/mysql/destroy.sql b/backend/plugin/workflow/sql/mysql/destroy.sql new file mode 100644 index 000000000..85d0c4c2c --- /dev/null +++ b/backend/plugin/workflow/sql/mysql/destroy.sql @@ -0,0 +1,4 @@ +delete from sys_menu where name in ( +'Workflow', 'WorkflowCategory', 'WorkflowStartApply', 'WorkflowDefinition', 'WorkflowTodo', 'WorkflowApply', 'WorkflowMessage', +'AddWorkflowCategory', 'EditWorkflowCategory', 'AddWorkflowDefinition', 'EditWorkflowDefinition' +); diff --git a/backend/plugin/workflow/sql/mysql/destroy_snowflake.sql b/backend/plugin/workflow/sql/mysql/destroy_snowflake.sql new file mode 100644 index 000000000..85d0c4c2c --- /dev/null +++ b/backend/plugin/workflow/sql/mysql/destroy_snowflake.sql @@ -0,0 +1,4 @@ +delete from sys_menu where name in ( +'Workflow', 'WorkflowCategory', 'WorkflowStartApply', 'WorkflowDefinition', 'WorkflowTodo', 'WorkflowApply', 'WorkflowMessage', +'AddWorkflowCategory', 'EditWorkflowCategory', 'AddWorkflowDefinition', 'EditWorkflowDefinition' +); diff --git a/backend/plugin/workflow/sql/mysql/init.sql b/backend/plugin/workflow/sql/mysql/init.sql new file mode 100644 index 000000000..8aa86c086 --- /dev/null +++ b/backend/plugin/workflow/sql/mysql/init.sql @@ -0,0 +1,21 @@ +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values ('workflow.menu', 'Workflow', '/plugins/workflow', 20, 'mdi:workflow', 0, null, null, 1, 1, 1, '', '审批流', null, now(), null); + +set @workflow_menu_id = LAST_INSERT_ID(); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('workflow.start', 'WorkflowStartApply', '/plugins/workflow/start', 1, 'mdi:play-circle-outline', 1, '/plugins/workflow/views/start-apply', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.definition', 'WorkflowDefinition', '/plugins/workflow/definition', 2, 'carbon:flow', 1, '/plugins/workflow/views/definition', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.todo', 'WorkflowTodo', '/plugins/workflow/my-todo', 3, 'mdi:clipboard-clock-outline', 1, '/plugins/workflow/views/my-todo', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.apply', 'WorkflowApply', '/plugins/workflow/my-apply', 4, 'mdi:file-document-edit-outline', 1, '/plugins/workflow/views/my-apply', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.message', 'WorkflowMessage', '/plugins/workflow/message', 5, 'mdi:message-badge-outline', 1, '/plugins/workflow/views/message', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null); + +set @workflow_definition_menu_id = (select id from sys_menu where name = 'WorkflowDefinition' limit 1); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('新增分类', 'AddWorkflowCategory', null, 0, null, 2, null, 'workflow:category:add', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null), +('修改分类', 'EditWorkflowCategory', null, 0, null, 2, null, 'workflow:category:edit', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null), +('新增流程', 'AddWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:add', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null), +('修改流程', 'EditWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:edit', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null); diff --git a/backend/plugin/workflow/sql/mysql/init_snowflake.sql b/backend/plugin/workflow/sql/mysql/init_snowflake.sql new file mode 100644 index 000000000..8aa86c086 --- /dev/null +++ b/backend/plugin/workflow/sql/mysql/init_snowflake.sql @@ -0,0 +1,21 @@ +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values ('workflow.menu', 'Workflow', '/plugins/workflow', 20, 'mdi:workflow', 0, null, null, 1, 1, 1, '', '审批流', null, now(), null); + +set @workflow_menu_id = LAST_INSERT_ID(); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('workflow.start', 'WorkflowStartApply', '/plugins/workflow/start', 1, 'mdi:play-circle-outline', 1, '/plugins/workflow/views/start-apply', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.definition', 'WorkflowDefinition', '/plugins/workflow/definition', 2, 'carbon:flow', 1, '/plugins/workflow/views/definition', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.todo', 'WorkflowTodo', '/plugins/workflow/my-todo', 3, 'mdi:clipboard-clock-outline', 1, '/plugins/workflow/views/my-todo', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.apply', 'WorkflowApply', '/plugins/workflow/my-apply', 4, 'mdi:file-document-edit-outline', 1, '/plugins/workflow/views/my-apply', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null), +('workflow.message', 'WorkflowMessage', '/plugins/workflow/message', 5, 'mdi:message-badge-outline', 1, '/plugins/workflow/views/message', null, 1, 1, 1, '', null, @workflow_menu_id, now(), null); + +set @workflow_definition_menu_id = (select id from sys_menu where name = 'WorkflowDefinition' limit 1); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('新增分类', 'AddWorkflowCategory', null, 0, null, 2, null, 'workflow:category:add', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null), +('修改分类', 'EditWorkflowCategory', null, 0, null, 2, null, 'workflow:category:edit', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null), +('新增流程', 'AddWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:add', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null), +('修改流程', 'EditWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:edit', 1, 0, 1, '', null, @workflow_definition_menu_id, now(), null); diff --git a/backend/plugin/workflow/sql/postgresql/destroy.sql b/backend/plugin/workflow/sql/postgresql/destroy.sql new file mode 100644 index 000000000..85d0c4c2c --- /dev/null +++ b/backend/plugin/workflow/sql/postgresql/destroy.sql @@ -0,0 +1,4 @@ +delete from sys_menu where name in ( +'Workflow', 'WorkflowCategory', 'WorkflowStartApply', 'WorkflowDefinition', 'WorkflowTodo', 'WorkflowApply', 'WorkflowMessage', +'AddWorkflowCategory', 'EditWorkflowCategory', 'AddWorkflowDefinition', 'EditWorkflowDefinition' +); diff --git a/backend/plugin/workflow/sql/postgresql/destroy_snowflake.sql b/backend/plugin/workflow/sql/postgresql/destroy_snowflake.sql new file mode 100644 index 000000000..85d0c4c2c --- /dev/null +++ b/backend/plugin/workflow/sql/postgresql/destroy_snowflake.sql @@ -0,0 +1,4 @@ +delete from sys_menu where name in ( +'Workflow', 'WorkflowCategory', 'WorkflowStartApply', 'WorkflowDefinition', 'WorkflowTodo', 'WorkflowApply', 'WorkflowMessage', +'AddWorkflowCategory', 'EditWorkflowCategory', 'AddWorkflowDefinition', 'EditWorkflowDefinition' +); diff --git a/backend/plugin/workflow/sql/postgresql/init.sql b/backend/plugin/workflow/sql/postgresql/init.sql new file mode 100644 index 000000000..82f9c2379 --- /dev/null +++ b/backend/plugin/workflow/sql/postgresql/init.sql @@ -0,0 +1,17 @@ +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values ('workflow.menu', 'Workflow', '/plugins/workflow', 20, 'mdi:workflow', 0, null, null, 1, 1, 1, '', '审批流', null, now(), null); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('workflow.start', 'WorkflowStartApply', '/plugins/workflow/start', 1, 'mdi:play-circle-outline', 1, '/plugins/workflow/views/start-apply', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.definition', 'WorkflowDefinition', '/plugins/workflow/definition', 2, 'carbon:flow', 1, '/plugins/workflow/views/definition', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.todo', 'WorkflowTodo', '/plugins/workflow/my-todo', 3, 'mdi:clipboard-clock-outline', 1, '/plugins/workflow/views/my-todo', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.apply', 'WorkflowApply', '/plugins/workflow/my-apply', 4, 'mdi:file-document-edit-outline', 1, '/plugins/workflow/views/my-apply', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.message', 'WorkflowMessage', '/plugins/workflow/message', 5, 'mdi:message-badge-outline', 1, '/plugins/workflow/views/message', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('新增分类', 'AddWorkflowCategory', null, 0, null, 2, null, 'workflow:category:add', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null), +('修改分类', 'EditWorkflowCategory', null, 0, null, 2, null, 'workflow:category:edit', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null), +('新增流程', 'AddWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:add', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null), +('修改流程', 'EditWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:edit', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null); diff --git a/backend/plugin/workflow/sql/postgresql/init_snowflake.sql b/backend/plugin/workflow/sql/postgresql/init_snowflake.sql new file mode 100644 index 000000000..82f9c2379 --- /dev/null +++ b/backend/plugin/workflow/sql/postgresql/init_snowflake.sql @@ -0,0 +1,17 @@ +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values ('workflow.menu', 'Workflow', '/plugins/workflow', 20, 'mdi:workflow', 0, null, null, 1, 1, 1, '', '审批流', null, now(), null); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('workflow.start', 'WorkflowStartApply', '/plugins/workflow/start', 1, 'mdi:play-circle-outline', 1, '/plugins/workflow/views/start-apply', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.definition', 'WorkflowDefinition', '/plugins/workflow/definition', 2, 'carbon:flow', 1, '/plugins/workflow/views/definition', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.todo', 'WorkflowTodo', '/plugins/workflow/my-todo', 3, 'mdi:clipboard-clock-outline', 1, '/plugins/workflow/views/my-todo', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.apply', 'WorkflowApply', '/plugins/workflow/my-apply', 4, 'mdi:file-document-edit-outline', 1, '/plugins/workflow/views/my-apply', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null), +('workflow.message', 'WorkflowMessage', '/plugins/workflow/message', 5, 'mdi:message-badge-outline', 1, '/plugins/workflow/views/message', null, 1, 1, 1, '', null, (select id from sys_menu where name = 'Workflow' limit 1), now(), null); + +insert into sys_menu (title, name, path, sort, icon, type, component, perms, status, display, cache, link, remark, parent_id, created_time, updated_time) +values +('新增分类', 'AddWorkflowCategory', null, 0, null, 2, null, 'workflow:category:add', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null), +('修改分类', 'EditWorkflowCategory', null, 0, null, 2, null, 'workflow:category:edit', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null), +('新增流程', 'AddWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:add', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null), +('修改流程', 'EditWorkflowDefinition', null, 0, null, 2, null, 'workflow:definition:edit', 1, 0, 1, '', null, (select id from sys_menu where name = 'WorkflowDefinition' limit 1), now(), null); diff --git a/backend/plugin/workflow/tasks/__init__.py b/backend/plugin/workflow/tasks/__init__.py new file mode 100644 index 000000000..902cf9daf --- /dev/null +++ b/backend/plugin/workflow/tasks/__init__.py @@ -0,0 +1 @@ +"""Workflow tasks package.""" diff --git a/backend/sql/mysql/init_snowflake_test_data.sql b/backend/sql/mysql/init_snowflake_test_data.sql index e1c929e07..52689383f 100644 --- a/backend/sql/mysql/init_snowflake_test_data.sql +++ b/backend/sql/mysql/init_snowflake_test_data.sql @@ -64,6 +64,15 @@ values (2048601263708438528, 2048601263515500544, 2049629108245233666), (2048601263775547392, 2048601263515500544, 2049629108253622282); +insert into sys_role_menu (id, role_id, menu_id) +select 2048601263775547393, 2048601263515500544, id from sys_menu where name = 'Workflow'; + +insert into sys_role_menu (id, role_id, menu_id) +select 2048601263775547394, 2048601263515500544, id from sys_menu where name = 'WorkflowStartApply'; + +insert into sys_role_menu (id, role_id, menu_id) +select 2048601263775547395, 2048601263515500544, id from sys_menu where name = 'WorkflowApply'; + insert into sys_user (id, uuid, username, nickname, password, salt, email, status, is_superuser, is_staff, is_multi_login, avatar, phone, join_time, last_login_time, last_password_changed_time, dept_id, created_time, updated_time) values (2048601263834267648, uuid(), 'admin', '用户88888', '$2b$12$8y2eNucX19VjmZ3tYhBLcOsBwy9w1IjBQE4SSqwMDL5bGQVp2wqS.', unhex('24326224313224387932654E7563583139566A6D5A33745968424C634F'), 'admin@example.com', 1, true, true, true, null, null, now(), now(), now(), 2048601258595581952, now(), null), diff --git a/backend/sql/mysql/init_test_data.sql b/backend/sql/mysql/init_test_data.sql index 2c4269e98..58ed62129 100644 --- a/backend/sql/mysql/init_test_data.sql +++ b/backend/sql/mysql/init_test_data.sql @@ -64,6 +64,15 @@ values (3, 1, 3), (4, 1, 50); +insert into sys_role_menu (id, role_id, menu_id) +select 5, 1, id from sys_menu where name = 'Workflow'; + +insert into sys_role_menu (id, role_id, menu_id) +select 6, 1, id from sys_menu where name = 'WorkflowStartApply'; + +insert into sys_role_menu (id, role_id, menu_id) +select 7, 1, id from sys_menu where name = 'WorkflowApply'; + insert into sys_user (id, uuid, username, nickname, password, salt, email, status, is_superuser, is_staff, is_multi_login, avatar, phone, join_time, last_login_time, last_password_changed_time, dept_id, created_time, updated_time) values (1, uuid(), 'admin', '用户88888', '$2b$12$8y2eNucX19VjmZ3tYhBLcOsBwy9w1IjBQE4SSqwMDL5bGQVp2wqS.', unhex('24326224313224387932654E7563583139566A6D5A33745968424C634F'), 'admin@example.com', 1, true, true, true, null, null, now(), now(), now(), 1, now(), null), diff --git a/backend/sql/postgresql/init_snowflake_test_data.sql b/backend/sql/postgresql/init_snowflake_test_data.sql index 9157da1de..16302db5b 100644 --- a/backend/sql/postgresql/init_snowflake_test_data.sql +++ b/backend/sql/postgresql/init_snowflake_test_data.sql @@ -64,6 +64,15 @@ values (2048601269546909696, 2048601269345583104, 2049629108245233666), (2048601269609824256, 2048601269345583104, 2049629108253622282); +insert into sys_role_menu (id, role_id, menu_id) +select 2048601269609824257, 2048601269345583104, id from sys_menu where name = 'Workflow'; + +insert into sys_role_menu (id, role_id, menu_id) +select 2048601269609824258, 2048601269345583104, id from sys_menu where name = 'WorkflowStartApply'; + +insert into sys_role_menu (id, role_id, menu_id) +select 2048601269609824259, 2048601269345583104, id from sys_menu where name = 'WorkflowApply'; + insert into sys_user (id, uuid, username, nickname, password, salt, email, status, is_superuser, is_staff, is_multi_login, avatar, phone, join_time, last_login_time, last_password_changed_time, dept_id, created_time, updated_time) values (2048601269672738816, gen_random_uuid(), 'admin', '用户88888', '$2b$12$8y2eNucX19VjmZ3tYhBLcOsBwy9w1IjBQE4SSqwMDL5bGQVp2wqS.', decode('24326224313224387932654E7563583139566A6D5A33745968424C634F', 'hex'), 'admin@example.com', 1, true, true, true, null, null, now(), now(), now(), 2048601264366944256, now(), null), diff --git a/backend/sql/postgresql/init_test_data.sql b/backend/sql/postgresql/init_test_data.sql index dab6a3883..fe95c5289 100644 --- a/backend/sql/postgresql/init_test_data.sql +++ b/backend/sql/postgresql/init_test_data.sql @@ -64,6 +64,15 @@ values (3, 1, 3), (4, 1, 50); +insert into sys_role_menu (id, role_id, menu_id) +select 5, 1, id from sys_menu where name = 'Workflow'; + +insert into sys_role_menu (id, role_id, menu_id) +select 6, 1, id from sys_menu where name = 'WorkflowStartApply'; + +insert into sys_role_menu (id, role_id, menu_id) +select 7, 1, id from sys_menu where name = 'WorkflowApply'; + insert into sys_user (id, uuid, username, nickname, password, salt, email, status, is_superuser, is_staff, is_multi_login, avatar, phone, join_time, last_login_time, last_password_changed_time, dept_id, created_time, updated_time) values (1, gen_random_uuid(), 'admin', '用户88888', '$2b$12$8y2eNucX19VjmZ3tYhBLcOsBwy9w1IjBQE4SSqwMDL5bGQVp2wqS.', decode('24326224313224387932654E7563583139566A6D5A33745968424C634F', 'hex'), 'admin@example.com', 1, true, true, true, null, null, now(), now(), now(), 1, now(), null), From ab59359748cf2e99dcb6120cb9a3b5f65bb54d38 Mon Sep 17 00:00:00 2001 From: Albert <18556863020@163.com> Date: Tue, 7 Apr 2026 18:55:11 +0800 Subject: [PATCH 2/2] feat: add workflow websocket room notifications --- backend/app/task/actions.py | 4 ++-- backend/common/socketio/actions.py | 5 +++++ backend/common/socketio/server.py | 9 +++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/backend/app/task/actions.py b/backend/app/task/actions.py index b6bfde7bc..a21811f50 100644 --- a/backend/app/task/actions.py +++ b/backend/app/task/actions.py @@ -4,8 +4,8 @@ from backend.common.socketio.server import sio -@sio.event +@sio.event(namespace='/ws') async def task_worker_status(sid, data) -> None: # noqa: ANN001 """任务 Worker 状态事件""" worker = await run_in_threadpool(celery_app.control.ping) - await sio.emit('task_worker_status', worker, sid) + await sio.emit('task_worker_status', worker, sid, namespace='/ws') diff --git a/backend/common/socketio/actions.py b/backend/common/socketio/actions.py index b6bcc4b9a..7a615ec5e 100644 --- a/backend/common/socketio/actions.py +++ b/backend/common/socketio/actions.py @@ -9,3 +9,8 @@ async def task_notification(msg: str) -> None: :return: """ await sio.emit('task_notification', {'msg': msg}) + + +async def workflow_message_notification(user_id: int, data: dict) -> None: + """审批流消息通知""" + await sio.emit('workflow_message', data, room=f'user:{user_id}', namespace='/ws') diff --git a/backend/common/socketio/server.py b/backend/common/socketio/server.py index 64d629682..b3bb78d4d 100644 --- a/backend/common/socketio/server.py +++ b/backend/common/socketio/server.py @@ -18,11 +18,10 @@ async_mode='asgi', cors_allowed_origins=settings.CORS_ALLOWED_ORIGINS, cors_credentials=True, - namespaces=['/ws'], ) -@sio.event +@sio.event(namespace='/ws') async def connect(sid, environ, auth) -> bool: """Socket 连接事件""" if not auth: @@ -42,16 +41,18 @@ async def connect(sid, environ, auth) -> bool: try: with request_cycle_context({settings.TRACE_ID_REQUEST_HEADER_KEY: uuid.uuid4().hex}): - await jwt_authentication(token) + user = await jwt_authentication(token) except Exception as e: log.info(f'WebSocket 连接失败:{e!s}') return False + await sio.save_session(sid, {'user_id': user.id, 'session_uuid': session_uuid}, namespace='/ws') + await sio.enter_room(sid, f'user:{user.id}', namespace='/ws') await redis_client.sadd(settings.TOKEN_ONLINE_REDIS_PREFIX, session_uuid) return True -@sio.event +@sio.event(namespace='/ws') async def disconnect(sid) -> None: """Socket 断开连接事件""" await redis_client.spop(settings.TOKEN_ONLINE_REDIS_PREFIX)