without haste but without rest

카프카 클러스터 도커 컴포즈 템플릿 본문

Data Engineering & DataOps/Kafka

카프카 클러스터 도커 컴포즈 템플릿

JinungKim 2022. 1. 10. 15:48

요약

브로커 3대와 주키퍼 1대로 구성된 카프카 클러스터 도커 컴포즈 템플릿

컨테이너의 비정상적인 종료에 따른 데이터 유실을 막기 위해 오프셋 데이터와 로그 디렉토리를 호스트 볼륨에 마운팅 했다.

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - ./zookeeper/bitnami/zookeeper:/bitnami/zookeeper
  kafka-0:
    image: docker.io/bitnami/kafka:2.8.1
    hostname: kafka-0
    restart: on-failure
    ports:
      - "9093:9093"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_BROKER_ID=0
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
    volumes:
      - ./kafka_0/bitnami/kafka:/bitnami/kafka
      - ./kafka_0/logs:/opt/bitnami/kafka/logs
    depends_on:
      - zookeeper
  kafka-1:
    image: docker.io/bitnami/kafka:2.8.1
    hostname: kafka-1
    restart: on-failure
    ports:
      - "9094:9094"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
    volumes:
      - ./kafka_1/bitnami/kafka:/bitnami/kafka
      - ./kafka_1/logs:/opt/bitnami/kafka/logs
    depends_on:
      - zookeeper
  kafka-2:
    image: docker.io/bitnami/kafka:2.8.1
    hostname: kafka-2
    restart: on-failure
    ports:
      - "9095:9095"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_BROKER_ID=2
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9095
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://localhost:9095
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
    volumes:
      - ./kafka_2/bitnami/kafka:/bitnami/kafka
      - ./kafka_2/logs:/opt/bitnami/kafka/logs
    depends_on:
      - zookeeper

옵션 설명

enviroments:

KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 # 주키퍼 어드레스

KAFKA_CFG_BROKER_ID=0 # 브로커 ID

ALLOW_PLAINTEXT_LISTENER=yes # 암호화 되지 않은 리스너의 접근 권한 여부

KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT # 리스너의 보안 프로토콜 본 컴포즈 파일에서는 내부, 외부 전부 평문 사용

KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094 # 브로커가 서버 소켓 생성에 사용할 포트

KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://localhost:9094  # 클라이언트가 브로커에 연결하는데 사용할 포트

KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT # 브로커의 네임?

KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 # 토픽의 오프셋 복제 개수

 

volumes

./kafka_n/bitnami/kafka:/bitnami/kafka # 카프카 오프셋 디렉토리 마운트 

./kafka_n/logs:/opt/bitnami/kafka/logs # 카프카 로그 디렉토리 마운트


 

 

GitHub - bitnami/bitnami-docker-kafka: Bitnami Docker Image for Kafka

Bitnami Docker Image for Kafka . Contribute to bitnami/bitnami-docker-kafka development by creating an account on GitHub.

github.com

 

이미지는 bitnami에서 제공하는 카프카 이미지를 사용했으며 디테일한 옵션은 bitnami Kafka 리포지토리에서 참조할 수 있다.

 

 


테스트

 

Producer

from kafka import KafkaProducer

# Kafka Broker IP
broker = ["localhost:9093", "localhost:9094", "localhost:9095"]

# Topic name
topic = "jinyes"

# Generate producer object
producer = KafkaProducer(bootstrap_servers=broker)


# Send test message
for i in range(100):
    msg = "Hello Kafka {}".format(i)
    producer.send(topic, msg.encode("utf-8"))
    producer.flush()
    print(msg)

 

 

Consumer

from kafka import KafkaConsumer

# Kafka Broker IP
broker = ["localhost:9093", "localhost:9094", "localhost:9095"]

# Topic name
topic = "jinyes"

# Generate producer object
consumer = KafkaConsumer(topic,
                         group_id="group_1",
                         bootstrap_servers=broker,
                         auto_offset_reset="latest")

# Read message from consumer object
try:
    for msg in consumer:
        print("Topic: {}\nPartition: {}\nOffset: {}\nKey: {}\nValue: {}\n".format(
            msg.topic, msg.partition, msg.offset, msg.key, msg.value.decode("utf-8")))

except KeyboardInterrupt:
    exit(0)
Comments