without haste but without rest

파이썬으로 카프카 프로듀서 만들기 본문

Data Engineering & DataOps/Kafka

파이썬으로 카프카 프로듀서 만들기

JinungKim 2020. 7. 14. 21:40
참조 - https://analyticshut.com/kafka-producer-and-consumer-in-python/
 

Kafka Producers and Consumers in Python | Analyticshut

After writing consumers, producers and topics in Java, we will do the same in Python with just 10% lines of code.

analyticshut.com

 

0. 개요

데이터 파이프라인 구축 공부를 위해서 파이썬으로 카프카 프로듀서를 만들었다. 프로듀서를 만든 환경은 윈도우이며 파이참을 사용했다. 

 

카프카 브로커 클러스터는 aws ec2에 구축한 상태이다. 윈도우가 프롬프트 라인에서 ssh를 제공하지만 권한 문제가 있어서 컨슈머 환경에서 aws ec2 접속은 git bash를 사용했다.


 

1. 테스트 과정 요약

숫자, 문자열 두 가지 타입 데이터를 전송 해봤는데, 숫자의 경우 컨슈머가 데이터를 공백으로 받는 문제가 있다.  따라서 숫자 타입의 경우 우선은 스트링으로 변환해서 전송했다. 테스트 환경은 성공적으로 구축했다. 상당히 짧은 코드로 프로듀서의 역할을 할 수 있다는 게 놀랍다.

 

*주의할 점은 데이터를 쏴주고 잠깐 멈추지 않으면 컨슈머가 데이터를 전부 받지 못하고 유실되었다. 따라서 데이터를 쏘고 나서 1초 정도 재웠고, 테스트 환경에서는 데이터가 유실되지 않았다. 이 부분은 ack 같은 카프카 구조에 대해서 추가적인 학습이 필요할 것 같다. (아마도 ec2 환경이 프리티어인 것도 관련이 있을 것 같다.)

 


 

2. 프로듀서 코드

import time
import random
import datetime
from kafka import KafkaProducer

bootstrap_servers = ['localhost:9092'] # kafka broker ip
topicName = 'test'
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)

for i, _ in enumerate(range(5)):

    # test1 - send numeric type
    print(i)
    producer.send(topicName, str(i).encode())

    # test2 = send string type
    text = 'This is ' + str(i) + ' msg'
    print(text)

    tim = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    producer.send(topicName, text.encode())
    producer.send(topicName, tim.encode())

    time.sleep(1)

 

카프카는 브로커로 메세지를 전송할 때 데이터를 직렬화 해야하는 것 같다. 파이썬에서는 문자열이 할당된 변수의 경우 .encode() 메소드를 이용해서 변환할 수 있다. (파라미터는 아무것도 주지 않아도 된다.) 만약 수동으로 지정하고 싶다면 b'전송할 메세지' 와 같이 지정하면 된다. 

Comments