侧边栏壁纸
博主头像
博主等级

  • 累计撰写 23 篇文章
  • 累计创建 35 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Chat聊天记录存储

前尘一梦
2025-05-03 / 0 评论 / 0 点赞 / 39 阅读 / 12789 字

基于SpringAI的BaseAdvisor接口实现

package com.canvas.ai.service.chat.modulerag.core;

import com.canvas.ai.service.chat.chathistory.entity.vo.ChatHistoryBO;
import com.canvas.ai.service.chat.chathistory.historycombine.entity.vo.ChatHistoryCombineBO;
import com.canvas.ai.service.chat.chathistory.historycombine.service.ChatHistoryCombineService;
import com.canvas.ai.service.chat.chathistory.service.ChatHistoryService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClientMessageAggregator;
import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

/**
 * 永久保存聊天记录到MySQL数据库的Advisor
 * 参考Spring AI的MessageChatMemoryAdvisor实现
 * 
 * @author cqry2016
 */
@Slf4j
public class PersistentChatHistoryAdvisor implements BaseAdvisor {

    public static final String USER_ID = "userId";
    public static final String historyCombineId = "historyCombineId";

    private final ChatHistoryService chatHistoryService;
    private final ChatHistoryCombineService chatHistoryCombineService;
    private final int order;

    /**
     * 构造函数
     * 
     * @param chatHistoryService 聊天记录服务
     * @param order advisor的执行顺序
     */
    public PersistentChatHistoryAdvisor(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService, int order) {
        this.chatHistoryService = chatHistoryService;
        this.chatHistoryCombineService = chatHistoryCombineService;
        this.order = order;
    }

    /**
     * 默认构造函数,order为0
     */
    public PersistentChatHistoryAdvisor(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService) {
        this(chatHistoryService, chatHistoryCombineService, 0);
    }

    @Override
    public int getOrder() {
        return this.order;
    }

    @Override
    public ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) {
        // 提取并存储请求消息
        try {
            Long userId = (Long) chatClientRequest.context().getOrDefault(USER_ID, 1);
            String conversationId = (String) chatClientRequest.context().get(ChatMemory.CONVERSATION_ID);
            UserMessage userMessage = chatClientRequest.prompt().getUserMessage();
            SystemMessage systemMessage = chatClientRequest.prompt().getSystemMessage();
            Long historyCombineId = saveReqMessages(userMessage, systemMessage, conversationId, userId);
            chatClientRequest.context().put(PersistentChatHistoryAdvisor.historyCombineId, historyCombineId);
        } catch (Exception e) {
            log.warn("Failed to extract user message in before method", e);
        }
        return chatClientRequest;
    }

    @Override
    public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) {
        // 从context中获取conversationId和userId
        String conversationId = null;
        Long userId;
        
        try {
            conversationId = (String) chatClientResponse.context().get(ChatMemory.CONVERSATION_ID);
            userId = (Long) chatClientResponse.context().get(USER_ID);

            // 保存助手回复
            saveAssistantMessages(chatClientResponse, conversationId, userId);
            
        } catch (Exception e) {
            log.error("Error saving chat memory for conversation: {}", conversationId, e);
        }

        return chatClientResponse;
    }

    /**
     * 保存请求消息
     */
    private Long saveReqMessages(UserMessage userMessage, SystemMessage systemMessage, String conversationId, Long userId) {
        Long historyCombineId = null;
        try {
            // 保存轮次消息
            if (StringUtils.hasText(userMessage.getText())) {
                ChatHistoryBO chatMemoryBO = ChatHistoryBO.builder()
                        .conversationId(conversationId)
                        .userId(userId)
                        .content(userMessage.getText())
                        .type(userMessage.getMessageType().name())
                        .timestamp(LocalDateTime.now())
                        .build();

                chatHistoryService.createAiChatMemory(chatMemoryBO);

                //  保存完整消息
                ChatHistoryCombineBO chatHistoryCombineBO = ChatHistoryCombineBO.builder()
                        .conversationId(conversationId)
                        .userId(userId)
                        .question(userMessage.getText())
                        .sysContent(systemMessage != null ? systemMessage.getText() : null)
                        .build();
                historyCombineId = chatHistoryCombineService.createHistoryCombine(chatHistoryCombineBO);

                log.debug("Saved chat message to database for conversation: {}", conversationId);
            } else {
                log.debug("No user message found in context for conversation: {}", conversationId);
            }
        } catch (Exception e) {
            log.warn("Failed to save user message for conversation: {}", conversationId, e);
        }
        return historyCombineId;
    }

    /**
     * 保存助手消息到数据库
     */
    private void saveAssistantMessages(ChatClientResponse chatClientResponse, String conversationId, Long userId) {
        List<Message> assistantMessages = new ArrayList<>();
        if (chatClientResponse.chatResponse() != null) {
            assistantMessages = chatClientResponse.chatResponse()
                    .getResults()
                    .stream()
                    .map(g -> (Message) g.getOutput())
                    .toList();
        }

        Long historyCombineId = (Long) chatClientResponse.context().get(PersistentChatHistoryAdvisor.historyCombineId);
        for (Message assistantMessage : assistantMessages) {

            saveMessageToDatabase(conversationId, userId, assistantMessage.getText(), assistantMessage.getMessageType().name(), historyCombineId);
        }

    }

    /**
     * 保存消息到MySQL数据库
     */
    private void saveMessageToDatabase(String conversationId, Long userId, String content, String type, Long historyCombineId) {
        try {
            ChatHistoryBO chatMemoryBO = ChatHistoryBO.builder()
                .conversationId(conversationId)
                .userId(userId)
                .content(content)
                .type(type)
                .timestamp(LocalDateTime.now())
                .build();

            Long id = chatHistoryService.createAiChatMemory(chatMemoryBO);

            //  保存完整消息
            ChatHistoryCombineBO chatHistoryCombineBO = ChatHistoryCombineBO.builder()
                    .id(historyCombineId)
                    .answer(content)
                    .build();
            chatHistoryCombineService.updateHistoryCombine(chatHistoryCombineBO);

            log.debug("Saved chat message to database with id: {} for conversation: {}", id, conversationId);
        } catch (Exception e) {
            log.error("Failed to save chat message to database for conversation: {}", conversationId, e);
        }
    }

    @Override
    public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest,
                                                 StreamAdvisorChain streamAdvisorChain) {
        // Get the scheduler from BaseAdvisor
        Scheduler scheduler = this.getScheduler();

        // Process the request with the before method
        return Mono.just(chatClientRequest)
                .publishOn(scheduler)
                .map(request -> this.before(request, streamAdvisorChain))
                .flatMapMany(streamAdvisorChain::nextStream)
                .transform(flux -> new ChatClientMessageAggregator().aggregateChatClientResponse(flux,
                        response -> this.after(response, streamAdvisorChain)));
    }

    /**
     * 静态工厂方法
     */
    public static Builder builder(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService) {
        return new Builder(chatHistoryService, chatHistoryCombineService);
    }

    /**
     * Builder类
     */
    public static class Builder {
        private final ChatHistoryService chatHistoryService;
        private final ChatHistoryCombineService chatHistoryCombineService;
        private int order = 0;

        private Builder(ChatHistoryService chatHistoryService, ChatHistoryCombineService chatHistoryCombineService) {
            this.chatHistoryService = chatHistoryService;
            this.chatHistoryCombineService = chatHistoryCombineService;
        }

        public Builder order(int order) {
            this.order = order;
            return this;
        }

        public PersistentChatHistoryAdvisor build() {
            return new PersistentChatHistoryAdvisor(chatHistoryService, chatHistoryCombineService, order);
        }
    }
}

Mysql表结构

CREATE TABLE `chat_history` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `conversation_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对话id',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  `content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '内容',
  `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '角色',
  `timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '时间戳',
  `state` int DEFAULT '0' COMMENT '状态',
  `description` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '说明',
  `label` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '标签',
  `creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
  `workspace_id` bigint DEFAULT NULL COMMENT '归属工作空间ID',
  `tenant_id` bigint DEFAULT '0' COMMENT '租户编号',
  `sorting` int DEFAULT '0' COMMENT '排序',
  `version` int DEFAULT '1' COMMENT '版本号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1983809822749970434 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='对话历史';



CREATE TABLE `chat_history_combine` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `conversation_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对话id',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  `question` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '问题',
  `sys_content` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '提示词',
  `answer` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '回答',
  `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '0' COMMENT '类型',
  `state` int DEFAULT '0' COMMENT '状态',
  `description` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '说明',
  `label` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '标签',
  `creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '创建者',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '更新者',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否删除',
  `workspace_id` bigint DEFAULT NULL COMMENT '归属工作空间ID',
  `tenant_id` bigint DEFAULT '0' COMMENT '租户编号',
  `sorting` int DEFAULT '0' COMMENT '排序',
  `version` int DEFAULT '1' COMMENT '版本号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1983809785852678147 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='单次完整对话历史';

0

评论区