ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

基于生产者消费者的BatchProcessor

2022-02-07 10:03:32  阅读:165  来源: 互联网

标签:BatchProcessor bulkNum 生产者 list 基于 DEFAULT int doInsert private


一 背景

主要用于批量处理:

二 基本架构 

三 代码

package com.xuyu.batch;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 数据库批量处理器
 */
@Slf4j
public class BatchProcessor<T> {

    /**
     * 一次批量提交多少个,默认1000
     */
    private int bulkNum;

    /**
     * 最多隔多久进行一次提交,单位秒
     */
    private int flushInterval;

    /**
     * 保存数据的队列
     */
    private BlockingQueue<T> itemQueue;

    private volatile boolean closed = false;

    /**
     * 执行插入动作的执行器
     */
    private DoInsert<T> doInsert;

    private static final int DEFAULT_BULK_NUM = 1000;
    private static final int DEFAULT_CAPACITY = 1024;
    private static final int DEFAULT_FLUSH_INTERVAL = 3;

    public BatchProcessor(DoInsert<T> doInsert) {
        this(doInsert, DEFAULT_BULK_NUM, DEFAULT_FLUSH_INTERVAL, DEFAULT_CAPACITY);
    }

    public BatchProcessor(DoInsert<T> doInsert, int bulkNum) {
        this(doInsert, bulkNum, DEFAULT_FLUSH_INTERVAL, DEFAULT_CAPACITY);
    }

    public BatchProcessor(DoInsert<T> doInsert, int bulkNum, int flushInterval) {
        this(doInsert, bulkNum, flushInterval, DEFAULT_CAPACITY);
    }

    public BatchProcessor(DoInsert<T> doInsert, int bulkNum, int flushInterval, int capacity) {
        if (bulkNum < 1) {
            bulkNum = DEFAULT_BULK_NUM;
        }
        if (capacity < 1 || capacity < bulkNum) {
            capacity = Math.max(DEFAULT_CAPACITY, bulkNum * 2);
        }
        if (flushInterval < 1) {
            flushInterval = DEFAULT_FLUSH_INTERVAL;
        }

        this.bulkNum = bulkNum;
        this.doInsert = doInsert;
        this.flushInterval = flushInterval;
        itemQueue = new ArrayBlockingQueue<>(capacity);
        //开始flash任务
        this.startFlushTask();

    }

    /**
     * 阻塞添加到队列中
     *
     * @param item
     * @return 最后添加成功就返回true,添加失败就返回false
     */
    public boolean addItem(T item) {
        if (closed) {
            return false;
        }
        try {
            itemQueue.put(item);
            return true;
        } catch (InterruptedException e) {
            log.error("添加到队列时中断!item={}", e);

        }
        return false;
    }

    /**
     * 将队列中的数据全部提交到数据库中
     */
    public void flushAllItem() {
        while (!itemQueue.isEmpty()) {
            List<T> list = new ArrayList<>(bulkNum);
            itemQueue.drainTo(list, bulkNum);
            if (!list.isEmpty()) {
                flushToDB(list);
            }

        }
        log.info("flushAllItem success!");
    }

    /**
     * 关闭批量插入处理器,并提交队列中所有的数据
     */
    public void close() {
        this.closed = true;
        flushAllItem();
        log.info("DbBatchInsertProcessor 成功关闭");
    }

    /**
     * 开始flush任务
     */
    private void startFlushTask() {
        Thread t = new Thread(() -> {
            int waitSecond = 0;
            while (true) {
                if (closed) {
                    break;
                }
                if (itemQueue.size() >= bulkNum || waitSecond >= flushInterval) {
                    //队列数量大于批量提交数或等待超过指定的时间时,进行提交
                    if (!itemQueue.isEmpty()) {
                        List<T> list = new ArrayList<>(bulkNum);
                        itemQueue.drainTo(list, bulkNum);
                        if (!list.isEmpty()) {
                            flushToDB(list);
                        }
                    }
                    waitSecond = 0;
                } else {
                    //还没到批量提交点,进行等待
                    try {
                        Thread.sleep(1000);
                        waitSecond++;
                    } catch (InterruptedException e) {
                        log.error("startFlushTask 异常中断!");
                    }
                }
            }
        });
        t.setName("BatchProcessor thread" + this.hashCode());
        t.start();
    }

    private void flushToDB(List<T> list) {
        try {
            int insertRow = doInsert.batchInsert(list);
            log.info("{}表插入{}条记录", doInsert.tableName(), insertRow);
        } catch (Throwable e) {
            log.error("{}表批量插入时发生异常,list={}", doInsert.tableName(), e);
        }

    }

    /**
     * 执行真正批量插入的接口
     *
     * @param <T>
     */
    interface DoInsert<T> {
        int batchInsert(List<T> list);
        String tableName();
    }

}

标签:BatchProcessor,bulkNum,生产者,list,基于,DEFAULT,int,doInsert,private
来源: https://blog.csdn.net/weixin_40637783/article/details/122804620

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有