V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
qi1070445109
V2EX  ›  Kafka

kafka 重启后, kafka- Python 消费者需要重启才能订阅 topic

  •  
  •   qi1070445109 · 2018-03-08 22:22:14 +08:00 via Android · 4748 次点击
    这是一个创建于 2212 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近项目用到 kafka-python,在试验把 kafka broker 停止后,生产者和消费者都没有出现异常。

    重启后生产者继续生产,但是消费者不能消费那个 topic 了。

    想知道有什么办法,让消费者继续消费,或者让它能抛出异常捕捉后重新订阅一下 topic ?

    2 条回复    2018-03-08 23:44:46 +08:00
    qi1070445109
        1
    qi1070445109  
    OP
       2018-03-08 23:43:31 +08:00
    补充一下我用的 kafka-python 1.4.1 代码如下:

    #consumer.py
    from kafka import KafkaConsumer

    # To consume latest messages and auto-commit offsets
    consumer = KafkaConsumer('my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092'])
    for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
    # producer.py

    from kafka import KafkaProducer
    from kafka.errors import KafkaError

    producer = KafkaProducer(bootstrap_servers=['broker1:1234'])


    for _ in range(100000):
    producer.send('my-topic', b'msg'
    qi1070445109
        2
    qi1070445109  
    OP
       2018-03-08 23:44:46 +08:00
    抱歉,没整好格式。

    #consumer.py
    from kafka import KafkaConsumer

    # To consume latest messages and auto-commit offsets
    consumer = KafkaConsumer('my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092'])
    for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
    # producer.py

    from kafka import KafkaProducer
    from kafka.errors import KafkaError

    producer = KafkaProducer(bootstrap_servers=['broker1:1234'])


    for _ in range(100000):
    producer.send('my-topic', b'msg'
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3008 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 11:05 · PVG 19:05 · LAX 04:05 · JFK 07:05
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.