问答详情

python如何连接kafka?

277次观看
标签: python kafka
老师回答

1、kafka-python安装:

# PyPI安装
pip install kafka-python
 
# conda安装
conda install -c conda-forge kafka-python
 
# anaconda自带pip安装
/root/anaconda3/bin/pip install kafka-python

2、kafka-python生产者

producer.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import json
import time
import uuid
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')
topic = 'test_20181105'
def test():
    print('begin')
    try:
        n = 0
        while True:
            dic = {}
            dic['id'] = n
            n = n + 1
            dic['myuuid'] = str(uuid.uuid4().hex)
            dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S")
            producer.send(topic, json.dumps(dic).encode())
            print("send:" + json.dumps(dic))
            time.sleep(0.5)
    except KafkaError as e:
        print(e)
    finally:
        producer.close()
        print('done')
if __name__ == '__main__':
    test()

服务器集群中配置好Kafka, 修改上面程序中的ip地址和端口号, 执行python脚本就可以成功将消息发送到 topic: test_20181105

send:{"id": 1411, "myuuid": "a25a3d0361f94d3b8fffd5967ab5df01", "time": "20181105 16:11:14"}
send:{"id": 1412, "myuuid": "784efd5389564194941240dca66233b6", "time": "20181105 16:11:14"}
send:{"id": 1413, "myuuid": "6a211195319e447aa559614662f70590", "time": "20181105 16:11:15"}
send:{"id": 1414, "myuuid": "2cc45bd82baf4a1cb41ea4786e50a0df", "time": "20181105 16:11:15"}
send:{"id": 1415, "myuuid": "b7dfed4919c74164b83cf3ec28e257b6", "time": "20181105 16:11:16"}
send:{"id": 1416, "myuuid": "9218eceb17834c228f5ab01ca7595272", "time": "20181105 16:11:16"}
send:{"id": 1417, "myuuid": "c2751c54c390453f9eedd417fb1e5a31", "time": "20181105 16:11:17"}
send:{"id": 1418, "myuuid": "9bbc4ef2cfbb42148332eb979b1142cb", "time": "20181105 16:11:17"}
send:{"id": 1419, "myuuid": "f4998a862494445c976137793b55ed73", "time": "20181105 16:11:18"}

3、kafka-python消费者

consumer.py

#!/bin/env python
from kafka import KafkaConsumer
# connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('test_20181105',group_id = 'test_group2', bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')
try:
    for msg in consumer:
        print(msg)
        # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except KeyboardInterrupt as e:
    print(e)

同样修改上面的Ip地址和端口号,就可以接收 topic: test_20181105上的消息:

ConsumerRecord(topic='test_20181105', partition=1, offset=951, timestamp=1541405600340, timestamp_type=0, key=None, value=b'{"id": 1663, "myuuid": "0f744021b2d9468886908ee6685a0fdb", "time": "20181105 16:13:20"}', checksum=1357895145, serialized_key_size=-1, serialized_value_size=87)
ConsumerRecord(topic='test_20181105', partition=0, offset=935, timestamp=1541405600841, timestamp_type=0, key=None, value=b'{"id": 1664, "myuuid": "9379f68f656644bdb2d30911f06240e4", "time": "20181105 16:13:20"}', checksum=-715594646, serialized_key_size=-1, serialized_value_size=87)
ConsumerRecord(topic='test_20181105', partition=1, offset=952, timestamp=1541405601341, timestamp_type=0, key=None, value=b'{"id": 1665, "myuuid": "f4a5fa5b32cd4b7991612b626bea4b0e", "time": "20181105 16:13:21"}', checksum=-2068072013, serialized_key_size=-1, serialized_value_size=87)

可以通过设置不同的group_id 来实现消息队列或消息订阅:

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.

如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.

免费直播

    精选课程
    相关推荐
    python中sort()和sorted()使用有什么区别?
    付老师 Python编程

    python中有两种列表排序的方法,即sort() 和sorted() 。这两个方法看起来很像,但是有很大的差别。sort() 修改原列表,永久性排序,无返回值,内存消耗小,而sorted() 保持原列表不变,临时性排序,有返回值,内存消耗大。本文向大家详解这二者使用的区别。

    一、sort() 

    1、定义:python列表的一个内置的排序方法,只是列表的一个方法,只适用于列表;

    2、作用:作用于列表,直接修改原有列表,无返回值;

    3、排序时间:对列表进行永久性排序;

    4、内存消耗:无需保存原对象,节省内存空间。

    5、使用实例:

    list_name = [1, 3, 4, -0.2200222, -4.66]
    list_name.sort()
    print(list_name)

    输出

    [-4.66, -0.2200222, 1, 3, 4]
    原列表的值发生变化,原列表被修改

    二、sorted() 

    1、定义:python内置的一个排序函数,接受一切迭代器,返回一个有序的副本,并且类型总是列表;

    2、作用:作用于任意可迭代的对象,原有列表保持不变,会返回一个排序后的列表。

    3、排序时间:对列表进行临时排序。

    4、内存消耗:返回新对象,所以耗费较多资源。

    5、使用实例:

    list_name = [1, 3, 4, -0.2200222, -4.66]
    list_name_new = sorted(list_name)
    print(list_name)
    print(list_name_new)

    输出

    [1, 3, 4, -0.2200222, -4.66] 原列表
    [-4.66, -0.2200222, 1, 3, 4] 排序后的列表

    相比于sort(),sorted() 使用的范围更为广泛,但是sort()消耗内存比较小,效率也比较高。所以如果不需要保留原列表,sort更有效一点哦~

    注册电脑版

    版权所有 2003-2020 广州环球青藤科技发展有限公司