搜索

带处理结果的线程池Callable Future


发布时间: 2022-11-24 21:06:00    浏览次数:84 次
package mine;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;

import org.apache.tools.zip.ZipEntry;
import org.apache.tools.zip.ZipFile;

import com.google.common.util.concurrent.ThreadFactoryBuilder;


public class MutilThread {
    
    static Map<String,Map<String,Integer>> progressMap = new HashMap<String,Map<String,Integer>>();//进度
    
    class UploadCall implements Callable<Map<String, String>> {
        String zeName;
        byte[] byteArray;
        protected UploadCall() {}
        public UploadCall(String zeName, byte[] byteArray) {
            this.zeName = zeName;
            this.byteArray = byteArray;
        }
        @Override
        public Map<String, String> call() {
            Map<String, String> result = new HashMap<String, String>();
            result.put("msg", "true");
            //DODO 具体上传文件业务,捕获所有异常
            return result;
        }
    }
    
    public void test() throws IOException {
        List<Future<Map<String, String>>> futureList = new ArrayList<Future<Map<String, String>>>();
        File unZipFile = new File("/home/pi/xxx.zip");
        ZipFile zfile = new ZipFile(unZipFile, "GBK");
        Enumeration<?> zList = zfile.getEntries();
        ZipEntry ze = null;
        String timestamp =  String.valueOf(System.currentTimeMillis());
        int finishCount = 0;
        List<String> errorfileList = new ArrayList<String>();
        List<String> successfileList = new ArrayList<String>();
        long _start = System.currentTimeMillis();
        int count = 0;//用来记录已经完成数量
        int cpuSize = Runtime.getRuntime().availableProcessors();//处理器内核数量,用于线程池线程
        int maxSize = cpuSize + 1;
        ThreadPoolExecutor  executor = new ThreadPoolExecutor(cpuSize, maxSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(maxSize), 
                new ThreadFactoryBuilder().setNameFormat("upload_executor_%d").build(),new AbortPolicy());
        while (zList.hasMoreElements()) {
            ze = (ZipEntry) zList.nextElement();
            ByteArrayOutputStream os = new ByteArrayOutputStream();
            BufferedInputStream is = null;
            byte[] byteArray = new byte[] {};
            try {
                is = new BufferedInputStream(zfile.getInputStream(ze));
                int readLen = 0;
                byte[] buf = new byte[1024];
                while ((readLen = is.read(buf, 0, 1024)) != -1) {
                    os.write(buf, 0, readLen);
                }
                byteArray = os.toByteArray();
                os.flush();
            } finally{
                if(is != null) {
                    is.close();
                }
                if(os != null) {
                    os.close();
                }
            }
            UploadCall uploadCall = new UploadCall(ze.getName(),byteArray);
            Future<Map<String, String>> future = executor.submit(uploadCall);//提交任务到线程池
            futureList.add(future);//线程处理情况追踪对象放到集合,以便后续追踪
            count++;
            while(executor.getActiveCount() == executor.getMaximumPoolSize()) {//为了避免触发线程池拒绝策略,等待队列满了时让主线程睡眠
                System.out.println( " watting, submit " + count);
                Iterator<Future<Map<String, String>>> iterator = futureList.iterator();
                while(iterator.hasNext()) {
                    Future<Map<String, String>> _future = iterator.next();
                    if(_future.isDone()) {
                        //等待队列满了时顺便计算下已完成任务
                        try {
                            Map<String, String> result = _future.get();
                            Object msg = result.get("msg");
                            if("true".equals(msg)) {
                                successfileList.add("成功"); 
                            }else if("false".equals(msg)) {
                                errorfileList.add("失败"); 
                            }else{
                                errorfileList.add("失败"); 
                            }
                            finishCount++;
                            progressMap.get(timestamp).put("finishCount", finishCount);
                            iterator.remove();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                    }
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        while(futureList.size() > 0) {
            //任务提交线程池完成,但线程池未做完任务时
            Iterator<Future<Map<String, String>>> iterator = futureList.iterator();
            while(iterator.hasNext()) {
                Future<Map<String, String>> future = iterator.next();
                if(future.isDone()) {
                    try {
                        Map<String, String> result = future.get();
                        Object msg = result.get("msg");
                        String fileName = (String) result.get("fileName");
                        if("true".equals(msg)) {
                            successfileList.add("成功"); 
                        }else if("false".equals(msg)) {
                            errorfileList.add("失败"); 
                        }else{
                            errorfileList.add("失败");
                        }
                        finishCount++;
                        progressMap.get(timestamp).put("finishCount", finishCount);
                        iterator.remove();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long _end = System.currentTimeMillis();
        System.out.println( " MutilThread.test cost " + (_end - _start));
    }

}

 

免责声明 带处理结果的线程池Callable Future,资源类别:文本, 浏览次数:84 次, 文件大小:-- , 由本站蜘蛛搜索收录2022-11-24 09:06:00。此页面由程序自动采集,只作交流和学习使用,本站不储存任何资源文件,如有侵权内容请联系我们举报删除, 感谢您对本站的支持。 原文链接:https://www.cnblogs.com/oioele/p/16916122.html