Java并发52:并发集合系列-基于独占锁+二叉树最小堆实现的单向阻塞无界优先级队列PriorityBlockingQueue

news/2024/7/8 3:16:29

[超级链接:Java并发学习系列-绪论]
[系列序章:Java并发43:并发集合系列-序章]


原文地址:http://www.importnew.com/25541.html

一、 前言

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方式存放最小堆节点的都知道,直接遍历队列元素是无序的。

二、 PriorityBlockingQueue类图结构

这里写图片描述

如图PriorityBlockingQueue内部:

  • 有个数组queue用来存放队列元素,
  • size用来存放队列元素个数,
  • allocationSpinLockOffset是用来在扩容队列时候做cas的,目的是保证只有一个线程可以进行扩容。
  • 由于这是一个优先级队列所以有个比较器comparator用来比较元素大小。
  • lock独占锁对象用来控制同时只能有一个线程可以进行入队出队操作。
  • notEmpty条件变量用来实现take方法阻塞模式。
  • 这里没有notFull 条件变量是因为这里的put操作是非阻塞的,为啥要设计为非阻塞的是因为这是无界队列。
  • 最后PriorityQueue q用来搞序列化的。

如下构造函数,默认队列容量为11,默认比较器为null;

private static final int DEFAULT_INITIAL_CAPACITY = 11;


public PriorityBlockingQueue() {
   this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

三、 offer操作

在队列插入一个元素,由于是无界队列,所以一直为成功返回true;

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;

    //如果当前元素个数>=队列容量,则扩容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);


    try {
        Comparator<? super E> cmp = comparator;

        //默认比较器为null
        if (cmp == null)(2)
            siftUpComparable(n, e, array);
        else
            //自定义比较器(3)
            siftUpUsingComparator(n, e, array, cmp);

        //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程
        size = n + 1;(9)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

主流程比较简单,下面看看两个主要函数

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //must release and then re-acquire main lock
    Object[] newArray = null;

    //cas成功则扩容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用cas控制只有一个线程可以扩容成功。

我的理解是为了性能,因为扩容时候是需要花时间的,如果这些操作时候还占用锁那么其他线程在这个时候是不能进行出队操作的,也不能进行入队操作,这大大降低了并发性。

所以在扩容前释放锁,这允许其他出队线程可以进行出队操作,但是由于释放了锁,所以也允许在扩容时候进行入队操作,这就会导致多个线程进行扩容会出现问题。

所以这里使用了一个spinlock用cas控制只有一个线程可以进行扩容,失败的线程调用Thread.yield()让出cpu,目的意在让扩容线程扩容后优先调用lock.lock重新获取锁,但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁。

那copy元素数据到新数组为啥放到获取锁后面那?

原因应该是因为可见性问题,因为queue并没有被volatile修饰。

另外有可能在扩容时候进行了出队操作,如果直接拷贝可能看到的数组元素不是最新的。

而通过调用Lock后,获取的数组则是最新的,并且在释放锁前 数组内容不会变化。

具体建堆算法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;

    //队列元素个数>0则判断插入位置,否者直接入队(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}

四、 poll操作

在队列头部获取并移除一个元素,如果队列为空,则返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

主要看dequeue

private E dequeue() {

    //队列为空,则返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {


        //获取队头元素(1)
        Object[] array = queue;
        E result = (E) array[0];

        //获取对尾元素,并值null(2)
        E x = (E) array[n];
        array[n] = null;

        Comparator<? super E> cmp = comparator;
        if (cmp == null)//cmp=null则调用这个,把对尾元素位置插入到0位置,并且调整堆为最小堆(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;(4return result;
    }
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                            int n) {
     if (n > 0) {
         Comparable<? super T> key = (Comparable<? super T>)x;
         int half = n >>> 1;           // loop while a non-leaf
         while (k < half) {
             int child = (k << 1) + 1; // assume left child is least
             Object c = array[child];(5int right = child + 1;(6)
             if (right < n &&
                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                 c = array[child = right];
             if (key.compareTo((T) c) <= 0)(8)
                 break;
             array[k] = c;
             k = child;
         }
         array[k] = key;(9)
     }
 }

五、 put操作

内部调用的offer,由于是无界队列,所以不需要阻塞

public void put(E e) {
    offer(e); // never need to block
}

六、 take操作

获取队列头元素,如果队列为空则阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

这里是阻塞实现,阻塞后直到入队操作调用notEmpty.signal 才会返回。

七、 size操作

获取队列元个数,由于加了独占锁所以返回结果是精确的

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

八、总结

PriorityBlockingQueue类似于ArrayBlockingQueue内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队。

另外前者只使用了一个notEmpty条件变量而没有notFull这是因为前者是无界队列。

当put时候永远不会处于await所以也不需要被唤醒。

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素,并且可以定制优先级的规则。

内部通过使用一个二叉树最小堆算法来维护内部数组,这个数组是可扩容的,当前元素个数>=最大容量时候会通过算法扩容。

扩容:默认队列容量为11,oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE。

值得注意的是为了避免在扩容操作时候其他线程不能进行出队操作,实现上使用了先释放锁,然后通过cas保证同时只有一个线程可以扩容成功。


http://www.niftyadmin.cn/n/2815549.html

相关文章

Mysql之Centos6.5+Mysql5.6搭建配置

下载mysql-xxx.tar.gz文件 将下载的文件通过ftp软件传到linux下/usr/local 解压并重命名 cd /usr/local tar -zxvf mysql-xxx.tar.gz mv mysql-xxx mysql 检测是否安装过mysql rpm -qa |grep mysql 删除旧版本mysql rpm -e mysql-server rpm -e mysql -nodeps 强制性 添加组和用…

oraclenbsp;表复制

1. 复制表结构及其数据&#xff1a;create table table_name_new as select * from table_name_old ;2. 只复制表结构&#xff1a;create table table_name_new as select * from table_name_old where 12;ORcreate table table_name_new like table_name_old ;3. 只复制表数据…

SparkSQL:使用反射、编程方式将RDD转换为DataFrame

为什么要将RDD转换为DataFrame&#xff1f;因为这样的话&#xff0c;我们就可以直接针对HDFS等任何可以构建为RDD的数据&#xff0c;使用Spark SQL进行SQL查询了。这个功能是无比强大的。想象一下&#xff0c;针对HDFS中的数据&#xff0c;直接就可以使用SQL进行查询。 一、Spa…

Java并发53:并发集合系列-基于独占锁+PriorityBlockingQueue实现的单向阻塞无界延时队列DelayQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://cmsblogs.com/?p2413 DelayQueue是一个支持延时获取元素的无界阻塞队列。 里面的元素全部都是“可延期”的元素&#xff0c;列头的元素是最先…

批处理删除指定盘符下的某种文件

echo offecho 正在清除以lng后缀的文件&#xff0c;请稍后。。。 del /a /q e:\*.lng

Java并发54:并发集合系列-基于CAS算法的非阻塞无数据缓冲队列SynchronousQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;https://blog.csdn.net/Dax1n/article/details/69813682 介绍 Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue(队列只能…

一个「学渣」从零Web前端自学之路

从 13 年专科毕业开始&#xff0c;一路跌跌撞撞走了很多弯路&#xff0c;做过餐厅服务员&#xff0c;进过工厂干过流水线&#xff0c;做过客服&#xff0c;干过电话销售可以说经历相当的“丰富”。 最后的机缘巧合下&#xff0c;走上了前端开发之路&#xff0c;作为一个非计算机…

spring 读取properties文件--通过注解方式

问题: 需要通过properties读取页面的所需楼盘的名称.为了以后便于修改. 解决: 可以通过spring的 PropertiesFactoryBean 读取properties属性,就不需要自己通过jdk的Properties类编写程序读取信息. <!-- 第二种方式是使用注解的方式注入&#xff0c;主要用在java代码中使用注…