Kafka滥用案例

2020-05-26 00:00:00 专区 订阅 是在 离线 就不
近再做一个master-worker式的任务调度程序,这个版是同事做的,我负责重构。代码结构在这篇文章里

之前使用kafka来做队列,但是当我重构时发现,worker处理一个message的时间有时候需要几分钟,处理完茶都凉了,consumer和partition的连接都断了,没法commit了。

试了很多方法都不行,后改成了下边这个丑样子:

    def run(self):
        consumer = KafkaConsumer(
            bootstrap_servers=conf.KAFKA_CONF['host'],
            group_id=self.group_id,
            enable_auto_commit=False,
            auto_offset_reset='earliest',
            api_version=(0, 9)
        )   
    
        consumer.subscribe([self.topic])
    
        while not self.exit.is_set():
            poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)
            for partition in poll_msgs:
                offset = consumer.committed(partition)
                try:
                    consumer.commit({partition: OffsetAndMetadata(offset+1, None)})
                except Exception, e:
                    Log.error(e)
                    continue

                msgs = poll_msgs[partition]
                for msg in msgs:
                    self.process(msg)
    
        consumer.close()
        Log.info('%s shutdown' % self.name)

相关文章