이론 정리
Kafka 아는척하기 강의 정리
철매존
2023. 11. 8. 23:22
728x90
-> https://www.youtube.com/watch?v=xqrIDHbGjOY&t=4s
카프카를 공부해보기 위해 찾아보다가 정말 좋은 강의가 있어 이론에 대한 기초를 알기 위해 보고 정리해 보았다.
데이터 스트리밍 및 이벤트 기반 데이터 파이프라인을 위한 메시지 큐 시스템.
기본 모양
- 카프카 클러스터
- 메세지를 저장하는 저장소
- 하나의 카프카 클러스터는 여러개의 브로커로 구성된다.
- 각 브로커는 메세지를 나눠서 저장하고(이중화 처리 / 장애시 대체 등등도 여기서 한다) 한다.
- 주키퍼는 클러스터를 관리하는데에 쓰인다.
- 프로듀서
- 카프카 클러스터에 메세지를 보낸다.
- 컨슈머
- 메세지를 카프카에서 읽어오는 역할을 한다.
토픽과 파티션
- 토픽
- 메시지를 구분하는 단위
- 파일시스템의 폴더와 유사
- 프로듀서가 메시지를 카프카에 저장할 때 어떤 토픽에 저장할지
- 컨슈머가 메시지를 읽어올 때 어떤 토픽에서 가져올지
- 파티션
- 메시지를 저장하는 물리적인 파일
- append-only -> 추가만 가능하다
- 메시지 저장 위치를 offset이라고 한다.
- 메시지는 맨 뒤에 추가되고, FIFO로 저장/읽기
여러 파티션과 컨슈머
- 컨슈머는 컨슈머그룹에 속한다.
- 한 개의 파티션은 컨슈머그룹의 한개의 컨슈머만 연결 가능
- 다른 컨슈머그룹의 컨슈머와는 연결 가능
- 이를 통해 하나의 컨슈머그룹 입장에서 파티션의 메세지는 순서대로 처리된다.
성능
카프카는 성능이 좋은데, 그 이유는
- 파티션파일은 OS페이지캐시 사용
- 파일 IO가 메모리영역에서 처리된다.
- zero copy
- 디스크에서 네트워크 버퍼로 직접 데이터 복사
- 데이터를 메모리의 여러 영역에 복사하여 사용하지 않고 데이터를 이동함으로서 전송의 오버헤드를 줄인다.
- 컨슈머 추적을 위해 브로커가 하는 일이 비교적 단순하다.
- 메시지 필터, 재전송같은 일을 브로커가 아니라 프로듀서, 컨슈머가 직접 해야 한다.
- 묶어서 보내고, 묶어서 받기(batch)
- 프로듀서 : 일정 크기만큼 메시지를 모아서 전송 가능
- 컨슈마 : 최소 크기만큼 메시지를 모아서 조회 가능
- 처리량 증대가 쉽다.
- 1개가 장비의 용량 한계 -> 브로커, 파티션 추가
- 컨슈머가 느리면 -> 컨슈머 추가 (+파티션 추가)
리플리카 - 복제
장애가 났을 때의 대비
- 리플리카 : 파티션의 복제본
- 복제수만큼 파티션의 복제본이 각 브로커에 생긴다.
- 리더와 팔로워로 구성
- 프로듀서와 컨슈머는 리더를 통해서만 메시지 처리
- 팔로워는 리더로부터 복제
- 장애 대응
- 리더가 속한 브로커 장애시 다른 팔로워가 리더가 된다.
프로듀셔
기본 흐름은
- send메세지를 통해 전송하면
- 해당 메세지를
Serializer
를 통해 byte배열로 변환 Partitioner
를 이용해서 그 메시지를 어느 토픽의 파티션으로 보낼지 결정한다.- 변환된 메시지(바이트)를 버퍼에 저장한다. 이 때, 배치로 묶어서 저장한다(한마디로 메시지를 모아서 저장하는거임)
Sender
가 그 배치를 카프카 브로커로 전달Sender
는 별도 쓰레드로 동작하고, 배치가 찼는지 여부에 관계없이 차례대로 브로커에 보냄- 그리고 그동안 받아진 메서드는 저장되고 있다.(한마디로
Sender
가 메시지를 보내는 동안에도 메시지 받기는 가능하다.)
전송 결과 확인 암함
producer.send(new ProducerRecoed<>("simple", value));
딱히 뭐 전송 성공/실패 여부를 몰라도 되는 경우
전송 결과 확인함 : Future 사용
Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
RecordMetadata meta = f.get(); // 블로킹
} catch(ExecutionException ex) {
}
이름만 들어도 알수 있듯이 Future의 보내진거를 보내고 -> 받고 -> 확인 할 때 이를 매번 보게된다(그 때에 블로킹이 이루어진다.)
- 배치 효과가 떨어짐 -> 처리량 저하
- 처리량이 낮아도 되는 경우에만 사용
- 건바이건으로 데이터 처리 확인 가능
전송 결과 확인함 : Callback 사용
producer.send(new ProducerRecord<>("simple", "value").
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
}
}
)
처리 결과를 받아서, 이 때에 Exception 객체를 받게 되면 전송이 실패한거다.
이거는 블로킹이 이루어지지 않는다. -> 배치를 쓸 수 있다. -> 성능 저하 없다.
전송 보장과 ack
- ack = 0
- 서버 응답을 기다리지 않는다.
- 전송 보장이 되지 않는다.
- ack = 1
- 파티션의 리더에 저장되면 성공 응답 받는다.
- 근데 리더에 장애가 발생하면 메시지 유실 가능
- 예를 들어 리더에서 팔로워 저장 안된 상태에서 리더가 죽는 경우
- ack = all (혹은 -1)
- 모든 리플리카에 저장되면 응답 받음
- min.insync.replicas
- 이거는 ack = all일 때에 저장에 성공했다고 응답할 수 있는 동기화된 리플리카 최소 개수
- 참고로 이거는 리더도 포함이다(2이고 리더 저장되고 하나의 팔로워에 저장되면 됐다고 본다)
- 근데 그래서 다른데에 저장이 됐는데 팔로워가 하나 죽었을 때에 잘못하면 실패응답을 할수도 있다.
- 전송 엄격하게 보장 가능
에러 유형
- 전송 과정에서 실패
- 타임아웃
- 리더 다운 -> 새 리더 선출 진행 중
- 브로커 설정 메시지 크기 한도 초과
- 등
- 전송 전에 실패
- 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과
- 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과
- 등
실패 대응 : 재시도
- 재시도
- 재시도 가능한 에러는 재시도 처리
- 타임아웃, 일시적 리더 없음 등
- 재시도 가능한 에러는 재시도 처리
- 재시도 위치
- 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
- retries 시도
- send 메서드에서 익셉션 발생시 익셉션 타입에 따라 send() 재호출
- 콜백 메서드에서 익셉션을 받으면 타입에 따라 send() 재호출
- 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
- 아주아주 특별한 이유가 없다면 무한 재시도 X
- 당연한것... 전체 메시지가 쌓이게 될거니까
재시도와 메시지 중복 전송 가능성
- 브로커 응답이 늦게 와서 재시도하는 경우 중복 발생 가능
- 예를 들어 전송해서 ㅇㅋ저장됨 하고 브로커가 보내기 전에 타임아웃 떠서 재전송 하는 경우
재시도와 순서
재시도는 전송 순서를 바꾸기도 한다.
- 1번 실패
- 그 사이에 2번 3번 성공
- 1번 재시고
- 2, 3, 1 이렇게 저장
- 이게 싫다면
max.in.flight.requests.per.connection
을 1로 지정해주자
실패 대응 : 기록
- 추후 처리를 위해 기록
- 별도 파일, DB등을 이용해서 실패한 메시지 기록
- 추후 수동/자동 보정 작업 진행
- 기록 위치
- send() 메서드에서 익셉션 발생
- send() 메서드에 전달한 callback / Future의 get 에서 익셉션 발생시
컨슈머
토픽 파티션에서 레코드 조회
토픽 파티션은 그룹 단위로 할당된다.
-> 그래서 컨슈머그룹은 파티션의 갯수보다 적거나 같은게 나을것이다.(컨슈머그룹이 더 많으면 그냥 놀거니까)
그러니까 컨슈머를 늘린다면 파티션도 같이 늘려줘야 한다는 것이다.
커밋과 오프셋
- 컨슈머 poll은 이전 커밋 오프셋 이후의 레코드를 읽어온다.
- 그리고 이후에 읽어온 레드의 오프셋을 커밋한다.
- 이거 반복...
커밋된 오프셋이 없는 경우
- 처음 접근이거나 커밋한 오프셋이 없는 경우
- auto.offset.reset 설정 사용
- earliest : 맨 처음 오프셋 사용
- latest : 가장 마지막 오프셋 사용(Default)
- none : 컨슈머 그룹에 대한 이전 커밋이 없으면 익셉션 발생
- none은 잘 안씀
컨슈머 설정
조회에 영향을 주는 주요 설정
- fetch.min.bytes
- 조회시 브로커가 전송할 최소 데이터 크기
- 그만큼 값이 쌓일때까지 전송을 안하고있는다.
- 그래서 대기시간은 늘지만 처리량이 증가한다.
- fetch.max.wait.ms
- 데이터가 최소 크기가 될 때까지 기다릴 시간
- 아무리 최소 크기가 아니여도 계속 기다릴수는 없으니까 이걸 설정한다.
- 기본값 500
- 브로커가 리턴할 때까지 대기하는 시간으로
poll()
메서드의 대기 시간과 다름
- max.partition.fetch.bytes
- 파티션 당 서버(브로커)가 리턴할 수 있는 최대 크기
- 이 최대 크기가 넘어가면 바로 return하도록 한다.
- 기본값 1048576(1MB)
- 파티션 당 서버(브로커)가 리턴할 수 있는 최대 크기
자동 커밋
- enable.auto.commit 설정
- true : 일정 주기로 컨슈머가 읽은 오프셋을 커밋(Default)
- false : 수동으로 커밋 실행
- auto.commit.interval.ms
- 자동 커밋 주기
- 기본값 5초
- poll(), close() 메서드 호출시 자동 커밋 실행
수동 커밋 - 동기/비동기 커밋
- 동기
commitSync()
메서드 이용- 커밋 실패시 exception 발생
- 비동기
commitAsync()
메서드 이용- 비동기이기 때문에 얘를 통해서 알수는 없고, 위에서 본 Callback을 받아서 처리해야 한다.
재처리와 순서
- 동일 메시지 조회 가능성
- 일시적으로 커밋 실패, 새로운컨슈머추가/기존컨슈머사라짐으로 인한 리밸런스 등에 의해 발생한다.
- 컨슈머는 멱등성(idempotence)을 고려해야 한다.
- 재처리를 단순하게 하면 데이터가 변할 수 있으니까
- 데이터 특성에 따라 타임스탬프, 일련번호 등을 활용
세션 타임아웃, 하트비트, 최대 Poll 간격
- 컨슈머는 하트비트를 전송해서 연결 유지
- 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행
- 하트비트를 컨슈머로 계속 보내서 연결을 유지한다.
- 그래서 일정시간동안 하트비트가 없으면 그룹에서 빼버리고 리밸런스한다.
- 하트비트를 컨슈머로 계속 보내서 연결을 유지한다.
- 관련 설정
- session.timeout.ms : 세션 타임 아웃 시간(Default 10초)
- 이 시간동안 하트비트 없으면 리밸런싱 진행
- heartbeat.interval.ms : 하트비트 전송 주기(Default 3초)
- session.timeout.ms의 1/3 이하를 추천한다고 한다.
- session.timeout.ms : 세션 타임 아웃 시간(Default 10초)
- max.poll.interval.ms
- poll()메서드의 최대 호출 간격
- 이 시간이 지내도록 poll()하지 않으면 그룹에서 빼고 리밸런스 진행
- 브로커는 일정 시간 컨슈머로부터 하트비트가 없으면 컨슈머를 그룹에서 빼고 리밸런스 진행
종료 처리
다 쓰면 close()
해서 종료한다.
보통은 무한루프로 하다가 Exception이 발생하면 종료하는데
그 때에 이걸 어떻게
- 다른 쓰레드에서 wakeup() 메서드 호출
- poll()메서드가 WakeupException 발생
- 그거를 while루프 밖에서 catch하고, 그 다음에 consumer의 close메서드를 호출
주의 : kafka consumer는 thread-safe하지 않다.
- wakeup메서드 말고는 다른 쓰레드에서 사용하지 말자...
카프카 이론 공부를 시작해봐야겠다.