7.5. Kafka Connect 구성
AMQ Streams의 KafkaConnect 리소스를 사용하여 새 Kafka Connect 클러스터를 빠르고 쉽게 생성할 수 있습니다.
KafkaConnect 리소스를 사용하여 Kafka Connect를 배포할 때 Kafka 클러스터에 연결하기 위해 부트스트랩 서버 주소( spec.bootstrapServers)를 지정합니다. 서버가 중단된 경우 둘 이상의 주소를 지정할 수 있습니다. 또한 보안 연결을 위해 인증 자격 증명 및 TLS 암호화 인증서를 지정합니다.
Kafka 클러스터는 AMQ Streams에서 관리하거나 OpenShift 클러스터에 배포할 필요가 없습니다.
KafkaConnect 리소스를 사용하여 다음을 지정할 수도 있습니다.
- 연결을 위해 플러그인이 포함된 컨테이너 이미지를 빌드하는 플러그인 구성
- Kafka Connect 클러스터에 속한 작업자 Pod 구성
-
KafkaConnector리소스를 사용하여 플러그인을 관리할 수 있는 주석
Cluster Operator는 KafkaConnector 리소스를 사용하여 생성된 KafkaConnect 리소스 및 커넥터를 사용하여 배포된 Kafka Connect 클러스터를 관리합니다.
플러그인 구성
플러그인에서는 커넥터 인스턴스 생성을 위한 구현을 제공합니다. 플러그인이 인스턴스화되면 특정 유형의 외부 데이터 시스템에 연결하기 위해 구성이 제공됩니다. 플러그인은 지정된 종류의 데이터 소스에 연결하기 위해 커넥터 및 작업 구현을 정의하는 하나 이상의 JAR 파일 세트를 제공합니다. 여러 외부 시스템용 플러그인은 Kafka Connect에서 사용할 수 있습니다. 자체 플러그인을 만들 수도 있습니다.
이 구성에서는 Kafka Connect에 공급할 소스 입력 데이터 및 대상 출력 데이터를 설명합니다. 소스 커넥터의 경우 외부 소스 데이터는 메시지를 저장할 특정 항목을 참조해야 합니다. 플러그인에는 데이터를 변환하는 데 필요한 라이브러리와 파일도 포함될 수 있습니다.
Kafka Connect 배포에는 하나 이상의 플러그인이 있을 수 있지만 각 플러그인마다 하나의 버전만 사용할 수 있습니다.
선택한 플러그인을 포함하는 사용자 지정 Kafka Connect 이미지를 생성할 수 있습니다. 다음 두 가지 방법으로 이미지를 생성할 수 있습니다.
컨테이너 이미지를 자동으로 생성하려면 KafkaConnect 리소스의 build 속성을 사용하여 Kafka Connect 클러스터에 추가할 플러그인을 지정합니다. AMQ Streams는 플러그인 아티팩트를 자동으로 다운로드하여 새 컨테이너 이미지에 추가합니다.
플러그인 구성 예
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
# ...
build: 1
output: 2
type: docker
image: my-registry.io/my-org/my-connect-cluster:latest
pushSecret: my-registry-credentials
plugins: 3
- name: debezium-postgres-connector
artifacts:
- type: tgz
url: https://ARTIFACT-ADDRESS.tgz
sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
# ...
# ...
- 1
- 플러그인이 자동으로 컨테이너 이미지를 빌드하기 위한 구성 속성을 빌드 합니다.
- 2
- 새 이미지가 푸시되는 컨테이너 레지스트리의 구성
출력속성은 이미지의 유형 및 이름과 선택적으로 컨테이너 레지스트리에 액세스하는 데 필요한 인증 정보가 포함된 시크릿 이름을 설명합니다. - 3
- 새 컨테이너 이미지에 추가할 플러그인 및 아티팩트 목록입니다.
plugins속성은 아티팩트 유형과 아티팩트가 다운로드되는 URL을 설명합니다. 각 플러그인은 하나 이상의 아티팩트로 구성해야 합니다. 또한 SHA-512 체크섬을 지정하여 아티팩트의 압축을 풀기 전에 아티팩트를 확인할 수 있습니다.
Dockerfile을 사용하여 이미지를 빌드하는 경우 AMQ Streams의 최신 컨테이너 이미지를 기본 이미지로 사용하여 플러그인 구성 파일을 추가할 수 있습니다.
플러그인 구성 수동 추가 표시 예
FROM registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
작업자를 위한 Kafka Connect 클러스터 구성
KafkaConnect 리소스의 config 속성에서 작업자에 대한 구성을 지정합니다.
분산 Kafka Connect 클러스터에는 그룹 ID와 내부 구성 주제 세트가 있습니다.
-
group.id -
offset.storage.topic -
config.storage.topic -
status.storage.topic
Kafka Connect 클러스터는 기본적으로 이러한 속성에 대해 동일한 값으로 구성됩니다. Kafka Connect 클러스터는 오류를 생성할 때 그룹 ID 또는 주제 이름을 공유할 수 없습니다. 여러 개의 Kafka Connect 클러스터를 사용하는 경우 이러한 설정은 생성된 각 Kafka Connect 클러스터의 작업자에 고유해야 합니다.
각 Kafka Connect 클러스터에서 사용하는 커넥터의 이름도 고유해야 합니다.
다음 예제 작업자 구성에서는 JSON 변환기가 지정됩니다. 복제 요소는 Kafka Connect에서 사용하는 내부 Kafka 항목에 대해 설정됩니다. 프로덕션 환경의 경우 최소 3개여야 합니다. 주제를 만든 후 복제 요소를 변경하면 적용되지 않습니다.
작업자 구성 예
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
config:
# ...
group.id: my-connect-cluster 1
offset.storage.topic: my-connect-cluster-offsets 2
config.storage.topic: my-connect-cluster-configs 3
status.storage.topic: my-connect-cluster-status 4
key.converter: org.apache.kafka.connect.json.JsonConverter 5
value.converter: org.apache.kafka.connect.json.JsonConverter 6
key.converter.schemas.enable: true 7
value.converter.schemas.enable: true 8
config.storage.replication.factor: 3 9
offset.storage.replication.factor: 3 10
status.storage.replication.factor: 3 11
# ...
- 1
- Kafka 내의 Kafka Connect 클러스터 ID입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 2
- 커넥터 오프셋을 저장하는 Kafka 주제입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 3
- 커넥터 및 작업 상태 구성을 저장하는 Kafka 주제입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 4
- 커넥터 및 작업 상태 업데이트를 저장하는 Kafka 주제입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 5
- Kafka에서 저장하기 위해 메시지 키를 JSON 형식으로 변환하는 데 사용됩니다.
- 6
- Kafka에서 저장하기 위해 메시지 값을 JSON 형식으로 변환하는 데 사용됩니다.
- 7
- 메시지 키를 구조화된 JSON 형식으로 변환하는 데 활성화된 스키마입니다.
- 8
- 메시지 값을 구조화된 JSON 형식으로 변환하는 데 활성화된 스키마입니다.
- 9
- 커넥터 오프셋을 저장하는 Kafka 항목에 대한 복제 요인입니다.
- 10
- 커넥터 및 작업 상태 구성을 저장하는 Kafka 항목에 대한 복제 요인입니다.
- 11
- 커넥터 및 작업 상태 업데이트를 저장하는 Kafka 항목에 대한 복제 요인입니다.
커넥터의 KafkaConnector 관리
배포의 작업자 Pod에 사용되는 컨테이너 이미지에 플러그인을 추가한 후 AMQ Streams의 KafkaConnector 사용자 정의 리소스 또는 Kafka Connect API를 사용하여 커넥터 인스턴스를 관리할 수 있습니다. 이러한 옵션을 사용하여 새 커넥터 인스턴스를 만들 수도 있습니다.
KafkaConnector 리소스는 Cluster Operator의 커넥터 관리에 OpenShift 네이티브 접근 방식을 제공합니다. KafkaConnector 리소스를 사용하여 커넥터를 관리하려면 KafkaConnect 사용자 정의 리소스에 주석을 지정해야 합니다.
KafkaConnectors를 활성화하는 주석
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
# ...
use-connector-resources 를 true 로 설정하면 KafkaConnectors가 커넥터를 생성, 삭제 및 재구성할 수 있습니다.
KafkaConnect 구성에서 use-connector-resources 가 활성화된 경우 KafkaConnector 리소스를 사용하여 커넥터를 정의하고 관리해야 합니다. KafkaConnector 리소스는 외부 시스템에 연결하도록 구성됩니다. Kafka Connect 클러스터 및 외부 데이터 시스템과 상호 작용하는 Kafka 클러스터와 동일한 OpenShift 클러스터에 배포됩니다.
Kafka 구성 요소는 동일한 OpenShift 클러스터에 포함되어 있습니다.
이 구성은 인증을 포함하여 커넥터 인스턴스가 외부 데이터 시스템에 연결하는 방법을 지정합니다. 또한 조사할 데이터를 기록해야 합니다. 소스 커넥터의 경우 구성에 데이터베이스 이름을 제공할 수 있습니다. 대상 주제 이름을 지정하여 데이터가 Kafka에 있는 위치를 지정할 수도 있습니다.
tasksMax 를 사용하여 최대 작업 수를 지정합니다. 예를 들어 tasksMax: 2 가 있는 소스 커넥터는 소스 데이터 가져오기를 두 개의 작업으로 분할할 수 있습니다.
KafkaConnector 소스 커넥터 구성의 예
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
- 1
- 커넥터의 이름으로 사용되는
KafkaConnector리소스의 이름입니다. OpenShift 리소스에 유효한 이름을 사용합니다. - 2
- Kafka Connect 클러스터의 이름이 에서 커넥터 인스턴스를 생성합니다. 커넥터를 연결하는 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다.
- 3
- 커넥터 클래스의 전체 이름입니다. Kafka Connect 클러스터에서 사용하는 이미지에 있어야 합니다.
- 4
- 커넥터가 생성할 수 있는 최대 Kafka Connect 작업 수입니다.
- 5
- 커넥터 구성은 키-값 쌍으로 설정됩니다.
- 6
- 외부 데이터 파일의 위치입니다. 이 예제에서는
/opt/kafka/LICENSE파일에서 읽을 수 있도록 CloudEventSourceConnector를 구성하고 있습니다. - 7
- 소스 데이터를 게시하는 Kafka 주제입니다.
OpenShift Secrets 또는 ConfigMaps에서 커넥터의 기밀 구성 값을 로드 할 수 있습니다.
Kafka Connect API
KafkaConnector 리소스를 사용하여 커넥터를 관리하는 대신 Kafka Connect REST API를 사용합니다. Kafka Connect REST API는 < connect_cluster_name> -connect-api:8083 에서 실행되는 서비스로 사용할 수 있습니다. 여기서 < connect_cluster_name >은 Kafka Connect 클러스터의 이름입니다.
커넥터 구성을 JSON 오브젝트로 추가합니다.
커넥터 구성 추가에 대한 curl 요청의 예
curl -X POST \
http://my-connect-cluster-connect-api:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "my-source-connector",
"config":
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/opt/kafka/LICENSE",
"topic":"my-topic",
"tasksMax": "4",
"type": "source"
}
}'
KafkaConnectors가 활성화된 경우 Kafka Connect REST API를 직접 사용하여 변경한 수동 변경 사항을 Cluster Operator에 의해 되돌립니다.
REST API에서 지원하는 작업은 Apache Kafka 설명서에 설명되어 있습니다.
OpenShift 외부에서 Kafka Connect API 서비스를 노출할 수 있습니다. 수신 또는 경로와 같은 액세스를 제공하는 연결 메커니즘을 사용하는 서비스를 생성하여 이 작업을 수행합니다. 연결이 안전하지 않으므로 advisedly를 사용합니다.