Solo  当前访客:1 开始使用

多线程使用(工具类)


一、Fork-Join
大规模问题,分解为n个小规模的问题,条件:(1、子问题的相互独立,2、子问题和原问题形式相同),递归的解决小问题,小问题的解合并得到N的解,RecursiveTask表示我们及计算任务有返回结果,RecursiveAction没有返回结果。

public class TestForkJoin{ private static class GetDate extends RecursiveTask<Integer> { int k=0; public GetDate(int k) { this.k = k; } @Override protected Integer compute() { int i=0; List<GetDate> subTasks = new ArrayList<>(); if(k>1){ for(int j=0;j<3;j++){ if(i<2){ i+=1; }else{ subTasks.add(new GetDate(1)); } } }else{ i+=2; } if (!subTasks.isEmpty()) { // 在当前的 ForkJoinPool 上调度所有的子任务。 for (GetDate subTask : invokeAll(subTasks)) { i = i+subTask.join(); } System.out.println("i:"+i); } return i; } } public static void main(String[] args) { ForkJoinPool pool =new ForkJoinPool(); GetDate task=new GetDate(2); pool.invoke(task); System.out.println("测试开始"); System.out.println("result:"+task.join()); System.out.println("测试结束"); } }

二、CountDownLatch
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。CountDownLatch是通过一个计数器来实现的,计数器的初始值为初始任务的数量。每当完成了一个任务后,计数器的值就会减1,CountDownLatch.countDown()方法。当计数器值到达0时,它表示所有的已经完成了任务,然后在闭锁上等待CountDownLatch.await()方法的线程就可以恢复执行任务。

public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); private static class InitThread implements Runnable{ @Override public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown(); for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } private static class BusiThread implements Runnable{ @Override public void run() { try { latch.await(); } catch (InterruptedException e) { } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown(); System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown(); } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); } }

三、 CyclicBarrier
它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5, new CollectThread()); private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();//存放子线程工作结果的容器 public static void main(String[] args) { for(int i=0;i<=4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId(); resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random(); try { if(r.nextBoolean()) { Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do something "); } barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) { e.printStackTrace(); } } } }

四、 Exchanger
是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。

public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>(); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { Set<String> setA = new HashSet<String>();//存放数据的容器 try { /*添加数据 * set.add(.....) * */ setA.add("a"); setA = exchange.exchange(setA);//交换set System.out.println("setA:"+setA); /*处理交换后的数据*/ } catch (InterruptedException e) { } } }).start(); new Thread(new Runnable() { @Override public void run() { Set<String> setB = new HashSet<String>();//存放数据的容器 try { /*添加数据 * set.add(.....) * set.add(.....) * */ setB.add("b"); setB = exchange.exchange(setB);//交换set System.out.println("setB:"+setB); /*处理交换后的数据*/ } catch (InterruptedException e) { } } }).start(); } }

五、Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
SqlConnectImpl

public class SqlConnectImpl implements Connection{ /*拿一个数据库连接*/ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } @Override public <T> T unwrap(Class<T> iface) throws SQLException { // TODO Auto-generated method stub return null; } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { // TODO Auto-generated method stub return false; } @Override public Statement createStatement() throws SQLException { // TODO Auto-generated method stub return null; } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { // TODO Auto-generated method stub return null; } @Override public CallableStatement prepareCall(String sql) throws SQLException { // TODO Auto-generated method stub return null; } @Override public String nativeSQL(String sql) throws SQLException { // TODO Auto-generated method stub return null; } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { // TODO Auto-generated method stub } @Override public boolean getAutoCommit() throws SQLException { // TODO Auto-generated method stub return false; } @Override public void commit() throws SQLException { // TODO Auto-generated method stub } @Override public void rollback() throws SQLException { // TODO Auto-generated method stub } @Override public void close() throws SQLException { // TODO Auto-generated method stub } @Override public boolean isClosed() throws SQLException { // TODO Auto-generated method stub return false; } @Override public DatabaseMetaData getMetaData() throws SQLException { // TODO Auto-generated method stub return null; } @Override public void setReadOnly(boolean readOnly) throws SQLException { // TODO Auto-generated method stub } @Override public boolean isReadOnly() throws SQLException { // TODO Auto-generated method stub return false; } @Override public void setCatalog(String catalog) throws SQLException { // TODO Auto-generated method stub } @Override public String getCatalog() throws SQLException { // TODO Auto-generated method stub return null; } @Override public void setTransactionIsolation(int level) throws SQLException { // TODO Auto-generated method stub } @Override public int getTransactionIsolation() throws SQLException { // TODO Auto-generated method stub return 0; } @Override public SQLWarning getWarnings() throws SQLException { // TODO Auto-generated method stub return null; } @Override public void clearWarnings() throws SQLException { // TODO Auto-generated method stub } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { // TODO Auto-generated method stub return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { // TODO Auto-generated method stub return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { // TODO Auto-generated method stub return null; } @Override public Map<String, Class<?>> getTypeMap() throws SQLException { // TODO Auto-generated method stub return null; } @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException { // TODO Auto-generated method stub } @Override public void setHoldability(int holdability) throws SQLException { // TODO Auto-generated method stub } @Override public int getHoldability() throws SQLException { // TODO Auto-generated method stub return 0; } @Override public Savepoint setSavepoint() throws SQLException { // TODO Auto-generated method stub return null; } @Override public Savepoint setSavepoint(String name) throws SQLException { // TODO Auto-generated method stub return null; } @Override public void rollback(Savepoint savepoint) throws SQLException { // TODO Auto-generated method stub } @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { // TODO Auto-generated method stub } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { // TODO Auto-generated method stub return null; } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { // TODO Auto-generated method stub return null; } @Override public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { // TODO Auto-generated method stub return null; } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { // TODO Auto-generated method stub return null; } @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { // TODO Auto-generated method stub return null; } @Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { // TODO Auto-generated method stub return null; } @Override public Clob createClob() throws SQLException { // TODO Auto-generated method stub return null; } @Override public Blob createBlob() throws SQLException { // TODO Auto-generated method stub return null; } @Override public NClob createNClob() throws SQLException { // TODO Auto-generated method stub return null; } @Override public SQLXML createSQLXML() throws SQLException { // TODO Auto-generated method stub return null; } @Override public boolean isValid(int timeout) throws SQLException { // TODO Auto-generated method stub return false; } @Override public void setClientInfo(String name, String value) throws SQLClientInfoException { // TODO Auto-generated method stub } @Override public void setClientInfo(Properties properties) throws SQLClientInfoException { // TODO Auto-generated method stub } @Override public String getClientInfo(String name) throws SQLException { // TODO Auto-generated method stub return null; } @Override public Properties getClientInfo() throws SQLException { // TODO Auto-generated method stub return null; } @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { // TODO Auto-generated method stub return null; } @Override public Struct createStruct(String typeName, Object[] attributes) throws SQLException { // TODO Auto-generated method stub return null; } @Override public void setSchema(String schema) throws SQLException { // TODO Auto-generated method stub } @Override public String getSchema() throws SQLException { // TODO Auto-generated method stub return null; } @Override public void abort(Executor executor) throws SQLException { // TODO Auto-generated method stub } @Override public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { // TODO Auto-generated method stub } @Override public int getNetworkTimeout() throws SQLException { // TODO Auto-generated method stub return 0; } }

DBPoolSemaphore:

public class DBPoolSemaphore { private final static int POOL_SIZE = 10; private final Semaphore useful,useless;//两个指示器,分别表示池子还有可用连接和已用连接 //存放数据库连接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } public DBPoolSemaphore() { this.useful = new Semaphore(10); this.useless = new Semaphore(0); } /*归还连接*/ public void returnConnect(Connection connection) throws InterruptedException { if(connection!=null) { System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!" +"可用连接数:"+useful.availablePermits()); useless.acquire(); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /*从池子拿连接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection connection; synchronized (pool) { connection = pool.removeFirst(); } useless.release(); return connection; } }

AppTest:

public class AppTest { private static DBPoolSemaphore dbPool = new DBPoolSemaphore(); private static class BusiThread extends Thread{ @Override public void run() { Random r = new Random();//让每个线程持有连接的时间不一样 long start = System.currentTimeMillis(); try { Connection connect = dbPool.takeConnect(); System.out.println("Thread_"+Thread.currentThread().getId() +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms."); SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据 System.out.println("查询数据完成,归还连接!"); dbPool.returnConnect(connect); } catch (InterruptedException e) { } } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { Thread thread = new BusiThread(); thread.start(); } } }