RESTGrid:.NET Core下的RESTful工作流与ETL系统

在当今的软件开发领域,工作流和ETL(Extract, Transform, Load)系统是数据处理和业务流程自动化的关键组成部分。RESTGrid是一个基于.NET Core的简单工作流/ETL系统,它通过REST服务与外界交互,支持同步和异步工作流,并且可以通过REST API进行管理。此外,RESTGrid还支持Docker容器部署,使得它可以轻松地在支持Docker的云平台上运行。

RESTGrid的核心特性

RESTGrid具有以下核心特性:

  • 使用JSON格式定义工作流
  • 支持同步和异步(长运行)工作流
  • 每个工作流步骤都是对REST服务的调用
  • 支持通过REST API启动和重启工作流
  • 通过REST API进行管理
  • 支持使用JUST定义JSON转换
  • 基于接口的系统,可以连接不同的数据源
  • 提供MySql和AWS DynamoDB的Docker容器

系统架构

RESTGrid的系统架构包括以下几个主要组件:

  • 编排引擎(Orchestration Engine):系统的核心,负责排队任务、读取工作流的业务逻辑并执行任务。
  • REST API用于外部系统启动/重启工作流以及用户管理/设置工作流。
  • 后端存储(Backend):存储配置和历史数据的地方。RESTGrid是主要的项目,定义了系统的接口、对象和编排逻辑。

业务逻辑

系统的业务逻辑使用标准的JSON格式定义,与之前提到的FloatingBridge相似,但更为简化。业务逻辑的JSON包含以下属性:

  • Start:工作流的起始任务。
  • Tasks:工作流中所有后续任务的数组。
每个任务(Task)的JSON包含以下属性:
  • Identifier:唯一标识符,用于识别任务。
  • Type:任务类型。
  • Next:指向此任务之后可能运行的任务的标识符数组。
  • TaskRetries:任务失败前可以重试的次数。
  • TaskProperties:包含如何运行任务的信息的JSON对象(可以使用JUST进行转换)。
  • RunCondition:任务运行必须满足的条件。

任务类型

任务可以是以下四种类型之一:

  • Sync:同步REST调用(同步调用后执行下一步)。
  • Async:异步REST调用(执行异步调用后,工作流等待外部输入)。
  • Transformer:转换消息体(使用JUST转换)。
  • Splitter:基于JSON中的数组分割消息体(使用JUST分割)。

使用代码

RESTGrid是一个基于接口的系统。核心库提供了可以继承以实现自己的后端提供者的接口。以下是提供的一些接口: namespace RESTGrid.Interfaces { public interface IAdministration { void CreateWorkflowType(string workflowTypeName, JObject businessLogicJson); void CreateTransformer(JObject transformerJson); WorkflowHistory GetHistory(string workflowID); } } namespace RESTGrid.Interfaces { public interface IOrchestration { void PublishWorkflowStep(string workflowTypeName, Guid workflowID, JObject messageBodyJson, JObject customPropertiesJson, string stepIdentifier, bool stepSucceeded, bool workflowCompleted, int retries, bool active, string runStepIdentifier, string splitID); void SetWorkflowActive(JObject messageBodyJson, string customPropertyName, string customPropertyValue); List<RESTGrid.Models.Queue> Enqueue(); JObject GetTransformer(int transformerID); } } 实现IOrchestration接口后,可以轻松开发自己的编排/工作流引擎。以下是MySql引擎的实现方式: MySqlOrchestration orchestration = new MySqlOrchestration(connectionString); OrchestrationEngine engine = new OrchestrationEngine(orchestration); Console.WriteLine("Running orchestration engine..."); while (true) { engine.Run(); }

REST API

MySql提供者的REST API包括以下API:

  • 管理API:
    • GET {url}/api/Administration/History{workflowID} - 返回整个工作流历史。
    • POST {url}/api/Administration/WorkflowType/{workflowTypeName} - 包含业务逻辑JSON的请求体。
    • POST {url}/api/Administration/Transformer - 返回204 No Content。
  • 编排API:
    • PUT {url}/api/Orchestration/Workflow/{customPropertyName}/{customPropertyValue} - (可选)包含新输入消息的JSON。
    • POST {url}/api/Orchestration/Workflow/{workflowTypeName} - 包含JSON消息的请求体。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485