队列批量"/>
java队列批量
往数据库里插入大量的数据,当然是批量插入最高效,我们设定一个题目,每次把数据放入队列,当数据大于1000条或者时间大于5分钟后把数据批量入库
队列处理代码:
package bathQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
/**
*
Title: BatchQueue.java
*
Description:
*
Copyright: Copyright (c) 2014
* @author 雪含心
* @date 2014年3月1日
*/
public class BatchQueue {
// 默认间隔处理队列时间
private static int DEFAULT_TIME = 5000;
// 默认队列处理长度
private static int DEFAULT_COUNT = 2000;
// 设置队列处理时间
private long handleTime;
// 设置队列处理长度
private int handleLength;
// 阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(20000);
// 回调接口
private QueueProcess process;
// 用来存放从队列拿出的数据
private List dataList;
// 往队列添加数据
public void add(T t){
queue.add(t);
}
// 清理生成的list
public void clearList(){
dataList = null;
dataList = new ArrayList();
}
/**
* 最原始的构造方法,使用这个构造方法设置默认的队列处理时间和数量
* @param process
*/
public BatchQueue(QueueProcess process){
this(DEFAULT_TIME, DEFAULT_COUNT, process);
}
/**
* 可以设置队列的处理时间和处理长度
* @param handleTime
* @param handleQueueLength
* @param process
*/
public BatchQueue(int handleTime, int handleQueueLength, QueueProcess process){
this.process = process;
this.handleTime = handleTime;
this.handleLength = handleQueueLength;
start();
}
private void start(){
dataList = new ArrayList(handleLength);
DataListener listener = new DataListener();
new Thread(listener).start();
}
// 队列监听,当队列达到一定数量和时间后处理队列
class DataListener implements Runnable{
@Override
public void run() {
long startTime = System.currentTimeMillis();
T t = null;
while(true){
try {
// 从队列拿出队列头部的元素,如果没有就阻塞
t = queue.take();
if(null != t){
dataList.add(t);
}
if(dataList.size() >= DEFAULT_COUNT){
startTime = callBack(dataList);
continue;
}
long currentTime = System.currentTimeMillis();
System.out.println("currentTime - startTime" + (currentTime - startTime) + "handleTime==>" + handleTime);
if(currentTime - startTime > handleTime){
startTime = callBack(dataList);
continue;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private long callBack(List dataList) {
// 处理队列
try{
System.out.println(dataList);
process.processData(dataList);
}catch(Exception e){
e.printStackTrace();
}finally{
// 清理掉dataList中的元素
clearList();
}
return System.currentTimeMillis();
}
}
}
/**
* add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
offer 添加一个元素并返回true 如果队列已满,则返回false
poll 移除并返问队列头部的元素 如果队列为空,则返回null
peek 返回队列头部的元素 如果队列为空,则返回null
put 添加一个元素 如果队列满,则阻塞
take 移除并返回队列头部的元素 如果队列为空,则阻塞
*/
数据处理抽象类
package com.zh.utils;
import java.util.List;
/**
* 批量数据回调接口
* @author zhanghua
*
* @param
*/
public interface BatchQueueCallback {
/**
* 用于接收批量数据
* @param list 批量数据
*/
public abstract void batch(List list);
}
数据库处理示例
package bathQueue;
import java.util.List;
/**
*
Title: DataInsert.java
*
Description:
*
Copyright: Copyright (c) 2014
* @author 雪含心
* @param
* @date 2014年3月1日
*/
public class DataInsert extends QueueProcess {
@Override
public void processData(List list) {
}
public static void main(String[] args) throws Exception{
DataInsert back = new DataInsert<>();
BatchQueue queue = new BatchQueue(back);
for(int i = 0; i < 20000; i ++){
queue.add("a" +i);
Thread.sleep(2000);
}
}
}
更多推荐
java队列批量
发布评论