Solo  当前访客:0 开始使用

Nick1407 的个人博客

公众号:JavaCase

多线程使用(线程池和队列)


一、线程池
 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
1、ThreadPoolExecutor

public class UseThreadPool {
    static class Worker implements Runnable
    {
        private String taskName;
        private Random r = new Random();

        public Worker(String taskName){
            this.taskName = taskName;
        }

        public String getName() {
            return taskName;
        }

        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName()
                    +" process the task : " + taskName);
            //SleepTools.ms(r.nextInt(100)*5);
        }
    }

    public static void main(String[] args)
    {
        ExecutorService threadPool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i = 0; i < 10; i++)
        {
            Worker worker = new Worker("worker " + i);
            System.out.println("A new task has been added : " + worker.getName());
            threadPool.execute(worker);
        }
        threadPool.shutdown();
    }
}

2、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务,可以通过构造函数来指定后台线程的个数。
示例:
工作类

public class ScheduleWorker implements Runnable{

    public final static int Normal = 0;//普通任务类型

    public final static int HasException = -1;//会抛出异常的任务类型
    public final static int ProcessException = 1;//抛出异常但会捕捉的任务类型

    public static SimpleDateFormat formater = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");
    
    private int taskType;
    public ScheduleWorker(int taskType) {
        this.taskType = taskType;
    }

    @Override
    public void run() {
    	if(taskType==HasException) {
            System.out.println(formater.format(new Date())
                    +" Exception be made,will next task run?");
            throw new RuntimeException("ExceptionHappen");
    	}else if(taskType==ProcessException) {
            try {
                System.out.println("ProcessException ..."
                		+formater.format(new Date()));
                throw new RuntimeException("ProcessException");
            } catch (RuntimeException e) {
                System.out.println("ProcessException catched,,will next task run?");
            }  		
    	}else {
    		System.out.println("Normal..."+formater.format(new Date()));
    	}
    }
}

测试:

public class ScheduledCase {
    public static void main(String[] args) {
    	
    	ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);

        //固定时间间隔执行的任务,第一次任务在1000ms后执行,第二次任务在1000+3000 ms后执行,第三次在
        //1000+3000*2 ms后执行
        schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.Normal),
                1000, 3000, TimeUnit.MILLISECONDS);

        // 固定时间间隔执行的任务,开始执行后就触发异常,next周期将不会运行
        schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.HasException),
                1000, 3000, TimeUnit.MILLISECONDS);

        // 固定时间间隔执行的任务,虽然抛出了异常,但被捕捉了,next周期继续运行
        schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.ProcessException),
                1000, 3000, TimeUnit.MILLISECONDS);

    }
}

二、队列
1、延时队列

实体类

public class Order {

	private final String orderNo;
	private final double orderMoney;
	
	public Order(String orderNo, double orderMoney) {
		super();
		this.orderNo = orderNo;
		this.orderMoney = orderMoney;
	}

	public String getOrderNo() {
		return orderNo;
	}

	public double getOrderMoney() {
		return orderMoney;
	}
}

包装类

public class ItemVo<T> implements Delayed{
    private long activeTime;//到期时间,单位毫秒
    private T data;//业务数据,泛型
    
	public ItemVo(long activeTime, T data) {
		super();
		this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,
                TimeUnit.MILLISECONDS) + System.nanoTime();;
		this.data = data;
	}

	public long getActiveTime() {
		return activeTime;
	}

	public T getData() {
		return data;
	}
	
	/*
	 * 这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
	 */
	@Override
	public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.activeTime - System.nanoTime(), 
        		TimeUnit.NANOSECONDS);
        return d;
	}

	/*
     *Delayed接口继承了Comparable接口,按剩余时间排序
	 */
	@Override
	public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
	}
}

取出队列数据

public class FetchOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;

    public FetchOrder(DelayQueue<ItemVo<Order>> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while(true){
            try {
            	ItemVo<Order> item = queue.take();
            	Order order = (Order)item.getData();
                System.out.println("GetFromQueue:"
                        +" data = "+order.getOrderNo()+"-"+order.getOrderMoney());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

数据放入队列

public class PutOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;

    public PutOrder(DelayQueue<ItemVo<Order>> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        //5秒后超时
    	Order orderTb = new Order("TB123456789",300);
    	ItemVo<Order> itemTb = new ItemVo<Order>(5000,orderTb);
        queue.offer(itemTb);
        System.out.println("PutOrder5秒后超时:"+orderTb.getOrderNo()+":"
        		+orderTb.getOrderMoney());
        //3秒后超时
        Order orderJd = new Order("JD987654321",289);
        ItemVo<Order> itemJd = new ItemVo<Order>(3000,orderJd);
        queue.offer(itemJd);
        System.out.println("PutOrder3秒后超时:"+orderJd.getOrderNo()+":"+orderJd.getOrderMoney());

    }
}

测试:

public class PutOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;

    public PutOrder(DelayQueue<ItemVo<Order>> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        //5秒后超时
    	Order orderTb = new Order("TB123456789",300);
    	ItemVo<Order> itemTb = new ItemVo<Order>(5000,orderTb);
        queue.offer(itemTb);
        System.out.println("PutOrder5秒后超时:"+orderTb.getOrderNo()+":"
        		+orderTb.getOrderMoney());
        //3秒后超时
        Order orderJd = new Order("JD987654321",289);
        ItemVo<Order> itemJd = new ItemVo<Order>(3000,orderJd);
        queue.offer(itemJd);
        System.out.println("PutOrder3秒后超时:"+orderJd.getOrderNo()+":"+orderJd.getOrderMoney());

    }
}

三、多线程框架

多线程框架.zip


, 0 0