Kafka滥用案例
近再做一个master-worker式的任务调度程序,这个版是同事做的,我负责重构。代码结构在这篇文章里
之前使用kafka来做队列,但是当我重构时发现,worker处理一个message的时间有时候需要几分钟,处理完茶都凉了,consumer和partition的连接都断了,没法commit了。
试了很多方法都不行,后改成了下边这个丑样子:
def run(self):
consumer = KafkaConsumer(
bootstrap_servers=conf.KAFKA_CONF['host'],
group_id=self.group_id,
enable_auto_commit=False,
auto_offset_reset='earliest',
api_version=(0, 9)
)
consumer.subscribe([self.topic])
while not self.exit.is_set():
poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)
for partition in poll_msgs:
offset = consumer.committed(partition)
try:
consumer.commit({partition: OffsetAndMetadata(offset+1, None)})
except Exception, e:
Log.error(e)
continue
msgs = poll_msgs[partition]
for msg in msgs:
self.process(msg)
consumer.close()
Log.info('%s shutdown' % self.name)
相关文章