package mine; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.TimeUnit; import org.apache.tools.zip.ZipEntry; import org.apache.tools.zip.ZipFile; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class MutilThread { static Map<String,Map<String,Integer>> progressMap = new HashMap<String,Map<String,Integer>>();//进度 class UploadCall implements Callable<Map<String, String>> { String zeName; byte[] byteArray; protected UploadCall() {} public UploadCall(String zeName, byte[] byteArray) { this.zeName = zeName; this.byteArray = byteArray; } @Override public Map<String, String> call() { Map<String, String> result = new HashMap<String, String>(); result.put("msg", "true"); //DODO 具体上传文件业务,捕获所有异常 return result; } } public void test() throws IOException { List<Future<Map<String, String>>> futureList = new ArrayList<Future<Map<String, String>>>(); File unZipFile = new File("/home/pi/xxx.zip"); ZipFile zfile = new ZipFile(unZipFile, "GBK"); Enumeration<?> zList = zfile.getEntries(); ZipEntry ze = null; String timestamp = String.valueOf(System.currentTimeMillis()); int finishCount = 0; List<String> errorfileList = new ArrayList<String>(); List<String> successfileList = new ArrayList<String>(); long _start = System.currentTimeMillis(); int count = 0;//用来记录已经完成数量 int cpuSize = Runtime.getRuntime().availableProcessors();//处理器内核数量,用于线程池线程 int maxSize = cpuSize + 1; ThreadPoolExecutor executor = new ThreadPoolExecutor(cpuSize, maxSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(maxSize), new ThreadFactoryBuilder().setNameFormat("upload_executor_%d").build(),new AbortPolicy()); while (zList.hasMoreElements()) { ze = (ZipEntry) zList.nextElement(); ByteArrayOutputStream os = new ByteArrayOutputStream(); BufferedInputStream is = null; byte[] byteArray = new byte[] {}; try { is = new BufferedInputStream(zfile.getInputStream(ze)); int readLen = 0; byte[] buf = new byte[1024]; while ((readLen = is.read(buf, 0, 1024)) != -1) { os.write(buf, 0, readLen); } byteArray = os.toByteArray(); os.flush(); } finally{ if(is != null) { is.close(); } if(os != null) { os.close(); } } UploadCall uploadCall = new UploadCall(ze.getName(),byteArray); Future<Map<String, String>> future = executor.submit(uploadCall);//提交任务到线程池 futureList.add(future);//线程处理情况追踪对象放到集合,以便后续追踪 count++; while(executor.getActiveCount() == executor.getMaximumPoolSize()) {//为了避免触发线程池拒绝策略,等待队列满了时让主线程睡眠 System.out.println( " watting, submit " + count); Iterator<Future<Map<String, String>>> iterator = futureList.iterator(); while(iterator.hasNext()) { Future<Map<String, String>> _future = iterator.next(); if(_future.isDone()) { //等待队列满了时顺便计算下已完成任务 try { Map<String, String> result = _future.get(); Object msg = result.get("msg"); if("true".equals(msg)) { successfileList.add("成功"); }else if("false".equals(msg)) { errorfileList.add("失败"); }else{ errorfileList.add("失败"); } finishCount++; progressMap.get(timestamp).put("finishCount", finishCount); iterator.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } while(futureList.size() > 0) { //任务提交线程池完成,但线程池未做完任务时 Iterator<Future<Map<String, String>>> iterator = futureList.iterator(); while(iterator.hasNext()) { Future<Map<String, String>> future = iterator.next(); if(future.isDone()) { try { Map<String, String> result = future.get(); Object msg = result.get("msg"); String fileName = (String) result.get("fileName"); if("true".equals(msg)) { successfileList.add("成功"); }else if("false".equals(msg)) { errorfileList.add("失败"); }else{ errorfileList.add("失败"); } finishCount++; progressMap.get(timestamp).put("finishCount", finishCount); iterator.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } long _end = System.currentTimeMillis(); System.out.println( " MutilThread.test cost " + (_end - _start)); } }