Dify 工作流引擎处理流程源码解析_02_Graph篇

Dify 工作流引擎处理流程源码解析_02_Graph篇

fanz Lv3

在上一小节简单解析了一下 dify 工作流引擎处理过程中WorkflowAppRunner -> WorkflowEntry -> GraphEngine 这几层的处理过程。然后上一节说到 WorkflowAppRunner 会先根据配置文件解析出工作流的图数据结构,用以后续的执行,这一小节就详细介绍一下这个部分。我们重点讲Graph.init 这个部分,然后由于这个部分比较复杂,代码量比较多,这里挑其中几个关键部分加以说明。

首先还是使用邻接表法建立图,节点和边的信息都是保存在 edge_config 里的,这是一个 json 文件。当然这里值得注意的是 edge_mapping 是使用邻接表法建立正向图,而 reverse_edge_mapping 则是建立反向图,可以用于回溯。比较关键的部分是下面的_recursively_add_parallelsGeneratorRouter.init方法。下面逐一介绍一下这两个部分。
image.png

image.png

1. recursively_add_parallels

这个方法的作用直译过来就是 递归添加并行分支,其实本质上使用的还是 DFS 算法,遵从以下顺序:

  1. 首先从 start_node 出发,获取该节点的所有出边。如果出边数量大于 1,才进入并行分支判断。如果分支没有条件,放在 “default” 分组中;反之则按照 run_condition 进行分组。每个分组内都是并行分支;

  2. 然后我们遍历刚才的获得的那个分组 dict,对于每个分组,先获取属于该分支的所有节点(不含分支合并节点)。然后就是将这些 node_id 与并行分支 id 保存在 dict 中,这样 O(1)就可以判定节点属于哪个分支;
    image.png

  3. 之后就是要找不属于当前分支的节点(个人觉得这里有点繁琐,因为其实在_fetch_all_node_ids_in_parallels 方法中已经大致可以判断出分支结束节点了)。这里的做法是,遍历属于这个分支的所有节点,然后只看“恰好只有一条出边”的节点,如果这个点就在父分支内 / 如果目标节点就是父分支的结束节点 / 如果没有父分支,而且目标节点不在任何并行分支内(说明目标节点在当前分支外部),那个这个节点肯定在并行分支之外;

  4. 如果“不属于当前分支的节点”只有 1 个,那么它就是当前分支的结束节点;

  5. 当前点遍历完毕,继续递归当前点的所有出边节点,循环往复;

参考示例

上面这段流程看的肯定很懵,这里给出示例:

1
2
3
4
5
6
7
       ┌──(ok)──▶ B ───────────────┐
│ ▼
A ─────┼──(ok)──▶ C ──┬──▶ D ──▶ F ──▶ G (End)
│ │ ▲ ▲
│ └──▶ E ─────┘ │
│ │
└──(fail)─▶ X ─────────────────── ┘
  1. 处理 A 的出边分组

    • condition_edge_mappings = {ok:[A->B,A->C], fail:[A->X]}
    • 只对 ok 组建并行:P1(start=A, parent=None)
  2. 计算 P1 的分支内节点

    • in_branch_node_ids ≈ {B:[B], C:[C,D,E]}(合流点 F 不算“分支内”)
    • parallel_node_ids = [B,C,D,E]
    • node_parallel_mapping 写入这 4 个节点属于 P1
  3. 反推 P1 的出口

    • 扫单出边节点:B->F, D->F, E->F 都跳出并行
    • outside_parallel_target_node_ids = {F}
    • 唯一候选成立:P1.end_to_node_id = F
  4. 递归到 C,生成子并行

    • C 有两条边 C->D, C->E,建 P2(start=C, parent=P1)
    • parallel_node_ids(P2) = [D,E]
    • node_parallel_mapping[D/E] 更新为 P2(内层覆盖外层),P1P2的父分支
  5. 反推 P2 的出口

    • 候选跳出点:D->F, E->F
    • 因为 F == P1.end_to_node_id,作用域合法
    • outside_parallel_target_node_ids = {F},最终 P2.end_to_node_id = F

所以对于 ok 这个 run_condition 一共有两个分支P1P2,其中P1P2的父分支,他们的结束节点都是 F

2.  AnswerStreamGeneratorRouter.init

AnswerStreamGeneratorRouter.initEndStreamGeneratorRouter.init的原理差不多,这里只介绍其中之一——AnswerStreamGeneratorRouter
其实代码量还是不大的,可以看到首先就是找到工作流中的所有"直接回复"节点,然后解析出这个"直接回复"节点中的变量块与文本块(在AnswerStreamProcessor中有很大作用)。然后解析出每个"直接回复"节点的上游节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class AnswerStreamGeneratorRouter:
@classmethod
def init(
cls,
node_id_config_mapping: dict[str, dict],
reverse_edge_mapping: dict[str, list["GraphEdge"]], # type: ignore[name-defined]
) -> AnswerStreamGenerateRoute:
"""
Get stream generate routes.
:return:
"""
# parse stream output node value selectors of answer nodes
answer_generate_route: dict[str, list[GenerateRouteChunk]] = {}
for answer_node_id, node_config in node_id_config_mapping.items():
# 这里是过滤作用,只筛选出所有节点中的"直接回复"节点
if node_config.get("data", {}).get("type") != NodeType.ANSWER.value:
continue

# get generate route for stream output
# 这里的generate_route其实是直接回复节点解析出来的"变量块"和"文本块"
generate_route = cls._extract_generate_route_selectors(node_config)
answer_generate_route[answer_node_id] = generate_route

# fetch answer dependencies
answer_node_ids = list(answer_generate_route.keys())
# 获取所有"直接回复"节点的依赖节点,其实主要就是使用reverse_edge_mapping反向DFS
answer_dependencies = cls._fetch_answers_dependencies(
answer_node_ids=answer_node_ids,
reverse_edge_mapping=reverse_edge_mapping,
node_id_config_mapping=node_id_config_mapping,
)

return AnswerStreamGenerateRoute(
answer_generate_route=answer_generate_route,
answer_dependencies=answer_dependencies,
)

# ----------------------------------------------------------------
# 这个方法是递归调用的,其实使用的也就是DFS算法
@classmethod
def _recursive_fetch_answer_dependencies(
cls,
current_node_id: str,
answer_node_id: str,
node_id_config_mapping: dict[str, dict],
reverse_edge_mapping: dict[str, list["GraphEdge"]], # type: ignore[name-defined]
answer_dependencies: dict[str, list[str]],
) -> None:
"""
Recursive fetch answer dependencies
:param current_node_id: current node id
:param answer_node_id: answer node id
:param node_id_config_mapping: node id config mapping
:param reverse_edge_mapping: reverse edge mapping
:param answer_dependencies: answer dependencies
:return:
"""
reverse_edges = reverse_edge_mapping.get(current_node_id, [])
for edge in reverse_edges:
source_node_id = edge.source_node_id
if source_node_id not in node_id_config_mapping:
continue
# 反向DFS,获取每个"直接回复"节点的所有上游节点
source_node_type = (
node_id_config_mapping[source_node_id].get("data", {}).get("type")
)
source_node_data = node_id_config_mapping[source_node_id].get("data", {})
# 这里在遇到下面几类节点的时候停止DFS,避免重复统计上游节点,
# 比如"直接回复1"节点的上游节点如果还有"直接回复2"节点,那么这一层的DFS直接停止;避免回复1又重复统计了回复2的上游节点
if (
source_node_type
in {
NodeType.ANSWER,
NodeType.IF_ELSE,
NodeType.QUESTION_CLASSIFIER,
NodeType.ITERATION,
NodeType.LOOP,
NodeType.VARIABLE_ASSIGNER,
}
or source_node_data.get("error_strategy") == ErrorStrategy.FAIL_BRANCH
):
answer_dependencies[answer_node_id].append(source_node_id)
else:
cls._recursive_fetch_answer_dependencies(
current_node_id=source_node_id,
answer_node_id=answer_node_id,
node_id_config_mapping=node_id_config_mapping,
reverse_edge_mapping=reverse_edge_mapping,
answer_dependencies=answer_dependencies,
)

这里值得注意的是在_recursive_fetch_answer_dependencies方法里,下面这段代码比较重要:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (
source_node_type
in {
NodeType.ANSWER,
NodeType.IF_ELSE,
NodeType.QUESTION_CLASSIFIER,
NodeType.ITERATION,
NodeType.LOOP,
NodeType.VARIABLE_ASSIGNER,
}
or source_node_data.get("error_strategy") == ErrorStrategy.FAIL_BRANCH
):
answer_dependencies[answer_node_id].append(source_node_id)

我们以下面这个工作流为例:
image.png

这里 条件 02 在 Answer04 和 Answer05 的上游节点中,从 Answer04 和 Answer05 直接获取上游节点的话,条件 02 的上游节点会重复写入这两个节点的 map 中。这里根据上面的算法,从 Answer04 和 Answer05 回溯获取上游节点时到 条件02节点就会 break。这么一来,Answer04 的上游节点为code_node02,条件02,Answer05 的上游节点为code_node03,条件02。以此类推,最终直接回复节点与上游节点的对应关系如图所示:

image.png

Tip

Q:为什么需要获取上游节点?并且为什么需要 break?
A:因为按照 dify 的逻辑,每个节点执行结束之后,应该立即检测其后是否有"直接回复"节点使用其输出信息。获取到上游节点之后其实是反向使用的,就是 根据当前 node 找最近的"直接回复节点",运行结束就赶紧检查"直接回复节点的情况"。

3. run_parallel 完整处理流程

综合上述信息,以下面的图为例,我们再来看一下上一节中运行并行分支的具体流程:

1
2
3
START ──┬── NodeA ──┬── NodeC ──┐
│ └── NodeD ──┴── SubMerge ──┐
└── NodeB ───────────────────────────────┴── Merge ── END

预备数据(图初始化后,node_parallel_mapping 会记录每个节点对应的分支名,每个分支都有起始节点和结束节点):

1
2
3
4
5
6
7
8
9
10
11
12
node_parallel_mapping = {
"NodeA": "outer-p1", "NodeB": "outer-p1",
"NodeC": "inner-p2", "NodeD": "inner-p2",
"SubMerge": "outer-p1",
# Merge、END 不在里面
}

parallel_mapping = {
"outer-p1": GraphParallel(start_from_node_id="START", end_to_node_id="Merge", parent_parallel_id=None),
"inner-p2": GraphParallel(start_from_node_id="NodeA", end_to_node_id="SubMerge", parent_parallel_id="outer-p1",
parent_parallel_start_node_id="START"),
}

Phase 1:主线程运行 START

1
2
3
4
5
6
7
主线程: _run(start_node_id="START", in_parallel_id=None)
→ 运行 START 节点
→ edge_mappings = [START→NodeA, START→NodeB] # 2条边,无 run_condition
→ 进入 else 分支,调用 _run_parallel_branches(
edge_mappings=[START→NodeA, START→NodeB],
in_parallel_id=None
)

Phase 2:外层 _run_parallel_branches(主线程内)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parallel_id = node_parallel_mapping["NodeA"] = "outer-p1"
parallel = parallel_mapping["outer-p1"] # end_to_node_id = "Merge"

q_outer = Queue()

# 提交两个线程到 thread_pool
Thread-A: _run_parallel_node(q=q_outer, parallel_id="outer-p1",
parallel_start_node_id="NodeA",
parent_parallel_id=None)

Thread-B: _run_parallel_node(q=q_outer, parallel_id="outer-p1",
parallel_start_node_id="NodeB",
parent_parallel_id=None)

# 主线程阻塞在 q_outer.get(timeout=1),等待事件
1
2
3
4
时间轴:
主线程 ─────────[阻塞 q_outer.get()]──────────────────────────────────→
Thread-A ───[运行]──────────────────────────────────────────────────────→
Thread-B ───[运行]──────────────────────────────────────────────→

Phase 3:Thread-B 的执行(简单,无嵌套)

1
2
3
4
5
6
7
Thread-B: _run(start_node_id="NodeB", in_parallel_id="outer-p1")
→ q_outer.put(ParallelBranchRunStartedEvent("outer-p1", "NodeB"))
→ 运行 NodeB(产生各种事件,全部 q_outer.put)
→ edge_mappings = [NodeB→Merge],next_node_id = "Merge"
→ 停止条件检查:
node_parallel_mapping.get("Merge") = "" ≠ "outer-p1" → break
→ q_outer.put(ParallelBranchRunSucceededEvent("outer-p1", "NodeB"))

Phase 4:Thread-A 运行 NodeA,触发内层并行

1
2
3
4
5
6
7
8
9
Thread-A: _run(start_node_id="NodeA", in_parallel_id="outer-p1")
→ q_outer.put(ParallelBranchRunStartedEvent("outer-p1", "NodeA"))
→ 运行 NodeA
→ edge_mappings = [NodeA→NodeC, NodeA→NodeD] # 2条边,无 run_condition
→ 调用 _run_parallel_branches(
edge_mappings=[NodeA→NodeC, NodeA→NodeD],
in_parallel_id="outer-p1",
parallel_start_node_id="NodeA" ← 注意:传入了父并行上下文
)

Phase 5:内层 _run_parallel_branches在 Thread-A 内执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
parallel_id = node_parallel_mapping["NodeC"] = "inner-p2"
parallel = parallel_mapping["inner-p2"] # end_to_node_id = "SubMerge"

q_inner = Queue() # 新建内层队列,与 q_outer 完全独立

# 提交两个子线程(同一个 thread_pool)
Thread-C: _run_parallel_node(q=q_inner, parallel_id="inner-p2",
parallel_start_node_id="NodeC",
parent_parallel_id="outer-p1", ← 填入父层
parent_parallel_start_node_id="NodeA") ← 填入父层分叉点

Thread-D: _run_parallel_node(q=q_inner, parallel_id="inner-p2",
parallel_start_node_id="NodeD",
parent_parallel_id="outer-p1",
parent_parallel_start_node_id="NodeA")

# Thread-A 阻塞在 q_inner.get(timeout=1)
# Thread-C/D 产生的事件 → q_inner → Thread-A 读取后 yield
# → 冒泡到 Thread-A 的 _run_parallel_node → q_outer.put
# → 最终被主线程从 q_outer 读走
1
2
3
4
嵌套队列关系:
Thread-C → q_inner ─┐
Thread-D → q_inner ─┴→ Thread-A(读q_inner, 写q_outer) → q_outer → 主线程
Thread-B ─────────────────────────────────→ q_outer → 主线程

Phase 6:Thread-C 和 Thread-D 的执行

Thread-C:

1
2
3
4
5
6
7
_run(start_node_id="NodeC", in_parallel_id="inner-p2",
parent_parallel_id="outer-p1", parent_parallel_start_node_id="NodeA")
→ q_inner.put(ParallelBranchRunStartedEvent("inner-p2", "NodeC", parent="outer-p1"))
→ 运行 NodeC
→ edge_mappings = [NodeC→SubMerge],next_node_id = "SubMerge"
→ 停止检查: node_parallel_mapping.get("SubMerge") = "outer-p1" ≠ "inner-p2" → break
→ q_inner.put(ParallelBranchRunSucceededEvent("inner-p2", "NodeC"))

Thread-D: 同理,运行 NodeD 后在 SubMerge 处停止。

Phase 7:内层并行汇总,Thread-A 恢复

1
2
3
4
5
6
7
8
9
Thread-A(阻塞在 q_inner.get):
收到 ParallelBranchRunSucceededEvent("inner-p2", "NodeC") → succeeded_count = 1
收到 ParallelBranchRunSucceededEvent("inner-p2", "NodeD") → succeeded_count = 2
→ succeeded_count == len(futures)==2 → q_inner.put(None)
收到 None → break,退出循环
wait(futures) # 确保 Thread-C、D 完全结束

final_node_id = parallel.end_to_node_id = "SubMerge"
yield "SubMerge" # 作为字符串返回给 Thread-A 的 _run

Thread-A 的 _run 收到 "SubMerge"

1
2
3
4
5
6
7
for parallel_result in parallel_generator:
if isinstance(parallel_result, str):
final_node_id = parallel_result # = "SubMerge"
else:
yield parallel_result

next_node_id = "SubMerge"

Phase 8:Thread-A 继续运行 SubMerge

1
2
3
4
5
6
7
Thread-A: next_node_id = "SubMerge"
→ 停止检查: node_parallel_mapping.get("SubMerge") = "outer-p1" == "outer-p1" → 继续!
→ 运行 SubMerge
→ edge_mappings = [SubMerge→Merge],next_node_id = "Merge"
→ 停止检查: node_parallel_mapping.get("Merge") = "" ≠ "outer-p1" → break

→ q_outer.put(ParallelBranchRunSucceededEvent("outer-p1", "NodeA"))

Phase 9:外层并行汇总,主线程恢复

1
2
3
4
5
6
7
8
9
10
主线程(阻塞在 q_outer.get):
陆续收到 Thread-B、Thread-A 的各种节点事件(yield 给上层)
收到 ParallelBranchRunSucceededEvent("outer-p1", "NodeB") → succeeded_count = 1
收到 ParallelBranchRunSucceededEvent("outer-p1", "NodeA") → succeeded_count = 2
→ q_outer.put(None)
收到 None → break
wait(futures) # 等 Thread-A、B 完全结束

final_node_id = parallel.end_to_node_id = "Merge"
yield "Merge"

主线程的 _run 收到 "Merge"

1
2
3
next_node_id = "Merge"
# in_parallel_id=None,不触发 break 检查
# 继续主循环,运行 Merge → END

完整时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
主线程         Thread-A        Thread-B        Thread-C        Thread-D
| | | | |
|--START-----→ | | | |
| | | | |
|--submit A/B→ | | | |
| [q_outer] | | | |
| |──NodeA──────→ |──NodeB──────→ | |
| | | | |
| |─submit C/D──→ | | |
| [q_outer] | [q_inner] | | |
| | [阻塞] | |──NodeC──────→ |──NodeD──→
| | | (到Merge停) | |
| | | |──SubMerge停 → |──SubMerge停
| | | SucceededB | SuccC | SuccD
| | ←──────────────────────────── q_inner 汇总
| | (q_inner=None,恢复)
| |──SubMerge───→ |
| | (Merge停) |
| | SucceededA |
| ←──────────── q_outer 汇总
| (q_outer=None,恢复)
|──Merge──────→ |
|──END────────→ |

核心设计要点小结

要点体现
队列按层隔离外层用 q_outer,内层用 q_inner,各自独立,不会混淆
事件逐层冒泡Thread-C/D → q_inner → Thread-A 转发 → q_outer → 主线程
停止信号靠 node_parallel_mapping每个线程到达不属于本 parallel_id 的节点时自动 break
汇聚节点靠 end_to_node_id并行结束后 yield "SubMerge" / yield "Merge" 通知主循环继续
共享同一线程池所有层的并行共用 GraphEngineThreadPool,受 max_submit_count 约束防止爆炸
  • 标题: Dify 工作流引擎处理流程源码解析_02_Graph篇
  • 作者: fanz
  • 创建于 : 2025-04-30 17:41:36
  • 更新于 : 2026-03-28 18:33:18
  • 链接: https://redefine.ohevan.com/tcls9d/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。