基于SpringAI的BaseAdvisor接口实现
package com.canvas.ai.service.chat.modulerag.core;
import com.canvas.ai.service.chat.chathistory.entity.vo.ChatHistoryBO;
import com.canvas.ai.service.chat.chathistory.historycombine.entity.vo.ChatHistoryCombineBO;
import com.canvas.ai.service.chat.chathistory.historycombine.service.ChatHistoryCombineService;
import com.canvas.ai.service.chat.chathistory.service.ChatHistoryService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClientMessageAggregator;
import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 永久保存聊天记录到MySQL数据库的Advisor
* 参考Spring AI的MessageChatMemoryAdvisor实现
*
* @author cqry2016
*/
@Slf4j
public class PersistentChatHistoryAdvisor implements BaseAdvisor {
public static final String USER_ID = "userId";
public static final String historyCombineId = "historyCombineId";
private final ChatHistoryService chatHistoryService;
private final ChatHistoryCombineService chatHistoryCombineService;
private final int order;
/**
* 构造函数
*
* @param chatHistoryService 聊天记录服务
* @param order advisor的执行顺序
*/
public PersistentChatHistoryAdvisor(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService, int order) {
this.chatHistoryService = chatHistoryService;
this.chatHistoryCombineService = chatHistoryCombineService;
this.order = order;
}
/**
* 默认构造函数,order为0
*/
public PersistentChatHistoryAdvisor(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService) {
this(chatHistoryService, chatHistoryCombineService, 0);
}
@Override
public int getOrder() {
return this.order;
}
@Override
public ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) {
// 提取并存储请求消息
try {
Long userId = (Long) chatClientRequest.context().getOrDefault(USER_ID, 1);
String conversationId = (String) chatClientRequest.context().get(ChatMemory.CONVERSATION_ID);
UserMessage userMessage = chatClientRequest.prompt().getUserMessage();
SystemMessage systemMessage = chatClientRequest.prompt().getSystemMessage();
Long historyCombineId = saveReqMessages(userMessage, systemMessage, conversationId, userId);
chatClientRequest.context().put(PersistentChatHistoryAdvisor.historyCombineId, historyCombineId);
} catch (Exception e) {
log.warn("Failed to extract user message in before method", e);
}
return chatClientRequest;
}
@Override
public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) {
// 从context中获取conversationId和userId
String conversationId = null;
Long userId;
try {
conversationId = (String) chatClientResponse.context().get(ChatMemory.CONVERSATION_ID);
userId = (Long) chatClientResponse.context().get(USER_ID);
// 保存助手回复
saveAssistantMessages(chatClientResponse, conversationId, userId);
} catch (Exception e) {
log.error("Error saving chat memory for conversation: {}", conversationId, e);
}
return chatClientResponse;
}
/**
* 保存请求消息
*/
private Long saveReqMessages(UserMessage userMessage, SystemMessage systemMessage, String conversationId, Long userId) {
Long historyCombineId = null;
try {
// 保存轮次消息
if (StringUtils.hasText(userMessage.getText())) {
ChatHistoryBO chatMemoryBO = ChatHistoryBO.builder()
.conversationId(conversationId)
.userId(userId)
.content(userMessage.getText())
.type(userMessage.getMessageType().name())
.timestamp(LocalDateTime.now())
.build();
chatHistoryService.createAiChatMemory(chatMemoryBO);
// 保存完整消息
ChatHistoryCombineBO chatHistoryCombineBO = ChatHistoryCombineBO.builder()
.conversationId(conversationId)
.userId(userId)
.question(userMessage.getText())
.sysContent(systemMessage != null ? systemMessage.getText() : null)
.build();
historyCombineId = chatHistoryCombineService.createHistoryCombine(chatHistoryCombineBO);
log.debug("Saved chat message to database for conversation: {}", conversationId);
} else {
log.debug("No user message found in context for conversation: {}", conversationId);
}
} catch (Exception e) {
log.warn("Failed to save user message for conversation: {}", conversationId, e);
}
return historyCombineId;
}
/**
* 保存助手消息到数据库
*/
private void saveAssistantMessages(ChatClientResponse chatClientResponse, String conversationId, Long userId) {
List<Message> assistantMessages = new ArrayList<>();
if (chatClientResponse.chatResponse() != null) {
assistantMessages = chatClientResponse.chatResponse()
.getResults()
.stream()
.map(g -> (Message) g.getOutput())
.toList();
}
Long historyCombineId = (Long) chatClientResponse.context().get(PersistentChatHistoryAdvisor.historyCombineId);
for (Message assistantMessage : assistantMessages) {
saveMessageToDatabase(conversationId, userId, assistantMessage.getText(), assistantMessage.getMessageType().name(), historyCombineId);
}
}
/**
* 保存消息到MySQL数据库
*/
private void saveMessageToDatabase(String conversationId, Long userId, String content, String type, Long historyCombineId) {
try {
ChatHistoryBO chatMemoryBO = ChatHistoryBO.builder()
.conversationId(conversationId)
.userId(userId)
.content(content)
.type(type)
.timestamp(LocalDateTime.now())
.build();
Long id = chatHistoryService.createAiChatMemory(chatMemoryBO);
// 保存完整消息
ChatHistoryCombineBO chatHistoryCombineBO = ChatHistoryCombineBO.builder()
.id(historyCombineId)
.answer(content)
.build();
chatHistoryCombineService.updateHistoryCombine(chatHistoryCombineBO);
log.debug("Saved chat message to database with id: {} for conversation: {}", id, conversationId);
} catch (Exception e) {
log.error("Failed to save chat message to database for conversation: {}", conversationId, e);
}
}
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest,
StreamAdvisorChain streamAdvisorChain) {
// Get the scheduler from BaseAdvisor
Scheduler scheduler = this.getScheduler();
// Process the request with the before method
return Mono.just(chatClientRequest)
.publishOn(scheduler)
.map(request -> this.before(request, streamAdvisorChain))
.flatMapMany(streamAdvisorChain::nextStream)
.transform(flux -> new ChatClientMessageAggregator().aggregateChatClientResponse(flux,
response -> this.after(response, streamAdvisorChain)));
}
/**
* 静态工厂方法
*/
public static Builder builder(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService) {
return new Builder(chatHistoryService, chatHistoryCombineService);
}
/**
* Builder类
*/
public static class Builder {
private final ChatHistoryService chatHistoryService;
private final ChatHistoryCombineService chatHistoryCombineService;
private int order = 0;
private Builder(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService) {
this.chatHistoryService = chatHistoryService;
this.chatHistoryCombineService = chatHistoryCombineService;
}
public Builder order(int order) {
this.order = order;
return this;
}
public PersistentChatHistoryAdvisor build() {
return new PersistentChatHistoryAdvisor(chatHistoryService, chatHistoryCombineService, order);
}
}
}Mysql表结构
CREATE TABLE `chat_history` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`conversation_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对话id',
`user_id` bigint DEFAULT NULL COMMENT '用户id',
`content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '内容',
`type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '角色',
`timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '时间戳',
`state` int DEFAULT '0' COMMENT '状态',
`description` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '说明',
`label` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '标签',
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
`workspace_id` bigint DEFAULT NULL COMMENT '归属工作空间ID',
`tenant_id` bigint DEFAULT '0' COMMENT '租户编号',
`sorting` int DEFAULT '0' COMMENT '排序',
`version` int DEFAULT '1' COMMENT '版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1983809822749970434 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='对话历史';
CREATE TABLE `chat_history_combine` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`conversation_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对话id',
`user_id` bigint DEFAULT NULL COMMENT '用户id',
`question` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '问题',
`sys_content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '提示词',
`answer` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '回答',
`type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '0' COMMENT '类型',
`state` int DEFAULT '0' COMMENT '状态',
`description` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '说明',
`label` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '标签',
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
`workspace_id` bigint DEFAULT NULL COMMENT '归属工作空间ID',
`tenant_id` bigint DEFAULT '0' COMMENT '租户编号',
`sorting` int DEFAULT '0' COMMENT '排序',
`version` int DEFAULT '1' COMMENT '版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1983809785852678147 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='单次完整对话历史';
评论区