知识

面试官:阻塞队列的底层实现有了解过吗?

时间:2010-12-5 17:23:32  作者:系统运维   来源:IT科技  查看:  评论:0
内容摘要:前言本节以ArrayBlockingQueue为例, 带大家看下阻塞队列是如何实现,一起来看下吧!ArrayBlockingQueue 源码分析构造函数同样的,我们先从它的构造函数看起。public

前言

本节以ArrayBlockingQueue为例,面试 带大家看下阻塞队列是如何实现,一起来看下吧!官阻过

ArrayBlockingQueue 源码分析

构造函数

同样的塞队实现,我们先从它的底层构造函数看起。

public ArrayBlockingQueue(int capacity) {

this(capacity,有解 false);

}capacity 固定容量大小。false,面试这个字段名称其实是官阻过fair默认下它是false,非公平锁。

​​上节​​我们使用的塞队实现就是它的默认用法,公平锁和非公平锁我们之前讲过,底层可以查看以往文章(ReentrantLock源码分析)。有解下面我们接着看:

public ArrayBlockingQueue(int capacity,面试 boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

this.items = new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

从上面的代码来看,可知capacity > 0,官阻过第一个构造函数的this()其实就是调的这个构造函数,我们可以通过它来指定容量和访问策略(fair 和 nofair)的塞队实现ArrayBlockingQueue。

再接着看最后一个构造函数。底层

public ArrayBlockingQueue(int capacity,有解 boolean fair,

Collection c) {

this(capacity, fair);

final ReentrantLock lock = this.lock;

// 锁用于可见性

lock.lock();

try {

int i = 0;

try {

// 迭代集合

for (E e : c) {

checkNotNull(e);

items[i++] = e;

}

// 捕获异常 越界

} catch (ArrayIndexOutOfBoundsException ex) {

throw new IllegalArgumentException();

}

count = i;

putIndex = (i == capacity) ? 0 : i;

} finally {

lock.unlock();

}

}

从代码来看,云服务器对比上一个多了个Collection,这是干嘛的呢?它允许我们在创建的时候初始化一个集合进去,按迭代顺序添加到容器,从它的内部我们也可以看出来。

内部变量

// 队列的元素

final Object[] items;

// 获取下一个元素时的索引

int takeIndex;

// 下一个添加元素时的索引

int putIndex;

// 队列的元素数量

int count;

// 全局锁

final ReentrantLock lock;

// 等待条件

private final Condition notEmpty;

private final Condition notFull;

// 迭代器的共享状态

transient Itrs itrs = null;

内部方法

add() & offer()

我们看下add方法,这个方法用于向队列中添加元素。

public boolean add(E e) {

return super.add(e);

}

内部调用了父类的方法,它继承了AbstractQueue。

public class ArrayBlockingQueueextends AbstractQueue

implements BlockingQueue<E>, java.io.Serializable { ....}

接着看AbstractQueue的add()。

public boolean add(E e) {

if (offer(e))

return true;

else

throw new IllegalStateException("Queue full");

}

可以看到内部调用了offer(), 如果添加成功就返回true,失败就抛出异常, 这符合我们上节使用时它的特点。

但是,我们发现在它的内部并没有offer方法,所以实现不在AbstractQueue,实现还是在ArrayBlockingQueue。

来看下ArrayBlockingQueue的offer()方法。云服务器提供商

public boolean offer(E e) {

// 判断元素 e 是否为空,空抛出 NullPointerException 异常

checkNotNull(e);

final ReentrantLock lock = this.lock;

// 需要持锁

lock.lock();

try {

// 如果元素已满 返回false, 对标 add 就会抛出异常

if (count == items.length)

return false;

else {

// 添加到队列中

enqueue(e);

return true;

}

} finally {

lock.unlock();

}

}

看下enqueue:

private void enqueue(E x) {

final Object[] items = this.items;

// 将元素添加到预期的索引位置

items[putIndex] = x;

// 如果下个索引值等于容器数量值 将putIndex归0

if (++putIndex == items.length)

putIndex = 0;

// 容器元素数量+1

count++;

// 唤醒等待的线程

notEmpty.signal();

}

remove & poll

先看第一个 remove(), 同样的这个方法存在于 AbstractQueue内部,如果被移除的元素为null则抛出异常。

public E remove() {

E x = poll();

if (x != null)

return x;

else

throw new NoSuchElementException();

}

poll()的实现在ArrayBlockingQueue,内部实现方式跟add很像。

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return (count == 0) ? null : dequeue();

} finally {

lock.unlock();

}

}private E dequeue() {

final Object[] items = this.items;

// 这个注解用于忽略一些警告 这不是重点

@SuppressWarnings("unchecked")

// 取出元素 takeIndex 按照 FIFO

E x = (E) items[takeIndex];

// 元素取出时 置为空

items[takeIndex] = null;

// 判断下一个元素的位置

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

// 唤醒等待的线程

notFull.signal();

// 返回元素

return x;

}

第二个remove(e), 这个实现在ArrayBlockingQueue的内部,可以移除指定元素。

public boolean remove(Object o) {

if (o == null) return false;

final Object[] items = this.items;

final ReentrantLock lock = this.lock;

lock.lock();

try {

if (count > 0) {

final int putIndex = this.putIndex;

int i = takeIndex;

do {

// 遍历移除指定元素

if (o.equals(items[i])) {

// 移除指定元素 并更新对应的索引位置

removeAt(i);

return true;

}

// 防止越界

if (++i == items.length)

i = 0;

} while (i != putIndex);

}

return false;

} finally {

lock.unlock();

}

}

take

take会造成线程阻塞下面我看下它的内部实现。

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

// 获取锁,当线程中断会直接返回

lock.lockInterruptibly();

try {

// 如果元素内部为空 会进入阻塞,意思是没有元素可拿了,进入等待

while (count == 0)

// 使当前线程等待

notEmpty.await();

// 否则出列

return dequeue();

} finally {

lock.unlock();

}

}

put

该方法实现跟 take类似, 也会阻塞线程。

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 如果元素满了 就阻塞

while (count == items.length)

notFull.await();

// 否则入列

enqueue(e);

} finally {

lock.unlock();

}

}

结束语

本节主要给大家讲了ArrayBlockingQueue的源码实现,它的源码相对简单一些, 大家可以根据本节看下BlockingQueue其它的实现类。

服务器租用
copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap