Dify 工作流引擎处理流程源码解析_01_基础结构篇

Dify 工作流引擎处理流程源码解析_01_基础结构篇

fanz Lv3

注:当前研究的 dify 版本为 0.19.1

正好最近和小组长一起在尝试修改 dify 的后台代码,希望可以将“工作流工具”改成流式输出的格式,不过任何事都得一步步来,这两天先看了一下 dify 里最核心的部分:GraphEngine,正好记录一下阅读完源码之后的一点理解。

1. 简单梳理 Workflow 部分源码结构

按照 dify 的 Readme 文件将后端服务跑起来之后,我先根据前端 F12 窗口中的接口地址定位到后台接口位置,打好断点加上查看堆栈信息,可以确定工作流服务的代码都在services\app_generate_service.py这一层。在AppGenerateService.generate这个静态方法中,可以看到对于工作流和 chatflow,使用的 generator 有所不同:
image.png
这里因为 chatflow 中可以添加直接回复节点实现流式输出,而 workflow 原生不支持,所以下面重点研究AdvancedChatAppGenerator().generate方法。现在先用伪代码梳理一下源码的大致结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
`AppGenerateService.generate()
-> 根据App类型选择对应的Generator,如果是"chatflow"选择`AdvancedChatAppGenerator()

-----------AdvancedChatAppGenerator().generate() ----------------
-> 1. 开启一个WorkflowAppQueueManager(保持监听的消息队列);
2. 开启一个新线程执行_generate_worker任务,实际执行的是WorkflowAppRunner.run()
3. 还开启一个WorkflowAppGenerateTaskPipeline监听这个queue_manager
4. 流程执行期间_generate_worker不断向消息队列写消息,generate_task_pipeline不断监听消息队列读消息。generator跑出去的就是generate_task_pipeline读的消息

-> ----------------(Async Thread) WorkflowAppRunner.run() (构造函数传入 queue_manager)-----------------
1. 调用Graph.init(graph_config)方法,构建节点和边的关系(包括并行边,直接回复节点,结束节点等都会在这一步解析出来);
2. 创建WorkflowEntry对象,执行workflow_entry.run();
-> ---------------(Async Thread) WorkflowEntry.run()--------------------
1.调用 GraphEngine.run()
-> --------------------(Async Thread) GraphEngine.run()(最底层!)------------------
1. 构造时默认会开启一个线程池,用于处理并行分支;
2.while True循环中根据刚才加载的graph按照节点和边的关系,逐个节点运行;
3. 如果超出最大执行步数或者最大执行时间,直接抛出异常;
4. 每次节点执行开始都会根据NodeRunStartedEvent记录运行步数,节点执行结束之后会检查出边分支有没有condition,如果有,就根据节点执行状态判断分支走向;
5. 如果节点之后有多条并行分支,那么会把并行任务放在线程池中并发执行。(注意,并行分支中可能会再出现并行分支,所以这个_run_parallel_branches方法会递归执行)

<- -----(GraphEngine) 不断yield工作流执行时产生的各种事件-----------

<- -----(WorkflowEntry) 接收graph_engine的事件,封装后再抛出-------------

<- -----(WorkflowAppRunner) workflow_entry返回的所有事件,都会在handle_event()中由queue_manager.publish()发布到刚才监听的消息队列中。----------------

<-----(AdvancedChatAppGenerator) generate_task_pipeline不断监听queue_manager读消息
# 持续不断监听消息,直到chatflow运行完毕

流程的话比较复杂,我用 Excalidraw 画了一个简单的图方便理解。个人感觉 WorkflowAppRunner -> WorkflowEntry -> GraphEngine 这种层层封装的信息传递方式很像计网里 OSI 层的信息传递方式。那下面的话就按照从上到下的层次顺序逐层说明一下代码。
image.png

2. AdvancedChatAppGenerator 层 (request thread)

如果创建的应用是 chatflow,那么会在AppGenerateService.generate这一层会进入 AdvancedChatAppGenerator。这一层作为工作流启动的最高层,负责开启消息队列,记录会话信息等。
image.png
这里点开 MessageBasedAppQueueManager 的父类,可以发现这个队列是一直在持续监听的,当且仅当遇到错误 event 或者终止 event 才停止监听。这么做是为了在工作流执行报错的时候即使关闭监听任务,减少 CPU 资源的占用。
image.png

image.png
可以发现的是,消息队列监听任务消耗的是当前发起请求所占用的线程,app_generator 只额外开启了一个 generate_worker 线程用来执行工作流,并且向消息队列中发送信息。app_generator 在_handle_advanced_chat_response 方法中会不断监听这个消息队列,实际上,为了方便查看源码,可以在AdvancedChatAppGenerateTaskPipeline 的这个方法里打印日志信息:
image.png

总结

  • AdvancedChatAppGenerator 作为最高层,负责在工作流开始执行前做一些准备工作,比如开启新线程,记录日志到数据库,开启消息队列并开启监听消息队列等,本身不真正执行工作流;
  • 消息队列持续监听占用的是收到 request 的接口线程,不是主线程,可以算是“主任务线程”;
  • 消息队列收到工作流执行终止或者报错事件的时候会停止监听,避免查勇过多资源。

3.WorkflowAppRunner 层(new thread)

从这一层开始都是在开启的新线程中执行的了, WorkflowAppRunner 会先记录工作流执行时的 conv_id,变量信息等,并且在这一步会 加载 graph 的数据结构,解析工作流图中节点与边的关系这是至关重要的一步,工作流的节点执行走向都依赖于此
紧接着, WorkflowAppRunner 会创建一个 WorkflowEntry,然后就是上面说的消息封装再抛出了。
image.png
这一层的执行流程非常简约,最关键的那一步是init_graph,实际上这个方法来源于Graph.init。这一部分因为篇幅较长,会在后面的部分中详细介绍,,这里只需要知道init_graph负责将工作流的 json 配置文件加载成 graph,便于之后流程的进行和分析就可以了。

总结

  • WorkflowAppRunner是工作流 “预备执行” 的开始,负责工作流执行开始的 graph 解析工作和日志记录工作;
  • 最关键的地方就是 graph 的解析,后续工作流的执行全都依赖于此时的解析工作,包括 并行分支、直接回复节点等;

4. WorkflowEntry 层 (new thread)

我更愿意把这个类叫做 WorkflowInstance 类,其实这一层就是负责创建工作流执行实例,然后开始执行。上一层的WorkflowAppRunner完成 graph 解析之后,会将 graph 直接传递给 entry 的构造函数,这里会创建一个GraphEngine
image.png
同样的,这里也是对信息再做一次封装,然后这里的 callback 应该是个后续扩展开发方向,因为现在的 callback 只有一个 WorkflowLoggingCallback,其作用目前只有记录日志,后续可能会有其他操作?
image.png

5. GraphEngine.run()层 (new thread)

现在终于到最底层了,到达这一层才是开始执行工作流节点的一层。对于 dify 中的工作流,底层原理是 DAG(有向无环图),执行过程中需要考虑LLM 节点的流式输出问题、并行分支的执行问题、执行过程中的变量上下文问题等。并且这里查看源码发现,dify 的开发者继承 ThreadPoolExecutor 创建了一个 GraphEngineThreadPool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class GraphEngineThreadPool(ThreadPoolExecutor):
def __init__(
self,
max_workers=None,
thread_name_prefix="",
initializer=None,
initargs=(),
max_submit_count=dify_config.MAX_SUBMIT_COUNT,
) -> None:
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
self.max_submit_count = max_submit_count
self.submit_count = 0

def submit(self, fn, /, *args, **kwargs):
self.submit_count += 1
self.check_is_full()

return super().submit(fn, *args, **kwargs)

def task_done_callback(self, future):
self.submit_count -= 1

def check_is_full(self) -> None:
if self.submit_count > self.max_submit_count:
raise ValueError(
f"Max submit count {self.max_submit_count} of workflow thread pool reached."
)

这个线程池后面会用来处理并行分支问题。下面还是直接看核心 run()方法:
image.png

好的,下面就直接进入_run方法,可以发现这里开启了一个 while true 循环,并且会记录工作流执行步数和执行时间,如果超出限制会及时报错,防止工作流运行陷入死循环吞 CPU 资源。
image.png
这里这个方法的代码的代码比较长,我在关键位置添加了对应的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def _run(
self,
start_node_id: str,
in_parallel_id: Optional[str] = None,
parent_parallel_id: Optional[str] = None,
parent_parallel_start_node_id: Optional[str] = None,
handle_exceptions: list[str] = [],
) -> Generator[GraphEngineEvent, None, None]:
parallel_start_node_id = None
if in_parallel_id:
parallel_start_node_id = start_node_id

next_node_id = start_node_id
previous_route_node_state: Optional[RouteNodeState] = None
while True:
# 如果执行步数超过最大限制,那么直接抛出错误
# 如果工作流执行时间超过最大限制,那么直接抛出运行超时错误......

# 初始化当前节点的运行状态(包括当前节点的变量信息)
route_node_state = (
self.graph_runtime_state.node_run_state.create_node_state(
node_id=next_node_id
)
)
# get node config
node_id = route_node_state.node_id
node_config = self.graph.node_id_config_mapping.get(node_id)
# 获取上一个节点
previous_node_id = (
previous_route_node_state.node_id if previous_route_node_state else None
)

# 初始化工作流的运行状态,包括工作流的变量信息等
node_instance = node_cls( # type: ignore
id=route_node_state.id,
config=node_config,
graph_init_params=self.init_params,
graph=self.graph,
graph_runtime_state=self.graph_runtime_state,
previous_node_id=previous_node_id,
thread_pool_id=self.thread_pool_id,
)
node_instance = cast(BaseNode[BaseNodeData], node_instance)
try:
# 核心!运行单个节点!
generator = self._run_node(
node_instance=node_instance,
route_node_state=route_node_state,
parallel_id=in_parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
handle_exceptions=handle_exceptions,
)
# 这里返回值为什么是一个 迭代器?
# 因为存在LLM Node,需要支持流式输出,并且迭代器也可以传递单个值
for item in generator:
if isinstance(item, NodeRunStartedEvent):
self.graph_runtime_state.node_run_steps += 1
item.route_node_state.index = (
self.graph_runtime_state.node_run_steps
)

yield item
# 记录当前节点的运行状态,并且添加到执行路线中
self.graph_runtime_state.node_run_state.node_state_mapping[
route_node_state.id
] = route_node_state

# append route
except Exception as e:
route_node_state.status = RouteNodeState.Status.FAILED
route_node_state.failed_reason = str(e)
# yield错误事件

# 如果遇到End节点,就直接结束
if (
self.graph.node_id_config_mapping[next_node_id]
.get("data", {})
.get("type", "")
.lower()
== NodeType.END.value
):
break

previous_route_node_state = route_node_state

# 获取当前节点的所有出边,也即当前节点指向的所有下一个节点
edge_mappings = self.graph.edge_mapping.get(next_node_id)
if not edge_mappings:
break
# 如果只有一条出边,说明是单向分支
if len(edge_mappings) == 1:
edge = edge_mappings[0]
if (
previous_route_node_state.status == RouteNodeState.Status.EXCEPTION
and node_instance.node_data.error_strategy
== ErrorStrategy.FAIL_BRANCH
and edge.run_condition is None
):
break
# 分支节点可能是有判断条件的,这里根据现在的运行状态(含变量信息)判断分支条件
if edge.run_condition:
result = ConditionManager.get_condition_handler(
init_params=self.init_params,
graph=self.graph,
run_condition=edge.run_condition,
).check(
graph_runtime_state=self.graph_runtime_state,
previous_route_node_state=previous_route_node_state,
)

if not result:
break

next_node_id = edge.target_node_id
# 如果有多条出边,那么就说明遇到了并行分支的场景
else:
final_node_id = None
# 只要有任意一条分支有判定条件,都需要逐个判定
if any(edge.run_condition for edge in edge_mappings):
condition_edge_mappings: dict[str, list[GraphEdge]] = {}
for edge in edge_mappings:
if edge.run_condition:
# 这里说一下为什么要使用 条件节点的 hash?
# 原因是可能是分支节点(比如"i<3")之后有多条分支,这里是根据分支节点的判定条件对之后的分支进行聚类
run_condition_hash = edge.run_condition.hash
if run_condition_hash not in condition_edge_mappings:
condition_edge_mappings[run_condition_hash] = []

condition_edge_mappings[run_condition_hash].append(edge)

for _, sub_edge_mappings in condition_edge_mappings.items():
if len(sub_edge_mappings) == 0:
continue

edge = cast(GraphEdge, sub_edge_mappings[0])
if edge.run_condition is None:
logger.warning(
f"Edge {edge.target_node_id} run condition is None"
)
continue
# 同上,这里也是根据现在的运行状态判断能否满足条件分支的条件
result = ConditionManager.get_condition_handler(
init_params=self.init_params,
graph=self.graph,
run_condition=edge.run_condition,
).check(
graph_runtime_state=self.graph_runtime_state,
previous_route_node_state=previous_route_node_state,
)

if not result:
continue

if len(sub_edge_mappings) == 1:
final_node_id = edge.target_node_id
else:
# 很细节,这里同上,因为可能是条件分支节点后面有多个分支,所以是
# 条件节点后运行并行分支
parallel_generator = self._run_parallel_branches(
edge_mappings=sub_edge_mappings,
in_parallel_id=in_parallel_id,
parallel_start_node_id=parallel_start_node_id,
handle_exceptions=handle_exceptions,
)
# 可以发现,并行分支方法返回的也是一个迭代器,
# 关于具体原因下面会说明,感觉设计的也很巧妙
for parallel_result in parallel_generator:
if isinstance(parallel_result, str):
final_node_id = parallel_result
else:
yield parallel_result

break

if not final_node_id:
break

next_node_id = final_node_id
# 如果运行状态出错,或者走到"异常分支",就停止运行
elif (
node_instance.node_data.error_strategy == ErrorStrategy.FAIL_BRANCH
and node_instance.should_continue_on_error
and previous_route_node_state.status
== RouteNodeState.Status.EXCEPTION
):
break
# 如果所有分支都没有额外条件,那么直接运行并行分支,和上面的步骤一样
else:
parallel_generator = self._run_parallel_branches(
edge_mappings=edge_mappings,
in_parallel_id=in_parallel_id,
parallel_start_node_id=parallel_start_node_id,
handle_exceptions=handle_exceptions,
)

for generated_item in parallel_generator:
if isinstance(generated_item, str):
final_node_id = generated_item
else:
yield generated_item

if not final_node_id:
break

next_node_id = final_node_id
# 最关键的一段代码!处理并行分支必须!
# 如果当前节点已经不属于正在运行的分支,那么及时break
if (
in_parallel_id
and self.graph.node_parallel_mapping.get(next_node_id, "")
!= in_parallel_id
):
break

简述一下执行流程,其实大概就是这样的:

  1. 开启一个 while true 循环,在循环中按照 graph 的节点顺序依次执行,如果运行超时或者超过最大步数立即终止循环;
  2. 运行单个节点返回的都是迭代器,原因是考虑到大模型节点是流式输出的;
  3. 单个节点执行完毕之后,会查找这个节点有多少出边,如果没有出边,则终止;如果仅有一条出边,则接着运行;如果有多条出边,首先判断有哪些出边满足 run_condition,并行运行这些有条件的出边;如果所有出边都没有 run_condition,那么直接并行运行所有出边
  4. 并行处理时,在运行节点结尾判定,如果当前节点已经不在当前分支(分支有自己的 id)中,立即 break;

5.1 run_condition 长什么样子

有点想看一下这个 run_condition 长什么样子,这个应该是在条件分支节点之后添加上的。为了知晓答案,我用源码启动后端代码,前端和其他组件用 docker compose 启动,前端搭建的工作流如图:
image.png

这里我在 IFElseNode 的源码位置打了断点,发现下列信息。对于条件分支这个节点,一共有两个 case_id,分别是 true、uuid(其实还有一个 false),这个就是分支的 id。
image.png
self.graph.edge_mapping 中,可以看到真正的 run_condition,其实和条件分支的 case_id 是绑定的:
image.png
这里 IF-ELSE 节点结束之后,会在返回结果中记录 edge_source_handle 字段,条件判断器也是根据这个字段来判断是否满足执行条件的:
image.png

5.2 并行分支是怎么处理的?

这个我觉得是 dify 设计的一个小亮点,相关代码在这里。其实还是使用消息队列。将并行分支的任务放在线程池中并行运行,然后监听消息队列。每个线程在运行节点的时候都会检测,如果某个节点已经不在当前分支中,那么就立即 break:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def _run_parallel_branches(
self,
edge_mappings: list[GraphEdge],
in_parallel_id: Optional[str] = None,
parallel_start_node_id: Optional[str] = None,
handle_exceptions: list[str] = [],
) -> Generator[GraphEngineEvent | str, None, None]:
# .....
# run parallel nodes, run in new thread and use queue to get results
# 依然是使用消息队列的形式收到消息
q: queue.Queue = queue.Queue()

# Create a list to store the threads
futures = []

# new thread
for edge in edge_mappings:
if (
edge.target_node_id not in self.graph.node_parallel_mapping
or self.graph.node_parallel_mapping.get(edge.target_node_id, "")
!= parallel_id
):
continue
# 在线程池中提交并行分支任务,并行运行
future = self.thread_pool.submit(
self._run_parallel_node,
**{
"flask_app": current_app._get_current_object(), # type: ignore[attr-defined]
"q": q,
"context": contextvars.copy_context(),
"parallel_id": parallel_id,
"parallel_start_node_id": edge.target_node_id,
"parent_parallel_id": in_parallel_id,
"parent_parallel_start_node_id": parallel_start_node_id,
"handle_exceptions": handle_exceptions,
},
)

future.add_done_callback(self.thread_pool.task_done_callback)

futures.append(future)

succeeded_count = 0
while True:
try:
event = q.get(timeout=1)
if event is None:
break

yield event
if (
not isinstance(event, BaseAgentEvent)
and event.parallel_id == parallel_id
):
if isinstance(event, ParallelBranchRunSucceededEvent):
succeeded_count += 1
# 记录succeeded_count,如果线程都已经完成,终止消息队列的监听任务
if succeeded_count == len(futures):
q.put(None)

continue
elif isinstance(event, ParallelBranchRunFailedEvent):
raise GraphRunFailedError(event.error)
except queue.Empty:
continue

# wait all threads
# 等待所有线程执行完毕
wait(futures)

# get final node id
final_node_id = parallel.end_to_node_id
if final_node_id:
yield final_node_id


def _run_parallel_node(
self,
flask_app: Flask,
context: contextvars.Context,
q: queue.Queue,
parallel_id: str,
parallel_start_node_id: str,
parent_parallel_id: Optional[str] = None,
parent_parallel_start_node_id: Optional[str] = None,
handle_exceptions: list[str] = [],
) -> None:
"""
Run parallel nodes
"""
for var, val in context.items():
var.set(val)

with flask_app.app_context():
try:
# 运行节点时会不断向消息队列中写信息,每个节点运行都有起始事件和结束事件
q.put(
ParallelBranchRunStartedEvent(
parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
)
)

# run node
generator = self._run(
start_node_id=parallel_start_node_id,
in_parallel_id=parallel_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
handle_exceptions=handle_exceptions,
)

for item in generator:
q.put(item)

# trigger graph run success event
q.put(
ParallelBranchRunSucceededEvent(
parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
)
)
except GraphRunFailedError as e:
q.put(
ParallelBranchRunFailedEvent(
parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
error=e.error,
)
)
except Exception as e:
logger.exception("Unknown Error when generating in parallel")
q.put(
ParallelBranchRunFailedEvent(
parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id,
error=str(e),
)
)
finally:
db.session.remove()
Info

Q:上面写到,并行分支运行的时候,会根据当前节点所属分支 id 与当前分支 id 是否匹配来判定,这个是怎么做到的?
A:在 graph.init 这一步做到的。这一步会解析节点与所有分支与子分支的从属关系,保存在map中,这个会在下一节中写。

  • 标题: Dify 工作流引擎处理流程源码解析_01_基础结构篇
  • 作者: fanz
  • 创建于 : 2025-04-26 16:30:52
  • 更新于 : 2026-03-26 16:31:26
  • 链接: https://redefine.ohevan.com/tchzng/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。