当前位置:首页 > 数据库 > 正文内容

Mysql大表数据归档

2024-11-29数据库2

前言

在生产实践中,你的mysql数据库可能面临下面这些情况:

  • 不可抗力的因素,数据库所在服务器被回收,或者服务器磁盘损坏,数据库必须得迁移?

  • 单点数据库读写压力越来越大,需要扩展一个或多个节点分摊读写压力?

  • 单表数据量太大了,需要进行水平或垂直拆分怎么搞?

  • 数据库需要从mysql迁移到其他数据库,比如PG,OB…

以上的这些场景,对于不少同学来讲,或多或少的在所处的业务中可能会涉及到,没有碰到还好,一旦发生了这样的问题,该如何处理呢?在这里我通过提供一个思路来解决单表数据量太大了,进行水平拆分,将历史数据归档保证热点数据查询

归档流程示意图

实现步骤

controller 层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@RestController
@RequestMapping("/backDoor")
public class CleanHistoryDataController {
 
    @Autowired
    private ICleanHistoryDataService cleanHistoryDataService;
 
    /**
     * 把指定过期时间的订单表数据迁移到历史表中
     */
    @PostMapping("/cleanByTableNameAndEndTime")
    public Resp<String> cleanByTableNameAndEndTime(@RequestBody CleanTableReq cleanTableReq) {
        try {
            CleanTableBo cleanTableBo = ObjectUtils.mapValue(cleanTableReq, CleanTableBo.class);
            cleanHistoryDataService.cleanByTableNameAndEndTime(cleanTableBo);
        } catch (Exception e) {
            log.error(cleanTableReq.getTableName() + " 数据迁移异常", e);
        }
        return Resp.success("success");
    }
}

Service 层

1
2
3
4
public interface ICleanHistoryDataService {
 
    void cleanHistoryTableData();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.alibaba.fastjson.JSONObject;
import com.photon.union.risk.clean.service.ICleanHistoryDataService;
import com.photon.union.risk.repo.mapper.clean.MasterDbMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
 
import java.util.List;
import java.util.stream.Collectors;
 
 
/**
 * @author robin
 */
@Service
@Slf4j
public class CleanHistoryDataService implements ICleanHistoryDataService {
    @Autowired
    private MasterDbMapper masterDbMapper;
 
    @Override
    public void cleanHistoryTableData() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        int logInt = 0;
        Long startId = 0L;
        while (true) {
            logInt ++;
            List<JSONObject> hashMapList = masterDbMapper.selectHistoryTableDataIds(startId);
            if (CollectionUtils.isEmpty(hashMapList)){
                break;
            }
            List<Long> allIds = hashMapList.stream().map(o -> o.getLong("id")).collect(Collectors.toList());
            startId = allIds.get(allIds.size()-1);
            if (logInt % 100 == 0 ){
                log.info("id 已经处理到-->" + allIds.get(allIds.size()-1));
            }
            try {
                // 往归档历史数据表写入数据
                masterDbMapper.insertOldHistoryTableDataBatchByIds(allIds);
                // 把归档的数据从目前业务表中删除
                masterDbMapper.deleteHistoryTableDataBatchByIds(allIds);
            }catch (Exception e){
                log.error("数据迁移异常,ids:{}", allIds, e);
            }
        }
        stopWatch.stop();
        log.info("数据迁移到历史表处理完成时间:{}s", (long)stopWatch.getTotalTimeSeconds());
    }
}

Mapper 层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import com.alibaba.fastjson.JSONObject;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
 
import java.util.List;
 
/**
 * @author robin
 */
@Repository
public interface MasterDbMapper {
 
    List<JSONObject> selectHistoryTableDataIds( @Param("id") Long startId);
 
    void insertOldHistoryTableDataBatchByIds(@Param("ids") List<Long> allIds);
 
    void deleteHistoryTableDataBatchByIds(@Param("ids") List<Long> allIds);
}

Sql Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.MasterDbMapper">
 
    <insert id="insertOldHistoryTableDataBatchByIds">
        INSERT IGNORE INTO t_order_old
        SELECT NULL,order_no,created_at
        FROM t_order WHERE id in
        <foreach collection="ids" item="item" open="(" close=")" separator=",">
            #{item}
        </foreach>
    </insert>
 
    <delete id="deleteHistoryTableDataBatchByIds">
        DELETE FROM t_order WHERE id IN
        <foreach collection="ids" item="item" open="(" close=")" separator=",">
            #{item}
        </foreach>
    </delete>
 
    <select id="selectHistoryTableDataIds" resultType="com.alibaba.fastjson.JSONObject">
        SELECT id FROM t_order
        WHERE id > #{id} and created_at lt;= DATE_SUB(NOW(), INTERVAL 6 MONTH)
        ORDER BY id limit 1000
    </select>
 
</mapper>

核心说明

  • 根据 t_order订单表结构创建 t_order_old历史订单表用于历史数据备份存放。

  • 整个流程基于主键 id 处理,避免慢 sql 产生,做到不影响当前线上业务处理。

  • 1000 条记录一个批次,避免长期抢占锁资源,同时每个批次执行不影响下个批次处理,出现异常后,打印 error 日志再人工跟进处理。