项目中因为要迁库,所以我要在原项目中接入我的双写逻辑,确保新旧两个库都有数据写入,假如新库写入失败,旧库数据也能写入,这就确保了重要数据不能丢失。
一开始考虑的方案是使用数据同步工具,像是canal或是DTS等,但是环境这块卡的比较死,没有其他花里胡哨的工具,只能纯靠java改写代码来实现了,期间排了不少坑,这里做个人踩坑记录
实现效果,批量双写全部报200,自测下来还算成功
首先列一下要实现类的目录,因为我们门户,任务,和开放接口都是单独一套springboot然后共用common包的形式,所以这套package每个springboot都要引入
这里整理得比较匆忙,但重点其实是这几个类
aop中TargetDataSource之前说过了,还包括DataSourceConfig,DataSourceUtil和DynamicDataSource的配置,可以看我之前写的这一篇: mybatis动态数据源配置(附自定义注解实现数据源切换)
另外一个注解没有用上,是做多数据源事务切换的作用,想看代码和使用场景的,完全参考如下:配置多个数据源的事务
现在开始讲重点,为了追踪mysql中实时变化的数据,就要用到mybatis拦截器,这是我之前参考的博客,但不能完全适用于我的业务场景: 用mybatis 拦截器 为insert update操作填充字段
说一下我的写法
进入AutoFillInterceptor类,也就是我的拦截器
因为我要引入我的DoubleWriteService做我的双写逻辑,这里会发生第一个坑,因为service加载速度是在拦截器后面的,直接Autowired启动会报错,这里用到了懒加载机制,确保启动顺利
之后进入拦截器的时候先判空,再去SpringUtils取bean,就能调用了
附上工具类SpringUtils
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
if (SpringUtils.applicationContext == null) {
SpringUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
//根据name
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
//根据类型
public static T getBean(Class clazz) {
return getApplicationContext().getBean(clazz);
}
public static T getBean(String name, Class clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
接着往下走方法,这里遇到的第一个坑就是无限循环调用导致的stackoverflow,看过前面那篇博客的都知道,这个mybatis拦截器的作用其实就是去监控各类dao增改的操作,如果走到我的dao里,就会导致再次进入拦截器,然后再进入我的dao,循环往复导致了爆栈,所以我的操作很简单,就是取MappedStatement中的id,也就是完整的包路径+Dao类名,如果是就直接返回
接下来就是处理我的第一个arg,这个args是如何产生的呢,开头我们声明的就是args,第一个就肯定是MappedStatement了,第二个Object是我们的传参,可以是实体类或是map,这个之后会讲
取MappedStatement的用处其实有很多,自己debug的话可以看到许多mybatis分装的参数,我这里就取id就够了,一般常用的就是取sql字符串了,这里注释的就是取sql并且替换占位符?的逻辑。我看了好几篇博客,都写的一样: SpringBoot通过MyBatis拦截器打印完整SQL语句(无问号) 但我自己跑的时候却发现获取不到参数,所以我的双写不用直接sql的形式做
这里我就用到了第二个args,也就是增改他可以传entity类,可以传map,我这里做了类判断和新增修改判断,然后分别进入我相应的方法里。我的service入口传参简单明了,就是传表名和map/entity的形式
顺便说一下枚举的作用,因为我拦截器只能获得sqlID,为了获取表名tableName,这里枚举其实用到了包含,其实就是如下效果,假如表名交lzq_test1,包路径名叫com.lzq.common.dao.lzqTest1Dao
拦截器获取的sqlId一般都叫com.lzq.common.dao.lzqTest1Dao.insert,所以这里用contains去获取
枚举讲完了,之后就进入我的service做主要逻辑了
可以看到insert和update各做了map和obj类的传参,确保都能顺利执行写入,先讲一下各个方法的作用:
进入实现类,所有依赖如图所示,事务的注掉了因为发现不好用,同类调用避免依赖问题再次加上@Lazy
线程池根据应用自己配
@Data
@Configuration
@ConfigurationProperties(prefix = "executor")
@EnableConfigurationProperties(ExecutorConfig.class)
public class ExecutorConfig {
private int corePoolSize = 5;//2
private int maxPoolSize = 10;//4
private int keepAliveSeconds = 60;
private int queueCapacity = 200;
@Bean
public TraceThreadPoolExecutor traceThreadPoolExecutor() {
return new TraceThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity, rejectedExecutionHandler());
}
RejectedExecutionHandler rejectedExecutionHandler() {
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}
双写开关根据配置文件实现
新增/更新方法如下
再往下走,根据需要自己配
public String processUpdateBinLogData(String tableName, Map param,String isUpdate) {
//, int threadCount
String result;
if (null == tableName) {
return null;
}
result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);
/** 请求数据超过二十,才使用多线程,否则就是单线程 */
// int threadCount = 2;//requestList.size() > 20 ? 20 : 1;
// try {
// result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);
// if (threadCount > 1) {
// /* 多线程处理 */
// result = procUpdateBinLogByMultiThread(enumByTable,param, threadCount);
// } else {
// /* 直接处理 */
// result = processUpdateBinLog(param, enumByTable);
// }
// } catch (Exception e) {
//
// }
return result;
}
private String procUpdateBinLogByMultiThread(String tableName,Map param,String isUpdate) {
//List result = new ArrayList<>();
if (!param.isEmpty()) {
processUpdateBinLogDataByThread(param, tableName,isUpdate);
}
// if (!param.isEmpty() && null != threadCount) {
// List> splitList = ListUtils.averageSplit(requestList, threadCount);
// if (CollectionUtils.isNotEmpty(splitList)){
// List>> futureList = new ArrayList<>();
// for (List r : splitList) {
// Future> future = processUpdateBinLogDataByThread(param, enumByTable);
// if (null != future){
// futureList.add(future);
// }
// }
// /** 获取多线程返回结果 */
// for (Future> r : futureList) {
// result.addAll(r.get());
// }
// }
// result.addAll(future.get());
// }
return null;
}
这里用Future类和线程池去走异步
public Future processUpdateBinLogDataByThread(Map param, String tableName,String isUpdate) {
//Future> result = null;
Future result = null;
//if (CollectionUtils.isNotEmpty(requestList)) {
if (!param.isEmpty()) {
result = threadPoolExecutor.submit(new TraceAsyncCallableTask() {
private final Map paramIn = param;
private final String tableNameIn = tableName;
private final String isUpdateIn = isUpdate;
private Object res;
@Override
public String getMethod() {
return "processUpdateBinLogDataByThread";
}
@Override
public String[] getParams() {
String[] paraArr = new String[1];
return paraArr;
}
@Override
public String getService() {
return "DoubleWriteService";
}
@Override
public Object getRes() {
return res;
}
@Override
public String call() {
//同类调用才能生效注解
return doubleWriteService.processUpdateBinLog(paramIn, tableNameIn,isUpdateIn);
}
});
}
return result;
}
注意这里,用到了切换数据源的注解
@Override
@TargetDataSource(connName = "dbTwo") //异步重新生效不注解,换成同类调用
public String processUpdateBinLog(Map map, String tableName,String isUpdate) {
//List result = new ArrayList<>();
//String result = "fail";
int isSucc = 0;
Map queryMap = new HashMap<>();
//String str;
if (!map.isEmpty()) {
try {
//报错测试
//int i = 1;i = i /0;
map = humpToUnderline(map);
queryMap.put("tableName", tableName);
queryMap.put("fieldsMap", map);
queryMap.put("queryMap", map);
if (DoubleWriteConstants.UPDATE.equals(isUpdate)){
isSucc = dataMigrationDao.updateTableListDynamic(queryMap);
//更新直接去map的主键
logger.info("{}表双写{}成功,主键->{}", tableName,isUpdate, map.get("id"));
}else {
isSucc = dataMigrationDao.insertTableListDynamic(queryMap);
//新增取mybatis返回主键
logger.info("{}表双写{}成功,主键->{}", tableName,isUpdate, queryMap.get("id"));
}
}catch (Exception e){
String jsonStr = JSONObject.toJSONString(map);
//key格式:前缀+新增/更新+表名
cacheClient.lpush( DoubleWriteConstants.REDIS_KEY_PREFIX + isUpdate + tableName,jsonStr);
logger.error("{}表双写{}错误,json->{},异常->{}" ,tableName,isUpdate,jsonStr, e.getMessage());
}
}
return null;
}
驼峰转下划线逻辑,这里用hutool的逻辑复制过来的
/**
* 把 map 中的 key 由驼峰命名转为下划线,使用LinkedHashMap确保字段顺序一致性
*/
private HashMap humpToUnderline(Map map) {
//使用LinkedHashMap确保字段顺序一致性
HashMap transitionMap = new LinkedHashMap<>(16);
map.forEach((k, v) -> transitionMap.put(toUnderlineCase(k), v));
return transitionMap;
}
public static String toUnderlineCase(CharSequence str) {
return toSymbolCase(str, '_');
}
public static String toSymbolCase(CharSequence str, char symbol) {
if (str == null) {
return null;
} else {
int length = str.length();
StringBuilder sb = new StringBuilder();
for(int i = 0; i < length; ++i) {
char c = str.charAt(i);
Character preChar = i > 0 ? str.charAt(i - 1) : null;
if (Character.isUpperCase(c)) {
Character nextChar = i < str.length() - 1 ? str.charAt(i + 1) : null;
if (null != preChar && Character.isUpperCase(preChar)) {
sb.append(c);
} else if (null != nextChar && Character.isUpperCase(nextChar)) {
if (null != preChar && symbol != preChar) {
sb.append(symbol);
}
sb.append(c);
} else {
if (null != preChar && symbol != preChar) {
sb.append(symbol);
}
sb.append(Character.toLowerCase(c));
}
} else {
if (sb.length() > 0 && Character.isUpperCase(sb.charAt(sb.length() - 1)) && symbol != c) {
sb.append(symbol);
}
sb.append(c);
}
}
return sb.toString();
}
}
最后是beanToMap逻辑,注意有时候会有转化失败问题,会影响到入mybatis的传参,自己处理
public static Map beanToMap(Object object){
Map map = null;
try {
map = new HashMap();
BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor property : propertyDescriptors) {
String key = property.getName();
if (key.compareToIgnoreCase("class") == 0) {
continue;
}
Method getter = property.getReadMethod();
Object value = getter!=null ? getter.invoke(object) : null;
map.put(key, value);
}
//key 可能会把自己的class 和hashcode编进去,直接去掉
map.remove("class");
} catch (Exception e) {
e.printStackTrace();
return new HashMap<>();
}
Set set = map.keySet();
Iterator it = set.iterator();
while (it.hasNext()){
String key = it.next();
if (map.get(key)==null || map.get(key)==""){
map.remove(key);
set = map.keySet();
it = set.iterator();
}
}
if ("false".equals(map.get("emtpy"))){
logger.error("{}双写前obj转化失败",object);
}
return map;
}
最后讲一下dao,上篇博客也整理过了,就是动态新增修改
作为消费端使用很方便,只要传表名和map就完全能用,这里注意insert要加上useGeneratedKeys = “true” keyProperty = "id"来获取自增主键,这会自动注入到当前map里
insert into
${
map.tableName}
(
`${
key}`
)
values
(
#{
value}
)
update ${
map.tableName}
${
key}= #{
value}
${
key}= #{
value}
where id = #{
map.queryMap.id}
原文链接:https://blog.csdn.net/Koikoi12/article/details/125680976
留言与评论(共有 0 条评论) “” |