LangChain4j从入门到精通-6-响应流(Response Streaming)
LangChain4j从入门到精通-6-响应流(Response Streaming)
本文深入解析了LangChain4j中实现响应流(Response Streaming)的核心技术,揭秘如何通过逐令牌流式传输大幅提升AI对话体验。文章详细介绍了StreamingChatModel接口及StreamingChatResponseHandler的完整事件回调机制(包括部分响应、思考过程、工具调用等),并通过代码实例演示了基础流式处理、Lambda表达式简化写法以及流式取消等高级控制技巧。针对生产环境需求,还提供了与Spring Boot SSE、WebSocket的集成方案,帮助开发者轻松构建响应迅捷、体验流畅的实时AI应用。掌握响应流技术,是优化用户感知延迟、打造类ChatGPT级交互体验的关键一步。
#Java #人工智能 #LangChain4j #响应式流 #实时AI交互
:::注意
本页介绍了如何使用底层LLM API实现响应流式传输。如需了解高层LLM API,请参阅AI Service。
:::
大语言模型(LLMs)逐令牌生成文本,因此许多LLM服务提供商提供了一种流式传输响应的方法,即逐个令牌返回结果,而无需等待整个文本生成完毕。这显著提升了用户体验,因为用户无需等待未知时长,几乎可以立即开始阅读回复内容。
对于ChatModel和LanguageModel接口,都有对应的StreamingChatModel和StreamingLanguageModel接口。 这些接口具有相似的API,但可以流式传输响应。 它们接受StreamingChatResponseHandler接口的实现作为参数。
public interface StreamingChatResponseHandler {
default void onPartialResponse(String partialResponse) {}
default void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) {}
default void onPartialThinking(PartialThinking partialThinking) {}
default void onPartialThinking(PartialThinking partialThinking, PartialThinkingContext context) {}
default void onPartialToolCall(PartialToolCall partialToolCall) {}
default void onPartialToolCall(PartialToolCall partialToolCall, PartialToolCallContext context) {}
default void onCompleteToolCall(CompleteToolCall completeToolCall) {}
void onCompleteResponse(ChatResponse completeResponse);
void onError(Throwable error);
}
通过实现StreamingChatResponseHandler,您可以为以下事件定义操作:
- 当生成下一个部分文本响应时:会调用onPartialResponse(String)或onPartialResponse(PartialResponse, PartialResponseContext)(您可以实现其中任一方法)。根据LLM提供商的不同,部分响应文本可能包含单个或多个令牌。例如,您可以立即将令牌直接发送到用户界面。
- 当生成下一个部分思考/推理文本时:会调用onPartialThinking(PartialThinking)或onPartialThinking(PartialThinking, PartialThinkingContext)(您可以实现其中任一方法)。根据LLM提供商的不同,部分思考文本可能包含单个或多个令牌。
- 当生成下一个部分工具调用时:会调用onPartialToolCall(PartialToolCall)或onPartialToolCall(PartialToolCall, PartialToolCallContext)(您可以实现其中任一方法)。
- 当LLM完成单个工具调用的流式传输时:会调用onCompleteToolCall(CompleteToolCall)。
- 当LLM完成生成时:会调用onCompleteResponse(ChatResponse)。ChatResponse对象包含完整响应(AiMessage)以及ChatResponseMetadata。
- 当发生错误时:会调用onError(Throwable error)。
以下是使用StreamingChatModel实现流式处理的示例:
StreamingChatModel model = OpenAiStreamingChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName(GPT_4_O_MINI)
.build();
String userMessage = "Tell me a joke";
model.chat(userMessage, new StreamingChatResponseHandler() {
@Override
public void onPartialResponse(String partialResponse) {
System.out.println("onPartialResponse: " + partialResponse);
}
@Override
public void onPartialThinking(PartialThinking partialThinking) {
System.out.println("onPartialThinking: " + partialThinking);
}
@Override
public void onPartialToolCall(PartialToolCall partialToolCall) {
System.out.println("onPartialToolCall: " + partialToolCall);
}
@Override
public void onCompleteToolCall(CompleteToolCall completeToolCall) {
System.out.println("onCompleteToolCall: " + completeToolCall);
}
@Override
public void onCompleteResponse(ChatResponse completeResponse) {
System.out.println("onCompleteResponse: " + completeResponse);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
一种更简洁的流式响应方式是使用LambdaStreamingResponseHandler类。 该工具类提供了静态方法,可通过lambda表达式创建StreamingChatResponseHandler。 使用lambda表达式流式传输响应的方法非常简单。 只需调用带有lambda表达式的onPartialResponse()静态方法,定义如何处理部分响应:
import static dev.langchain4j.model.LambdaStreamingResponseHandler.onPartialResponse;
model.chat("Tell me a joke", onPartialResponse(System.out::print));
onPartialResponseAndError()方法允许您为 onPartialResponse()和 onError()事件定义操作:
import static dev.langchain4j.model.LambdaStreamingResponseHandler.onPartialResponseAndError;
model.chat("Tell me a joke", onPartialResponseAndError(System.out::print, Throwable::printStackTrace));
Streaming取消
如果您想取消流服务,可以通过以下任一方式进行操作:
onPartialResponse(PartialResponse, PartialResponseContext)onPartialThinking(PartialThinking, PartialThinkingContext)onPartialToolCall(PartialToolCall, PartialToolCallContext)
上下文对象包含 StreamingHandle,可用于取消流传输
model.chat(userMessage, new StreamingChatResponseHandler() {
@Override
public void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) {
process(partialResponse);
if (shouldCancel()) {
context.streamingHandle().cancel();
}
}
@Override
public void onCompleteResponse(ChatResponse completeResponse) {
System.out.println("onCompleteResponse: " + completeResponse);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
当调用 StreamingHandle.cancel()时,LangChain4j 将关闭连接并停止流式传输。 一旦调用了 StreamingHandle.cancel(),StreamingChatResponseHandler将不再接收任何回调。
更多推荐

所有评论(0)