冷热分离的demo(方案被收拢入口后的sharding分表代替)"/>
冷热分离的demo(方案被收拢入口后的sharding分表代替)
背景:
随着业务的不断发展,单量越来越大,对数据库的访问压力也越来越大,为了保证系统的稳定、减轻数据库的查询压力,针对 订单信息表、订单扩展表、 订单产品信息表和产品服务信息表做分表方案,由于oms系统在业务操作时订单查询是多种维度查询,比如通过运单号查询、客户单号查询以及平台订单号查询,故采用常规的分库分表方案无法满足现有的业务变化,考虑采用冷热数据分离方案来解决目前的现状。
方案概述:
1、四张表分表,分表需要考虑后期的查询问题,以时间维度进行冷热分离(冷热数据分界线:以新增的冷热库时间(hot_date)6个月为分界点)。
2、热数据只占全部数据的一部分,因此每次优先查询热库,以下情况才查询冷库
- 当查询条件未命中(结果集为空)时,查询冷库。 - 当查询条件部分命中时,查询冷库。
3、为了区分部分命中和全部命中,可以新建一张R表存放每次查询冷库的查询条件和查询结果数量和查询结果的主键,每次查询热库时,对比相同查询条件的查询结果数量是否一致。一致,则本次查询结束。不一致,则需要到冷库中进行查询。
4、举例说明:100条中80条还是热数据 20条变成了冷数据,其实应该只是对冷数据库发起这20条数据的请求。此时需要将R表数据拿出来比对,只查一部分冷数据。
5、热库时间=>冷库时间 : 查询和使用热数据时,将一段时间不再使用的热数据移到冷库(时间)。
6、冷库时间=>热库时间 :查询冷库时,将本次查询的结果移到热库,附上最新查询日期,并删除冷库数据。
7、数据同步(每次查询进行或达到一定量级进行)
冷热数据迁移流程说明
1.全量数据如何迁移?
2.数据冷热切换如何解决重复问题?
3.冷库数据是否需做归档?若需要,如何归档。
准备:
1:上述三张表每张表需新建对应冷热表
2:两个定时任务
1:在数据超期时后的一段时间移除热表数据到冷库
2:在数据移出后的(时间?)删除热库数据
3:mybatis插件,在插件中对所有三张表的查询做结果拦截,查询热库如结果集为空,查询冷库,如冷库查询到数据移除冷库数据,并插入到热库更新热库查询时间,
上代码demo:(joor 方便反射 实测这方案性能还可以)
package xxxxx.hera.mybatisIntercept; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.imile.hera.mapper.client.OmsOrderInfoMapper; import com.imile.hera.util.SpringUtils; import org.apachemons.lang3.ObjectUtils; import org.apache.ibatis.executor.parameter.ParameterHandler; import org.apache.ibatis.executor.resultset.DefaultResultSetHandler; import org.apache.ibatis.executor.resultset.ResultSetHandler; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.plugin.Interceptor; import org.apache.ibatis.plugin.Intercepts; import org.apache.ibatis.plugin.Invocation; import org.apache.ibatis.plugin.Plugin; import org.apache.ibatis.plugin.Signature; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.joor.Reflect; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import java.sql.Statement; import java.util.*; import java.util.concurrent.TimeUnit; @Intercepts({ @Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class}) }) @Component @Lazy(value = true) public class OrderInfoPlugin implements Interceptor { public static final String mapperClassName = "OmsOrderInfoMapper"; public static final Class orderInfoMapperClass = OmsOrderInfoMapper.class; static Cache<String, Object> cache = Caffeine.newBuilder() .expireAfterWrite(5, TimeUnit.DAYS) .maximumSize(10_000) .build(); //方法拦截 @Override public Object intercept(Invocation invocation) throws Throwable { //取出查询的结果 Object resultObject = invocation.proceed(); //若热库查询为空查询冷库 if (ObjectUtils.isEmpty(resultObject)) { DefaultResultSetHandler df = (DefaultResultSetHandler) invocation.getTarget(); Reflect reflect = Reflect.on(df); MappedStatement mappedStatement = reflect.field("mappedStatement").get(); ParameterHandler parameterHandler = reflect.field("parameterHandler").get(); Map<String, String> pathMap = filterMethodById(mappedStatement.getId()); Map<Object, Object> map = (Map<Object, Object>) parameterHandler.getParameterObject(); List<Object> list = new ArrayList<>(); //查询参数组装 if (!map.isEmpty()) { Map<Object, Object> treeMap = new TreeMap<Object, Object>( new Comparator<Object>() { public int compare(Object obj1, Object obj2) { return obj1.toString()pareTo(obj2.toString()); } }); Iterator<Map.Entry<Object, Object>> it = map.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Object, Object> entry = it.next(); if (entry.getKey().toString().contains("param")) { treeMap.put(entry.getKey(), entry.getValue()); } } Iterator<Map.Entry<Object, Object>> its = treeMap.entrySet().iterator(); while (its.hasNext()) { Map.Entry<Object, Object> entry = its.next(); list.add(entry.getValue()); } } Object[] params = list.toArray(new Object[0]); if (mapperClassName.equalsIgnoreCase(pathMap.get("mapper"))) { String key = "orderInfoMapperClassProxy"; GetProxy getProxy = (GetProxy) cache.get(key, k -> getInstance(orderInfoMapperClass)); String methodPath = pathMap.get("method"); resultObject = Reflect.on(getProxy.getProxy()).call(methodPath, params).get(); // todo 冷库查询到数据迁移到热库并删除冷库数据 //测插入删除同理 // OmsOrderInfoDO omsOrderInfoDO = new OmsOrderInfoDO(); // omsOrderInfoDO.setId(IdWorkerUtil.getId()); // omsOrderInfoDO.setRecordVersion("2"); // omsOrderInfoDO.setClientOrgId(101L); // SpringUtils.getBean("omsOrderInfoMapper", OmsOrderInfoMapper.class).insert(omsOrderInfoDO); // SpringUtils.getBean("omsOrderInfoMapper", OmsOrderInfoMapper.class).deleteById(12121); } } return resultObject; } private GetProxy getInstance(Class orderInfoMapperClass) { SqlSession sqlSession = SpringUtils.getBean("sqlSessionFactory", SqlSessionFactory.class).openSession(); return GetProxy.getInstance(orderInfoMapperClass, sqlSession); } /** * 根据获取到执行 id 找到对应的方法,只在 searchByQuery 方法上执行拦截 * * @param id 根据 MappedStatement 获取到的 id 属性 * @return 是否是 searchByQuery 方法 */ private Map filterMethodById(String id) { Map<String, String> resultObject = new HashMap<>(); String[] names = id.split("\\."); resultObject.put("mapper", names[names.length - 2]); resultObject.put("method", names[names.length - 1]); return resultObject; } //获取到拦截的对象,底层也是通过代理实现的,实际上是拿到一个目标代理对象 @Override public Object plugin(Object target) { return Plugin.wrap(target, this); } //获取设置的阈值等参数 @Override public void setProperties(Properties properties) { } }
更多推荐
冷热分离的demo(方案被收拢入口后的sharding分表代替)
发布评论