public class finxed {
public static void main(String[] args) throws InterruptedException {
finxed finxed = new finxed();
// 阻塞队列
// 满了之后,放不进去
// 空的时候取数据,也会堵塞
ArrayBlockingQueue storage = new ArrayBlockingQueue(10);
Producer producer = finxed.new Producer(storage);
Thread producerThread = new Thread(producer);
producerThread.start();
Thread.sleep(1000);
Consumer consumer = finxed.new Consumer(storage);
while (consumer.needMore()) {
System.out.println(consumer.storage.take() + "被消费");
Thread.sleep(100);
}
System.out.println("消费者不需要更多数据");
// 消费者不需要数据,让生产者停下来
producerThread.interrupt();
}
class Producer implements Runnable {
public volatile boolean canceled = false;
BlockingQueue storage;
public Producer(BlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
int num = 0;
try {
while (num <= 10000 && !Thread.currentThread().isInterrupted()) {
if (num % 100 == 0) {
storage.put(num);
System.out.println("num" + "生产");
}
num++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("生产者停止运行");
}
}
}
class Consumer {
BlockingQueue storage;
public Consumer(BlockingQueue storage) {
this.storage = storage;
}
public boolean needMore() {
if (Math.random() > 0.9) {
return false;
}
return true;
}
}
}
/*
* 2100被消费
num生产
2200被消费
num生产
2300被消费
num生产
消费者不需要更多数据
生产者停止运行
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at com.jx.JavaTest.stopThread.volatiledmo.finxed$Producer.run(finxed.java:51)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 0
*
* */
|