Java并发51:并发集合系列-基于独占锁+数组实现的单向阻塞有界队列ArrayBlockingQueue

news/2024/7/8 2:45:43

[超级链接:Java并发学习系列-绪论]
[系列序章:Java并发43:并发集合系列-序章]


原文地址:http://www.importnew.com/25566.html

一、 前言

上节介绍了无界链表方式的阻塞队列LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列ArrayBlockingQueue

二、 ArrayBlockingQueue类图结构

这里写图片描述

如图ArrayBlockingQueue内部

  • 有个数组items用来存放队列元素,
  • putindex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,从定义可知道并没有使用volatile修饰,这是因为访问这些变量使用都是在锁块内,并不存在可见性问题。
  • 另外有个独占锁lock用来对出入队操作加锁,这导致同时只有一个线程可以访问入队出队,
  • 另外notEmpty,notFull条件变量用来进行出入队的同步。

另外构造函数必须传入队列大小参数,所以为有界队列,默认是Lock为非公平锁。

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

三、offer操作

在队尾插入元素,如果队列满则返回false,否者入队返回true。

public boolean offer(E e) {

    //e为null,则抛出NullPointerException异常
    checkNotNull(e);

    //获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列满则返回false
        if (count == items.length)
            return false;
        else {
            //否者插入元素
            insert(e);
            return true;
        }
    } finally {
        //释放锁
        lock.unlock();
    }
}


private void insert(E x) {

    //元素入队
    items[putIndex] = x;

    //计算下一个元素应该存放的下标
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

//循环队列,计算下标
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

这里由于在操作共享变量前加了锁,所以不存在内存不可见问题。

加过锁后获取的共享变量都是从主内存获取的,而不是在CPU缓存或者寄存器里面的值。

释放锁后修改的共享变量值会刷新会主内存中。

另外这个队列是使用循环数组实现,所以计算下一个元素存放下标时候有些特殊。

另外insert后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。

四、put操作

在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;

    //获取可被中断锁
    lock.lockInterruptibly();
    try {

        //如果队列满,则把当前线程放入notFull管理的条件队列
        while (count == items.length)
            notFull.await();

        //插入元素
        insert(e);
    } finally {
        lock.unlock();
    }
}

需要注意的是如果队列满了那么当前线程会阻塞,直到出队操作调用了notFull.signal方法激活该线程。

代码逻辑很简单,但是这里需要思考一个问题为啥调用lockInterruptibly方法而不是Lock方法。

我的理解是因为调用了条件变量的await()方法,而await()方法会在中断标志设置后抛出InterruptedException异常后退出,所以还不如在加锁时候先看中断标志是不是被设置了,如果设置了直接抛出InterruptedException异常,就不用再去获取锁了。

然后看了其他并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock也验证了这个想法。

五、poll操作

从队头获取并移除元素,队列为空,则返回null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //当前队列为空则返回null,否者
        return (count == 0) ? null : extract();
    } finally {
        lock.unlock();
    }
}

private E extract() {
    final Object[] items = this.items;

    //获取元素值
    E x = this.<E>cast(items[takeIndex]);

    //数组中值值为null;
    items[takeIndex] = null;

    //队头指针计算,队列元素个数减一
    takeIndex = inc(takeIndex);
    --count;

    //发送信号激活notFull条件队列里面的线程
    notFull.signal();
    return x;
}

六、take操作
从队头获取元素,如果队列为空则阻塞直到队列有元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {

        //队列为空,则等待,直到队列有元素
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}

需要注意的是如果队列为空,当前线程会被挂起放到notEmpty的条件队列里面,直到入队操作执行调用notEmpty.signal后当前线程才会被激活,await才会返回。

七、peek操作

返回队列头元素但不移除该元素,队列为空,返回null

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //队列为空返回null,否者返回头元素
        return (count == 0) ? null : itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return this.<E>cast(items[i]);
}

八、 size操作

获取队列元素个数,非常精确因为计算size时候加了独占锁,其他线程不能入队或者出队或者删除元素

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

九、总结

ArrayBlockingQueue通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作。

这个锁的粒度比较大,有点类似在方法上添加synchronized的意味。

其中offer,poll操作通过简单的加锁进行入队出队操作。

而put,take则使用了条件变量实现如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。

另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。


http://www.niftyadmin.cn/n/2815551.html

相关文章

lingo错误代码对照大全

需要lingo11破解版&#xff0c;请留下邮箱。 0 LINGO模型生成器的内存已经用尽(可用“LINGO|Options"命令对General Solver 选项卡中的“Generator Memory Limit"选项进行内存大小的修改) 1 模型中的行数太多(对于有实际意义的模型&#xff0c;这个错…

Java并发52:并发集合系列-基于独占锁+二叉树最小堆实现的单向阻塞无界优先级队列PriorityBlockingQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://www.importnew.com/25541.html 一、 前言 PriorityBlockingQueue是带优先级的无界阻塞队列&#xff0c;每次出队都返回优先级最高的元素&…

Mysql之Centos6.5+Mysql5.6搭建配置

下载mysql-xxx.tar.gz文件 将下载的文件通过ftp软件传到linux下/usr/local 解压并重命名 cd /usr/local tar -zxvf mysql-xxx.tar.gz mv mysql-xxx mysql 检测是否安装过mysql rpm -qa |grep mysql 删除旧版本mysql rpm -e mysql-server rpm -e mysql -nodeps 强制性 添加组和用…

oraclenbsp;表复制

1. 复制表结构及其数据&#xff1a;create table table_name_new as select * from table_name_old ;2. 只复制表结构&#xff1a;create table table_name_new as select * from table_name_old where 12;ORcreate table table_name_new like table_name_old ;3. 只复制表数据…

SparkSQL:使用反射、编程方式将RDD转换为DataFrame

为什么要将RDD转换为DataFrame&#xff1f;因为这样的话&#xff0c;我们就可以直接针对HDFS等任何可以构建为RDD的数据&#xff0c;使用Spark SQL进行SQL查询了。这个功能是无比强大的。想象一下&#xff0c;针对HDFS中的数据&#xff0c;直接就可以使用SQL进行查询。 一、Spa…

Java并发53:并发集合系列-基于独占锁+PriorityBlockingQueue实现的单向阻塞无界延时队列DelayQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;http://cmsblogs.com/?p2413 DelayQueue是一个支持延时获取元素的无界阻塞队列。 里面的元素全部都是“可延期”的元素&#xff0c;列头的元素是最先…

批处理删除指定盘符下的某种文件

echo offecho 正在清除以lng后缀的文件&#xff0c;请稍后。。。 del /a /q e:\*.lng

Java并发54:并发集合系列-基于CAS算法的非阻塞无数据缓冲队列SynchronousQueue

[超级链接&#xff1a;Java并发学习系列-绪论] [系列序章&#xff1a;Java并发43:并发集合系列-序章] 原文地址&#xff1a;https://blog.csdn.net/Dax1n/article/details/69813682 介绍 Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue(队列只能…