without haste but without rest
카프카 중복 메시지 핸들링 with python 본문
※ 쉬는 텀 없이 실시간으로 계속 들어오는 경우 사용하기 어려운 코드
dp 알고리즘의 메모이제이션을 응용했다.
현재 수집하는 데이터는 큐 구조로 1시간마다 갱신이 되는데, 새로운 데이터만 주는 것이 아니라 기존 데이터에 갱신된 데이터를 추가해서 보내준다. 따라서 while 문이 돌기 전 빈 딕셔너리를 선언하고 해당 자료구조를 이용해서 중복체크를 한다.
if __name__ == "__main__":
memoization_dict = {}
while True:
# 중복 검사 및 추출
res_list = []
for raw in raw_list: # raw_list는 갱신 받은 데이터다.
key = raw["serial"]
try:
if memoization[key]:
continue
except:
res_list.append(raw)
# 데이터 갱신 주기 고려하여 메모이제이션 초기화
if len(memoization) >= 100000:
memoization = {}
# 메모이제이션 업데이트
for raw in res_list:
key = raw["serial"]
memoization[key] = True
1. 시작 시 빈 메모할 빈 딕셔너리 선언
2. 갱신 받은 데이터의 유니크 컬럼을 메모이제이션 딕셔너리에 존재하는지 확인
3. 존재하면 중복이므로 컨티뉴 / 키 에러가 나면 결과 리스트에 추가
4. 메모이제이션 딕셔너리 초기화 후 결과 리스트를 이용해서 메모이제이션 딕셔너리 업데이트
'Data Engineering & DataOps' 카테고리의 다른 글
데이터 파이프라인에서 중복 데이터 핸들링 방법 (0) | 2022.02.03 |
---|---|
워크플로우 관리와 멱등성 (0) | 2022.02.03 |
Flume 개념과 agent.conf 파일 작성 예시 (0) | 2021.12.10 |
데이터 레이크 아키텍처 (0) | 2021.07.25 |
Comments