dify源码_20250420_源码笔记

dify源码_20250420_源码笔记

fanz Lv3

1.dify工作流运行流程

我发现调用的流程大概是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
workflowAppRunner.run()
-> workflow_entry.run()
-> GraphEngine.run()
{
if 流程类型 is CHAT: 使用AnswerStreamProcessor
else: 使用EndStreamProcessor

Then:
while True {
1.判断最大执行step是否达到限制
2.判断是否超时
3.初始化节点状态
4.运行单个节点(所有节点的信息存在map中,通过node_id取信息)
5.通过edge_map(边信息)找到下一个节点
6.循环1~5步的步骤,直到整个graph执行完毕
}
}

当使用AnswerStreamProcessor的时候,会启动一个异步线程(generate_task_pipeline.py),这个线程负责持续监听一个内置的消息队列。
image.png

每次LLM_Node输出的时候,会同时输出两个类型的事件:QueueTextChunkEventRunStreamChunkEvent,其中RunStreamChunkEvent这个由直接回复节点捕捉并累加,(也就是说直接回复节点本身不具备流式输出的功能);而QueueTextChunkEvent会被消息队列接收并流式输出。
image.png

image.png

在我调试的时候发现,其实在刚好走到直接回复节点之前流式输出的内容就已经展示在页面上了。

2.遇到的问题

这个问题是周五(0418)的时候遇到的,那天上午遇到的问题是,如果直接回复节点中 llm.text 不是第一个变量,那么endStreamProfressor 中会报错。当时检查了一下,发现错误是在 endStreamProfressor 我搬运的这段代码里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class EndStreamProcessor(StreamProcessor):
def __init__(self, graph: Graph, variable_pool: VariablePool) -> None:
super().__init__(graph, variable_pool)
# 获取图中end节点的信息,注入self.route_position
self.end_stream_param = graph.end_stream_param
self.route_position = {}
for (
end_node_id,
_,
) in self.end_stream_param.end_stream_variable_selector_mapping.items():
self.route_position[end_node_id] = 0
# Add: 获取图中Answer节点的信息,注入self.extra_route_position
# (不要动用route_position, _generate_stream_outputs_when_node_finished会对其进行遍历)
self.extra_route_position = {}
self.generate_routes = graph.answer_stream_generate_routes
for answer_node_id in self.generate_routes.answer_generate_route:
self.extra_route_position[answer_node_id] = 0
self.current_stream_chunk_generating_node_ids: dict[str, list[str]] = {}
# Add: 增加current_stream_answer_chunk_generating_node_ids缓存
# (不要动用current_stream_chunk_generating_node_ids,这个专门用于缓存end节点信息
# 否则process中answer与end节点共用这个dict可能出错)
self.current_stream_answer_chunk_generating_node_ids: dict[str, list[str]] = {}

这里我是直接搬运的 AnswerStreamProfessor ,直接把 extra_route_position 初始化为 0。但是直接回复节点会把变量和文本分离开,封装成不同的文本变量,如果是变量就需要从 variable_pool 中获取,如果直接是文本就直接取值。大模型流式输出的原理就是查看大模型输出的那个 llm.text 有没有被后面的直接回复节点引用,如果引用了,才会流式输出结果,否则就不会输出出来。而且每个 workflow 都是有一个 text 变量和一个 json 变量的,text 在我们更改之后会记录 workflow 内部输出的文本(可以流式)

如果直接回复节点中 llm.text 不在开头,那么其 route_position 不是 0,此时下面这段搬运的代码就会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def _get_stream_out_answer_node_ids(
self, event: NodeRunStreamChunkEvent
) -> list[str]:
"""
Is stream out support
:param event: queue text chunk event
:return:
"""
if not event.from_variable_selector:
return []

stream_output_value_selector = event.from_variable_selector
if not stream_output_value_selector:
return []

stream_out_answer_node_ids = []
for answer_node_id, route_position in self.extra_route_position.items():
if answer_node_id not in self.rest_node_ids:
continue

# all depends on answer node id not in rest node ids
if all(
dep_id not in self.rest_node_ids
for dep_id in self.generate_routes.answer_dependencies[answer_node_id]
):
if route_position >= len(
self.generate_routes.answer_generate_route[answer_node_id]
):
continue

route_chunk = self.generate_routes.answer_generate_route[
answer_node_id
][route_position]

if route_chunk.type != GenerateRouteChunk.ChunkType.VAR:
continue

route_chunk = cast(VarGenerateRouteChunk, route_chunk)
value_selector = route_chunk.value_selector

# check chunk node id is before current node id or equal to current node id
if value_selector != stream_output_value_selector:
continue

stream_out_answer_node_ids.append(answer_node_id)

return stream_out_answer_node_ids

原因是 llm.text 对应的 route_position 并不是 0,但是初始化的时候又是 0,这里取值得到的是文本类型,于是就会报错。当时我又看了一会儿觉得,endStreamProfessor 中会处理 graph_engine.run()的所有返回的迭代值,同时我觉得所有值应该都是 graph_engine.run() 返回出来的。

3.奇怪的发现

于是我就开始调试 graph_engine.run()这个方法,我发现了一个非常有意思的情况,开始结点执行完毕之后还是正常的,但是执行到大模型节点的时候,哦搜敖德萨这几个字先于流式结果输出出来,或者说,我感觉这几个字在还没走到 llm 节点之后就已经输出出来了。

image.png

可是在调试的时候又遇到了问题,因为 graph_engine.run 本质上就是按顺序执行每个节点的 node.run,进而执行每个节点的 _run,但是我执行 llm_node 这个节点的时候发现刚好执行到_invoke_llm 这个方法之后出现了哦搜敖德萨。可是这里不应该是直接调用大模型吗?而且为什么会在流式之前出现所谓的 “哦搜敖德萨”?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class LLMNode(BaseNode[LLMNodeData]):
_node_data_cls = LLMNodeData
_node_type = NodeType.LLM

def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
node_inputs: Optional[dict[str, Any]] = None
process_data = None

try:
# init messages template
self.node_data.prompt_template = self._transform_chat_messages(self.node_data.prompt_template)

# fetch variables and fetch values from variable pool
inputs = self._fetch_inputs(node_data=self.node_data)
# ......
# handle invoke result
# 就是在执行完invoke_llm方法之后出现了 哦搜敖德萨
generator = self._invoke_llm(
node_data_model=self.node_data.model,
model_instance=model_instance,
prompt_messages=prompt_messages,
stop=stop,
)

对这个_invoke_llm 方法进行单步调试之后我发现有个奇怪的地方,执行 db.session.close() 之后会进入到 WorkflowAppGenerateTaskPipeline 中,这个类的作用就是记录工作流的状态,读取消息队列的内容返回给前端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    def _invoke_llm(
self,
node_data_model: ModelConfig,
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
stop: Optional[Sequence[str]] = None,
) -> Generator[NodeEvent, None, None]:
# 关闭会话之后会读取消息队列 MessageBasedAppQueueManager 中现在存在的消息
db.session.close()
# 后面没什么奇怪的
invoke_result = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters=node_data_model.completion_params,
stop=stop,
stream=True,
user=self.user_id,
)

return self._handle_invoke_result(invoke_result=invoke_result)


# ...跳转到此
class AdvancedChatAppGenerateTaskPipeline:
"""
AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
"""
Note

这个消息队列是干什么的?是在哪定义的?

workflowAppRunner 会使用新线程启动 workflow_entry,同时主线程会开启一个 taskpipeline,这两个共用一个 MessageBasedAppQueueManager(消息队列管理器)。
workflow_entry 得到的 graph_engine.run 结果最后都会通过 handle_event 写入消息队列,taskpipeline 则不断监听消息队列,将其中的消息抛出。

4.问题解决

我先是尝试在 invoke_llm 那个函数尝试单步调试,但是调到 db.session.close 感觉已经到极限了,这里最多就只能看到 taskpipeline 会从消息队列中取消息并抛出,可是这个消息是什么时候放在队列中的呢?,这个问题显然更加重要。

于是看了眼调试窗口的堆栈,发现最高层也就是 workflow_entry 中的 handle_event 了,我在这里打断点调试,终于是能在放入消息队列之前拦截到这个消息。但是仔细看看这个 event 会发现一个很有意思的情况,这个哦搜敖德萨明明是在直接回复节点里的,可是这里的 node_data 却是开始节点的信息。那么也就是说,其实在走到 llm 节点之前这个消息就已经放在消息队列里了,这样一来刚才的推断也符合逻辑了。

image.png

调试 graph_engine.run 没看出啥,只能抱着试一试的心态单步调试 AnswerStreamProfessor 了,没想到还真发现了源头处,就是 AnswerStreamProfessor.generate_stream_outputs_when_node_finished 这个方法。

可以看下面这段代码,这个方法是在每个节点成功执行之后触发的,这里的逻辑是对于每个节点,在执行成功之后就检查节点之后是否有直接回复节点。刚才说过,直接回复节点中对文本和引用变量进行了分块,这里就按分块顺序遍历,如果是文本直接抛出;如果是变量先从 variable_pool 获取,如果获取不到就终止循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def _generate_stream_outputs_when_node_finished(
self, event: NodeRunSucceededEvent
) -> Generator[GraphEngineEvent, None, None]:
"""
Generate stream outputs.
:param event: node run succeeded event
:return:
"""
for answer_node_id, position in self.route_position.items():
# all depends on answer node id not in rest node ids
if event.route_node_state.node_id != answer_node_id and (
answer_node_id not in self.rest_node_ids
or not all(
dep_id not in self.rest_node_ids
for dep_id in self.generate_routes.answer_dependencies[answer_node_id]
)
):
continue

route_position = self.route_position[answer_node_id]
route_chunks = self.generate_routes.answer_generate_route[answer_node_id][route_position:]

for route_chunk in route_chunks:
if route_chunk.type == GenerateRouteChunk.ChunkType.TEXT:
route_chunk = cast(TextGenerateRouteChunk, route_chunk)
yield NodeRunStreamChunkEvent(
id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
chunk_content=route_chunk.text,
route_node_state=event.route_node_state,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
from_variable_selector=[answer_node_id, "answer"],
)
else:
route_chunk = cast(VarGenerateRouteChunk, route_chunk)
# 从当前的varible_tool获取值,获取不到则终止循环
value_selector = route_chunk.value_selector
if not value_selector:
break

value = self.variable_pool.get(value_selector)

if value is None:
break

text = value.markdown

if text:
yield NodeRunStreamChunkEvent(
id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
chunk_content=text,
from_variable_selector=list(value_selector),
route_node_state=event.route_node_state,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
)

self.route_position[answer_node_id] += 1
  • 标题: dify源码_20250420_源码笔记
  • 作者: fanz
  • 创建于 : 2025-04-20 17:06:03
  • 更新于 : 2025-04-25 20:17:03
  • 链接: https://redefine.ohevan.com/sv9wa4/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。