V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
luxinfl
V2EX  ›  程序员

发现了个很奇怪的现象,关于 parallelStream 的

  •  
  •   luxinfl · 2021-10-28 21:38:38 +08:00 · 2004 次点击
    这是一个创建于 1124 天前的主题,其中的信息可能已经有所发展或是发生改变。

    这个接口的处理类似这样

    List<Data> dataList = xxx.paralleStream().map(e->dealData(e)).collect(Collector.toList());
    

    其中有的 dealData 会用 http 调用外部接口。我在每个 dealData 都打印了时间,基本都在 50ms 左右徘徊。按理说这个方法的处理时间应该是耗时最长的那个,但是最终结果却差的很远。这个接口除了并发流基本就没什么操作了。服务器间的时延可以忽略不计。好神奇

    第 1 条附言  ·  2021-10-28 23:51:35 +08:00
    public static void test1() throws ExecutionException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool(4);
            for( int i=0;i<1000;i++) {
                int index = i;
                long time = System.currentTimeMillis();
                List<String> list = forkJoinPool.submit(() -> Stream.of(ary).parallel().map(s -> getString(index)).collect(Collectors.toList())).get();
                System.out.println("cost time :" + (System.currentTimeMillis() - time));
    
            }   
        }
        
        public static void test2(){
            for( int i=0;i<1000;i++) {
                int index = i;
                long time = System.currentTimeMillis();
                List<String> list = Stream.of(ary).parallel().map(s -> getString(index)).collect(Collectors.toList());
                System.out.println("cost time :" + (System.currentTimeMillis() - time));
    
            }
        }
    
    
        public static String getString(int index) {
            int i = new Random().nextInt(50);
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ":" +index + "cost time " + i);
            return "";
        }
    

    试着跑了一下,test1自定义线程池之后,会一直用到池里面的线程。 test2用的共享池,竟然还会用主线程main在跑。所以时间差距是不是在这块。

    第 2 条附言  ·  2021-10-29 00:38:46 +08:00
    public static void test1() throws ExecutionException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool(30);
            long time = System.currentTimeMillis();
            List<String> list = forkJoinPool.submit(() -> Stream.of(ary).parallel().map(s -> getString()).collect(Collectors.toList())).get();
            System.out.println("cost time :" + (System.currentTimeMillis() - time));
    
        }
    
    public static String getString() {
            int i = new Random().nextInt(50);
            try {
                Thread.sleep(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "";
        }
    

    我用jmeter20并发压测发现,这个性能也并不是很好。要比50ms多两倍多

    13 条回复    2021-11-01 22:27:55 +08:00
    Jooooooooo
        1
    Jooooooooo  
       2021-10-28 22:21:39 +08:00
    相差很远啥意思, 怎么统计的耗时?
    aureole999
        2
    aureole999  
       2021-10-28 22:23:18 +08:00
    这个只是告诉 java 你这个流可以并行执行,不代表一定要并行执行,更不代表所有的子项全都同时并行,或者说根本不会所有子项都同时执行。具体好像是通过 ForkJoinPool 线程池取得线程执行,默认线程数不超过你的 cpu 的个数。你在每个 dealData 打出当前线程看就知道了。
    luxinfl
        3
    luxinfl  
    OP
       2021-10-28 23:20:30 +08:00
    @Jooooooooo 意思是就好像没有并行一样
    @aureole999 还有这种说法么
    Jooooooooo
        4
    Jooooooooo  
       2021-10-28 23:29:25 +08:00
    @luxinfl 很有可能是 2 楼说的问题
    546L5LiK6ZOt
        5
    546L5LiK6ZOt  
       2021-10-28 23:41:49 +08:00 via iPhone
    我也遇到过 2 楼说的问题,一般都是自定义一个 forkjoinpool
    dqzcwxb
        6
    dqzcwxb  
       2021-10-29 00:09:50 +08:00
    Fork/Join 的策略就是会用主线程跑看源码就知道了
    luxinfl
        7
    luxinfl  
    OP
       2021-10-29 00:36:51 +08:00
    @dqzcwxb 这个以前用得少,没注意过。。而且我发现用 jmeter 压测,ForkJoinPool 的性能并不是太好的样子。。和线程数有很大关系
    dqzcwxb
        8
    dqzcwxb  
       2021-10-29 00:46:34 +08:00   ❤️ 1
    @luxinfl #7 parallelStream 默认是 cpu 核心数的线程数,而且只推荐 cpu 密集型运算时使用
    io 密集型请使用 Completablefuture+Fork/Join 线程池
    ForkJoinPool 因为有工作窃取机制性能比其他线程池要高得多,多测试即可知道
    dqzcwxb
        9
    dqzcwxb  
       2021-10-29 00:46:54 +08:00
    luxinfl
        10
    luxinfl  
    OP
       2021-10-29 08:47:39 +08:00 via Android
    @dqzcwxb 主要是有多个 io 调用,而且 parallel 并发调用写起来方便。。没太搞明白 CompletableFuture 怎么写才能并发调用。要写多个 supplyAsync 调用麽
    ZeroDu
        11
    ZeroDu  
       2021-10-29 10:11:58 +08:00
    确实如楼上所说的,forkjoinpool 线程数默认是 cpu 核心数,而且这个只适合 CPU 密集型任务。贴主这种情况 io 密集型得用 CompletableFuture 但是这个写起来当然么有 parallelStream 丝滑
    dqzcwxb
        12
    dqzcwxb  
       2021-10-29 10:13:44 +08:00
    @luxinfl #10 不要再使用 parallelStream 写 io 操作因为它使用的是 forkjoinpool 是全局共用的,如果出现 io 阻塞会直接影响到其他使用 parallelStream 的代码
    CompletableFuture 值得你花时间学习
    luxinfl
        13
    luxinfl  
    OP
       2021-11-01 22:27:55 +08:00
    @ZeroDu 我现在新接口用的 CompletableFutrue ,自定义线程城市,用了 join 方法,但是发现还是没啥效果
    @dqzcwxb 我有自定义线程池
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2768 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 39ms · UTC 15:14 · PVG 23:14 · LAX 07:14 · JFK 10:14
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.