without haste but without rest

카프카 중복 메시지 핸들링 with python 본문

Data Engineering & DataOps

카프카 중복 메시지 핸들링 with python

JinungKim 2021. 4. 29. 13:19

※ 쉬는 텀 없이 실시간으로 계속 들어오는 경우 사용하기 어려운 코드

 

dp 알고리즘의 메모이제이션을 응용했다.

 

현재 수집하는 데이터는 큐 구조로 1시간마다 갱신이 되는데, 새로운 데이터만 주는 것이 아니라 기존 데이터에 갱신된 데이터를 추가해서 보내준다.  따라서 while 문이 돌기 전 빈 딕셔너리를 선언하고 해당 자료구조를 이용해서 중복체크를 한다.

 

if __name__ == "__main__":
  memoization_dict = {}

  while True:

      # 중복 검사 및 추출
      res_list = []
      for raw in raw_list: # raw_list는 갱신 받은 데이터다.
          key = raw["serial"]

          try:
              if memoization[key]:
                  continue
          except:
              res_list.append(raw)

      # 데이터 갱신 주기 고려하여 메모이제이션 초기화
      if len(memoization) >= 100000:
      	memoization = {}
        
      # 메모이제이션 업데이트
      for raw in res_list:
          key = raw["serial"]
          memoization[key] = True
    

1. 시작 시 빈 메모할 빈 딕셔너리 선언

2. 갱신 받은 데이터의 유니크 컬럼을 메모이제이션 딕셔너리에 존재하는지 확인

3. 존재하면 중복이므로 컨티뉴 / 키 에러가 나면 결과 리스트에 추가

4. 메모이제이션 딕셔너리 초기화 후 결과 리스트를 이용해서 메모이제이션 딕셔너리 업데이트

 

 

Comments