Kafka 구성 튜닝

Red Hat AMQ Streams 2.5

Kafka 구성 속성을 사용하여 데이터 스트리밍 최적화

초록

Kafka 구성 속성을 사용하여 Kafka 브로커, 생산자 및 소비자의 작업을 미세 조정합니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 용어를 교체하기 위해 최선을 다하고 있습니다. 먼저 마스터(master), 슬레이브(slave), 블랙리스트(blacklist), 화이트리스트(whitelist) 등 네 가지 용어를 교체하고 있습니다. 이러한 변경 작업은 작업 범위가 크므로 향후 여러 릴리스에 걸쳐 점차 구현할 예정입니다. 자세한 내용은 CTO Chris Wright의 메시지를 참조하십시오.

1장. Kafka 튜닝 개요

구성 속성을 사용하여 Kafka 브로커, 생산자 및 소비자의 성능을 최적화합니다. OCP 및 RHEL에서 AMQ Streams의 구성 속성을 지정할 수 있습니다.

최소 구성 속성 세트가 필요하지만 속성을 추가하거나 조정하여 생산자 및 소비자가 Kafka 브로커와 상호 작용하는 방식을 변경할 수 있습니다. 예를 들어 클라이언트가 데이터에 실시간으로 응답할 수 있도록 메시지의 대기 시간 및 처리량을 조정할 수 있습니다.

메트릭을 분석하여 초기 구성을 수행할 위치를 측정한 다음, 필요한 구성이 있을 때까지 증분 변경 및 메트릭을 추가로 비교할 수 있습니다.

Apache Kafka 구성 속성에 대한 자세한 내용은 Apache Kafka 설명서 를 참조하십시오.

1.1. 속성 및 값 매핑

구성 속성을 지정하는 방법은 배포 유형에 따라 다릅니다. OCP에 AMQ Streams를 배포한 경우 Kafka 리소스를 사용하여 config 속성을 통해 Kafka 브로커 구성을 추가할 수 있습니다. RHEL에서 AMQ Streams를 사용하면 속성 파일에 환경 변수로 구성을 추가합니다.

사용자 정의 리소스에 구성 속성을 추가할 때 콜론(':')을 사용하여 속성 및 값을 매핑합니다.

사용자 정의 리소스의 구성 예

num.partitions:1

속성을 환경 변수로 추가하면 등호('=')를 사용하여 속성과 값을 매핑합니다.

환경 변수인 구성 예

num.partitions=1

1.2. 튜닝에 도움이 되는 툴

Kafka 튜닝에 도움이 되는 툴은 다음과 같습니다.

  • Cruise Control은 클러스터 재조정을 평가하고 구현하는 데 사용할 수 있는 최적화 제안을 생성합니다.
  • Kafka 정적 할당량 플러그인은 브로커에 대한 제한을 설정
  • Rack 구성에서는 브로커 파티션을 랙에 분배하고 소비자가 가장 가까운 복제본에서 데이터를 가져올 수 있습니다.

이러한 도구에 대한 자세한 내용은 다음 가이드를 참조하십시오.

2장. 관리 브로커 구성

OpenShift에 AMQ Streams를 배포할 때 Kafka 사용자 정의 리소스의 config 속성을 통해 브로커 구성을 지정합니다. 그러나 특정 브로커 구성 옵션은 AMQ Streams에 의해 직접 관리됩니다.

따라서 OpenShift에서 AMQ Streams를 사용하는 경우 다음 옵션을 구성할 수 없습니다.

  • Kafka 브로커의 ID를 지정하는 broker.id
  • 로그 데이터의 log.dirs 디렉터리
  • zookeeper.connect 구성: ZooKeeper와 Kafka를 연결
  • Kafka 클러스터를 클라이언트에 노출하는 리스너
  • 사용자가 실행하는 작업을 허용하거나 거부하기 위한 권한 부여 메커니즘
  • Kafka에 액세스해야 하는 사용자의 ID를 증명하는 인증 메커니즘

브로커 ID는 0(0)부터 시작하여 브로커 복제본 수에 해당합니다. 로그 디렉터리는 Kafka 사용자 정의 리소스의 spec.kafka.storage 구성에 따라 /var/lib/kafka/data/kafka-logIDX 에 마운트됩니다. IDX 는 Kafka 브로커 Pod 인덱스입니다.

제외 목록은 KafkaClusterSpec 스키마 참조를 참조하십시오.

이러한 제외 사항은 RHEL에서 AMQ Streams를 사용할 때 적용되지 않습니다. 이 경우 브로커를 식별하고 안전한 액세스를 제공하기 위해 기본 브로커 구성에 이러한 속성을 추가해야 합니다.

RHEL에서 AMQ Streams에 대한 브로커 구성의 예

# ...
broker.id = 1
log.dirs = /var/lib/kafka
zookeeper.connect = zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
listeners = internal-1://:9092
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
ssl.truststore.location = /path/to/truststore.jks
ssl.truststore.password = 123456
ssl.client.auth = required
# ...

3장. Kafka 브로커 구성 튜닝

구성 속성을 사용하여 Kafka 브로커의 성능을 최적화합니다. AMQ Streams에서 직접 관리하는 속성을 제외하고 표준 Kafka 브로커 구성 옵션을 사용할 수 있습니다.

3.1. 기본 브로커 구성

일반적인 브로커 구성에는 주제, 스레드 및 로그와 관련된 속성 설정이 포함됩니다.

기본 브로커 구성 속성

# ...
num.partitions=1
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=3
num.io.threads=8
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
group.initial.rebalance.delay.ms=0
zookeeper.connection.timeout.ms=6000
# ...

3.2. 고가용성을 위한 항목 복제

기본 주제 속성에서는 주제가 자동으로 생성되는 시기를 포함하여 이러한 속성이 명시적으로 설정되지 않은 항목에 적용되는 기본 파티션 수 및 복제 요인을 설정합니다.

# ...
num.partitions=1
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
replica.fetch.max.bytes=1048576
# ...

고가용성 환경의 경우 복제 요소를 최소 3개로 늘리고 복제 요소보다 필요한 최소 동기화 복제본 수를 1개로 설정하는 것이 좋습니다.

생산자와 소비자가 필요한 경우 아직 존재하지 않는 항목이 자동으로 생성되도록 auto.create.topics.enable 속성이 기본적으로 활성화됩니다. 자동 주제 생성을 사용하는 경우 num.partitions 를 사용하여 주제의 기본 파티션 수를 설정할 수 있습니다. 그러나 일반적으로 이 속성은 명시적 주제 생성을 통해 주제를 통해 더 많은 제어가 제공되도록 비활성화되어 있습니다.

데이터 결함 의 경우 주제 구성에 min.insync.replicas 를 설정하고 생산자 구성에서 acks=all 을 사용하여 메시지 전달 ackedgment를 설정해야 합니다.

replica.fetch.max.bytes 를 사용하여 리더 파티션을 복제하는 각 추적자가 가져온 메시지의 최대 크기(바이트)를 설정합니다. 평균 메시지 크기 및 처리량에 따라 이 값을 변경합니다. 읽기/쓰기 버퍼링에 필요한 총 메모리 할당을 고려할 때 사용 가능한 메모리도 모든 불만족을 곱할 때 복제된 최대 메시지 크기를 수용할 수 있어야 합니다.

delete.topic.enable 속성은 주제가 삭제될 수 있도록 기본적으로 활성화되어 있습니다. 프로덕션 환경에서는 실수로 주제 삭제를 방지하기 위해 이 속성을 비활성화하여 데이터가 손실되지 않도록 해야 합니다. 그러나 임시로 활성화한 후 주제를 삭제한 다음 다시 비활성화할 수 있습니다.

참고

OpenShift에서 AMQ Streams를 실행하는 경우 Topic Operator는 Operator 스타일 주제 관리를 제공할 수 있습니다. KafkaTopic 리소스를 사용하여 주제를 생성할 수 있습니다. KafkaTopic 리소스를 사용하여 생성된 주제의 경우 복제 요소는 spec.replicas 를 사용하여 설정됩니다. delete.topic.enable 이 활성화된 경우 KafkaTopic 리소스를 사용하여 항목을 삭제할 수도 있습니다.

# ...
auto.create.topics.enable=false
delete.topic.enable=true
# ...

3.3. 트랜잭션 및 커밋에 대한 내부 주제 설정

트랜잭션을 사용하여 생산자의 파티션에 원자성 쓰기를 활성화하는 경우 트랜잭션 상태는 내부 __ECDHE_state 항목에 저장됩니다. 기본적으로 브로커는 이 항목에 대해 최소 3개의 복제 인수와 최소 2개의 동기화 복제본으로 구성됩니다. 즉, Kafka 클러스터에 최소 3개의 브로커가 필요합니다.

# ...
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ...

마찬가지로 소비자 상태를 저장하는 내부 __consumer_offsets 항목에는 파티션 수와 복제 요인에 대한 기본 설정이 있습니다.

# ...
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
# ...

프로덕션에서는 이러한 설정을 축소하지 마십시오. 프로덕션 환경의 설정을 늘릴 수 있습니다. 예외적으로 단일 브로커 테스트 환경의 설정을 줄일 수 있습니다.

3.4. I/O 스레드를 늘려 요청 처리 처리량 개선

네트워크 스레드는 클라이언트 애플리케이션에서 요청을 생성 및 가져오는 등 Kafka 클러스터에 대한 요청을 처리합니다. 요청 생성은 요청 큐에 배치됩니다. 응답은 응답 큐에 배치됩니다.

리스너당 네트워크 스레드 수는 복제 요소 및 클라이언트 생산자 및 소비자의 활동 수준이 Kafka 클러스터와 상호 작용해야 합니다. 많은 요청이 있을 경우 스레드 수를 늘릴 수 있으며, 시간 스레드 양을 사용하여 더 많은 스레드를 추가할 시기를 결정할 수 있습니다.

혼잡을 줄이고 요청 트래픽을 제어하려면 요청 큐에서 허용되는 요청 수를 제한할 수 있습니다. 요청 큐가 가득 차면 들어오는 모든 트래픽이 차단됩니다.

I/O 스레드는 요청 대기열에서 요청을 선택하여 처리합니다. 스레드를 더 많이 추가하면 처리량이 향상될 수 있지만 CPU 코어 및 디스크 대역폭의 수는 실제 상한이 적용됩니다. 최소한 I/O 스레드 수는 스토리지 볼륨 수와 같아야 합니다.

# ...
num.network.threads=3 1
queued.max.requests=500 2
num.io.threads=8 3
num.recovery.threads.per.data.dir=4 4
# ...
1
Kafka 클러스터의 네트워크 스레드 수입니다.
2
요청 큐에서 허용되는 요청 수입니다.
3
Kafka 브로커의 I/O 스레드 수입니다.
4
시작 시 로그 로드 및 종료 시 플러시에 사용되는 스레드 수입니다. 적어도 코어 수의 값으로 설정합니다.

모든 브로커의 스레드 풀에 대한 구성 업데이트가 클러스터 수준에서 동적으로 발생할 수 있습니다. 이 업데이트는 현재 크기의 절반과 현재 크기의 두 배로 제한됩니다.

작은 정보

다음 Kafka 브로커 메트릭은 필요한 스레드 수를 작업하는 데 도움이 될 수 있습니다.

  • Kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent 는 평균 시간 네트워크 스레드가 백분율로 유휴 상태인 메트릭을 제공합니다.
  • Kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent 는 평균 I/O 스레드가 백분율로 유휴 상태인 메트릭을 제공합니다.

0%의 유휴 시간이 있는 경우 모든 리소스가 사용 중이므로 스레드를 더 추가하는 것이 유용할 수 있습니다. 유휴 시간이 30% 미만으로 떨어지면 성능이 저하될 수 있습니다.

디스크 수로 인해 스레드가 느리거나 제한되는 경우 네트워크 요청에 대한 버퍼 크기를 늘려 처리량을 개선할 수 있습니다.

# ...
replica.socket.receive.buffer.bytes=65536
# ...

또한 Kafka가 수신할 수 있는 최대 바이트 수를 늘립니다.

# ...
socket.request.max.bytes=104857600
# ...

3.5. 대기 시간이 긴 연결을 위해 대역폭 증가

Kafka는 데이터 센터 간 연결과 같은 Kafka에서 클라이언트로의 대기 시간이 높은 연결보다 합리적인 처리량을 제공하기 위해 데이터를 정렬합니다. 그러나 대기 시간이 긴 경우 메시지 전송 및 수신을 위해 버퍼 크기를 늘릴 수 있습니다.

# ...
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# ...

대역폭-delay 제품 계산을 사용하여 버퍼의 최적 크기를 추정할 수 있습니다. 이는 최대 처리량을 유지하는 데 필요한 버퍼의 최대 대역폭(바이트/초)과 링크의 최대 대역폭(초)을 곱합니다.

3.6. 데이터 보존 정책을 사용하여 로그 관리

Kafka는 로그를 사용하여 메시지 데이터를 저장합니다. 로그는 다양한 인덱스와 관련된 일련의 세그먼트입니다. 새 메시지는 활성 세그먼트에 기록되며 이후에 수정되지 않습니다. 세그먼트는 소비자의 가져오기 요청을 제공할 때 읽습니다. 정기적으로 활성 세그먼트가 읽기 전용이 되도록 롤아웃 되고 이를 대체하기 위해 새 활성 세그먼트가 생성됩니다. 한 번에 하나의 세그먼트만 활성화됩니다. 이전 세그먼트는 삭제할 수 있을 때까지 유지됩니다.

브로커 수준의 구성은 로그 세그먼트의 최대 크기(바이트)와 활성 세그먼트가 롤링되기 전의 시간(밀리초)을 설정합니다.

# ...
log.segment.bytes=1073741824
log.roll.ms=604800000
# ...

segment.bytessegment.ms 를 사용하여 주제 수준에서 이러한 설정을 재정의할 수 있습니다. 이러한 값을 낮추거나 높여야 하는지 여부는 세그먼트 삭제 정책에 따라 다릅니다. 크기가 클수록 활성 세그먼트에 더 많은 메시지가 포함되고 덜 자주 롤아웃됩니다. 또한 세그먼트는 덜 자주 삭제할 수 있게 됩니다.

로그를 관리할 수 있도록 시간 기반 또는 크기 기반 로그 보존 및 정리 정책을 설정할 수 있습니다. 요구 사항에 따라 로그 보존 구성을 사용하여 이전 세그먼트를 삭제할 수 있습니다. 로그 보존 정책을 사용하면 보존 제한에 도달하면 비활성 로그 세그먼트가 제거됩니다. 이전 세그먼트 삭제는 로그에 필요한 스토리지 공간을 제한하므로 디스크 용량을 초과하지 않습니다.

시간 기반 로그 보존의 경우 시간, 분 및 밀리초에 따라 보존 기간을 설정합니다.For time-based log retention, you set a retention period based on hours, minutes, and milliseconds. 보존 기간은 메시지가 세그먼트에 추가된 시간을 기반으로 합니다.

밀리초 구성의 우선 순위는 분보다 우선하며 시간보다 우선합니다. 기본적으로 분 및 밀리초 구성은 null이지만 세 가지 옵션은 유지하려는 데이터에 대한 상당한 수준의 제어를 제공합니다. 동적으로 업데이트할 수 있는 세 가지 속성 중 유일한 속성 중 하나이기 때문에 밀리초 구성에 우선 순위를 지정해야 합니다.

# ...
log.retention.ms=1680000
# ...

log.retention.ms 가 -1로 설정되면 로그 보존에 시간 제한이 적용되지 않으므로 모든 로그가 유지됩니다. 디스크 사용량을 항상 모니터링해야 하지만 -1 설정은 전체 디스크의 문제를 일으킬 수 있으므로 권장되지 않으며 이는 수정하기 어려울 수 있습니다.

크기 기반 로그 보존의 경우 최대 로그 크기(로그의 모든 세그먼트)를 바이트 단위로 설정합니다.

# ...
log.retention.bytes=1073741824
# ...

즉, 로그에는 일반적으로 정상 상태가 되면 로그.retention.bytes/log.segment.bytes 세그먼트가 포함됩니다. 최대 로그 크기에 도달하면 이전 세그먼트가 제거됩니다.

최대 로그 크기를 사용할 때의 잠재적인 문제는 메시지가 세그먼트에 추가되는 시간을 고려하지 않는다는 것입니다. 필요한 균형을 얻기 위해 정리 정책에 시간 기반 및 크기 기반 로그 보존을 사용할 수 있습니다. 먼저 정리를 트리거하는 임계값에 해당하는 것은 무엇입니까.

세그먼트 파일이 시스템에서 삭제되기 전에 시간 지연을 추가하려면 주제 구성의 특정 항목에 대해 브로커 수준 또는 file.delete.delay.ms 의 모든 항목에 대해 log.segment.delete.delay.ms 를 사용하여 지연을 추가할 수 있습니다.

# ...
log.segment.delete.delay.ms=60000
# ...

3.7. 정리 정책을 사용하여 로그 데이터 제거

이전 로그 데이터를 제거하는 방법은 로그 정리 구성에 따라 결정됩니다.

기본적으로 브로커에 대해 로그 정리가 활성화됩니다.

# ...
log.cleaner.enable=true
# ...

로그 압축 정리 정책을 사용하는 경우 로그 정리를 활성화해야 합니다. 주제 또는 브로커 수준에서 정리 정책을 설정할 수 있습니다. 브로커 수준 구성은 정책이 설정되지 않은 항목의 기본값입니다.

로그를 삭제하거나 컴팩트한 로그를 삭제하거나 둘 다 수행하기 위해 정책을 설정할 수 있습니다.

# ...
log.cleanup.policy=compact,delete
# ...

삭제 정책은 데이터 보존 정책을 사용하여 로그 관리에 해당합니다. 데이터를 영구적으로 보관할 필요가 없을 때 적합합니다. 컴팩트 정책은 각 메시지 키에 대한 최신 메시지를 유지하도록 보장합니다. 메시지 값을 변경할 수 있고 최신 업데이트를 유지하려는 경우 로그 압축이 적합합니다.

정리 정책이 로그를 삭제하도록 설정된 경우 로그 보존 제한을 기반으로 이전 세그먼트가 삭제됩니다. 그렇지 않으면 로그 정리가 활성화되지 않고 로그 보존 제한이 없는 경우 로그가 계속 증가합니다.

로그 압축에 대한 정리 정책이 설정된 경우 로그 헤드 는 순서대로 새 메시지에 대한 쓰기를 사용하여 표준 Kafka 로그로 작동합니다. 로그 정리가 작동하는 컴팩트한 로그의 경우 동일한 키가 있는 다른 레코드가 나중에 로그에서 발생하는 경우 레코드가 삭제됩니다. 값이 null인 메시지도 삭제됩니다. 키를 사용하지 않는 경우 관련 메시지를 식별하는 데 키가 필요하므로 압축을 사용할 수 없습니다. Kafka는 각 키의 최신 메시지가 유지되도록 보장하지만 압축되지 않은 전체 로그에는 중복이 포함되지 않습니다.

그림 3.1. 압축하기 전에 오프셋 위치를 사용하여 키 값 쓰기를 표시하는 로그

키 값 쓰기를 표시하는 압축 이미지

Kafka 압축은 메시지를 식별하기 위해 키를 사용하여 특정 메시지 키에 대한 최신 메시지(가장 높은 오프셋)를 유지하여 결국 동일한 키가 있는 이전 메시지를 삭제합니다. 즉, 최신 상태의 메시지를 항상 사용할 수 있으며 특정 메시지에 대한 오래된 레코드는 로그 정리가 실행될 때 결국 제거됩니다. 메시지를 이전 상태로 복원할 수 있습니다.

레코드는 레코드가 삭제되는 경우에도 원래 오프셋을 유지합니다. 결과적으로, 테두리는 일치하지 않는 오프셋을 가질 수 있습니다. 테일에서 더 이상 사용할 수 없는 오프셋을 사용할 때 다음 오프셋이 있는 레코드가 검색됩니다.

그림 3.2. 압축 후 로그

로그 정리 후 압축 이미지

컴팩트한 정책만 선택하는 경우 로그는 여전히 임의로 커질 수 있습니다. 이 경우 정책을 컴팩트 하고 삭제하도록 설정할 수 있습니다. 컴팩트하고 삭제하도록 선택하는 경우 먼저 로그 데이터를 압축하여 로그 헤드에 키가 있는 레코드를 제거합니다. 그런 다음 로그 보존 임계값보다 먼저 속하는 데이터가 삭제됩니다.

그림 3.3. 로그 보존 지점 및 압축 지점

보존 지점을 통한 압축 이미지

로그가 밀리초 단위의 정리에 대해 확인되는 빈도를 설정합니다.

# ...
log.retention.check.interval.ms=300000
# ...

로그 보존 설정과 관련된 로그 보존 검사 간격을 조정합니다. 보존 크기가 작을수록 더 자주 검사해야 할 수 있습니다.

정리 빈도는 디스크 공간을 관리하기에 충분하지만 주제의 성능에 영향을 미치는 경우는 아닙니다.

정리할 로그가 없는 경우 더 정리를 위해 시간을 밀리초 단위로 설정할 수도 있습니다.

# ...
log.cleaner.backoff.ms=15000
# ...

이전 로그 데이터를 삭제하도록 선택하는 경우 삭제된 데이터를 제거하기 전에 보존하도록 기간을 밀리초 단위로 설정할 수 있습니다.

# ...
log.cleaner.delete.retention.ms=86400000
# ...

삭제된 데이터 보존 기간을 사용하면 데이터가 삭제되지 않게 삭제되기 전에 데이터를 확인할 수 있는 시간이 제공됩니다.

특정 키와 관련된 모든 메시지를 삭제하기 위해 생산자는 tombstone 메시지를 보낼 수 있습니다. tombstone 는 null 값을 가지며 사용자에게 값이 삭제되었음을 알리는 마커 역할을 합니다. 압축 후, only the tombstone이 유지되며 소비자는 메시지가 삭제된다는 것을 알 수 있도록 충분한 기간 동안 보관해야합니다. 이전 메시지가 삭제되면 값이 없는 경우 파티션에서 tombstone 키도 삭제됩니다.

3.8. 디스크 사용 관리

로그 정리와 관련된 다른 많은 구성 설정이 있지만 특히 중요한 것은 메모리 할당입니다.

중복 제거 속성은 모든 로그 정리 스레드에서 정리를 위한 총 메모리를 지정합니다. 버퍼 로드 요소를 통해 사용된 메모리의 백분율에 대한 상한을 설정할 수 있습니다.

# ...
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.io.buffer.load.factor=0.9
# ...

각 로그 항목은 정확히 24바이트를 사용하므로 버퍼가 단일 실행에서 처리할 수 있는 로그 항목 수를 파악하고 그에 따라 설정을 조정할 수 있습니다.

가능한 경우 로그 정리 시간을 줄이기 위해 로그 정리 스레드 수를 늘리는 것이 좋습니다.

# ...
log.cleaner.threads=8
# ...

디스크 대역폭 사용량에 대한 문제가 발생하는 경우 로그 정리 I/O를 조정할 수 있으므로 작업을 수행하는 디스크의 기능에 따라 읽기/쓰기 작업의 합계가 지정된 이중 값보다 적습니다.

# ...
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
# ...

3.9. 대용량 메시지 크기 처리

메시지의 기본 배치 크기는 1MB이며 대부분의 사용 사례에서 최대 처리량에 최적입니다. Kafka는 적절한 디스크 용량을 가정하여 감소된 처리량으로 더 큰 배치를 수용할 수 있습니다.

큰 메시지 크기는 다음 네 가지 방법으로 처리됩니다.

  1. 생산자 측 메시지 압축 은 압축된 메시지를 로그에 씁니다.
  2. 참조 기반 메시징은 메시지의 값에서 다른 시스템에 저장된 데이터로 참조만 보냅니다.
  3. 인라인 메시징은 동일한 키를 사용하는 청크로 메시지를 분할한 다음 Kafka Streams와 같은 스트림 프로세서를 사용하여 출력에서 결합됩니다.
  4. 브로커 및 생산자/소유자 클라이언트 애플리케이션 구성이 대규모 메시지 크기를 처리하도록 구축되었습니다.

참조 기반 메시징 및 메시지 압축 옵션이 권장되며 대부분의 상황을 다룹니다. 이러한 옵션을 사용하면 성능 문제가 발생하지 않도록 주의해야 합니다.

생산자 측 압축

생산자 구성의 경우 Gzip과 같은 compression.type 을 지정합니다. 그러면 프로듀서에서 생성한 데이터 일괄 처리에 적용됩니다. 브로커 구성 compression.type=producer 를 사용하면 브로커는 생산자가 사용한 모든 압축을 유지합니다. 생산자와 주제 압축이 일치하지 않을 때마다 브로커는 로그에 추가하기 전에 일괄 처리를 다시 압축해야 하므로 브로커 성능에 영향을 미칩니다.

압축은 또한 소비자의 생산자 및 압축 해제 오버헤드에 추가 처리 오버헤드를 추가하지만 일괄 처리에 더 많은 데이터를 포함하므로 메시지 데이터를 잘 압축할 때 처리량에 유용합니다.

프로듀서 측 압축과 배치 크기의 미세 조정을 결합하여 처리량 개선이 용이해집니다. 지표를 사용하면 필요한 평균 배치 크기를 측정하는 데 도움이 됩니다.

참조 기반 메시지

참조 기반 메시징은 메시지가 얼마나 큰지 모를 때 데이터 복제에 유용합니다. 외부 데이터 저장소는 이 구성이 작동하려면 빠르고, 오래되고, 가용성이 높아야 합니다. 데이터는 데이터 저장소에 기록되고 데이터에 대한 참조가 반환됩니다.Data is written to the data store and a reference to the data is returned. 생산자는 Kafka에 대한 참조가 포함된 메시지를 보냅니다. 소비자는 메시지에서 참조를 가져와서 데이터 저장소에서 데이터를 가져오는 데 사용합니다.

그림 3.4. 참조 기반 메시징 흐름

참조 기반 메시징 흐름의 이미지

메시지를 전달하려면 더 많은 트립이 필요하므로 엔드 투 엔드 대기 시간이 증가합니다. 이 접근 방식의 또 다른 중요한 단점은 Kafka 메시지가 정리될 때 외부 시스템의 데이터를 자동으로 정리하지 않는다는 것입니다. 하이브리드 접근 방식은 대용량 메시지를 데이터 저장소에 전송하고 표준 크기 메시지를 직접 처리하는 것입니다.

인라인 메시징

인라인 메시징은 복잡하지만 참조 기반 메시징과 같은 외부 시스템에 따라 오버헤드는 없습니다.

생성 클라이언트 애플리케이션은 직렬화한 다음 메시지가 너무 크면 데이터를 청크해야 합니다. 그런 다음 생산자는 Kafka>-< ArraySerializer 를 사용하거나 전송하기 전에 각 청크를 다시 직렬화하는 것과 유사합니다. 소비자는 완전한 메시지가 있을 때까지 메시지 및 버퍼 청크를 추적합니다. 소비 클라이언트 애플리케이션은 deserialization 전에 어셈블된 청크를 수신합니다. 전체 메시지는 청크 메시지 세트마다 첫 번째 또는 마지막 청크 오프셋에 따라 소비되는 애플리케이션의 나머지 부분으로 전달됩니다. 전체 메시지를 성공적으로 전달하면 오프셋 메타데이터와 비교하여 재조정하는 동안 중복이 발생하지 않도록 합니다.

그림 3.5. 인라인 메시징 흐름

인라인 메시징 흐름의 이미지

인라인 메시징은 특히 일련의 대규모 메시지를 병렬로 처리할 때 버퍼링이 필요하기 때문에 소비자 측에서 성능 오버헤드가 발생합니다. 대규모 메시지 청크는 인터리빙될 수 있으므로 버퍼의 다른 대규모 메시지 청크가 불완전하면 메시지의 모든 청크가 사용되었을 때 커밋할 수 없습니다. 이러한 이유로 버퍼링은 일반적으로 메시지 청크를 유지하거나 커밋 논리를 구현하여 지원됩니다.

더 큰 메시지를 처리하기 위한 구성

더 큰 메시지를 피할 수 없고 메시지 흐름의 어느 시점에서든 블록을 피하기 위해 메시지 제한을 늘릴 수 있습니다. 이렇게 하려면 항목 수준에서 message.max.bytes 를 구성하여 개별 항목에 대한 최대 레코드 배치 크기를 설정합니다. 브로커 수준에서 message.max.bytes 를 설정하면 모든 항목에 대해 더 큰 메시지가 허용됩니다.

브로커는 message.max.bytes 로 설정된 제한보다 큰 모든 메시지를 거부합니다. 생산자(max.request.size) 및 소비자(message.max.bytes)의 버퍼 크기는 더 큰 메시지를 수용할 수 있어야 합니다.

3.10. 메시지 데이터의 로그 플러시 제어

일반적으로 명시적 플러시 임계값을 설정하지 않고 운영 체제가 기본 설정을 사용하여 백그라운드 플러시를 수행하도록 하는 것이 좋습니다. 파티션 복제는 오류가 발생한 브로커가 동기화 내 복제본에서 복구할 수 있으므로 단일 디스크에 쓰는 것보다 오래 걸리는 데이터를 제공합니다.

로그 플러시 속성은 캐시된 메시지 데이터의 주기적인 쓰기를 디스크에 제어합니다. 스케줄러는 로그 캐시의 검사 빈도를 밀리초 단위로 지정합니다.

# ...
log.flush.scheduler.interval.ms=2000
# ...

메시지가 메모리에 유지되는 최대 시간 및 디스크에 쓰기 전에 로그의 최대 메시지 수에 따라 플러시 빈도를 제어할 수 있습니다.

# ...
log.flush.interval.ms=50000
log.flush.interval.messages=100000
# ...

플러시 간 대기에는 플러시를 수행하기 전에 검사 및 지정된 간격을 생성하는 시간이 포함됩니다. 플러시 빈도를 늘리면 처리량에 영향을 줄 수 있습니다.

애플리케이션 플러시 관리를 사용하는 경우 더 빠른 디스크를 사용하는 경우 더 낮은 플러시 임계값을 설정하는 것이 적절할 수 있습니다.

3.11. 가용성에 대한 파티션 재조정

내결함성을 위해 브로커 전체에 파티션을 복제할 수 있습니다. 지정된 파티션에 대해 하나의 브로커가 리더로 선택되고 모든 생성 요청(로그에 쓰기)을 처리합니다. 다른 브로커의 파티션 추적은 리더가 실패하는 경우 데이터 안정성을 위해 파티션 리더의 파티션 데이터를 복제합니다.

구성은 일반적으로 클라이언트에 서비스를 제공하지 않지만 Kafka 클러스터가 여러 데이터 센터에 걸쳐 있을 때 가장 가까운 복제본에서 메시지를 사용할 수 있습니다. 후자는 파티션 리더의 메시지를 복제하고 리더가 실패하는 경우 복구를 허용하기 위해서만 작동합니다. 복구하려면 동기화 후속 조치가 필요합니다. 자존심은 가져오기 요청을 리더에게 보내어 동기화 상태를 유지하며, 이는 후속자에게 메시지를 순서대로 반환합니다. 후속 기능은 리더에서 가장 최근에 커밋된 메시지와 함께 탐지된 경우 동기화된 것으로 간주됩니다. 리더는 후자가 요청한 마지막 오프셋을 확인하여 이를 확인합니다. 불명확한 리더 선택을 허용하지 않는 한 일반적으로 동기화 외부 후속자는 리더에서 현재 리더로 실패할 수 없습니다.

후속자가 동기화되지 않은 것으로 간주되기 전에 지연 시간을 조정할 수 있습니다.

# ...
replica.lag.time.max.ms=30000
# ...

지연 시간은 모든 동기화 복제본에 메시지를 복제하는 시간과 프로듀서가 승인을 기다려야하는 시간을 초과합니다. 후속자가 가져오기 요청을 작성하고 지정된 지연 시간 내에 최신 메시지를 포착하지 못하면 동기화 내 복제본에서 제거됩니다. 실패한 복제본을 더 빨리 감지하는 지연 시간을 줄일 수 있지만 이렇게 하면 필요에 따라 동기화되지 않는 구직 횟수를 늘릴 수 있습니다. 적절한 지연 시간 값은 네트워크 대기 시간과 브로커 디스크 대역폭 모두에 따라 달라집니다.

리더 파티션을 더 이상 사용할 수 없는 경우 동기화 중 복제본 중 하나가 새 리더로 선택됩니다. 파티션의 복제본 목록에 있는 첫 번째 브로커를 기본 리더라고 합니다. 기본적으로 Kafka는 리더 배포의 주기적인 검사를 기반으로 자동 파티션 리더 재조정이 활성화됩니다. 즉 Kafka는 기본 리더가 현재 리더인지 확인합니다. 리밸런스를 통해 리더는 브로커와 브로커에 균등하게 분배되지 않습니다.

AMQ Streams에 대한 Cruise Control for AMQ Streams를 사용하여 클러스터 전체에서 부하를 균등하게 균형을 조정하는 브로커에 복제본 할당을 확인할 수 있습니다. 그 계산은 리더와 팔로워가 경험하는 다양한 부하를 고려합니다. 나머지 브로커는 선행 추가 파티션의 추가 작업을 받기 때문에 실패한 리더는 Kafka 클러스터의 균형에 영향을 미칩니다.

Cruise Control가 실제로 찾은 할당의 균형을 조정하려면 파티션이 기본 리더에 의해 주도되어야 합니다. Kafka는 기본 리더가 사용 중인지 자동으로 확인하고 필요한 경우 현재 리더를 변경할 수 있습니다. 이렇게 하면 클러스터가 Cruise Control에서 찾은 균형 있는 상태로 유지됩니다.

리밸런스 검사의 빈도(초)와 리밸런스가 트리거되기 전에 브로커에 허용되는 최대 불균형 비율을 제어할 수 있습니다.

#...
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
#...

브로커의 백분율 리더 불균형은 브로커가 현재 리더인 파티션 수와 기본 리더인 파티션 수 간의 비율입니다. 백분율을 0으로 설정하여 선호하는 리더가 항상 선택되도록 할 수 있습니다.

검사에서 리밸런스에 더 많은 제어가 필요한 경우 자동화된 리밸런스를 비활성화할 수 있습니다. 그런 다음 kafka-leader-election.sh 명령줄 도구를 사용하여 리밸런스를 트리거할 시기를 선택할 수 있습니다.

참고

AMQ Streams와 함께 제공되는 Grafana 대시보드에는 활성 리더가 없는 복제 대상 파티션 및 파티션에 대한 메트릭이 표시됩니다.

3.12. 불명확한 리더 선택

In-sync 복제본에 대한 리더 선택은 데이터 손실을 보장하지 않기 때문에 정리된 것으로 간주됩니다. 이는 기본적으로 발생합니다. 그러나 리더십을 실현할 수 있는 in-sync 복제본이 없으면 어떻게 해야 합니까? ISR (in-sync replica)에는 리더의 디스크가 손실되었을 때만 리더가 포함되었을 수 있습니다. 최소 동기화 복제본 수가 설정되어 있지 않고 하드 드라이브가 예기치 않게 실패할 때 파티션 리더와 동기화할 때 데이터가 이미 손실되는 경우 데이터가 손실됩니다. 뿐만 아니라 새로운 리더가 선택될 수 없습니다.

Kafka에서 리더 오류를 처리하는 방법을 구성할 수 있습니다.

# ...
unclean.leader.election.enable=false
# ...

불명확한 리더 선택은 기본적으로 비활성화되어 있습니다. 즉, 비동기화 복제본이 리더가 될 수 없습니다. 명확한 리더 선택을 통해 이전 리더가 손실되었을 때 다른 브로커가 ISR에 없는 경우 Kafka는 메시지를 쓰거나 읽기 전에 리더가 다시 온라인 상태가 될 때까지 기다립니다. 불명확한 리더 선택을 통해 동기화되지 않은 복제본이 리더가 될 수 있지만 메시지가 손실될 위험이 있습니다. 선택 사항은 요구 사항이 가용성에 적합한지 또는 지속성에 따라 달라집니다.

주제 수준에서 특정 항목에 대한 기본 구성을 재정의할 수 있습니다. 데이터 손실 위험을 줄일 수 없는 경우 기본 구성을 그대로 둡니다.

3.13. 불필요한 소비자 그룹 리밸런스 방지

새 소비자 그룹에 가입하는 소비자의 경우 브로커에 불필요한 재조정을 피할 수 있도록 지연을 추가할 수 있습니다.

# ...
group.initial.rebalance.delay.ms=3000
# ...

지연은 조정자가 멤버가 참여할 때까지 대기하는 시간입니다. 지연 시간이 길수록 모든 멤버가 시간에 조인되고 재조정을 피할 가능성이 높아집니다. 그러나 지연은 기간이 종료될 때까지 그룹이 소비되지 않도록합니다.

4장. Kafka 소비자 구성 튜닝

특정 사용 사례에 맞는 선택적 속성과 함께 기본 소비자 구성을 사용합니다.

소비자를 튜닝할 때 주요 문제는 수집된 데이터 양에 효율적으로 대처할 수 있습니다. 생산자 튜닝과 마찬가지로 소비자가 예상대로 작동할 때까지 증분 변경을 수행할 수 있습니다.

4.1. 기본 소비자 구성

연결 및 역직렬러 속성은 모든 소비자에게 필수입니다. 일반적으로 추적을 위해 클라이언트 ID를 추가하는 것이 좋습니다.

소비자 구성에서 후속 구성에 관계없이 다음을 수행합니다.

  • 소비자는 지정된 오프셋에서 가져와서 메시지를 건너뛰거나 다시 읽기 위해 오프셋을 변경하지 않는 한 메시지를 순서대로 사용합니다.
  • 오프셋이 클러스터의 다른 브로커로 전송될 수 있으므로 브로커는 Kafka에 오프셋을 커밋하는 경우에도 소비자가 응답을 처리했는지 여부를 알 수 없습니다.

기본 소비자 구성 속성

# ...
bootstrap.servers=localhost:9092 1
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  2
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer  3
client.id=my-client 4
group.id=my-group-id 5
# ...

1
(필수) Kafka 브로커의 host:port 부트스트랩 서버 주소를 사용하여 Kafka 클러스터에 연결할 소비자를 지정합니다. 소비자는 주소를 사용하여 클러스터의 모든 브로커를 검색하고 연결합니다. 쉼표로 구분된 목록을 사용하여 서버가 중단된 경우 두 개 또는 세 개의 주소를 지정하지만 클러스터의 모든 브로커 목록을 제공할 필요는 없습니다. Kafka 클러스터를 노출하기 위해 로드 밸런서 서비스를 사용하는 경우 로드 밸런서에서 가용성을 처리하기 때문에 서비스의 주소만 있으면 됩니다.
2
(필수) Kafka 브로커에서 가져온 바이트를 메시지 키로 변환하는 Deserializer입니다.
3
(필수) Kafka 브로커에서 가져온 바이트를 메시지 값으로 변환하는 Deserializer입니다.
4
(선택 사항) 로그 및 요청의 소스를 식별하는 데 사용되는 클라이언트의 논리 이름입니다. ID를 사용하여 처리 시간 할당량을 기반으로 소비자를 제한할 수도 있습니다.
5
(조건) 소비자가 소비자 그룹에 참여하려면 그룹 ID가 필요합니다.

4.2. 소비자 그룹을 사용하여 데이터 소비 스케일링

소비자 그룹은 지정된 주제에서 하나 또는 여러 생산자가 생성한 일반적으로 큰 데이터 스트림을 공유합니다. Consumer는 group.id 속성을 사용하여 그룹화되므로 메시지가 멤버 전체에 분산됩니다. 그룹의 소비자 중 하나가 리더로 선택되고 그룹의 사용자에게 파티션이 할당되는 방법을 결정합니다. 각 파티션은 단일 사용자에게만 할당할 수 있습니다.

파티션 만큼의 소비자가 아직 없는 경우 동일한 group.id 에 더 많은 소비자 인스턴스를 추가하여 데이터 소비를 확장할 수 있습니다. 파티션이 있는 것보다 그룹에 더 많은 소비자를 추가하면 처리량에 도움이 되지 않지만, 사용자가 한 번 작동하지 않아야 한다는 것을 의미합니다. 소비자 수를 줄이고 처리량 목표를 충족할 수 있다면 리소스를 절약할 수 있습니다.

동일한 소비자 그룹 내의 소비자는 오프셋 커밋과 하트비트를 동일한 브로커에 보냅니다. 따라서 그룹의 소비자 수가 많을수록 브로커의 요청 부하가 높아집니다.

# ...
group.id=my-group-id 1
# ...
1
그룹 ID를 사용하여 소비자 그룹에 소비자를 추가합니다.

4.3. 메시지 순서 보장

Kafka 브로커는 브로커에게 주제, 파티션 및 오프셋 위치 목록에서 메시지를 보내도록 요청하는 요청을 받습니다.

소비자는 브로커에 커밋된 것과 동일한 순서로 단일 파티션에서 메시지를 관찰하므로 Kafka는 단일 파티션의 메시지 순서 제공합니다. 반대로 소비자가 여러 파티션의 메시지를 사용하는 경우 소비자가 확인한 다른 파티션의 메시지 순서가 전송된 순서를 반영하지 않습니다.

한 주제에서 메시지를 엄격하게 정렬하려면 소비자당 하나의 파티션을 사용합니다.

4.4. 처리량 및 대기 시간을 위해 소비자 최적화

클라이언트 애플리케이션이 KafkaConsumer.poll() 을 호출할 때 반환되는 메시지 수를 제어합니다.

fetch.max.wait.msfetch.min.bytes 속성을 사용하여 Kafka 브로커에서 소비자가 가져온 최소 데이터 양을 늘립니다. 시간 기반 일괄 처리는 fetch.max.wait.ms 를 사용하여 구성되며, size 기반 일괄 처리는 fetch.min.bytes 를 사용하여 구성됩니다.

소비자 또는 브로커의 CPU 사용률이 높은 경우 소비자의 요청이 너무 많기 때문일 수 있습니다. 더 적은 요청과 메시지가 더 큰 일괄 처리로 전달되도록 fetch.max.wait.msfetch.min.bytes 속성을 더 크게 조정할 수 있습니다. 처리량을 크게 조정하면 일부 대기 시간이 드는 비용이 향상됩니다. 생성되는 데이터 양이 낮은 경우 더 많이 조정할 수 있습니다.

예를 들어 fetch.max.wait.ms 를 500ms로 설정하고 fetch.min.bytes 를 16384바이트로 설정하면 Kafka가 첫 번째 임계값에 도달하면 응답하는 소비자에서 가져오기 요청을 받습니다.

반대로 fetch.max.wait.msfetch.min.bytes 속성을 낮게 조정하여 엔드 투 엔드 대기 시간을 개선할 수 있습니다.

# ...
fetch.max.wait.ms=500 1
fetch.min.bytes=16384 2
# ...
1
브로커는 가져오기 요청을 완료하기 전에 대기하는 최대 시간(밀리초)입니다. 기본값은 500 밀리초입니다.
2
최소 배치 크기(바이트)를 사용하면 최소값에 도달하거나 메시지가 fetch.max.wait.ms (가 더 빨리 제공됨)보다 오래 대기되었을 때 요청이 전송됩니다. 지연을 추가하면 일괄 처리가 배치 크기까지 메시지를 누적할 수 있습니다.

가져오기 요청 크기를 늘려 대기 시간 단축

fetch.max.bytesmax.partition.fetch.bytes 속성을 사용하여 Kafka 브로커에서 소비자가 가져온 최대 데이터 양을 늘립니다.

fetch.max.bytes 속성은 브로커에서 가져온 데이터 양에 대한 최대 제한을 한 번에 최대 바이트 단위로 설정합니다.

max.partition.fetch.bytes 는 각 파티션에 대해 반환되는 데이터의 최대 제한을 바이트 단위로 설정하며, 항상 max.message.bytes 에 대한 브로커 또는 주제 구성의 바이트 수보다 커야 합니다.

클라이언트가 사용할 수 있는 최대 메모리 양은 대략 다음과 같습니다.

NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes

메모리 사용량을 수용할 수 있는 경우 이 두 속성의 값을 늘릴 수 있습니다. 각 요청에 더 많은 데이터를 허용하면 가져오기 요청이 줄어들므로 대기 시간이 향상됩니다.

# ...
fetch.max.bytes=52428800 1
max.partition.fetch.bytes=1048576 2
# ...
1
가져오기 요청에 대해 반환된 최대 데이터 양(바이트)입니다.
2
각 파티션에 대해 반환된 최대 데이터 양(바이트)입니다.

4.5. 오프셋을 커밋할 때 데이터 손실 또는 복제 방지

Kafka 자동 커밋 메커니즘을 사용하면 소비자가 메시지 오프셋을 자동으로 커밋할 수 있습니다. 활성화하면 소비자는 브로커 폴링에서 5000ms 간격으로 수신된 오프셋을 커밋합니다.

자동 커밋 메커니즘은 편리하지만 데이터 손실 및 복제 위험이 있습니다. 소비자가 여러 메시지를 가져와서 변환했지만 시스템이 자동 커밋을 수행할 때 소비자 버퍼에서 처리된 메시지와 충돌하면 해당 데이터가 손실됩니다. 메시지를 처리한 후 시스템이 충돌하지만 자동 커밋을 수행하기 전에 시스템이 재조정 후 다른 소비자 인스턴스에서 데이터가 복제됩니다.

자동 커밋은 브로커에 대한 다음 폴링 전에 모든 메시지가 처리되거나 소비자가 종료될 때만 데이터 손실을 방지할 수 있습니다.

데이터 손실 또는 복제 가능성을 최소화하기 위해 enable.auto.commitfalse 로 설정하고 클라이언트 애플리케이션을 개발하여 오프셋 커밋을 더 잘 제어할 수 있습니다. 또는 auto.commit.interval.ms 를 사용하여 커밋 간 간격을 줄일 수 있습니다.

# ...
enable.auto.commit=false 1
# ...
1
자동 커밋은 커밋 오프셋에 대한 더 많은 제어를 제공하기 위해 false로 설정됩니다.

enable.auto.commitfalse 로 설정하여 모든 처리가 수행되고 메시지가 사용된 후 오프셋을 커밋할 수 있습니다. 예를 들어 Kafka 커밋Sync 및 commit Async 커밋 API를 호출하도록 애플리케이션을 설정할 수 있습니다.

commitSync API는 폴링에서 반환된 메시지 일괄 처리에 오프셋을 커밋합니다. 일괄 처리의 모든 메시지 처리가 완료되면 API를 호출합니다. commitSync API를 사용하는 경우 일괄 처리의 마지막 오프셋이 커밋될 때까지 애플리케이션은 새 메시지를 폴링하지 않습니다. 이로 인해 처리량에 부정적인 영향을 미치는 경우 자주 커밋하거나 commitAsync API를 사용할 수 있습니다. commitAsync API는 브로커가 커밋 요청에 응답할 때까지 기다리지 않지만 재조정할 때 더 많은 중복이 발생할 위험이 있습니다. 일반적인 접근 방식은 소비자를 종료하거나 재조정하기 전에 사용되는 commitSync API와 함께 애플리케이션에서 커밋 API를 결합하는 것입니다.

4.5.1. 트랜잭션 메시지 제어

생산자 측에서 트랜잭션 ID와 idempotence(enable.idempotence=true)를 사용하여 정확히 한 번 제공을 보장하는 것이 좋습니다. 그런 다음 소비자 측에서 isolation.level 속성을 사용하여 소비자가 트랜잭션 메시지를 읽는 방법을 제어할 수 있습니다.

isolation.level 속성에는 두 가지 유효한 값이 있습니다.

  • read_committed
  • read_uncommitted (기본값)

커밋된 트랜잭션 메시지만 소비자가 읽을 수 있도록 read_committed 를 사용합니다. 그러나 이는 소비자가 브로커가 트랜잭션 결과를 기록하는 트랜잭션 마커를 작성할 때까지 메시지를 반환할 수 없기 때문에 엔드 투 엔드 대기 시간이 증가합니다.

# ...
enable.auto.commit=false
isolation.level=read_committed 1
# ...
1
사용자가 커밋한 메시지만 읽도록 read_committed 로 설정합니다.

4.6. 데이터 손실을 방지하기 위해 실패에서 복구

session.timeout.msheartbeat.interval.ms 속성을 사용하여 소비자 그룹 내에서 소비자 실패를 확인하고 복구하는 데 걸리는 시간을 구성합니다.

session.timeout.ms 속성은 소비자 그룹 내의 소비자가 비활성 상태로 간주되기 전에 브로커와의 연결이 끊어질 수 있는 최대 시간(밀리초)을 지정하고 그룹의 활성 소비자 간에 재조정 이 트리거됩니다. 그룹이 재조정되면 파티션이 그룹 멤버에게 다시 할당됩니다.

heartbeat.interval.ms 속성은 소비자가 활성 상태이고 연결되어 있음을 나타내기 위해 하트 비트를 확인하는 간격(밀리초)을 지정합니다. 일반적으로 하트비트 간격은 세션 시간 제한 간격보다 일반적으로 세 번째 간격보다 작아야 합니다.

session.timeout.ms 속성을 낮게 설정하면 실패한 소비자가 이전에 감지되고 재조정이 더 빨리 수행될 수 있습니다. 그러나 브로커가 시간에 하트비트를 수신하지 못하고 불필요한 리밸런스를 트리거하지 못하도록 시간 초과를 낮게 설정하지 않도록 주의하십시오.

하트비트 간격을 줄이면 실수로 재조정할 가능성이 줄어들지 만 브로커 리소스의 오버헤드가 더 자주 하트비트가 증가합니다.

4.7. 오프셋 정책 관리

auto.offset.reset 속성을 사용하여 오프셋이 커밋되지 않았거나 커밋된 오프셋이 더 이상 유효하지 않거나 삭제된 경우 소비자 동작 방식을 제어할 수 있습니다.

소비자 애플리케이션을 처음 배포하고 기존 주제에서 메시지를 읽습니다. group.id 가 처음 사용되기 때문에 __consumer_offsets 항목에 이 애플리케이션에 대한 오프셋 정보가 포함되지 않습니다. 새 애플리케이션은 로그 시작부터 기존의 모든 메시지 또는 새 메시지만 처리할 수 있습니다. 기본 재설정 값은 latest 이며, 이는 파티션 끝에 시작되어 결과적으로 일부 메시지가 누락되었음을 의미합니다. 데이터 손실을 피하지만 처리 양을 늘리려면 auto.offset.reset가장 빨리 설정 하여 파티션을 시작할 수 있습니다.

또한 브로커에 대해 구성된 오프셋 보존 기간(offsets.retention.minutes)이 종료될 때 메시지가 손실되는 것을 방지하기 위해 가장 빠른 옵션을 사용하는 것이 좋습니다. 소비자 그룹 또는 독립 실행형 소비자가 보존 기간 동안 오프셋을 사용하지 않는 경우 이전에 커밋한 오프셋은 __consumer_offsets 에서 삭제됩니다.

# ...
heartbeat.interval.ms=3000 1
session.timeout.ms=45000 2
auto.offset.reset=earliest 3
# ...
1
예상 리밸런스에 따라 하트비트 간격을 낮게 조정합니다.
2
시간 초과 기간이 만료되기 전에 Kafka 브로커가 하트비트를 수신하지 않으면 소비자가 소비자 그룹에서 제거되고 재조정이 시작됩니다. 브로커 구성에 group.min.session.timeout.msgroup.max.session.timeout.ms 가 있는 경우 세션 타임아웃 값은 해당 범위 내에 있어야 합니다.
3
파티션 시작으로 돌아가 려면 시작으로 설정하고 오프셋이 커밋되지 않은 경우 데이터 손실이 발생하지 않도록 설정합니다.

단일 가져오기 요청에 반환된 데이터 양이 크면 소비자가 처리되기 전에 시간 초과가 발생할 수 있습니다. 이 경우 max.partition.fetch.bytes 를 낮추거나 session.timeout.ms 를 늘릴 수 있습니다.

4.8. 리밸런스 영향 최소화

그룹의 활성 소비자 간에 파티션 재조정은 다음과 같은 시간이 걸립니다.

  • 소비자는 오프셋을 커밋합니다.
  • 형성될 새로운 소비자 그룹
  • 그룹 멤버에 파티션을 할당할 그룹 리더
  • 그룹의 소비자는 할당을 받고 가져오기 시작합니다.

명확하게, 이 프로세스는 특히 소비자 그룹 클러스터를 롤링 재시작하는 동안 반복적으로 발생하는 경우 서비스 다운 타임을 늘립니다.

이 경우 정적 멤버십 개념을 사용하여 리밸런스 수를 줄일 수 있습니다. 리밸런싱은 소비자 그룹 멤버 간에 주제 파티션을 균등하게 할당합니다. 정적 멤버십은 지속성을 사용하므로 세션 시간 초과 후 소비자 인스턴스가 재시작 중에 인식됩니다.

소비자 그룹 조정자는 group.instance.id 속성을 사용하여 지정된 고유 ID를 사용하여 새 소비자 인스턴스를 식별할 수 있습니다. 다시 시작하는 동안 소비자에는 새 멤버 ID가 할당되지만 정적 멤버로서 동일한 인스턴스 ID로 계속되고 주제 파티션의 동일한 할당이 수행됩니다.

소비자 애플리케이션이 적어도 모든 max.poll.interval.ms 밀리초를 폴링하도록 호출하지 않으면 소비자는 실패한 것으로 간주되어 재조정이 발생합니다. 애플리케이션이 폴링에서 반환된 모든 레코드를 처리할 수 없는 경우 max.poll.interval.ms 속성을 사용하여 소비자의 새 메시지에 대한 폴링 간격을 밀리초 간 간격을 지정하여 재조정을 방지할 수 있습니다. 또는 max.poll.records 속성을 사용하여 소비자 버퍼에서 반환된 레코드 수에 대한 최대 제한을 설정할 수 있으므로 애플리케이션이 max.poll.interval.ms 제한 내의 더 적은 수의 레코드를 처리할 수 있습니다.

# ...
group.instance.id=UNIQUE-ID 1
max.poll.interval.ms=300000 2
max.poll.records=500 3
# ...
1
고유한 인스턴스 ID를 사용하면 새 소비자 인스턴스가 동일한 주제 파티션 할당을 받습니다.
2
간격을 설정하여 소비자가 계속 메시지를 처리하고 있는지 확인합니다.
3
소비자에서 반환된 처리된 레코드 수를 설정합니다.

5장. Kafka 생산자 구성 튜닝

특정 사용 사례에 맞는 선택적 속성과 함께 기본 생산자 구성을 사용합니다.

처리량을 최대화하도록 구성을 조정하면 대기 시간이 증가하거나 그 반대의 경우도 발생할 수 있습니다. 필요한 균형을 조정하려면 생산자 구성을 실험하고 튜닝해야 합니다.

5.1. 기본 생산자 구성

모든 프로듀서에 연결 및 직렬화기 속성이 필요합니다. 일반적으로 추적을 위해 클라이언트 ID를 추가하고 생산자에 압축을 사용하여 요청의 배치 크기를 줄이는 것이 좋습니다.

기본 생산자 구성에서는 다음을 수행합니다.

  • 파티션의 메시지 순서는 보장되지 않습니다.
  • 브로커에 도달하는 메시지의 승인이 승인한다고 보장되는 것은 아닙니다.

기본 생산자 구성 속성

# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...

1
(필수) Kafka 브로커의 host:port 부트스트랩 서버 주소를 사용하여 Kafka 클러스터에 연결할 생산자를 지정합니다. 생산자는 주소를 사용하여 클러스터의 모든 브로커를 검색하고 연결합니다. 쉼표로 구분된 목록을 사용하여 서버가 중단된 경우 두 개 또는 세 개의 주소를 지정하지만 클러스터의 모든 브로커 목록을 제공할 필요는 없습니다.
2
(필수) 각 메시지의 키를 브로커로 보내기 전에 바이트로 변환하는 직렬라이저입니다.
3
(필수) 각 메시지의 값을 브로커로 보내기 전에 바이트로 변환하는 직렬라이저입니다.
4
(선택 사항) 로그 및 요청의 소스를 식별하는 데 사용되는 클라이언트의 논리 이름입니다.
5
(선택 사항) 전송되고 압축된 형식으로 저장된 메시지를 압축하는 코드c이며 소비자에게 도달할 때 압축을 풉니다. 압축은 처리량을 개선하고 스토리지의 부하를 줄이는 데 유용하지만 압축 또는 압축 해제 비용이 금지될 수 있는 짧은 대기 시간 애플리케이션에 적합하지 않을 수 있습니다.

5.2. 데이터 조정

메시지 전달 승인은 메시지가 손실될 가능성을 최소화합니다. 감사는 기본적으로 acks=all 에 설정된 acks 속성을 사용하여 활성화됩니다.

메시지 전달 확인

# ...
acks=all 1
# ...

1
ACKS=all 은 메시지 요청이 성공적으로 수신되었음을 확인하기 전에 특정 수의 성공자에게 메시지를 복제하도록 강제 적용합니다.

acks=all 설정은 강력한 전달 보장을 제공하지만 생산자가 메시지를 보내고 승인을 수신하는 것 사이의 대기 시간을 늘릴 수 있습니다. 이러한 강력한 보장이 필요하지 않은 경우 acks=0 또는 acks=1 로 설정하면 리더 복제본에서 해당 로그에 레코드를 기록했음을 확인하는 보장이 없거나 확인만 제공합니다.

acks=all 을 사용하면 리더는 모든 동기화 복제본이 메시지 전달을 승인할 때까지 기다립니다. 항목의 min.insync.replicas 구성은 필요한 최소 개수의 in-sync 복제본 승인 수를 설정합니다. 승인의 횟수는 리더와 팔로워의 수를 포함합니다.

일반적으로 시작 지점은 다음 구성을 사용하는 것입니다.

  • 생산자 구성:

    • ACKS=all (기본값)
  • 주제 복제를 위한 브로커 구성:

    • default.replication.factor=3 (기본값 = 1)
    • min.insync.replicas=2 (default = 1)

주제를 만들 때 기본 복제 요소를 재정의할 수 있습니다. 주제 구성의 주제 수준에서 min.insync.replicas 를 재정의할 수도 있습니다.

AMQ Streams는 Kafka의 다중 노드 배포에 대해 예제 구성 파일에서 이 구성을 사용합니다.

다음 표에서는 리더 복제본을 복제하는 사용자의 가용성에 따라 이 구성이 작동하는 방법을 설명합니다.

표 5.1. 팔로워 사용 가능

사용 가능한 설명 및 동기화감사 인사생산자가 메시지를 보낼 수 있습니까?

2

리더는 2개의 후속 조치를 기다리고 있습니다.

제공됨

1

리더가 1 후속 조치를 기다리는 경우

제공됨

0

리더는 예외를 발생시킵니다.

없음

3의 주제 복제 요인은 하나의 리더 복제본과 두 명의 팔로워를 만듭니다. 이 구성에서는 단일 후속 조치를 사용할 수 없는 경우 생산자가 계속될 수 있습니다. in-sync 복제본에서 실패한 브로커를 제거하거나 새 리더를 생성하는 동안 일부 지연이 발생할 수 있습니다. 두 번째 후속 조치도 사용할 수 없는 경우 메시지 전달에 성공하지 못합니다. 성공적인 메시지 전달을 인정하는 대신 리더는 오류(여전복제본이 충분하지 않음)를 프로듀자에게 보냅니다. 프로듀서에서 동일한 예외를 발생시킵니다. 재시도 구성을 사용하면 프로듀서에서 실패한 메시지 요청을 다시 보낼 수 있습니다.

참고

시스템이 실패하면 버퍼에 원하지 않는 데이터가 손실될 위험이 있습니다.

5.3. 주문한 전달

idempotent 생산자는 메시지가 정확히 한 번만 전달되므로 중복을 방지합니다. 실패 시에도 전송 순서를 보장하기 위해 ID 및 시퀀스 번호가 메시지에 할당됩니다. 데이터 일관성을 위해 acks=all 을 사용하는 경우, 멱등성을 사용하는 것은 순서가 지정된 전달에 적합합니다. 생산자에는 기본적으로 idempotency가 활성화됩니다. idempotency를 사용하면 메시지 순서를 보존하기 위해 동시 진행 중 요청 수를 최대 5개로 설정할 수 있습니다.

idempotency로 주문한 전달

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
idempotent 생산자를 활성화하려면 true 로 설정합니다.
2
idempotent를 사용하면 진행 중인 요청 수가 1보다 클 수 있지만 메시지 순서 지정 보장은 계속 제공됩니다. 기본값은 5 진행 중인 요청입니다.
3
acks모두 로 설정합니다.
4
실패한 메시지 요청을 다시 보내는 시도 수를 설정합니다.

acks=all 을 사용하지 않고 성능 비용 때문에 idempotency를 사용하지 않도록 선택하는 경우 진행 중(unacknowledged) 요청 수를 1로 설정하여 순서를 유지합니다. 그렇지 않으면 Message-A 가 브로커에 이미 기록된 후에만 Message- A가 성공하는 경우가 발생할 수 있습니다.

idempotency 없이 주문한 전달

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
idempotent 생산자를 비활성화하려면 false 로 설정합니다.
2
진행 중인 요청 수를 정확히 1 로 설정합니다.

5.4. 신뢰성 보장

idempotence는 정확히 한 번 단일 파티션에 쓰는 데 유용합니다. 멱등과 함께 사용할 경우 트랜잭션은 정확히 한 번 여러 파티션에 쓸 수 있습니다.

트랜잭션은 동일한 트랜잭션 ID를 사용하는 메시지가 한 번 생성되도록 보장하며, 모두 각 로그에 성공적으로 기록되거나 그 중 어느 것도 아닙니다.

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
고유한 트랜잭션 ID를 지정합니다.
2
시간 초과 오류가 반환되기 전에 트랜잭션에 허용되는 최대 시간을 밀리초 단위로 설정합니다. 기본값은 900000 또는 15분입니다.

transactional.id 의 선택 사항은 트랜잭션 보장이 유지되기 위해서는 중요합니다. 각 트랜잭션 ID는 고유한 주제 파티션 세트에 사용해야 합니다. 예를 들어 파티션 이름에서 트랜잭션 ID에 대한 주제 파티션 이름의 외부 매핑을 사용하거나 충돌을 방지하는 함수를 사용하는 주제 파티션 이름에서 트랜잭션 ID를 계산하여 수행할 수 있습니다.

5.5. 처리량 및 대기 시간을 위해 생산자 최적화

일반적으로 시스템의 요구 사항은 지정된 대기 시간 내에 메시지 비율에 대한 특정 처리량 대상을 충족하기 위한 것입니다. 예를 들어 초당 500,000개의 메시지를 대상으로 하며 메시지의 95%가 2초 이내에 승인됩니다.

프로듀서의 메시징 순서(message order and commitment)가 애플리케이션 요구 사항에 따라 정의될 수 있습니다. 예를 들어, 애플리케이션에서 제공하는 일부 중요한 속성이나 보안을 손상시키지 않고 acks=0 또는 acks=1 을 사용하는 옵션이 없을 수 있습니다.

브로커 재시작은 높은 백분위 통계에 큰 영향을 미칩니다. 예를 들어 장기간에 걸쳐 99번째 백분위 대기 시간은 브로커 재시작에 대한 동작으로 인해 우선합니다. 벤치마킹을 설계하거나 벤치마킹을 수행하는 성능 번호를 프로덕션에서 볼 수 있는 성능 수치와 비교할 때 고려해야 할 사항은 다음과 같습니다.

Kafka는 목표에 따라 처리량과 대기 시간을 위해 생산자 성능을 조정하는 다양한 구성 매개변수와 기술을 제공합니다.

메시지 일괄 처리 (linger.msbatch.size)
동일한 브로커로 향하는 더 많은 메시지가 전송되어 단일 생성 요청에 배치될 수 있도록 메시지 일괄 처리 지연이 발생합니다. 일괄 처리는 처리량이 증가하기 위해 대기 시간이 길수록 저하됩니다. 시간 기반 일괄 처리는 linger.ms 를 사용하여 구성되며, 크기 기반 일괄 처리는 batch.size 를 사용하여 구성됩니다.
압축(compression.type)
메시지 압축은 생산자(메시지 압축에 소비된 CPU 시간)에 대기 시간을 추가하지만 요청(및 잠재적으로 디스크 쓰기 가능)을 작게 만들어 처리량을 늘릴 수 있습니다. 압축이 있는지 여부와 가장 좋은 압축은 전송 중인 메시지에 따라 달라집니다. 압축은 KafkaProducer.send() 호출 스레드에서 수행되므로 이 방법의 대기 시간이 애플리케이션에 필요한 경우 더 많은 스레드를 사용해야 합니다.
파이프라이닝 (max.in.flight.requests.per.connection)
파이프라이닝은 이전 요청에 대한 응답이 수신되기 전에 더 많은 요청을 전송함을 의미합니다. 일반적으로 더 많은 파이프라이닝은 처리량 향상, 즉 더 심각한 일괄 처리와 같은 다른 효과의 임계값까지 증가하여 처리량에 미치는 영향을 대응하기 시작합니다.

대기 시간 단축

애플리케이션이 KafkaProducer.send() 를 호출하면 메시지는 다음과 같습니다.

  • 모든 인터셉터에서 처리
  • serialized
  • 파티션에 할당
  • compressed
  • 파티션별 큐에서 메시지 일괄 처리 추가

이 경우 send() 메서드가 반환됩니다. 따라서 send() 가 차단되는 시간은 다음에 따라 결정됩니다.

  • 인터셉터, serializers 및 partitioner에서 보낸 시간
  • 사용된 압축 알고리즘
  • 버퍼 압축을 기다리는 데 걸리는 시간

일괄 처리는 다음 중 하나가 발생할 때까지 큐에 남아 있습니다.

  • 일괄 처리( batch.size에 따라)
  • linger.ms 에서 도입한 지연이 통과되었습니다.
  • 발신자는 다른 파티션에 대한 메시지 일괄 처리를 동일한 브로커에 보내려고 하며 이 배치를 추가할 수도 있습니다.
  • 프로듀서가 플러시되거나 닫히고 있습니다.

일괄 처리 및 버퍼링에 대한 구성을 확인하여 대기 시간에 대한 send() 차단의 영향을 완화합니다.

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 속성은 더 큰 메시지 배치가 누적되어 요청에 전송되도록 지연을 밀리초 단위로 추가합니다. 기본값은 0'입니다.
2
최대 batch.size (바이트)가 사용되는 경우 최대값에 도달할 때 요청이 전송되거나 메시지가 linger.ms 보다 오래 대기되었습니다(둘 중 더 빨리 제공됨). 지연을 추가하면 일괄 처리가 배치 크기까지 메시지를 누적할 수 있습니다.
3
버퍼 크기는 배치 크기 이상이어야 하며 버퍼링, 압축 및 진행 중인 요청을 수용할 수 있어야 합니다.

처리량 증가

메시지를 전달하고 전송 요청을 완료하기 전에 대기할 최대 시간을 조정하여 메시지 요청의 처리량을 향상시킵니다.

기본값을 대체할 사용자 지정 파티션을 작성하여 지정된 파티션으로 메시지를 보낼 수도 있습니다.

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
전체 전송 요청을 기다리는 최대 시간(밀리초)입니다. 이 값을 MAX_ LOECDHE로 설정하여 무제한 재시도 횟수를 Kafka에 위임할 수 있습니다. 기본값은 120000 또는 2분입니다.
2
사용자 정의 partitioner의 클래스 이름을 지정합니다.

부록 A. 서브스크립션 사용

AMQ Streams는 소프트웨어 서브스크립션을 통해 제공됩니다. 서브스크립션을 관리하려면 Red Hat 고객 포털에서 계정에 액세스하십시오.

귀하의 계정에 액세스

  1. access.redhat.com 으로 이동합니다.
  2. 아직 계정이 없는 경우 계정을 생성합니다.
  3. 계정에 로그인합니다.

서브스크립션 활성화

  1. access.redhat.com 으로 이동합니다.
  2. 내 서브스크립션으로 이동합니다.
  3. 서브스크립션을 활성화하여 16자리 활성화 번호를 입력합니다.

Zip 및 Tar 파일 다운로드

zip 또는 tar 파일에 액세스하려면 고객 포털을 사용하여 다운로드할 관련 파일을 찾습니다. RPM 패키지를 사용하는 경우에는 이 단계가 필요하지 않습니다.

  1. 브라우저를 열고 access.redhat.com/downloads 에서 Red Hat 고객 포털 제품 다운로드 페이지에 로그인합니다.
  2. INTEGRAT ION 및 AUTOMATION 카테고리에서 Apache Kafka의 AMQ Streams 를 찾습니다.
  3. 원하는 AMQ Streams 제품을 선택합니다. Software Download 페이지가 열립니다.
  4. 구성 요소에 대한 다운로드 링크를 클릭합니다.

DNF로 패키지 설치

패키지 및 모든 패키지 종속 항목을 설치하려면 다음을 사용합니다.

dnf install <package_name>

로컬 디렉터리에서 이전에 다운로드한 패키지를 설치하려면 다음을 사용합니다.

dnf install <path_to_download_package>

2023-11-22에 최종 업데이트된 문서

법적 공지

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.