ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java多线程之Phaser使用说明

2021-12-23 14:02:28  阅读:149  来源: 互联网

标签:Phaser Java arriveAndAwaitAdvance Assert phaser 线程 executor println 多线程


package com.abc.test

import org.junit.Assert
import org.junit.Test
import java.util.concurrent.*

/**

 */
class PhaserTest {

    /**
     * Phaser 除具有CyclicBarrier的所有功能外,还多加了以下功能:
     * 1 允许参与者到达一个目标点之后退出.
     * 2 允许线程只在某一轮等待.
     * 3 动态的增加参与者数量.
     *
     * 以下指定Phaser的参与者数量是3.
     * 运作之后可以得出结论:
     * 只有打印了“3: before arriveAndAwaitAdvance”  之后 ,
     * 线程1 和线程2 的  phaser.arriveAndAwaitAdvance() 之后的代码才执行.
     * 可以尝试修改线程3的sleep时间来看效果.
     */
    @Test
    fun tesArriveAndAwaitAdvance() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            phaser.arriveAndAwaitAdvance();
            println("1 is  over")
        }
        executor.execute {
            println("2")
            phaser.arriveAndAwaitAdvance();
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance()
            println("3 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }

    /**
     * 以下测试arriveAndDeregister()方法:
     * 允许一个参与者到达一个目标点之后退出,同时这个Phaser的参与者数量减一.
     * 以下指定Phaser的参与者数量是3。
     * 开始时参与者线程是1,2,3,
     * 线程3到达第一个目标之后退出,Phaser的参与者数量减为2,
     * 然后线程4和5开始,可以通过日志看出线程5执行arriveAndAwaitAdvance之后立即执行线程5和线程4的arriveAndAwaitAdvance之后的代码,
     * 说明Phaser的参与者数量在线程3调用arriveAndDeregister()方法之后已经减为2。
     */
    @Test
    fun tesArriveAndDeregister() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)

        executor.execute {
            println("1")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.registeredParties)
            println("1 is  over")
        }
        executor.execute {
            println("2")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.registeredParties)
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            Assert.assertEquals(3, phaser.registeredParties)
            phaser.arriveAndDeregister()
            Assert.assertEquals(2, phaser.registeredParties)
            println("3 is  over")
        }

        Thread.sleep(4000)

        executor.execute {
            println("4")
            phaser.arriveAndAwaitAdvance();
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            println("5 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }

    /**
     * 以下测试Phase:
     * Phase指这个Phaser已经跑了第几圈了。
     * 我们可以设想在一个环形跑道,很多线程在这个跑道上跑,所有的参与者到达栅栏一次
     * 那么这个Phaser的Phase值就加1,以此类推.
     * 线程1和线程2以及线程3调用arriveAndAwaitAdvance()之后就是第一圈
     * 线程4和线程5调用arriveAndAwaitAdvance()之后就是第二圈了,
     * 所以phase是2。
     */
    @Test
    fun tesPhase() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("1 is  over")
        }
        executor.execute {
            println("2")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndDeregister()
            Assert.assertEquals(1, phaser.phase)
            println("3 is  over")
        }

        Thread.sleep(4000)

        executor.execute {
            println("4")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("5 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }


    /**
     * 以下测试awaitAdvance():
     * 依然基于上面的代码,增加了线程6,
     * 线程6调用phaser.awaitAdvance(1),意思是等待phaser的phase值由1变为2,
     * 如果当前phase的值不等于1立即返回.
     * 也就是线程6在参与者线程全部跑完第一圈之后等待参与者线程全部跑完第二圈.
     */
    @Test
    fun testAwaitAdvance() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("1 is  over")
        }
        executor.execute {
            println("2")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndDeregister()
            Assert.assertEquals(1, phaser.phase)
            println("3 is  over")
        }

        Thread.sleep(4000)

        executor.execute {
            println("4")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("5 is  over")
        }
        executor.execute {
            println("6")
            Assert.assertEquals(1, phaser.phase)
            phaser.awaitAdvance(1)
            Assert.assertEquals(2, phaser.phase)
            println("6 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }

    /**
     * 以下测试register():
     * 基于tesArriveAndDeregister()的代码,增加了线程6,
     * Phaser.register()方法是增加一个参与者数量,现在数量是3.
     * 那么线程4,线程5和线程6调用arriveAndAwaitAdvance之后就满足通行条件,
     * 线程4,线程5和线程6继续执行arriveAndAwaitAdvance()之后的代码.
     */
    @Test
    fun testRegister() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            phaser.arriveAndAwaitAdvance();
            println("1 is  over")
        }
        executor.execute {
            println("2")
            phaser.arriveAndAwaitAdvance();
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(500)
            println("3: before arriveAndAwaitAdvance")
            phaser.arriveAndDeregister()
            println("3 is  over")
        }

        Thread.sleep(1000)

        Assert.assertEquals(phaser.registeredParties, 2)

        executor.execute {
            Thread.sleep(1000)
            println("4")
            phaser.arriveAndAwaitAdvance();
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            println("5 is  over")
        }
        executor.execute {
            println("6")
            Assert.assertEquals(phaser.unarrivedParties, 2)
            phaser.register()
            Assert.assertEquals(phaser.unarrivedParties, 3)
            phaser.arriveAndAwaitAdvance();
            println("6 is  over")
        }

        Thread.sleep(2000)
        Assert.assertEquals(phaser.unarrivedParties, 3)

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }


}

标签:Phaser,Java,arriveAndAwaitAdvance,Assert,phaser,线程,executor,println,多线程
来源: https://blog.csdn.net/oHeHui1/article/details/122105659

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有