ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Semaphore 的使用与源码解析

2021-12-05 19:32:01  阅读:174  来源: 互联网

标签:剩余 许可 permits int 源码 Semaphore 借阅 解析 书籍


使用

信号量(Semaphore)允许多个线程同时访问临界区资源,而 ReentrantLock 这类锁同时只允许一个线程访问临界区资源。

Semaphore 就是共享锁,它的构造方法可以传一个许可数 permits,表示临界区资源数量,多少个资源同时也最多只能由多少个线程来访问。当 permits 等于 1 时,Semaphore 就等价于 ReentrantLock 这类互斥锁。

代码示例:

@Test
public void test() throws InterruptedException {
    // 定义一个线程池
    ExecutorService executor = Executors.newFixedThreadPool(3);
    int userNuber = 10;
    CountDownLatch countDownLatch = new CountDownLatch(userNuber);
    // 三个许可,表示资源总数,这里表示只有三本书籍,所以最多只能由三个人借阅,即最多只能有三个线程获取到锁
    int permits = 3;
    Semaphore semaphore = new Semaphore(permits);

    IntStream.range(0, userNuber).forEach(i -> {
        executor.submit(() -> {
            try {
                System.out.println("学生" + i + "等待借阅书籍......");

                // 获取一个许可(加锁)
                semaphore.acquire(1);

                System.out.println("学生" + i + "借阅到了一个本书籍,当前还剩余" + semaphore.availablePermits() + "本书籍");
                Thread.sleep(i * 500);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放一个许可(解锁)
                semaphore.release(1);
                System.out.println("学生" + i + "归还了一个本书籍,当前还剩余" + semaphore.availablePermits() + "本书籍");
                countDownLatch.countDown();
            }
        });
    });

    countDownLatch.await();
    executor.shutdown();

    /**
         * 执行结果:
         *
         * 学生0等待借阅书籍......
         * 学生0借阅到了一个本书籍,当前还剩余2本书籍
         * 学生1等待借阅书籍......
         * 学生1借阅到了一个本书籍,当前还剩余1本书籍
         * 学生2等待借阅书籍......
         * 学生2借阅到了一个本书籍,当前还剩余0本书籍
         * 学生0归还了一个本书籍,当前还剩余1本书籍
         * 学生3等待借阅书籍......
         * 学生3借阅到了一个本书籍,当前还剩余0本书籍
         * 学生1归还了一个本书籍,当前还剩余1本书籍
         * 学生4等待借阅书籍......
         * 学生4借阅到了一个本书籍,当前还剩余0本书籍
         * 学生2归还了一个本书籍,当前还剩余1本书籍
         * 学生5等待借阅书籍......
         * 学生5借阅到了一个本书籍,当前还剩余0本书籍
         * 学生3归还了一个本书籍,当前还剩余1本书籍
         * 学生6等待借阅书籍......
         * 学生6借阅到了一个本书籍,当前还剩余0本书籍
         * 学生4归还了一个本书籍,当前还剩余1本书籍
         * 学生7等待借阅书籍......
         * 学生7借阅到了一个本书籍,当前还剩余0本书籍
         * 学生5归还了一个本书籍,当前还剩余1本书籍
         * 学生8等待借阅书籍......
         * 学生8借阅到了一个本书籍,当前还剩余0本书籍
         * 学生6归还了一个本书籍,当前还剩余1本书籍
         * 学生9等待借阅书籍......
         * 学生9借阅到了一个本书籍,当前还剩余0本书籍
         * 学生7归还了一个本书籍,当前还剩余1本书籍
         * 学生8归还了一个本书籍,当前还剩余2本书籍
         * 学生9归还了一个本书籍,当前还剩余3本书籍
         */
}

源码解析

Sync 内部类

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    // Semaphore 构造函数的传参 permits 赋值给 state
    Sync(int permits) {
        setState(permits);
    }

    // 获取许可数(即 state 的值)
    final int getPermits() {
        return getState();
    }

    // 非公平锁方式获取共享锁
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 当前可用的许可数
            int available = getState();
            // 当前申请锁后剩余的许可数
            int remaining = available - acquires;
            // 如果剩余许可数小于 0 直接返回剩余许可数
            // 如果剩余许可数大于 0,将其设置为 state,如果成功,则返回剩余许可数
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 释放锁
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 当前可用的许可数
            int current = getState();
            // (释放锁后)增加许可数
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 将增加后的许可数赋值给 state,成功则返回,否则自旋重试
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 减少许可数
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    // 清空许可数
    final int drainPermits() {
        for (;;) {
            // 当前许可数
            int current = getState();
            // 将 state 设置为 0
            if (current == 0 || compareAndSetState(current, 0))
                // 返回当前许可数
                return current;
        }
    }
}

NonfairSync 内部类

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

FairSync 内部类

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果当前线程节点还有前序节点,则获取锁失败
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

构造函数

// 默认使用非公平锁
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// permits 表示同时允许访问临界区资源的线程数
// fair 表示使用公平锁实现还是非公平锁实现
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire

// 可中断地阻塞获取锁
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireUninterruptibly

// 不可中断地阻塞获取锁
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

tryAcquire

public boolean tryAcquire() {
    // nonfairTryAcquireShared 返回剩余可用许可数
    // 剩余可用许可数大于等于 0,表示可以获取锁
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

release

// 释放锁(一个许可)
public void release() {
    sync.releaseShared(1);
}

acquire

// 一次获取 permits 个许可(可中断,阻塞获取)
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquireUninterruptibly

// 一次获取 permits 个许可(不可中断、阻塞获取)
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

tryAcquire

// 一次获取 permits 个许可(可中断,非阻塞获取)
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

// 一次获取 permits 个许可(可中断,非阻塞获取、超时可退出)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

release

// 释放多个许可
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

标签:剩余,许可,permits,int,源码,Semaphore,借阅,解析,书籍
来源: https://www.cnblogs.com/wu726/p/15646785.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有