多线程使用(线程池和队列)
一、线程池
线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
1、ThreadPoolExecutor
public class UseThreadPool {
static class Worker implements Runnable
{
private String taskName;
private Random r = new Random();
public Worker(String taskName){
this.taskName = taskName;
}
public String getName() {
return taskName;
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName()
+" process the task : " + taskName);
//SleepTools.ms(r.nextInt(100)*5);
}
}
public static void main(String[] args)
{
ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 10; i++)
{
Worker worker = new Worker("worker " + i);
System.out.println("A new task has been added : " + worker.getName());
threadPool.execute(worker);
}
threadPool.shutdown();
}
}
2、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务,可以通过构造函数来指定后台线程的个数。
示例:
工作类
public class ScheduleWorker implements Runnable{
public final static int Normal = 0;//普通任务类型
public final static int HasException = -1;//会抛出异常的任务类型
public final static int ProcessException = 1;//抛出异常但会捕捉的任务类型
public static SimpleDateFormat formater = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
private int taskType;
public ScheduleWorker(int taskType) {
this.taskType = taskType;
}
@Override
public void run() {
if(taskType==HasException) {
System.out.println(formater.format(new Date())
+" Exception be made,will next task run?");
throw new RuntimeException("ExceptionHappen");
}else if(taskType==ProcessException) {
try {
System.out.println("ProcessException ..."
+formater.format(new Date()));
throw new RuntimeException("ProcessException");
} catch (RuntimeException e) {
System.out.println("ProcessException catched,,will next task run?");
}
}else {
System.out.println("Normal..."+formater.format(new Date()));
}
}
}
测试:
public class ScheduledCase {
public static void main(String[] args) {
ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
//固定时间间隔执行的任务,第一次任务在1000ms后执行,第二次任务在1000+3000 ms后执行,第三次在
//1000+3000*2 ms后执行
schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.Normal),
1000, 3000, TimeUnit.MILLISECONDS);
// 固定时间间隔执行的任务,开始执行后就触发异常,next周期将不会运行
schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.HasException),
1000, 3000, TimeUnit.MILLISECONDS);
// 固定时间间隔执行的任务,虽然抛出了异常,但被捕捉了,next周期继续运行
schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.ProcessException),
1000, 3000, TimeUnit.MILLISECONDS);
}
}
二、队列
1、延时队列
实体类
public class Order {
private final String orderNo;
private final double orderMoney;
public Order(String orderNo, double orderMoney) {
super();
this.orderNo = orderNo;
this.orderMoney = orderMoney;
}
public String getOrderNo() {
return orderNo;
}
public double getOrderMoney() {
return orderMoney;
}
}
包装类
public class ItemVo<T> implements Delayed{
private long activeTime;//到期时间,单位毫秒
private T data;//业务数据,泛型
public ItemVo(long activeTime, T data) {
super();
this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,
TimeUnit.MILLISECONDS) + System.nanoTime();;
this.data = data;
}
public long getActiveTime() {
return activeTime;
}
public T getData() {
return data;
}
/*
* 这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
*/
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.activeTime - System.nanoTime(),
TimeUnit.NANOSECONDS);
return d;
}
/*
*Delayed接口继承了Comparable接口,按剩余时间排序
*/
@Override
public int compareTo(Delayed o) {
long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
取出队列数据
public class FetchOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public FetchOrder(DelayQueue<ItemVo<Order>> queue){
this.queue = queue;
}
@Override
public void run() {
while(true){
try {
ItemVo<Order> item = queue.take();
Order order = (Order)item.getData();
System.out.println("GetFromQueue:"
+" data = "+order.getOrderNo()+"-"+order.getOrderMoney());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
数据放入队列
public class PutOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public PutOrder(DelayQueue<ItemVo<Order>> queue){
this.queue = queue;
}
@Override
public void run() {
//5秒后超时
Order orderTb = new Order("TB123456789",300);
ItemVo<Order> itemTb = new ItemVo<Order>(5000,orderTb);
queue.offer(itemTb);
System.out.println("PutOrder5秒后超时:"+orderTb.getOrderNo()+":"
+orderTb.getOrderMoney());
//3秒后超时
Order orderJd = new Order("JD987654321",289);
ItemVo<Order> itemJd = new ItemVo<Order>(3000,orderJd);
queue.offer(itemJd);
System.out.println("PutOrder3秒后超时:"+orderJd.getOrderNo()+":"+orderJd.getOrderMoney());
}
}
测试:
public class PutOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public PutOrder(DelayQueue<ItemVo<Order>> queue){
this.queue = queue;
}
@Override
public void run() {
//5秒后超时
Order orderTb = new Order("TB123456789",300);
ItemVo<Order> itemTb = new ItemVo<Order>(5000,orderTb);
queue.offer(itemTb);
System.out.println("PutOrder5秒后超时:"+orderTb.getOrderNo()+":"
+orderTb.getOrderMoney());
//3秒后超时
Order orderJd = new Order("JD987654321",289);
ItemVo<Order> itemJd = new ItemVo<Order>(3000,orderJd);
queue.offer(itemJd);
System.out.println("PutOrder3秒后超时:"+orderJd.getOrderNo()+":"+orderJd.getOrderMoney());
}
}
三、多线程框架