Spring AI Tool 工具方法调用源码深度解析:从流式交互到工具执行全流程
前言:为什么需要读源码?如何高效读源码?
在上一篇博客中,我们介绍了如何通过 Spring AI 快速调用本地 Tool 方法实现大模型的工具能力扩展。但对于开发者来说,仅仅会用还不够 —— 理解框架的底层逻辑,才能在遇到问题时快速定位、在定制需求时游刃有余。
博客链接:https://www.lucaju.cn/index.php/archives/131/
很多小伙伴对读源码望而却步,其实掌握方法就能事半功倍:
- 详略得当:聚焦核心业务逻辑,忽略日志、校验等辅助代码
- 从命名和注释入手:规范框架的源码命名和注释会清晰指引核心流程
- 由浅入深:先抓整体流程,再钻关键细节,避免一开始陷入代码迷宫
本文将从 Spring AI 调用大模型的业务代码出发,逐步深入源码,解析 Tool 工具方法调用的完整流程,重点剖析工具执行的核心逻辑。
1. 业务代码回顾:流式调用大模型的入口
先看一段典型的 Spring AI 流式调用大模型并启用工具的业务代码,这是我们源码解析的起点:
public Flux<String> stream(String content) {
// 创建chatModel对象,配置模型参数和工具回调管理器
OpenAiChatModel chatModel = OpenAiChatModel.builder()
.openAiApi(OpenAiApi.builder()
.baseUrl("https://api.siliconflow.cn")
.apiKey(System.getenv("SiliconFlow_API"))
.build())
.defaultOptions(OpenAiChatOptions.builder()
.model("Qwen/Qwen3-8B")
.build())
// 关键:配置工具调用管理器
.toolCallingManager(SpringUtil.getBean(ToolCallingManager.class))
.build();
// 创建prompt对象
Prompt prompt = new Prompt(content);
// 调用流式输出接口
Flux<ChatResponse> stream = chatModel.stream(prompt);
return stream.map(chunk -> {
String text = chunk.getResult() != null ?
chunk.getResult().getOutput() != null ? chunk.getResult().getOutput().getText() : ""
: "";
text = StrUtil.nullToDefault(text, "");
return text;
});
}核心逻辑很清晰:创建配置好的OpenAiChatModel,构造Prompt,调用stream方法获取流式响应。其中toolCallingManager的配置是启用工具调用的关键。
2. 入口:ChatModel 的 stream 方法
从业务代码的chatModel.stream(prompt)进入源码,这是整个流程的入口:
@Override
public Flux<ChatResponse> stream(Prompt prompt) {
// 合并运行时和默认选项,创建最终请求prompt
Prompt requestPrompt = buildRequestPrompt(prompt);
// 实际发起请求
return internalStream(requestPrompt, null);
}2.1 配置合并:buildRequestPrompt 方法
buildRequestPrompt的核心作用是合并运行时配置和默认配置,确保模型使用正确的参数(如工具列表、回调、上下文等):
Prompt buildRequestPrompt(Prompt prompt) {
// 处理运行时prompt options
OpenAiChatOptions runtimeOptions = null;
if (prompt.getOptions() != null) {
// 转换运行时选项为OpenAiChatOptions类型
if (prompt.getOptions() instanceof ToolCallingChatOptions toolCallingChatOptions) {
runtimeOptions = ModelOptionsUtils.copyToTarget(toolCallingChatOptions, ToolCallingChatOptions.class,
OpenAiChatOptions.class);
} else {
runtimeOptions = ModelOptionsUtils.copyToTarget(prompt.getOptions(), ChatOptions.class,
OpenAiChatOptions.class);
}
}
// 合并运行时选项和默认选项
OpenAiChatOptions requestOptions = ModelOptionsUtils.merge(runtimeOptions, this.defaultOptions,
OpenAiChatOptions.class);
// 显式合并特殊选项(如HTTP头、工具配置等)
if (runtimeOptions != null) {
requestOptions.setHttpHeaders(mergeHttpHeaders(runtimeOptions.getHttpHeaders(), this.defaultOptions.getHttpHeaders()));
requestOptions.setInternalToolExecutionEnabled(
ModelOptionsUtils.mergeOption(runtimeOptions.getInternalToolExecutionEnabled(),
this.defaultOptions.getInternalToolExecutionEnabled()));
// 合并工具名称、回调、上下文等关键配置
requestOptions.setToolNames(ToolCallingChatOptions.mergeToolNames(runtimeOptions.getToolNames(),
this.defaultOptions.getToolNames()));
requestOptions.setToolCallbacks(ToolCallingChatOptions.mergeToolCallbacks(runtimeOptions.getToolCallbacks(),
this.defaultOptions.getToolCallbacks()));
requestOptions.setToolContext(ToolCallingChatOptions.mergeToolContext(runtimeOptions.getToolContext(),
this.defaultOptions.getToolContext()));
} else {
// 若无可运行时选项,直接使用默认配置
requestOptions.setHttpHeaders(this.defaultOptions.getHttpHeaders());
requestOptions.setInternalToolExecutionEnabled(this.defaultOptions.getInternalToolExecutionEnabled());
requestOptions.setToolNames(this.defaultOptions.getToolNames());
requestOptions.setToolCallbacks(this.defaultOptions.getToolCallbacks());
requestOptions.setToolContext(this.defaultOptions.getToolContext());
}
// 校验工具回调配置
ToolCallingChatOptions.validateToolCallbacks(requestOptions.getToolCallbacks());
return new Prompt(prompt.getInstructions(), requestOptions);
}总结:该方法通过合并默认配置和运行时配置,生成最终的请求参数,确保工具调用相关的配置(工具列表、回调等)被正确传入。
3. 核心流程:internalStream 方法的完整解析
internalStream是实际处理流式请求的核心方法,流程可拆解为 7 个关键步骤。我们重点关注与工具调用相关的核心逻辑:
return Flux.deferContextual(contextView -> {
// 步骤一:生成请求request对象
ChatCompletionRequest request = createRequest(prompt, true);
// 步骤二:语音类型流式输出校验(非核心,略)
audioRequestCheck()...
// 步骤三:发送调用请求,获取流式响应
Flux<OpenAiApi.ChatCompletionChunk> completionChunks = this.openAiApi.chatCompletionStream(request,
getAdditionalHttpHeaders(prompt));
// 步骤四:角色缓存(非核心,略)
ConcurrentHashMap<String, String> roleMap = new ConcurrentHashMap<>();
// 步骤五:生成监控observation对象(非核心,略)
final ChatModelObservationContext observationContext = ...;
Observation observation = ...;
// 步骤六:转换响应格式(将分片转为ChatResponse)
Flux<ChatResponse> chatResponse = completionChunks.map()......
// 步骤七:处理聊天响应流(核心:工具调用逻辑在这里)
Flux<ChatResponse> flux = chatResponse.flatMap()......
return new MessageAggregator().aggregate(flux, observationContext::setResponse);
});3.1 步骤三:发送流式请求(chatCompletionStream)
chatCompletionStream负责向大模型 API 发送流式请求,并处理服务器返回的 SSE(Server-Sent Events)响应:
public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest,
MultiValueMap<String, String> additionalHttpHeader) {
// 断言校验:请求非空且流式开关为true
Assert.notNull(chatRequest, "The request body can not be null.");
Assert.isTrue(chatRequest.stream(), "Request must set the stream property to true.");
AtomicBoolean isInsideTool = new AtomicBoolean(false);
// 使用WebClient发送POST请求,处理流式响应
return this.webClient.post()
.uri(this.completionsPath)
.headers(headers -> headers.addAll(additionalHttpHeader))
.body(Mono.just(chatRequest), ChatCompletionRequest.class)
.retrieve()
// 将响应转为字符串流
.bodyToFlux(String.class)
// 终止条件:收到"[DONE]"
.takeUntil("[DONE]"::equals)
// 过滤掉终止符
.filter("[DONE]"::equals.negate())
// 转换为ChatCompletionChunk对象
.map(content -> ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class))
// 标记工具调用片段(关键:识别工具调用的分片)
.map(chunk -> {
if (this.chunkMerger.isStreamingToolFunctionCall(chunk)) {
isInsideTool.set(true);
}
return chunk;
})
// 窗口化合并工具调用分片(核心:合并工具调用的多个分片)
.windowUntil(chunk -> {
if (isInsideTool.get() && this.chunkMerger.isStreamingToolFunctionCallFinish(chunk)) {
isInsideTool.set(false);
return true;
}
return !isInsideTool.get();
})
// 合并分片内容
.concatMapIterable(window -> {
Mono<ChatCompletionChunk> monoChunk = window.reduce(
new ChatCompletionChunk(...),
(previous, current) -> this.chunkMerger.merge(previous, current));
return List.of(monoChunk);
})
.flatMap(mono -> mono);
}为什么需要合并分片?
大模型返回工具调用时,可能会将工具名称、参数等拆分到多个 SSE 分片中(如下例)。windowUntil和reduce通过finish_reason=tool_calls标记合并分片,确保工具调用信息完整。
// 分片1:工具调用开始
{
"choices": [{"delta": {"tool_calls": [{"name": "current_date", "arguments": ""}]}}]
}
// 分片2:工具调用结束
{
"choices": [{"delta": {}, "finish_reason": "tool_calls"}]
}3.2 步骤六:响应格式转换(ChatResponse 处理)
这一步将模型返回的ChatCompletionChunk转换为 Spring AI 统一的ChatResponse格式,同时处理 token 用量统计:
Flux<ChatResponse> chatResponse = completionChunks
// 转换为ChatCompletion对象
.map(this::chunkToChatCompletion)
// 构建ChatResponse
.switchMap(chatCompletion -> Mono.just(chatCompletion).map(chatCompletion2 -> {
try {
String id = chatCompletion2.id() == null ? "NO_ID" : chatCompletion2.id();
// 转换为Generation列表(核心数据)
List<Generation> generations = chatCompletion2.choices().stream().map(choice -> {
// 缓存角色信息
if (choice.message().role() != null) {
roleMap.putIfAbsent(id, choice.message().role().name());
}
// 构建元数据(ID、角色、完成原因等)
Map<String, Object> metadata = Map.of(
"id", id,
"role", roleMap.getOrDefault(id, ""),
"index", choice.index() != null ? choice.index() : 0,
"finishReason", choice.finishReason() != null ? choice.finishReason().name() : "");
return buildGeneration(choice, metadata, request);
}).toList();
// 处理token用量统计(流式模式下用量通常在最后返回)
OpenAiApi.Usage usage = chatCompletion2.usage();
Usage currentChatResponseUsage = usage != null ? getDefaultUsage(usage) : new EmptyUsage();
Usage accumulatedUsage = UsageCalculator.getCumulativeUsage(currentChatResponseUsage,
previousChatResponse);
return new ChatResponse(generations, from(chatCompletion2, null, accumulatedUsage));
} catch (Exception e) {
log.error("Error processing chat completion", e);
return new ChatResponse(List.of());
}
}))
// 滑动窗口解决流式用量延迟问题
.buffer(2, 1)
.map(bufferList -> {
ChatResponse firstResponse = bufferList.get(0);
if (request.streamOptions() != null && request.streamOptions().includeUsage()) {
if (bufferList.size() == 2) {
ChatResponse secondResponse = bufferList.get(1);
// 用下一个响应的usage更新当前响应
Usage usage = secondResponse.getMetadata().getUsage();
if (!UsageCalculator.isEmpty(usage)) {
return new ChatResponse(firstResponse.getResults(), from(firstResponse.getMetadata(), usage));
}
}
}
return firstResponse;
});总结:该步骤完成格式转换和用量统计,为后续工具调用判断提供标准化的ChatResponse对象。
3.3 核心:Tool 工具方法的调用逻辑(步骤七详解)
步骤七是工具调用的核心触发点,通过判断响应是否需要工具执行,决定是否调用ToolCallingManager:
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
// 判断是否需要执行工具调用(核心条件)
if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(prompt.getOptions(), response)) {
return Flux.defer(() -> {
// 执行工具调用(同步操作)
var toolExecutionResult = this.toolCallingManager.executeToolCalls(prompt, response);
// 判断是否直接返回工具结果给客户端
if (toolExecutionResult.returnDirect()) {
return Flux.just(ChatResponse.builder().from(response)
.generations(ToolExecutionResult.buildGenerations(toolExecutionResult))
.build());
} else {
// 不直接返回:将工具结果作为新输入继续请求模型
return this.internalStream(new Prompt(toolExecutionResult.conversationHistory(), prompt.getOptions()),
response,false);
}
}).subscribeOn(Schedulers.boundedElastic());
} else {
// 无需工具调用,直接返回原响应
return Flux.just(response);
}
})
// 监控相关处理(略)
.doOnError(observation::error)
.doFinally(s -> observation.stop())
.contextWrite(ctx -> ctx.put(ObservationThreadLocalAccessor.KEY, observation));3.3.1 工具调用的执行:executeToolCalls
进入DefaultToolCallingManager的executeToolCalls方法,这是工具调用的统筹逻辑:
@Override
public ToolExecutionResult executeToolCalls(Prompt prompt, ChatResponse chatResponse) {
// 验证输入
Assert.notNull(prompt, "prompt cannot be null");
Assert.notNull(chatResponse, "chatResponse cannot be null");
// 查找包含工具调用的响应
Optional<Generation> toolCallGeneration = chatResponse.getResults()
.stream()
.filter(g -> !CollectionUtils.isEmpty(g.getOutput().getToolCalls()))
.findFirst();
if (toolCallGeneration.isEmpty()) {
throw new IllegalStateException("No tool call requested by the chat model");
}
AssistantMessage assistantMessage = toolCallGeneration.get().getOutput();
// 构建工具上下文
ToolContext toolContext = buildToolContext(prompt, assistantMessage);
// 实际执行工具调用
InternalToolExecutionResult internalToolExecutionResult = executeToolCall(prompt, assistantMessage,
toolContext);
// 构建工具执行后的对话历史
List<Message> conversationHistory = buildConversationHistoryAfterToolExecution(prompt.getInstructions(),
assistantMessage, internalToolExecutionResult.toolResponseMessage());
return ToolExecutionResult.builder()
.conversationHistory(conversationHistory)
.returnDirect(internalToolExecutionResult.returnDirect())
.build();
}3.3.2 工具调用的核心执行:executeToolCall
executeToolCall是工具方法实际被调用的地方,负责匹配工具、执行调用、收集结果:
private InternalToolExecutionResult executeToolCall(Prompt prompt, AssistantMessage assistantMessage,
ToolContext toolContext) {
// 从配置中获取工具回调列表
List<ToolCallback> toolCallbacks = List.of();
if (prompt.getOptions() instanceof ToolCallingChatOptions toolCallingChatOptions) {
toolCallbacks = toolCallingChatOptions.getToolCallbacks();
}
// 存储工具响应结果
List<ToolResponseMessage.ToolResponse> toolResponses = new ArrayList<>();
// 标记是否直接返回结果
Boolean returnDirect = null;
// 遍历执行每个工具调用
for (AssistantMessage.ToolCall toolCall : assistantMessage.getToolCalls()) {
// 提取工具名称和参数
String toolName = toolCall.name();
String toolInputArguments = toolCall.arguments();
// 匹配对应的ToolCallback(工具实现)
ToolCallback toolCallback = toolCallbacks.stream()
.filter(tool -> toolName.equals(tool.getToolDefinition().name()))
.findFirst()
.orElseGet(() -> this.toolCallbackResolver.resolve(toolName));
if (toolCallback == null) {
throw new IllegalStateException("No ToolCallback found for tool name: " + toolName);
}
// 处理returnDirect标记(所有工具都要求直接返回才为true)
if (returnDirect == null) {
returnDirect = toolCallback.getToolMetadata().returnDirect();
} else {
returnDirect = returnDirect && toolCallback.getToolMetadata().returnDirect();
}
// 构建监控上下文
ToolCallingObservationContext observationContext = ToolCallingObservationContext.builder()
.toolDefinition(toolCallback.getToolDefinition())
.toolMetadata(toolCallback.getToolMetadata())
.toolCallArguments(toolInputArguments)
.build();
// 执行工具调用(含监控)
String toolCallResult = ToolCallingObservationDocumentation.TOOL_CALL
.observation(...)
.observe(() -> {
String toolResult;
try {
// 核心:调用工具的call方法执行实际逻辑
toolResult = toolCallback.call(toolInputArguments, toolContext);
} catch (ToolExecutionException ex) {
// 处理工具执行异常
toolResult = this.toolExecutionExceptionProcessor.process(ex);
}
observationContext.setToolCallResult(toolResult);
return toolResult;
});
// 收集工具响应
toolResponses.add(new ToolResponseMessage.ToolResponse(toolCall.id(), toolName,
toolCallResult != null ? toolCallResult : ""));
}
// 返回执行结果
return new InternalToolExecutionResult(new ToolResponseMessage(toolResponses, Map.of()), returnDirect);
}总结:
- 从响应中提取工具调用信息(名称、参数);
- 通过
ToolCallback匹配对应的工具实现; - 调用工具的
call方法执行实际逻辑(如查询数据库、调用 API 等); - 收集工具执行结果,构建新的对话历史;
- 根据
returnDirect决定是否直接返回结果或继续请求模型。
最后再来看一下call方法,比较简单,就是执行我们的Tool工具方法逻辑啦
@Override
public String call(String toolInput, @Nullable ToolContext toolContext) {
Assert.hasText(toolInput, "toolInput cannot be null or empty");
logger.debug("Starting execution of tool: {}", this.toolDefinition.name());
I request = JsonParser.fromJson(toolInput, this.toolInputType);
O response = this.toolFunction.apply(request, toolContext);
logger.debug("Successful execution of tool: {}", this.toolDefinition.name());
return this.toolCallResultConverter.convert(response, null);
}4. 整体流程梳理:Tool 调用的完整链路
结合源码解析,Spring AI Tool 工具调用的完整流程可概括为:
- 配置准备:合并默认配置与运行时配置,生成包含工具信息的
Prompt; - 模型请求:通过
chatCompletionStream向大模型发送流式请求,获取 SSE 响应; - 分片处理:合并工具调用相关的分片,确保工具信息完整;
- 格式转换:将模型响应转为
ChatResponse,标准化数据格式; - 工具判断:检查响应是否包含工具调用请求;
- 工具执行:通过
ToolCallingManager匹配工具实现,执行call方法获取结果; - 结果处理:根据配置返回工具结果或用结果继续请求模型,形成对话闭环。
结语
本文从业务代码出发,逐步深入 Spring AI 的源码细节,重点解析了 Tool 工具方法调用的核心逻辑。理解这一流程后,你不仅能更清晰地排查工具调用中的问题,还能基于源码实现自定义扩展(如自定义工具匹配逻辑、增强异常处理等)。
源码阅读的关键在于 “抓大放小”,先理清整体流程,再深入核心细节。希望本文的解析方式能帮助你更高效地学习框架源码,真正做到 “知其然,更知其所以然”。
如果有疑问或补充,欢迎在评论区交流!
一篇比较干的内容,感谢能看到这里的朋友们~