前言
在日常项目中,我们常常在上线一个新功能时需要做生产测试,某些用户访问量比较高的系统,除了正常的业务测试,我们还需压测,不管是业务测试或者压测,最重要的一点就是测试的数据不能影响用户的数据,比如测试下单功能,把单子下到真实用户账号上去了,用户一登陆发现一些莫名其妙的单子,轻则找公司客户投诉解决,重则投诉到市场管理部门,导致公司业务严重影响,所以数据隔离非常重要。
几种方案
一般的方案有如下几种:
建立测试账号,测试数据和测试账号绑定
优点:在某些较简单场景下快速,方便,成本小;
缺点:当存在统计类的业务数据时,可能会因为测试数据导致统计数据不准,需要在编码时手动排除测试数据,容易漏掉且麻烦,无法满足压测场景。
在数据库表中建立额外测试数据标识字段
和建立测试账号本质类似,但限制更多,且需要同时考虑数据过滤,数据增删打标
通过影子库,测试数据统一落到和生产数据库结构一样的影子库中
优点:数据完美隔离,支持压测场景。
缺点:需要做项目改造,需要冗余的服务器资源,成本相对高一些。
以上方案中,可以看出影子库的方式是数据隔离性最高,且能满足压测需求,如果是公司主营业务对应的长期项目,建立一套这样的环境还是有必要的,接下来聊聊如何实现。
项目改造点
数据库;
缓存,消息队列,Feign等RPC通信中间件, 日志;
三方平台Mock,部分业务需要兼容处理,比如测试的订单不走ERP发货流程。
改造方案
数据库:
使用多数据源,发起请求的时候在请求头中添加影子库标识或者分配单独的测试域名,在Nginx解析的时候添加影子库标识,假设定义为dataSource = gray;
项目中配置多个数据源,如果ORM使用的是MybatisPlus,在yml中配置多个即可,如下
spring:
datasource:
dynamic:
strict: false
primary: master
datasource:
master:
url: jdbc:mysql
username:
password:
driver-class-name: com.mysql.cj.jdbc.Driver
type: org.apache.commons.dbcp2.BasicDataSource
dbcp2:
max-total: 15
max-idle: 8
minIdle : 8
initialSize : 8
gray:
url:
username:
password:
driver-class-name: com.mysql.cj.jdbc.Driver
type: org.apache.commons.dbcp2.BasicDataSource
dbcp2:
max-total: 15
max-idle: 8
minIdle: 8
initialSize: 8
配置好了多个数据源以后,我们需要根据请求过来的请求头控制数据源的选择,此处可以使用Aop或者拦截器来实现
public class ReqInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
response.addHeader("traceId", MDC.get("traceId"));
MDC.put("clientIp", ServletUtil.getClientIP(request, null));
String dataSource = getDataSource(request);
if(!DynamicSourceTtl.MASTER_DATASOURCE.equals(dataSource)) {
//配置使用数据源
DynamicSourceTtl.push(dataSource);
}
return HandlerInterceptor.super.preHandle(request, response, handler);
}
private String getDataSource(HttpServletRequest request) {
String dataSource = DynamicSourceTtl.MASTER_DATASOURCE;
// 从请求头里取
String requestDataSource = request.getHeader("dataSource");
// log.debug("获取到的dataSource {}", requestDataSource);
if(StringUtils.isNotBlank(requestDataSource)) {
dataSource = requestDataSource;
}
return dataSource;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
HandlerInterceptor.super.postHandle(request, response, handler, modelAndView);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
MDC.remove("clientIp");
DynamicSourceTtl.clear();
HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
}
/**
* @author: ranyang
* @Date: 2021/07/30 09:09
* @descript: 动态数据源管理
*/
@Slf4j
public class DynamicSourceTtl {
//灰度数据源
public final static String SLAVE_DATASOURCE = "gray";
//主数据源
public final static String MASTER_DATASOURCE = "master";
public static TransmittableThreadLocal<String> dataSourceContext = new TransmittableThreadLocal<>();
public static String push(String dataSource) {
String ds = StringUtils.isBlank(dataSource) ? MASTER_DATASOURCE : dataSource;
dataSourceContext.set(ds);
DynamicDataSourceContextHolder.push(ds);
return ds;
}
public static void clear() {
dataSourceContext.remove();
DynamicDataSourceContextHolder.clear();
}
public static String get() {
//判断如果ttl有参数,未找到参数。重新赋值
if (StringUtils.isNotBlank(dataSourceContext.get()) && StringUtils.isBlank(DynamicDataSourceContextHolder.peek())) {
DynamicDataSourceContextHolder.push(dataSourceContext.get());
}
//判断ttl有参数,并有参数,但参数不一致,以ttl为准重新赋值
if(StringUtils.isNotBlank(dataSourceContext.get()) && StringUtils.isNotBlank(DynamicDataSourceContextHolder.peek())) {
if(!dataSourceContext.get().equals(DynamicDataSourceContextHolder.peek())){
DynamicDataSourceContextHolder.push(dataSourceContext.get());
}
}
return dataSourceContext.get();
}
}
这里使用阿里开源的TTL组件透传gray标识,其原理可看另一篇:
这样只要发起请求的时候带上gray标识头,对应数据就会落到指定的影子库中,如果有多个子系统,则每个子系统都需要引入该拦截器。
中间件:
Redis(其他缓存中间件类似):
自定义RedisKeySerializer,注册RedisTemplate或CacheManager的时候配置使用我们自定义的KeySerializer
public class RedisKeySerializer implements RedisSerializer<String> { private final Charset charset; public static final RedisKeySerializer US_ASCII; public static final RedisKeySerializer ISO_8859_1; public static final RedisKeySerializer UTF_8; public RedisKeySerializer() { this(StandardCharsets.UTF_8); } public RedisKeySerializer(Charset charset) { Assert.notNull(charset, "Charset must not be null!"); this.charset = charset; } public String deserialize(@Nullable byte[] bytes) { String grayDataSource = DynamicSourceTtl.get(); if(bytes != null && StringUtils.isNotBlank(grayDataSource)) { String cacheValue = new String(bytes, charset); int indexOf = cacheValue.indexOf(grayDataSource); if (indexOf != -1) { cacheValue = cacheValue.substring(5); return cacheValue; } } // return (cacheKey.getBytes() == null ? null : cacheKey); return bytes == null ? null : new String(bytes, this.charset); } public byte[] serialize(@Nullable String cacheKeyOrValue) { if(!NumberUtils.isNumber(cacheKeyOrValue)) { String grayDataSource = DynamicSourceTtl.get(); if(StringUtils.isNotBlank(grayDataSource)) { cacheKeyOrValue = grayDataSource + CacheDataConstantKeys.SEPARATOR + cacheKeyOrValue; } } return cacheKeyOrValue == null ? null : cacheKeyOrValue.getBytes(this.charset); } public Class<?> getTargetType() { return String.class; } static { US_ASCII = new RedisKeySerializer(StandardCharsets.US_ASCII); ISO_8859_1 = new RedisKeySerializer(StandardCharsets.ISO_8859_1); UTF_8 = new RedisKeySerializer(StandardCharsets.UTF_8); } } @Bean public RedisTemplate<String, Object> redisTemplate(@Qualifier("redisAloneFactory") RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 redisTemplate.setValueSerializer(serializer()); // 使用自定义的StringRedisSerializer来序列化和反序列化redis的key值 RedisKeySerializer stringRedisSerializer = new RedisKeySerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 redisTemplate.setHashKeySerializer(stringRedisSerializer); // hash的value序列化方式采用jackson redisTemplate.setHashValueSerializer(serializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } private Jackson2JsonRedisSerializer<Object> serializer() { // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常 objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); return jackson2JsonRedisSerializer; } @Bean public CacheManager cacheManager(@Qualifier("redisAloneFactory") RedisConnectionFactory factory) { RedisSerializer<String> redisSerializer = new RedisKeySerializer(); // 配置序列化(解决乱码的问题) RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() // 缓存有效期 .entryTtl(timeToLive) .prefixCacheNameWith(key_prefix) // 使用StringRedisSerializer来序列化和反序列化redis的key值 .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer)) // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(serializer())); return RedisCacheManager.builder(factory) .cacheDefaults(config) .build(); }
工具类统一拦截, 在key前添加对应标识前缀以区分。需要两个前提条件
没使用CacheManager管理注解方式的缓存数据
项目所有缓存读取操作都是通过同一个缓存工具类
public static void add(StringRedisTemplate stringRedisTemplate, String k, String v, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(wrappKey(k), v);
stringRedisTemplate.expire(wrappKey(k), timeout, unit);
}
public static String get(StringRedisTemplate stringRedisTemplate, String k) {
return (String)stringRedisTemplate.opsForValue().get(wrappKey(k));
}
private static String wrappKey(String key) {
String s = DynamicSourceTtl.get();
return s + ":" + key;
}
消息队列
拦截消息消费和消息读取,在消息头中添加额外字段,发送消息时读取存储在TTL中的环境标识,放入消息头中,消费消息的时候从消息头中拿出标识,放到当前系统TTL中,并切换当前系统数据源。
RabbitMq( 其他类似)
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setUseDirectReplyToContainer(false);
//发送之前作额外处理,每次发送都会调用
template.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//拦截逻辑添加环境变量
message.getMessageProperties().getHeaders().put("dataSource", DynamicSourceTtl.get());
return message;
}
});
template.setMessageConverter(messageConverter());
return template;
}
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//消息接收之前加拦截处理,每次接收消息都会调用
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map header = message.getMessageProperties().getHeaders();
//判断是动态切换影子库
if (StringUtil.isNotBlank(String.valueOf(header.get("dataSource")))) {
DynamicSourceTtl.push(String.valueOf(header.get("dataSource")));
}
return message;
}
});
configurer.configure(factory, connectionFactory);
return factory;
}
RPC通信组件
Dubbo
// 请求 public class HeaderFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String dataSource = DynamicSourceTtl.get(); //根据业务存储数据 if (StringUtils.isNotEmpty(value)){ RpcContext.getContext().setAttachment("dataSource",dataSource); } Result invoke = invoker.invoke(invocation); return invoke; } } // 响应 String env = RpcContext.getContext().getAttachment("dataSource") DynamicSourceTtl.push(env)
Feign或者其他Http框架,拦截请求发送,在请求头中添加对应标识
@Configuration
public class FeignConfiguration implements RequestInterceptor {
private static final Logger log = LoggerFactory.getLogger(FeignConfiguration.class);
public FeignConfiguration() {
}
public void apply(RequestTemplate template) {
String dataSource = DynamicSourceTtl.get();
log.debug("RPC请求地址:{},RPC环境参数:{},当前数据库环境:{}", new Object[]{template.url(), dataSource, DynamicDataSourceContextHolder.peek()});
if (StringUtils.isNotBlank(dataSource)) {
template.header("dataSource", new String[]{dataSource});
}
}
}
日志
在日志元数据中添加额外字段,上报统一日志平台,把字段加上对应索引就可以搜索过滤了
如果是压测场景,尽量调高日志级别到ERROR或者临时关闭日志打印,不然会对日志中间件造成极大压力,如果使用了云服务,会带来高额的费用。
业务特殊处理
这块相对来说麻烦一些,需要根据实际的业务场景处理,要避免测试数据流入外部第三方平台,如果是压测,得防止把三分系统压垮,需要做MOCK。
成本控制
这种隔离方式比较明显的问题就是成本相对较高,不过只要规划合理,也能控制在一定范围内,分享以下几点经验
只针对微服务系统中的核心模块建影子库
影子库基于生产库创建,要保证表结构和生产一致,不然达不到业务测试效果,测试完成后应立即回收,下次有需求再重建,如果是使用阿里云的PolarDb, 可以从他的备份中恢复数据,几分钟内就可完成。
压测场景中,要提前考虑被压的对象会不会带来不必要的成本并做好应对策略,比如防火墙,带宽,CDN费用等。
总结
以上介绍了生产环境测试的数据隔离方案,实际项目应结合项目中用到的中间件,各模块业务全局考虑,才能做出一套合理的隔离环境。
评论区