1.dify工作流运行流程我发现调用的流程大概是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 workflowAppRunner.run() -> workflow_entry.run() -> GraphEngine.run() { if 流程类型 is CHAT: 使用AnswerStreamProcessor else: 使用EndStreamProcessor Then: while True { 1. 判断最大执行step是否达到限制 2. 判断是否超时 3. 初始化节点状态 4. 运行单个节点(所有节点的信息存在map中,通过node_id取信息) 5. 通过edge_map(边信息)找到下一个节点 6. 循环1 ~5 步的步骤,直到整个graph执行完毕 } }
当使用AnswerStreamProcessor的时候,会启动一个异步线程(generate_task_pipeline.py
),这个线程负责持续监听一个内置的消息队列。
每次LLM_Node输出的时候,会同时输出两个类型的事件:QueueTextChunkEvent
和RunStreamChunkEvent
,其中RunStreamChunkEvent
这个由直接回复节点捕捉并累加,(也就是说直接回复节点本身不具备流式输出的功能);而QueueTextChunkEvent
会被消息队列接收并流式输出。
在我调试的时候发现,其实在刚好走到直接回复节点之前流式输出的内容就已经展示在页面上了。
2.遇到的问题这个问题是周五(0418)的时候遇到的,那天上午遇到的问题是,如果直接回复节点中 llm.text
不是第一个变量,那么endStreamProfressor
中会报错。当时检查了一下,发现错误是在 endStreamProfressor
我搬运的这段代码里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class EndStreamProcessor (StreamProcessor ): def __init__ (self, graph: Graph, variable_pool: VariablePool ) -> None : super ().__init__(graph, variable_pool) self .end_stream_param = graph.end_stream_param self .route_position = {} for ( end_node_id, _, ) in self .end_stream_param.end_stream_variable_selector_mapping.items(): self .route_position[end_node_id] = 0 self .extra_route_position = {} self .generate_routes = graph.answer_stream_generate_routes for answer_node_id in self .generate_routes.answer_generate_route: self .extra_route_position[answer_node_id] = 0 self .current_stream_chunk_generating_node_ids: dict [str , list [str ]] = {} self .current_stream_answer_chunk_generating_node_ids: dict [str , list [str ]] = {}
这里我是直接搬运的 AnswerStreamProfessor
,直接把 extra_route_position
初始化为 0。但是直接回复节点会把变量和文本分离开,封装成不同的文本变量,如果是变量就需要从 variable_pool
中获取,如果直接是文本就直接取值。大模型流式输出的原理就是查看大模型输出的那个 llm.text 有没有被后面的直接回复节点引用,如果引用了,才会流式输出结果,否则就不会输出出来。而且每个 workflow 都是有一个 text 变量和一个 json 变量的,text 在我们更改之后会记录 workflow 内部输出的文本(可以流式)
如果直接回复节点中 llm.text
不在开头,那么其 route_position
不是 0,此时下面这段搬运的代码就会报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def _get_stream_out_answer_node_ids ( self, event: NodeRunStreamChunkEvent ) -> list [str ]: """ Is stream out support :param event: queue text chunk event :return: """ if not event.from_variable_selector: return [] stream_output_value_selector = event.from_variable_selector if not stream_output_value_selector: return [] stream_out_answer_node_ids = [] for answer_node_id, route_position in self .extra_route_position.items(): if answer_node_id not in self .rest_node_ids: continue if all ( dep_id not in self .rest_node_ids for dep_id in self .generate_routes.answer_dependencies[answer_node_id] ): if route_position >= len ( self .generate_routes.answer_generate_route[answer_node_id] ): continue route_chunk = self .generate_routes.answer_generate_route[ answer_node_id ][route_position] if route_chunk.type != GenerateRouteChunk.ChunkType.VAR: continue route_chunk = cast(VarGenerateRouteChunk, route_chunk) value_selector = route_chunk.value_selector if value_selector != stream_output_value_selector: continue stream_out_answer_node_ids.append(answer_node_id) return stream_out_answer_node_ids
原因是 llm.text
对应的 route_position
并不是 0,但是初始化的时候又是 0,这里取值得到的是文本类型,于是就会报错。当时我又看了一会儿觉得,endStreamProfessor
中会处理 graph_engine.run()
的所有返回的迭代值,同时我觉得所有值应该都是 graph_engine.run()
返回出来的。
3.奇怪的发现于是我就开始调试 graph_engine.run()
这个方法,我发现了一个非常有意思的情况,开始结点执行完毕之后还是正常的,但是执行到大模型节点的时候,哦搜敖德萨
这几个字先于流式结果输出出来,或者说,我感觉这几个字在还没走到 llm 节点之后就已经输出出来了。
可是在调试的时候又遇到了问题,因为 graph_engine.run 本质上就是按顺序执行每个节点的 node.run,进而执行每个节点的 _run,但是我执行 llm_node 这个节点的时候发现刚好执行到_invoke_llm 这个方法之后出现了哦搜敖德萨。可是这里不应该是直接调用大模型吗?而且为什么会在流式之前出现所谓的 “哦搜敖德萨”?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class LLMNode (BaseNode[LLMNodeData]): _node_data_cls = LLMNodeData _node_type = NodeType.LLM def _run (self ) -> Generator[NodeEvent | InNodeEvent, None , None ]: node_inputs: Optional [dict [str , Any ]] = None process_data = None try : self .node_data.prompt_template = self ._transform_chat_messages(self .node_data.prompt_template) inputs = self ._fetch_inputs(node_data=self .node_data) generator = self ._invoke_llm( node_data_model=self .node_data.model, model_instance=model_instance, prompt_messages=prompt_messages, stop=stop, )
对这个_invoke_llm 方法进行单步调试之后我发现有个奇怪的地方,执行 db.session.close() 之后会进入到 WorkflowAppGenerateTaskPipeline 中 ,这个类的作用就是记录工作流的状态,读取消息队列的内容返回给前端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def _invoke_llm ( self, node_data_model: ModelConfig, model_instance: ModelInstance, prompt_messages: Sequence [PromptMessage], stop: Optional [Sequence [str ]] = None , ) -> Generator[NodeEvent, None , None ]: db.session.close() invoke_result = model_instance.invoke_llm( prompt_messages=prompt_messages, model_parameters=node_data_model.completion_params, stop=stop, stream=True , user=self .user_id, ) return self ._handle_invoke_result(invoke_result=invoke_result) class AdvancedChatAppGenerateTaskPipeline : """ AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application. """
这个消息队列是干什么的?是在哪定义的?
workflowAppRunner 会使用新线程启动 workflow_entry,同时主线程会开启一个 taskpipeline,这两个共用一个 MessageBasedAppQueueManager(消息队列管理器)。 workflow_entry 得到的 graph_engine.run 结果最后都会通过 handle_event 写入消息队列,taskpipeline 则不断监听消息队列,将其中的消息抛出。
4.问题解决我先是尝试在 invoke_llm 那个函数尝试单步调试,但是调到 db.session.close 感觉已经到极限了,这里最多就只能看到 taskpipeline 会从消息队列中取消息并抛出,可是这个消息是什么时候放在队列中的呢? ,这个问题显然更加重要。
于是看了眼调试窗口的堆栈,发现最高层也就是 workflow_entry 中的 handle_event 了,我在这里打断点调试,终于是能在放入消息队列之前拦截到这个消息。但是仔细看看这个 event 会发现一个很有意思的情况,这个哦搜敖德萨
明明是在直接回复节点里的,可是这里的 node_data 却是开始节点的信息。那么也就是说,其实在走到 llm 节点之前这个消息就已经放在消息队列里了 ,这样一来刚才的推断也符合逻辑了。
调试 graph_engine.run 没看出啥,只能抱着试一试的心态单步调试 AnswerStreamProfessor 了,没想到还真发现了源头处,就是 AnswerStreamProfessor.generate_stream_outputs_when_node_finished 这个方法。
可以看下面这段代码,这个方法是在每个节点成功执行之后触发的,这里的逻辑是对于每个节点,在执行成功之后就检查节点之后是否有直接回复节点。刚才说过,直接回复节点中对文本和引用变量进行了分块,这里就按分块顺序遍历,如果是文本直接抛出;如果是变量先从 variable_pool 获取,如果获取不到就终止循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 def _generate_stream_outputs_when_node_finished ( self, event: NodeRunSucceededEvent ) -> Generator[GraphEngineEvent, None , None ]: """ Generate stream outputs. :param event: node run succeeded event :return: """ for answer_node_id, position in self .route_position.items(): if event.route_node_state.node_id != answer_node_id and ( answer_node_id not in self .rest_node_ids or not all ( dep_id not in self .rest_node_ids for dep_id in self .generate_routes.answer_dependencies[answer_node_id] ) ): continue route_position = self .route_position[answer_node_id] route_chunks = self .generate_routes.answer_generate_route[answer_node_id][route_position:] for route_chunk in route_chunks: if route_chunk.type == GenerateRouteChunk.ChunkType.TEXT: route_chunk = cast(TextGenerateRouteChunk, route_chunk) yield NodeRunStreamChunkEvent( id =event.id , node_id=event.node_id, node_type=event.node_type, node_data=event.node_data, chunk_content=route_chunk.text, route_node_state=event.route_node_state, parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, from_variable_selector=[answer_node_id, "answer" ], ) else : route_chunk = cast(VarGenerateRouteChunk, route_chunk) value_selector = route_chunk.value_selector if not value_selector: break value = self .variable_pool.get(value_selector) if value is None : break text = value.markdown if text: yield NodeRunStreamChunkEvent( id =event.id , node_id=event.node_id, node_type=event.node_type, node_data=event.node_data, chunk_content=text, from_variable_selector=list (value_selector), route_node_state=event.route_node_state, parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, ) self .route_position[answer_node_id] += 1