阻塞队列
一. 概念
阻塞队列是⼀种特殊的队列.也遵守"先进先出"的原则.
阻塞队列能是⼀种线程安全的数据结构,并且具有以下特性:
- 当队列满的时候,继续⼊队列就会阻塞,直到有其他线程从队列中取⾛元素.
- 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插⼊元素
阻塞队列的⼀个典型应⽤场景就是"生产者消费者模型".这是⼀种⾮常典型的开发模型
生产者
消费者模式就是通过⼀个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以生产者生产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,⽽是直接从阻塞队列⾥取.
生产者消费者模型的实现一共有两大步骤:
- 实现生产者
- 实现消费者
二. 标准库中的阻塞队列
在Java标准库中内置了阻塞队列.如果我们需要在⼀些程序中使⽤阻塞队列,直接使⽤标准库中的即
可.
- BlockingQueue是⼀个接⼝.真正实现的类是LinkedBlockingQueue
- put⽅法⽤于阻塞式的⼊队列,take⽤于阻塞式的出队列
- BlockingQueue也有offer,poll,peek等⽅法,但是这些⽅法不带有阻塞特性
阻塞队列的伪代码如下:
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// ⼊队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
三. 生产者消费者模型
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消费元素: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生产元素: " + num);
blockingQueue.put(num);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
四. 阻塞队列实现
- 通过"循环队列"的⽅式来实现.
- 使⽤synchronized进⾏加锁控制.
- put插⼊元素的时候,判定如果队列满了,就进⾏wait.(注意,要在循环中进⾏wait.被唤醒时不⼀定,队列就不满了,因为同时可能是唤醒了多个线程).
- take取出元素的时候,判定如果队列为空,就进⾏wait.(也是循环wait)
public class BlockingQueue {
private int[] items = new int[1000];
private volatile int size = 0;
private volatile int head = 0;
private volatile int tail = 0;
public void put(int value) throws InterruptedException {
synchronized (this) {
// 此处最好使⽤ while.
// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
// 就只能继续等待
while (size == items.length) {
wait();
}
items[tail] = value;
tail = (tail + 1) % items.length;
size++;
notifyAll();
}
}
public int take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
wait();
}
ret = items[head];
head = (head + 1) % items.length;
size--;
notifyAll();
}
return ret;
}
public synchronized int size() {
return size;
}
// 测试代码
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new BlockingQueue();
Thread customer = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者");
customer.start();
Thread producer = new Thread(() -> {
Random random = new Random();
while (true) { try {
blockingQueue.put(random.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者");
producer.start();
customer.join();
producer.join();
}
}
总结
- 阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能⼒.(削峰填⾕)
- 阻塞队列也能使生产者和消费者之间解耦.
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 【操作系统】阻塞队列以及生产者消费者模型
发表评论 取消回复