本文共 2313 字,大约阅读时间需要 7 分钟。
昨天晚上遗留的两个问题
1.两个消费者消费消息都到100了,但是下图中的日志未打印出来
这个问题看代码
public classConsumerObjectOne implementsRunnable {
@Overridepublic voidrun() {
while(true) {
if(PudConThread.arrayBlockingQueue.size() > 0) {
if(PudConThread.hasConsumerTotal.get() >= PudConThread.total) {
System.out.println("消费者1--消费已达上限停止消费");return;}
/***获取最新的一条消息消费*/try{
//这个地方是关键
MessageVO messageVO = PudConThread.arrayBlockingQueue.take();System.out.println("消费者1消费消息"+ messageVO.toString());PudConThread.hasConsumerTotal.getAndAdd(1);} catch(InterruptedException e) {
e.printStackTrace();}
}
}
}
}
ArrayBlockingQueue 里面的take(),当队列里面的长度为空时,会进入await 状态,所以两个消费线程在消费掉最后一条时,队列是空队列,take()阻塞不能进行下次循环,消费结束消息不能打印
将消费线程中的,消费消息代码和判断消费消息数量代码位置对调一下就可以了
public classConsumerObjectOne implementsRunnable {
@Overridepublic voidrun() {
while(true) {
if(PudConThread.arrayBlockingQueue.size() > 0) {
/***获取最新的一条消息消费*/try{
MessageVO messageVO = PudConThread.arrayBlockingQueue.take();System.out.println("消费者1消费消息"+ messageVO.toString());PudConThread.hasConsumerTotal.getAndAdd(1);} catch(InterruptedException e) {
e.printStackTrace();}
if(PudConThread.hasConsumerTotal.get() >= PudConThread.total) {
System.out.println("消费者1--消费已达上限停止消费");return;}
}
}
}
}
在运行结果如下
不对调位置也可以如下改造,将take()换成poll()
public classConsumerObjectOne implementsRunnable {
@Overridepublic voidrun() {
while(true) {
if(PudConThread.hasConsumerTotal.get() >= PudConThread.total) {
System.out.println("消费者1--消费已达上限停止消费");return;}
if(PudConThread.arrayBlockingQueue.size() > 0) {
/***获取最新的一条消息消费*/MessageVO messageVO = PudConThread.arrayBlockingQueue.poll();if(messageVO != null) {
System.out.println("消费者1消费消息"+ messageVO.toString());PudConThread.hasConsumerTotal.getAndAdd(1);}
}
}
}
}
第2个问题 序号我们用的是AtomicInteger但是每次都会出现两个为0的序号
分析:每次出现两个为0的序号是,两个生产者在设置序号的时候用的是 AtomicInteger的 get() 方法这个只是返回当前最新值,所以两个生产者并发去get 获取到了初始值0
代码改造如下
public classProductObjectOne implementsRunnable {
@Overridepublic voidrun() {
while(true) {
if(PudConThread.hasProductTotal.get() >= PudConThread.total) {
System.out.println("产品已达上限,停止生产");return;}
MessageVO messageVO = newMessageVO(PudConThread.hasProductTotal.getAndAdd(1),UUID.randomUUID().toString(),"ProductObjectOne---this is pubsub test");try{
PudConThread.arrayBlockingQueue.put(messageVO);} catch(InterruptedException e) {
e.printStackTrace();}
}
}
}
运行结果如下,未出现重复的序号了
转载地址:http://vhats.baihongyu.com/