多线程经典实例(二)

上一篇文章多线程经典实例(一) 讲到了几个多线程相关的例子,接下来我们来看一个经典的案例:生产者消费者问题。

介绍

生产者消费者问题是多线程中的一个经典问题,问题大概就是有一块缓冲区作为仓库,当仓库未满时生产者可以将产品放入仓库,当仓库未空时消费者可以从仓库取走产品。解决这个问题的核心就是处理好多线程之间的同步与协作。

生产者消费者的实现方式

通常有以下几种方式来实现生产者与消费者模型:

  1. BlockingQueue 阻塞队列
  2. wait()/notify()等待通知机制
  3. Lock/Condition
  4. 管道流PipedOutputStream和PipedInputStream
BlockingQueue实现

BlockingQueue是阻塞队列接口。它常用的几个实现如LinkedBlockingQueue、ArrayBlockingQueue等内部已经实现了同步队列,所以是线程安全的,所采用的原理是await()/singnal()方法。

BlockingQueue中常见方法:

操作\情况 抛异常 特定值 阻塞 超时
插入元素 add(e) offer(e) put() offer(e,timeout,unit)
移除元素 remove(e) poll() take() poll(timeout,unit)
检查元素 element() peek()

主要用到 put() 和 take() 方法:

当队列已满时调用 put() 方法会自动阻塞;
当队列为空时调用 take() 方法会自动阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class ProducerAndConsumer2 {
private BlockingQueue repertory = new LinkedBlockingQueue();
private final int SIZE = 5;
class Prodecer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000); //等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (repertory) {
if (repertory.size() == SIZE) {
System.out.println("仓库已满!");
return;
}
try {
repertory.put(new Object());
System.out.println("线程:" + Thread.currentThread().getName() + "成功生产一个产品!产品总数: " + repertory.size());
} catch (InterruptedException e) {
System.out.println("生产者生产失败! " + e.getMessage());
}
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000); //等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (repertory) {
if (repertory.isEmpty()) {
System.out.println("仓库为空!");
continue;
}
try {
Object object = repertory.take();
System.out.println("线程:" + Thread.currentThread().getName() + "成功消费一个产品!当前仓库剩余产品:" + repertory.size());
} catch (InterruptedException e) {
System.out.println("消费者消费失败! " + e.getMessage());
}
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer2 test = new ProducerAndConsumer2();
new Thread(test.new Prodecer()).start();
new Thread(test.new Prodecer()).start();
new Thread(test.new Prodecer()).start();
new Thread(test.new Consumer()).start();
new Thread(test.new Consumer()).start();
new Thread(test.new Consumer()).start();
}
}

wait()/notify()实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class ProducerAndConsumer {
private LinkedList repertory = new LinkedList();
private final int SIZE = 8;
class Prodecer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
synchronized (repertory) {
try {
Thread.sleep(1000); //等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
while (repertory.size() == SIZE) {
System.out.println("仓库已满!");
try {
repertory.wait(); //生产者释放锁并阻塞,等待消费者消费
} catch (InterruptedException e) {
e.printStackTrace();
}
}
repertory.add(new Object());
System.out.println("线程:" + Thread.currentThread().getName() + "成功生产一个产品!产品总数: " + repertory.size());
repertory.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000); //等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (repertory) {
while (repertory.size() == 0) {
System.out.println("仓库为空!");
try {
repertory.wait();//消费者释放锁并阻塞,等待生产者生产
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object object = repertory.removeLast();
System.out.println("线程:" + Thread.currentThread().getName() + "成功消费一个产品!当前仓库剩余产品:" + repertory.size());
repertory.notify();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer();
new Thread(producerAndConsumer.new Prodecer()).start();
new Thread(producerAndConsumer.new Consumer()).start();
new Thread(producerAndConsumer.new Prodecer()).start();
new Thread(producerAndConsumer.new Consumer()).start();
new Thread(producerAndConsumer.new Prodecer()).start();
new Thread(producerAndConsumer.new Consumer()).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
线程:Thread-0成功生产一个产品!产品总数: 1
线程:Thread-0成功生产一个产品!产品总数: 2
线程:Thread-0成功生产一个产品!产品总数: 3
线程:Thread-0成功生产一个产品!产品总数: 4
线程:Thread-0成功生产一个产品!产品总数: 5
线程:Thread-5成功消费一个产品!当前仓库剩余产品:4
线程:Thread-1成功消费一个产品!当前仓库剩余产品:3
线程:Thread-3成功消费一个产品!当前仓库剩余产品:2
线程:Thread-4成功生产一个产品!产品总数: 3
线程:Thread-4成功生产一个产品!产品总数: 4
线程:Thread-4成功生产一个产品!产品总数: 5
线程:Thread-4成功生产一个产品!产品总数: 6
线程:Thread-4成功生产一个产品!产品总数: 7
线程:Thread-2成功生产一个产品!产品总数: 8
仓库已满!
线程:Thread-3成功消费一个产品!当前仓库剩余产品:7
线程:Thread-1成功消费一个产品!当前仓库剩余产品:6
线程:Thread-5成功消费一个产品!当前仓库剩余产品:5
线程:Thread-2成功生产一个产品!产品总数: 6
线程:Thread-2成功生产一个产品!产品总数: 7
线程:Thread-2成功生产一个产品!产品总数: 8
仓库已满!
线程:Thread-5成功消费一个产品!当前仓库剩余产品:7
线程:Thread-1成功消费一个产品!当前仓库剩余产品:6
线程:Thread-3成功消费一个产品!当前仓库剩余产品:5
线程:Thread-2成功生产一个产品!产品总数: 6
线程:Thread-5成功消费一个产品!当前仓库剩余产品:5
线程:Thread-1成功消费一个产品!当前仓库剩余产品:4
线程:Thread-3成功消费一个产品!当前仓库剩余产品:3
线程:Thread-3成功消费一个产品!当前仓库剩余产品:2
线程:Thread-5成功消费一个产品!当前仓库剩余产品:1
线程:Thread-1成功消费一个产品!当前仓库剩余产品:0
Lock/Condition实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class ProducerAndConsumer3 {
private LinkedList repertory = new LinkedList(); //仓库
private final int SIZE = 5;
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition(); //与生产者绑定
private Condition notEmpty = lock.newCondition();//与消费者绑定
class Prodecer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000); //等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try{
while (repertory.size() == SIZE) {
System.out.println("仓库已满!");
try {
notFull.await(); //阻塞生产者
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//仓库未满
repertory.add(new Object());//添加产品
System.out.println("线程:" + Thread.currentThread().getName() + "成功生产一个产品!产品总数: " + repertory.size());
notEmpty.signal();//唤醒消费者
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000); //等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (repertory.isEmpty()) {
System.out.println("仓库为空!");
try {
notEmpty.await();//阻塞消费者
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//仓库不为空
Object object = repertory.removeLast(); //取出产品
System.out.println("线程:" + Thread.currentThread().getName() + "成功消费一个产品!当前仓库剩余产品:" + repertory.size());
notFull.signal();//唤醒生产者
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer3 test = new ProducerAndConsumer3();
new Thread(test.new Prodecer()).start();
new Thread(test.new Prodecer()).start();
new Thread(test.new Prodecer()).start();
new Thread(test.new Consumer()).start();
new Thread(test.new Consumer()).start();
new Thread(test.new Consumer()).start();
}
}
管道流PipedOutputStream和PipedInputStream

在Java IO 包中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流,可以利用它们来实现多线程之间的通信。
主要通信流程

  1. 建立输入输出流
  2. 绑定输入输出流
  3. 向缓冲区写数据
  4. 读取缓冲区数据

原理:
线程A向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲区中;此时,线程B可以读取PipedInputStream中的数据,这样就实现了线程A和线程B的通信。

  • 当这个缓冲数组已满的时候,输出流PipedOutputStream所在的线程将阻塞;
  • 当这个缓冲数组首次为空的时候,输入流PipedInputStream所在的线程将阻塞。
  • “管道输入流”的缓冲数组大小默认只有1024个字节。

注意:这里无法实现多生产者和多消费者的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ProducerAndConsumer4 {
private PipedOutputStream pos = new PipedOutputStream();//管道输出流—生产者写数据
private PipedInputStream pis = new PipedInputStream();//管道输入流-消费者读数据
private int count = 0;
class Prodecer implements Runnable {
@Override
public void run() {
try {
while (count < 5) { //循环5次停止
Thread.sleep(1000); //等待一段时间
String product = "zy的产品!";
System.out.println("线程:" + Thread.currentThread().getName() + "成功生产一个产品!");
count++;
pos.write(product.getBytes()); //写入管道
pos.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
try {
while (count < 5) {
Thread.sleep(1000); //等待一段时间
byte[] buffer = new byte[1024];
int len = pis.read(buffer); //读取管道中的数据,存入缓冲区
String product = new String(buffer, 0, len);
System.out.println("线程:" + Thread.currentThread().getName() + "成功消费一个产品:" + product);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProducerAndConsumer4 test = new ProducerAndConsumer4();
try {
test.pis.connect(test.pos); //管道连接
} catch (IOException e) {
e.printStackTrace();
}
new Thread(test.new Prodecer()).start();
new Thread(test.new Consumer()).start();
}
}

除此之外,当然还有其他方式可以实现,如Semaphore。
以上就是实现生产者消费者的几个实例。