이론 정리

Kafka 아는척하기 강의 정리

철매존 2023. 11. 8. 23:22
728x90

-> https://www.youtube.com/watch?v=xqrIDHbGjOY&t=4s
카프카를 공부해보기 위해 찾아보다가 정말 좋은 강의가 있어 이론에 대한 기초를 알기 위해 보고 정리해 보았다.

데이터 스트리밍 및 이벤트 기반 데이터 파이프라인을 위한 메시지 큐 시스템.

기본 모양

image

  • 카프카 클러스터
    • 메세지를 저장하는 저장소
    • 하나의 카프카 클러스터는 여러개의 브로커로 구성된다.
  • 각 브로커는 메세지를 나눠서 저장하고(이중화 처리 / 장애시 대체 등등도 여기서 한다) 한다.
  • 주키퍼는 클러스터를 관리하는데에 쓰인다.
  • 프로듀서
    • 카프카 클러스터에 메세지를 보낸다.
  • 컨슈머
    • 메세지를 카프카에서 읽어오는 역할을 한다.

토픽과 파티션

  • 토픽
    • 메시지를 구분하는 단위
    • 파일시스템의 폴더와 유사
    • 프로듀서가 메시지를 카프카에 저장할 때 어떤 토픽에 저장할지
    • 컨슈머가 메시지를 읽어올 때 어떤 토픽에서 가져올지
  • 파티션
    • 메시지를 저장하는 물리적인 파일
    • append-only -> 추가만 가능하다
      • 메시지 저장 위치를 offset이라고 한다.
      • 메시지는 맨 뒤에 추가되고, FIFO로 저장/읽기

여러 파티션과 컨슈머

  • 컨슈머는 컨슈머그룹에 속한다.
  • 한 개의 파티션은 컨슈머그룹의 한개의 컨슈머만 연결 가능
    • 다른 컨슈머그룹의 컨슈머와는 연결 가능
    • 이를 통해 하나의 컨슈머그룹 입장에서 파티션의 메세지는 순서대로 처리된다.

성능

카프카는 성능이 좋은데, 그 이유는

  • 파티션파일은 OS페이지캐시 사용
    • 파일 IO가 메모리영역에서 처리된다.
  • zero copy
    • 디스크에서 네트워크 버퍼로 직접 데이터 복사
    • 데이터를 메모리의 여러 영역에 복사하여 사용하지 않고 데이터를 이동함으로서 전송의 오버헤드를 줄인다.
  • 컨슈머 추적을 위해 브로커가 하는 일이 비교적 단순하다.
    • 메시지 필터, 재전송같은 일을 브로커가 아니라 프로듀서, 컨슈머가 직접 해야 한다.
  • 묶어서 보내고, 묶어서 받기(batch)
    • 프로듀서 : 일정 크기만큼 메시지를 모아서 전송 가능
    • 컨슈마 : 최소 크기만큼 메시지를 모아서 조회 가능
  • 처리량 증대가 쉽다.
    • 1개가 장비의 용량 한계 -> 브로커, 파티션 추가
    • 컨슈머가 느리면 -> 컨슈머 추가 (+파티션 추가)

리플리카 - 복제

장애가 났을 때의 대비

  • 리플리카 : 파티션의 복제본
    • 복제수만큼 파티션의 복제본이 각 브로커에 생긴다.
  • 리더와 팔로워로 구성
    • 프로듀서와 컨슈머는 리더를 통해서만 메시지 처리
    • 팔로워는 리더로부터 복제
  • 장애 대응
    • 리더가 속한 브로커 장애시 다른 팔로워가 리더가 된다.

프로듀셔

기본 흐름은

  1. send메세지를 통해 전송하면
  2. 해당 메세지를 Serializer를 통해 byte배열로 변환
  3. Partitioner를 이용해서 그 메시지를 어느 토픽의 파티션으로 보낼지 결정한다.
  4. 변환된 메시지(바이트)를 버퍼에 저장한다. 이 때, 배치로 묶어서 저장한다(한마디로 메시지를 모아서 저장하는거임)
  5. Sender가 그 배치를 카프카 브로커로 전달
  6. Sender는 별도 쓰레드로 동작하고, 배치가 찼는지 여부에 관계없이 차례대로 브로커에 보냄
  7. 그리고 그동안 받아진 메서드는 저장되고 있다.(한마디로 Sender가 메시지를 보내는 동안에도 메시지 받기는 가능하다.)

전송 결과 확인 암함

producer.send(new ProducerRecoed<>("simple", value));

딱히 뭐 전송 성공/실패 여부를 몰라도 되는 경우

전송 결과 확인함 : Future 사용

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));

try {
    RecordMetadata meta = f.get();    // 블로킹
} catch(ExecutionException ex) {

}

이름만 들어도 알수 있듯이 Future의 보내진거를 보내고 -> 받고 -> 확인 할 때 이를 매번 보게된다(그 때에 블로킹이 이루어진다.)

  • 배치 효과가 떨어짐 -> 처리량 저하
  • 처리량이 낮아도 되는 경우에만 사용
  • 건바이건으로 데이터 처리 확인 가능

전송 결과 확인함 : Callback 사용

producer.send(new ProducerRecord<>("simple", "value").
    new Callback() {
         @Override
         public void onCompletion(RecordMetadata metadata, Exception ex) {

         }
    }
)

처리 결과를 받아서, 이 때에 Exception 객체를 받게 되면 전송이 실패한거다.
이거는 블로킹이 이루어지지 않는다. -> 배치를 쓸 수 있다. -> 성능 저하 없다.

전송 보장과 ack

  • ack = 0
    • 서버 응답을 기다리지 않는다.
    • 전송 보장이 되지 않는다.
  • ack = 1
    • 파티션의 리더에 저장되면 성공 응답 받는다.
    • 근데 리더에 장애가 발생하면 메시지 유실 가능
      • 예를 들어 리더에서 팔로워 저장 안된 상태에서 리더가 죽는 경우
  • ack = all (혹은 -1)
    • 모든 리플리카에 저장되면 응답 받음
    • min.insync.replicas
      • 이거는 ack = all일 때에 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
      • 참고로 이거는 리더도 포함이다(2이고 리더 저장되고 하나의 팔로워에 저장되면 됐다고 본다)
        • 근데 그래서 다른데에 저장이 됐는데 팔로워가 하나 죽었을 때에 잘못하면 실패응답을 할수도 있다.
    • 전송 엄격하게 보장 가능

에러 유형

  • 전송 과정에서 실패
    • 타임아웃
    • 리더 다운 -> 새 리더 선출 진행 중
    • 브로커 설정 메시지 크기 한도 초과
  • 전송 전에 실패
    • 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과
    • 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과

실패 대응 : 재시도

  • 재시도
    • 재시도 가능한 에러는 재시도 처리
      • 타임아웃, 일시적 리더 없음 등
  • 재시도 위치
    • 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
      • retries 시도
    • send 메서드에서 익셉션 발생시 익셉션 타입에 따라 send() 재호출
    • 콜백 메서드에서 익셉션을 받으면 타입에 따라 send() 재호출
  • 아주아주 특별한 이유가 없다면 무한 재시도 X
    • 당연한것... 전체 메시지가 쌓이게 될거니까

재시도와 메시지 중복 전송 가능성

  • 브로커 응답이 늦게 와서 재시도하는 경우 중복 발생 가능
    • 예를 들어 전송해서 ㅇㅋ저장됨 하고 브로커가 보내기 전에 타임아웃 떠서 재전송 하는 경우

재시도와 순서

재시도는 전송 순서를 바꾸기도 한다.

  • 1번 실패
  • 그 사이에 2번 3번 성공
  • 1번 재시고
  • 2, 3, 1 이렇게 저장
  • 이게 싫다면 max.in.flight.requests.per.connection 을 1로 지정해주자

실패 대응 : 기록

  • 추후 처리를 위해 기록
    • 별도 파일, DB등을 이용해서 실패한 메시지 기록
    • 추후 수동/자동 보정 작업 진행
  • 기록 위치
    • send() 메서드에서 익셉션 발생
    • send() 메서드에 전달한 callback / Future의 get 에서 익셉션 발생시

컨슈머

토픽 파티션에서 레코드 조회

토픽 파티션은 그룹 단위로 할당된다.
-> 그래서 컨슈머그룹은 파티션의 갯수보다 적거나 같은게 나을것이다.(컨슈머그룹이 더 많으면 그냥 놀거니까)
그러니까 컨슈머를 늘린다면 파티션도 같이 늘려줘야 한다는 것이다.

커밋과 오프셋

  • 컨슈머 poll은 이전 커밋 오프셋 이후의 레코드를 읽어온다.
  • 그리고 이후에 읽어온 레드의 오프셋을 커밋한다.
  • 이거 반복...

커밋된 오프셋이 없는 경우

  • 처음 접근이거나 커밋한 오프셋이 없는 경우
  • auto.offset.reset 설정 사용
    • earliest : 맨 처음 오프셋 사용
    • latest : 가장 마지막 오프셋 사용(Default)
    • none : 컨슈머 그룹에 대한 이전 커밋이 없으면 익셉션 발생
      • none은 잘 안씀

컨슈머 설정

조회에 영향을 주는 주요 설정

  • fetch.min.bytes
    • 조회시 브로커가 전송할 최소 데이터 크기
    • 그만큼 값이 쌓일때까지 전송을 안하고있는다.
    • 그래서 대기시간은 늘지만 처리량이 증가한다.
  • fetch.max.wait.ms
    • 데이터가 최소 크기가 될 때까지 기다릴 시간
    • 아무리 최소 크기가 아니여도 계속 기다릴수는 없으니까 이걸 설정한다.
    • 기본값 500
    • 브로커가 리턴할 때까지 대기하는 시간으로 poll() 메서드의 대기 시간과 다름
  • max.partition.fetch.bytes
    • 파티션 당 서버(브로커)가 리턴할 수 있는 최대 크기
      • 이 최대 크기가 넘어가면 바로 return하도록 한다.
    • 기본값 1048576(1MB)

자동 커밋

  • enable.auto.commit 설정
    • true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋(Default)
    • false : 수동으로 커밋 실행
  • auto.commit.interval.ms
    • 자동 커밋 주기
    • 기본값 5초
  • poll(), close() 메서드 호출시 자동 커밋 실행

수동 커밋 - 동기/비동기 커밋

  • 동기
    • commitSync() 메서드 이용
    • 커밋 실패시 exception 발생
  • 비동기
    • commitAsync() 메서드 이용
    • 비동기이기 때문에 얘를 통해서 알수는 없고, 위에서 본 Callback을 받아서 처리해야 한다.

재처리와 순서

  • 동일 메시지 조회 가능성
    • 일시적으로 커밋 실패, 새로운컨슈머추가/기존컨슈머사라짐으로 인한 리밸런스 등에 의해 발생한다.
  • 컨슈머는 멱등성(idempotence)을 고려해야 한다.
    • 재처리를 단순하게 하면 데이터가 변할 수 있으니까
  • 데이터 특성에 따라 타임스탬프, 일련번호 등을 활용

세션 타임아웃, 하트비트, 최대 Poll 간격

  • 컨슈머는 하트비트를 전송해서 연결 유지
    • 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행
      • 하트비트를 컨슈머로 계속 보내서 연결을 유지한다.
        • 그래서 일정시간동안 하트비트가 없으면 그룹에서 빼버리고 리밸런스한다.
    • 관련 설정
      • session.timeout.ms : 세션 타임 아웃 시간(Default 10초)
        • 이 시간동안 하트비트 없으면 리밸런싱 진행
        • heartbeat.interval.ms : 하트비트 전송 주기(Default 3초)
          • session.timeout.ms의 1/3 이하를 추천한다고 한다.
    • max.poll.interval.ms
      • poll()메서드의 최대 호출 간격
      • 이 시간이 지내도록 poll()하지 않으면 그룹에서 빼고 리밸런스 진행

종료 처리

다 쓰면 close() 해서 종료한다.

보통은 무한루프로 하다가 Exception이 발생하면 종료하는데
그 때에 이걸 어떻게

  1. 다른 쓰레드에서 wakeup() 메서드 호출
  2. poll()메서드가 WakeupException 발생
  3. 그거를 while루프 밖에서 catch하고, 그 다음에 consumer의 close메서드를 호출

주의 : kafka consumer는 thread-safe하지 않다.

  • wakeup메서드 말고는 다른 쓰레드에서 사용하지 말자...

https://www.youtube.com/watch?v=xqrIDHbGjOY&t=4s

카프카 이론 공부를 시작해봐야겠다.