V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
shayang888
V2EX  ›  Java

消费者怎么确保能取到数据,又能正确退出线程

  •  2
     
  •   shayang888 · 2019-03-12 10:58:49 +08:00 · 3167 次点击
    这是一个创建于 1844 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在是这样的

    1.首先我最外面有个 quartz 定时器,每隔 N 秒执行一次

    2.定时器里执行的内容是这样,里面有个线程池,线程池大小是 2 个线程,coresize,maxsize 都是 2

    3.线程池里的 2 个线程,分别一个去执行生产者方法,一个去执行消费者方法

    4.生产者和消费者中间用消息队列来临时存数据

    现在有个问题,就是消费者这边,怎么能保证取到数据,又能正确的退出线程,进行到下一次定时器的执行

    在这之前我做的蠢办法是消费者那边加了个 while(true),结果定时器执行了 2 次后,线程池就满了,然后拒绝

    所以不知道有啥好办法

    代码:

    //线程池的
    public class TestTask extends QuartzJobBean {
    
        private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                2,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(4));
    
        @Override
        protected void executeInternal(JobExecutionContext jobExecutionContext) {
            threadPoolExecutor.execute(() -> {
                //生产者代码
                
                //数据放入消息队列
            });
            threadPoolExecutor.execute(() -> {
                while (true) {
                    //取队列里的数据
                    //消费者代码
                }
            });
        }
    }
    
    第 1 条附言  ·  2019-03-12 12:23:48 +08:00
    可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
    jinksw
        1
    jinksw  
       2019-03-12 11:09:43 +08:00
    看样子你定时器的意思是 每两秒生产一次 然后消费一次

    为啥要 2 个线程 你每次运行一个线程 先生产再消费不行吗
    sarlanori
        2
    sarlanori  
       2019-03-12 11:15:02 +08:00
    在 C#里,我一般是用信号量来等待和通知。
    shayang888
        3
    shayang888  
    OP
       2019-03-12 11:24:59 +08:00
    @jinksw 那样不就是同步了吗?我是想生产和消费分隔开来
    shayang888
        4
    shayang888  
    OP
       2019-03-12 11:26:27 +08:00
    @jinksw 另外定时器并不是 2 秒执行一次啊 定时器的执行时间随便设置的
    passerbytiny
        5
    passerbytiny  
       2019-03-12 12:05:56 +08:00
    消费者在线程之上,而不是之内,拿到数据后再从线程池里开线程去执行后续处理。消费者不能用多线程+死循环来取数据,而应该是单线程异步监听+同步取值,再具体我也不知道了,因为基本都是直接调用 API。
    Counter
        6
    Counter  
       2019-03-12 12:11:53 +08:00
    机制是不是不太对,改成这样如何呢?
    生产后的数据加锁,生产者方法和消费者方法排队存取
    shayang888
        7
    shayang888  
    OP
       2019-03-12 12:14:42 +08:00
    @passerbytiny 拿到数据后再从线程池里开线程去执行后续处理吗 好像有点思路
    shayang888
        8
    shayang888  
    OP
       2019-03-12 12:16:46 +08:00
    @passerbytiny 可是消费者怎么知道它啥时候能拿到数据
    shayang888
        9
    shayang888  
    OP
       2019-03-12 12:17:18 +08:00
    @Counter 没有懂你的意思
    limhiaoing
        10
    limhiaoing  
       2019-03-12 12:21:34 +08:00 via iPhone
    生产者、消费者线程一般用条件变量 Condition Variable 来通信。
    shayang888
        11
    shayang888  
    OP
       2019-03-12 12:23:59 +08:00
    @passerbytiny 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
    shayang888
        12
    shayang888  
    OP
       2019-03-12 12:24:05 +08:00
    @Counter 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
    passerbytiny
        13
    passerbytiny  
       2019-03-12 13:02:02 +08:00   ❤️ 1
    @shayang888 #12 消费者也不要放在定时器里,它应该是一个常驻的、独立的单线程。我不知道你的消息队列是什么队列,但一般的消息队列都是提供消费者 API 的,可以直接使用,自己做消费者太难了。

    如果用线程去看消费者 /监听,那么是类似 while(true) {if(收到数据) {……} else { Thread.sleep(0.0001);} },这要是用线程去做,要么系统受不了,要么延迟时间受不了。消费者 /监听的轮询,回采用操作系统层次的东西,高级程序员都没必要知道的太深入,自己设计是肯定设计不来的。

    你可以参考下 java.net.ServerSocket#accept()。
    autogen
        14
    autogen  
       2019-03-12 13:19:51 +08:00
    生产者发送的 msg 封装一下,加个 ctrl 字段,消费者接收到 msg.ctrl=exit 就退出
    NieKing
        15
    NieKing  
       2019-03-12 13:29:38 +08:00
    我想起了 Android 里的 RxJava
    linjiayu
        16
    linjiayu  
       2019-03-12 13:33:10 +08:00
    实现 callable
    linjiayu
        17
    linjiayu  
       2019-03-12 13:35:12 +08:00   ❤️ 1
    public void offer(Event event)
    {
    synchronized (eventQueue)
    {
    while (eventQueue.size() >= max)
    {
    try
    {
    console(" the queue is full.");
    eventQueue.wait();
    } catch (InterruptedException e)
    {
    e.printStackTrace();
    }
    }

    console(" the new event is submitted");
    eventQueue.addLast(event);
    eventQueue.notifyAll();
    }
    }

    public Event take()
    {
    synchronized (eventQueue)
    {
    while (eventQueue.isEmpty())
    {
    try
    {
    console(" the queue is empty.");
    eventQueue.wait();
    } catch (InterruptedException e)
    {
    e.printStackTrace();
    }
    }


    Event event = eventQueue.removeFirst();
    this.eventQueue.notifyAll();
    console(" the event " + event + " is handled.");
    return event;
    }
    }
    shayang888
        18
    shayang888  
    OP
       2019-03-12 13:46:28 +08:00
    @autogen 我现在是多个生产者同时生产数据然后往队列里 push,然后只有一个消费者在从队列里消费 加字段的话 我给哪个生产者加这个字段呢
    jingxyy
        19
    jingxyy  
       2019-03-12 13:48:43 +08:00
    先不管怎么实现合理的问题
    你是不是消费线程消费完了没退出啊?这样每一个 interval 之后你就有一个 while(true)的消费线程在跑,于是第 2 个周期后无法创建新的消费线程
    ratel
        20
    ratel  
       2019-03-12 13:50:50 +08:00
    使用消息中间件啊,消费者单独订阅消息消费,生产者用定时器就行了。
    micean
        21
    micean  
       2019-03-12 13:53:18 +08:00
    quartz 为什么要玩 while(true)
    shayang888
        22
    shayang888  
    OP
       2019-03-12 13:59:28 +08:00
    @jingxyy 对呀 我就是不知道怎么合适的退出
    shayang888
        23
    shayang888  
    OP
       2019-03-12 14:00:00 +08:00
    @ratel 我现在就是没有用到中间件 想问问如果自己来弄的话咋做的好
    woscaizi
        24
    woscaizi  
       2019-03-12 14:05:33 +08:00 via iPhone
    简单的数据结构入栈出栈吧。
    ratel
        25
    ratel  
       2019-03-12 14:09:38 +08:00
    @shayang888 不用中间件,那也是用一样的设计模式,消费者和生产者是分开的,只依赖消息。
    shayang888
        26
    shayang888  
    OP
       2019-03-12 14:19:23 +08:00
    @ratel 我不明白 为什么消费者要独立开来 我的消费只需要在这里进行消费呀 其它的地方都用不到它
    pusidun
        27
    pusidun  
       2019-03-12 14:21:59 +08:00
    可以生产者定时生产消息,放入消息队列;消费者可以用线程池常驻,每个消费者线程轮询消息队列是否空,不为空处理,为空阻塞一段时间,就不用退出了。不过消息队列要保证是线程安全的
    shayang888
        28
    shayang888  
    OP
       2019-03-12 14:27:24 +08:00
    @pusidun 你的意思和楼上他们说的是一个样吧 就是应该把消费者独立出来是吗?不应该放在定时器里
    codingKingKong
        29
    codingKingKong  
       2019-03-12 14:28:47 +08:00
    大概是这样么?
    ```java
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class TT {

    public static void main(String[] args) throws Exception{
    threadPoolExecutor.execute(() -> {
    try {
    String a = "";
    while (a != null){
    a = blockingDeque.poll(10, TimeUnit.SECONDS);
    System.out.println(a);
    }
    System.out.println("consumer exit.");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    Thread.sleep(2000);
    executeInternal();
    Thread.sleep(2000);
    executeInternal();
    }

    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    2,
    2,
    60,
    TimeUnit.SECONDS,
    new LinkedBlockingDeque<>(4));


    private static BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>();

    private static void executeInternal() {
    threadPoolExecutor.execute(() -> {
    try {
    blockingDeque.put("123");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }

    }
    ```
    Jrue0011
        30
    Jrue0011  
       2019-03-12 17:37:59 +08:00
    额,想问下定时器里为什么还要再通过线程池调度线程来执行任务...?
    ratel
        31
    ratel  
       2019-03-13 09:51:49 +08:00
    @shayang888 你去看下生产者消费者设计模式
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2821 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 14:38 · PVG 22:38 · LAX 07:38 · JFK 10:38
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.