BlockingQueue队列生产者消费者示例

Stella981
• 阅读 642
package org.web.controller.queue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 生产者线程
 */
public class Producer implements Runnable {

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        String data = null;
        Random r = new Random();

        System.out.println("启动生产者线程!");
        try {
            while (isRunning) {
                System.out.println("正在生产数据...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

                data = "data:" + count.incrementAndGet();
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("放入数据失败:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出生产者线程!");
        }
    }

    public void stop() {
        isRunning = false;
    }

    private volatile boolean isRunning = true;
    private BlockingQueue queue;
    private static AtomicInteger count = new AtomicInteger();
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

}


package org.web.controller.queue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 消费者线程
 * 
 */
public class Consumer implements Runnable {

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        System.out.println("启动消费者线程!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println("拿到数据:" + data);
                    System.out.println("正在消费数据:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出消费者线程!");
        }
    }

    private BlockingQueue<String> queue;
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}



package org.web.controller.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);

        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();

        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java 帮助类
package com.quincy.util;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.
Stella981 Stella981
3年前
POI导入大excel文件
package me.shanzhi.test;import java.io.InputStream;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.apac
Wesley13 Wesley13
3年前
Java公钥私钥签名工具包
package com.locator.encryption;import java.io.ByteArrayOutputStream;import java.security.Key;import java.security.KeyFactory;import java.security.KeyPa
Wesley13 Wesley13
3年前
Excel导出工具类.
 Excel导出工具类.POIimport java.io.OutputStream;import java.lang.reflect.Field;import java.lang.reflect.Method;import java.util.ArrayList;import java.uti
Stella981 Stella981
3年前
None of the configured nodes are available
1.今天写了一段ES的测试代码,如下:package elasticSearch;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.List;import java.util.M
Stella981 Stella981
3年前
BlockingQueue队列的使用
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueTest { public static void main(String args) 
Stella981 Stella981
3年前
Base64工具类
package com.locator.encryption;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.File;import java.io.FileInputStr
Stella981 Stella981
3年前
Gallery实现流畅的新闻滚动 方法复写
package com.ename.views;import android.content.Context;import android.util.AttributeSet;import android.view.KeyEvent;import android.view.MotionEvent;
Stella981 Stella981
3年前
Android GridView 添加 HeadView
package com.example.test;import android.content.Context;import android.util.AttributeSet;import android.util.Log;import android.view.MotionEvent;i
Stella981 Stella981
3年前
Android从相机或相册获取图片裁剪
package com.only.android.app; import java.io.File; import android.app.Activity;import android.app.AlertDialog;import android.content.DialogInterfa