
Dify 工作流引擎后端源码修改记录(源码)
前言
说一下目前使用 dify 引擎搭建工作流的时候遇到的问题:把工作流打包成工具(workflow_tool)之后,工作流工具不能流式输出。这次修改的目的就是是其能够流式输出,优化体验。
改造前的问题
先讲一下目前的需求:现在我们小组主要负责的是搭建烟草领域的问数智能体,相关数据又分为销量数据、零售户数据、品牌品规数据等领域,由于每个部分的计算规则不一样,一开始想着把所有计算规则都塞到一份提示词中让大模型写 SQL,但是效果太差了(目前使用的是 qwen2.5-72b-int4)。一是每个部分的表虽然只有两三个,但是几个部分加在一起就有九个,九个表的 sql-schema 都塞到提示词里会互相干扰;二是每个部分的计算规则有些容易冲突的地方,有的规则公式比较复杂。
那么我们的解决办法就是多搭建几个 workflow,每个 workflow 只负责一个领域的数据,这样也方便分工和调试。上个月小组长看到开源项目 vanna-db 的 few-shot 思想,我们就在 milvus 数据库中建立了一些 sql 样例数据(150 条左右)。拆分成多个 workflow 之后,刚好每个 workflow 都可以专门检索每个领域的数据,接到我们的 ReAct 智能体分步进行问数任务的时候,效果还不错,只不过这样一来 sql 语句不能流式输出了,中烟那边的客户用的时候觉得是我们系统卡顿…(尝试解释过客户那边也听不懂 😕)
上周为了改成流式输出的,小组长就让我先把所有工作流重新都放在一个 chatflow 里(累 😫),临时供用户那边使用。这么一搞节点数也太多了,还不好分工(类似数据库脏读,有时候我改完了别人又改回去)。小组长是两周前就说想改改代码,我之前没方面经验,所以先跟着小组长看了下代码(前三节博客),共计耗费一周时间(一天时间布置环境,四天调试和修改代码)。
我的总体方案
这是修改的代码记录:

我把这次改造拆成了五层,逐层打通:
- 工具协议层:允许工具返回 Generator,而不是只接受单条或列表消息。
- 工具执行层:generic_invoke 直接 yield from 工具输出,保持流式语义。
- workflow_tool 适配层:开启 streaming,识别 text_chunk 并转为 RunStreamChunkEvent。
- ToolNode 节点层:区分 NodeEvent、ToolInvokeMessage、list 三类返回,避免重复流式。
- 图执行与响应层:统一流处理器并让 node finish 带上 outputs,前端可获得更完整信息。
下面按代码说明。
1) 工具协议层:Tool 支持 Generator 直返
对应文件:core/tools/__base/tool.py
核心点是把 invoke 的返回值里 Generator 放在第一优先级直接返回,保证后续层拿到的是“可迭代流”。
1 | result = self._invoke( |
这一步是整个改造的入口,没有它,下面的流式透传都无从谈起。
2) 工具执行层:ToolEngine 保留流式通道
对应文件:core/tools/tool_engine.py
在 workflow 场景里,我让 generic_invoke 在拿到工具响应后直接透传:
1 | response = tool.invoke( |
关键价值是:执行层不再把输出先吃完再回调,而是让流先走出去。
3) workflow_tool 适配层:把内部事件翻译成流片段
对应文件:core/tools/workflow_as_tool/tool.py
这里是本次改造最关键的一层。
3.1 开启内部工作流 streaming
1 | result = generator.generate( |
3.2 识别 text_chunk 并转成 RunStreamChunkEvent
1 | for message in result: |
3.3 在 workflow_finished 时组装最终消息
1 | if event_type == "workflow_finished": |
这样做的好处是:
- 中间过程可实时流式。
- 结束时仍保留原有 files/text/json 结构,兼容既有消费逻辑。
4) ToolNode 节点层:做返回类型分流与去重
对应文件:core/workflow/nodes/tool/tool_node.py
ToolNode 现在会按返回类型走不同分支:
1 | if isinstance(message_stream, Generator): |
另外,我给 _transform_message 增加了 is_workflow_tool 参数,避免重复抛流:
1 | elif message.type == ToolInvokeMessage.MessageType.TEXT: |
这一步解决的是“同一文本被流两次”的问题。
5) 基础节点与图执行层补强
5.1 BaseNode:生成器异常保护,保证一定回收成 RunCompletedEvent
对应文件:core/workflow/nodes/base/node.py
1 | try: |
意义是:即便流中途抛异常,图引擎也能收到标准完成事件,不会卡死在“节点未结束”状态。
5.2 GraphEngine:统一采用 AnswerStreamProcessor
对应文件:core/workflow/graph_engine/graph_engine.py
1 | if self.init_params.workflow_type == WorkflowType.CHAT: |
这里把非 chat 流程也统一到同一流处理器,减少处理分叉。
5.3 NodeFinishStreamResponse:把 outputs 带给前端
对应文件:core/app/entities/task_entities.py
1 | "outputs": self.data.outputs, |
这个改动很小,但对调试和前端展示非常关键。
6) 端到端效果
改造后,workflow_tool 的行为变成:
- 工作流内部 LLM 产生 text_chunk 时,前端即时收到分片。
- 工具节点结束时仍能拿到完整 outputs/files/json。
- 出错时有标准完成事件兜底,图引擎状态机保持稳定。
一句话总结:既保住了实时流式体验,也保住了原有结果结构兼容性。
7) 这次改造里的经验
这次修改算是从 Tool 协议、ToolEngine、WorkflowTool、ToolNode、BaseNode 到 GraphEngine 与响应对象,全部围绕一个目标统一了行为。
- 同时支持流式事件和最终汇总结果时,必须明确类型分流策略。
- 节点异常一定要落成标准完成事件,否则上层状态机会被破坏。
- 小改动也重要,例如 node finished 带 outputs,会极大提升可观察性。
- 也是第一次修改这个比较大的工程源码,感觉看源码首先就是得布置好环境,先保证可以调试再看代码,不能逞能(硬看太crazy了);
- 修改的时候也有用AI工具,感觉现在的AI确实强,不过我觉得是明确修改位置让它写一般没问题,像大型项目的话还是得学会看日志,这次也是在
core\app\apps\advanced_chat\generate_task_pipeline.py中添加print语句打印出事件信息才加快了改进流程。
- 标题: Dify 工作流引擎后端源码修改记录(源码)
- 作者: fanz
- 创建于 : 2025-05-10 21:21:23
- 更新于 : 2026-03-29 23:39:04
- 链接: https://redefine.ohevan.com/tcnx3o/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。