Java Semaphore信号量

半兽人 发表于: 2017-09-25   最后更新时间: 2017-09-25  
  •   13 订阅,862 游览

Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。

Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

Semaphore维护了当前访问的个数,提供同步机制,控制同时访问的个数。在数据结构中链表可以保存“无限”的节点,用Semaphore可以实现有限大小的链表。另外重入锁 ReentrantLock 也可以实现该功能,但实现上要复杂些。

下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可。

package concurrent;

/**
 * TestSemaphore
 *
 * @author weiwei(Duan.Yu)
 * @version 1.0.0 createTime: 2017/9/25 上午10:17
 */

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class TestSemaphore {

    public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();

        // 只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);

        // 模拟20个客户端访问
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 获取许可
                        semp.acquire();
                        System.out.println("Accessing: " + NO);
                        Thread.sleep((long) (Math.random() * 10000));
                        // 访问完后,释放
                        semp.release();
                        System.out.println("-----------------" + semp.availablePermits());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            exec.execute(run);
        }

        // 退出线程池
        exec.shutdown();
    }
}

追加一个例子:1秒钟内得不到许可,则丢弃访问。

public class FlowControl {  

    /** 
     * 最大访问量 
     */  
    private static final int MAX_ACCESS_COUNT = 20;  

    /** 
     * 只能有MAX_ACCESS_COUNT个线程数同时访问 
     */  
    private static final Semaphore semaphore = new Semaphore(MAX_ACCESS_COUNT);  

    public static void main(String[] args) {  
        // 线程池  
        ExecutorService exec = Executors.newCachedThreadPool();  

        // 模拟30个客户端  
        for (int i = 0; i < 30; i++) {  
            Runnable run = new Runnable() {  
                @Override  
                public void run() {  
                    try {  
                        // 1秒钟内得不到许可,则丢弃访问。  
                        if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {  
                            System.out.println("正在执行...");  
                            //做一些事情...  
                            Thread.sleep(2 * 1000);  
                            System.out.println("执行完毕!");  
                        } else {  
                            System.out.println("访问被拒绝!!!");  
                        }  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    } finally {  
                        // 执行完成,释放许可。  
                        semaphore.release();  
                    }  
                }  
            };  
            exec.execute(run);  
        }  

        // 关闭线程池  
        exec.shutdown();  
    }  
}

分线程池例子

package concurrent;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    final static int MAX_QPS = 10;

    final static Semaphore semaphore = new Semaphore(MAX_QPS);

    public static void main(String... args) throws Exception {
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                semaphore.release(MAX_QPS / 2);
            }
        }, 1000, 500, TimeUnit.MILLISECONDS);

        //lots of concurrent calls:100 * 1000
        ExecutorService pool = Executors.newFixedThreadPool(100);

        for (int i = 100; i > 0; i--) {
            final int x = i;
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    for (int j = 1000; j > 0; j--) {
                        semaphore.acquireUninterruptibly(1);
                        remoteCall(x, j);
                    }
                }
            });
        }
        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.HOURS);
        System.out.println("DONE");
    }

    private static void remoteCall(int i, int j) {
        System.out.println(String.format("%s - %s: %d %d", new Date(),
                Thread.currentThread(), i, j));
    }
}






发表于: 10月前   最后更新时间: 10月前   游览量:862
上一条: java的JIT即时编译
下一条: java实现轮询和加权轮询

评论…


  • 评论…
    • in this conversation
      提问