without haste but without rest
Kafka 개념과 특징 본문
0. Kafka
카프카는 이벤트 기반 분산 스트리밍 플랫폼이다. 비슷한 서비스로 GCP의 Pub/Sub, AWS의 Kinesis 등이 있다.
1. Unified
카프카를 사용하면 파편화된 데이터 파이프라인을 중앙집중화된 심플한 아키텍처로 개선할 수 있다.
(1). 복잡한 데이터 파이프라인 예시
위 구조는 각각의 프로세스가 파편화되어있는 상태로 데이터 처리 프로세스가 추가되는 경우 점점 더 구조가 복잡해질 수 밖에 없다.
(2). 중앙집중화된 데이터 파이프라인 예시
카프카를 사용하는 아키텍처는 카프카 브로커가 모든 프로세스의 매개자 역할을 하는데, 이 과정에서 아키텍처가 매우 단순해진다.
RabiitMQ, Apahce Pulsa 같은 메시징 큐는 데이터를 읽으면 해당 데이터를 사용한 것으로 간주하고 삭제한다. 반면 카프카는 데이터를 요청해서 사용하더라도 보존 정책에 의해 일정 시간 동안 보관한다. 즉 중앙집중화된 아키텍처이고, 데이터의 재요청이 가능하다. 따라서 수집한 데이터는 일단 카프카 클러스터에 전송한다. 그리고 데이터가 필요하면 카프카 클러스터에 요청만 하면된다. 이러한 카프카의 중앙집중화된 아키텍처는 기존의 1:1 매칭의 개발 스타일에서 발생하던 커플링 이슈를 해결하였다.
2. Topic
토픽은 카프카에서 데이터를 구분하기 위해 사용하는 단위로 카테고리라고 할 수 있다. 토픽은 1개 이상의 파티션을 가진다. 파티션에서는 프로듀서가 보낸 데이터들이 큐 형태로 들어가 저장되는데 이를 레코드라고 부른다.
3. Partition
카프카 브로커는 프로듀서로부터 전달 받은 토픽이 지정된 데이터를 큐에 저장한다. 이때 다중 큐를 지원하여 데이터를 저장하는 큐의 개수를 늘릴 수 있다. 이를 파티션이라고 부르며 이것이 카프카의 병렬 처리 메커니즘이다. (단 파티션의 개수가 2개 이상일 때 카프카는 offset의 순서를 보장하지 않는다.)
카프카에는 commit_offset이라는 토픽이 존재하는데, 해당 토픽은 데이터를 요청한 컨슈머와 그룹별로 특정 토픽의 몇 번째 offset까지 데이터를 읽었는지 저장한다. 이를 통해 특정 토픽을 여러 컨슈머와 그룹에서 요청하더라도 중복없이 데이터를 전달할 수 있다.
4. Producer & Consumer
카프카는 발행과 구독이라는 두 가지 행위가 존재한다. Producer는 데이터를 브로커에 전송한다. 그리고 Consumer는 카프카 브로커에 데이터를 요청하여 전달 받는다. 이때 원하는 데이터를 구분하기 위해 Topic이라는 개념이 존재한다.
유튜브 채널을 구독한다고 생각해보자. 크리에이터(Producer)는 유튜브(Broker)에 개설한 자신의 채널(Topic)컨텐츠를 업로드(Produce)한다. 그리고 구독자(Consumer)는 자신이 원하는 채널(Topic)을 구독한다(Consume). 이러한 과정에서 프로듀서(유튜버)는 컨슈머(구독자)에게 컨텐츠(Data)를 전달하는 문제를 고려할 필요가 없다. 유튜브(브로커)에 업로드만 하면 채널의 컨텐츠를 소비하고자 하는 구독자가 플랫폼(카프카 브로커)에 요청하여 사용한다.
4-1 Producer 옵션
- 필수
- bootstrap.servers: 프로듀서가 데이터를 전송할 카프카 브로커의 주소로, 클러스터에 속한 2대 이상의 브로커 주소를 입력할 수도 있다.
- key.serializer: 레코드의 메시지 키를 직렬화하는 클래스
- value.serializer: 레코드의 메시지 밸류를 직렬화하는 클래스
- 선택
- acks: 프로듀서가 전송한 데이터가 브로커에 정상적으로 전송되었는지에 대한 옵션
- 0: 확인 안 함
- 1: 리더 파티션에 저장된 것만 확인
- 2: 팔로워 파티션까지 모두 저장되는 것을 확인
- buffer.memory: 브로커에 전송할 데이터를 배치로 모으기 위한 버퍼 메모리 사이즈로 기본 값은 33554432(32MB)
- retries: 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수 기본값은 2147483647
- batch.size: 배치로 전송할 레코드의 최대 용량 기본 값은 16384
- linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간. 기본값은 0
- partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다.
- enable.idempotance: 멱등성 프로듀서로 동작할지 여부를 설정한다. 기본값은 false
- transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다.
- acks: 프로듀서가 전송한 데이터가 브로커에 정상적으로 전송되었는지에 대한 옵션
4-2 Consumer 옵션
- 필수
- bootstrap.servers: 컨슈머가 데이터를 요청할 카프카 브로커의 주소 리스트
- key.deserializer: 레코드의 메시지 키를 역직렬화하는 클래스
- value.deserializer: 레코드의 메시지 밸류를 역직렬화하는 클래스
- 옵션
- group.id: 컨슈머 그룹의 id를 지정한다.
- auto.offset.reset: 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지에 대한 옵션
- latest: 가장 마지막부터
- earliest: 가장 오래된 오프셋부터
- none: 컨슈머 그룹이 커밋한 기록이 있는지 찾아보고 없으면 오류를 반환한다. (존재하면 거기서부터 읽는다.)
- enable.auto.commit: 자동으로 커밋할지 선택한다. 기본값은 true
- auto.commit.interval.ms: 오토 커밋일 경우 오프셋 커밋 간격을 지정하고 기본 값은 5000(5초)
- max.poll.records: poll() 메서드를 통해 반환되는 레코드 개수를 지정한다. 기본값은 500
- session.timeout.ms: 컨슈머가 브로커와 연결이 끊기는 최대 시간으로 이 시간 내에 하트 비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 판단한다. 보통 하트비트 간격의 3배로 설정하고 기본값은 10000(10초)
- hearbeat.interval.ms: 하트비트를 전송하는 간격. 기본값은 3000(3초)
- max.poll.interval: poll() 메서드를 호출하는 간격의 최대 시간을 지정한다.
- isolation.level: 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용하는 옵션
- read_committed: 커밋이 완료된 메시지만 읽는다.
- read_uncommitted: 커밋 여부와 상관없이 파티션에 있는 모든 레코드 컨슘
5. Controller
클러스터에 참여하고 있는 브로커 중 한 대는 컨트롤러 역할을 담당한다. 다른 브로커들의 상태를 체크하고 문제가 생기는 경우 해당 브로커에 존재하는 리더 파티션을 정상 작동 중인 브로커에 재분해한다. 컨트롤러 역할을 하는 브로커에 장애가 생기면 다른 브로커에 컨트롤러 역할이 넘어간다.
6. 데이터 삭제
카프카에서 데이터의 삭제는 로그 세그먼트 단위로 이루어진다. 디비가 아니므로 특정 데이터를 선별해서 삭제할 수는 없다. 세그먼트는 데이터가 쌓이는 동안 열려 있다. 세그먼트가 닫히는 기본 옵션 값은 1GB다.
- log.segment.bytes, log.segment.ms: 세그먼트 파일 사이즈 조정 옵션
- log.retention.check.interval.ms: 닫힌 세그먼트 파일 체크 옵션
7. 컨슈머 오프셋 저장
컨슈머 그룹은 토픽의 특정 파티션으로부터 데이터를 가져가서 처리하고 어느 레코드까지 가져갔는지 확인하기 위해 오프셋을 커밋한다. 해당 내용은 __consumer_offsets 토픽에 저장한다.
8. 코디네이터
클러스터에 참여하고 있는 브로커 중 한 대가 코디네이터 역할을 담당한다. 코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭 시키는 역할을 담당한다. 작동 중인 컨슈머가 종료되면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당하여 끊임없이 데이터가 처리될 수 있도록 하는데, 이렇게 파티션을 컨슈머로 재할당하는 과정을 리밸런스라고 부른다. (단 리밸런스 시 파티션의 소유권을 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수가 없어서 서비스가 정체될 수 있다.)
9. 토픽 정리 정책(Cleanup.policy)
토픽의 데이터는 시간, 용량에 따라 삭제 규칙을 적용할 수 있다.
토픽 삭제 정책
- delete.policy
- retention.ms: 토픽의 데이터를 유지하는 기간을 밀리초 단위로 지정한다. 카프카는 일정 주기마다 세그먼트 파일의 마지막 수정 시간과 retention.ms를 비교하여 파일의 마지막 수정 시간이 옵션을 넝머가면 세그먼트를 삭제한다.
- retention.bytes: 설정 사이즈를 넘어간 세그먼트 파일을 삭제한다. )복구 불가)
토픽 압축 정책
- compact.policy: 압축과는 다른 개념으로 메시지 키별로 해당 메시지 키의 레코드 중 오래된 데이터를 삭제하는 정책
10. ISR(In-Sync-Replicas)
ISR은 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 뜻한다.
- replica.lag.time.max: 지정한 시간 안에 팔로워 파티션이 리더 파티션의 오프셋을 복제하지 않으면 isr에서 제외시킨다.
- unclean.leader.election.enable: True로 설정하는 경우 리더 파티션과 동기화되지 않은 팔로워 파티션도 리밸런스 과정에서도 팔로워가 될 수 있다.
11. 멱등성 프로듀서
멱등성 프로듀서는 동일한 데이터를 여러번 전송하더라도 카프카 클러스터에 단 한 번만 저장됨을 의미한다. 기본 프로듀서의 동작 방식은 at-least-once를 지원한다. 따라서 중복이 발생할 수 있다. 카프카 0.11.0 버전 이후로 프로듀서에 enable.idempotance 옵션을 사용하여 exactly-once를 지원한다. 멱등성 프로듀서는 기본 프로듀서와 달리 데이터를 브로커로 전달할 때 PID(Producer unique ID)와 시퀀스 넘버를 함께 전달한다. 브로커는 프로듀서의 PID, 시퀀스 넘버를 확인하여 동일한 메시지의 적재 요청이 오더라도 단 한번만 데이터를 적재하여 중복 전송을 방지한다.
*단 멱등성 프로듀서는 동일한 세션에서만 정확히 한 번을 보장한다. 동일한 세션이란 PID의 생명주기로, 만약 멱등성 프로듀서로 동작하는 프로듀서 애플리케이션에 이슈가 발생하여 종료되고 애플리케이션을 재시작하면 PID가 달라진다. 따라서 멱등성 프로듀서는 장애가 발생하지 않는 경우에만 exactly-once를 보장한다.
12. 트랜잭션 프로듀서
다수의 토픽과 파티션에 데이터를 저장할 경우 모든 데이터에 대해 동일한 원자성을 만족시키기 위해 사용한다. 트랜잭션 프로듀서는 추가적으로 트랜잭션의 시작과 끝을 표현하기 위해 트랜잭션 레코드를 하나 더 보낸다. 트랜잭션 컨슈머는 파티션에 저장된 트랜잭션 레코드를 보고 트랜잭션이 완료됨을 확인하면 데이터를 가져간다.
- enable.idempotance
- transactional.id
- isolation.level
옵션을 설정하면 프로듀서와 컨슈머는 트랜잭션으로 처리 완료된 데이터만 쓰고 읽는다.
13. 카프카 컨슈머 멀티 워커 스레드 전략
프로세스 하나에 컨슈머 스레드를 다중으로 할당하는 방법
14. 컨슈머 랙
컨슈머 랙(LAG)은 토픽의 최신 오프셋과 컨슈머 오프셋 간의 차이다. 즉 병목현상을 확인하는 지표로 컨슈머 랙을 모니터링함으로써 컨슈머가 정상 작동하는지 확인할 수 있다. 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야 하는 지표다.
모니터링을 위한 3가지 방법
- 카프카 쉘 명령어
- 컨슈머 애플리케이션에서 metrics() 메서드 사용
- 외부 모니터링 툴
- 버로우(Burrow): 오픈소스 카프카 랙 체크 툴로 슬라이딩 윈도우 기반이다. 단 별도의 저장소가 없다.
'Data Engineering & DataOps > Kafka' 카테고리의 다른 글
카프카 메시지 Key의 역할 (0) | 2022.01.13 |
---|---|
카프카 클러스터 도커 컴포즈 템플릿 (0) | 2022.01.10 |
M1 confluent-kafka 설치 에러 - fatal error: 'librdkafka/rdkafka.h' file not found (0) | 2021.09.17 |
카프카 성능 테스트 (0) | 2021.06.07 |
로컬 환경에서 카프카 클러스터 구축 (0) | 2021.06.07 |