Type Here to Get Search Results !

아파치 카프카 (Apache Kafka) 퀵 스타트(Quickstart) 따라하기 - 테스트 방법

이 글에서는 아파치 카프카 (Apache Kafka) 퀵 스타트(Quickstart)의 3장 부터 마지막장 까지 설명합니다. 

  • STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS (토픽 생성하기)
  • STEP 4: WRITE SOME EVENTS INTO THE TOPIC (토픽에 이벤트 작성)
  • STEP 5: READ THE EVENTS (이벤트 읽기)
  • STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT (KAFKA CONNECT를 사용하여 데이터를 이벤트 스트림으로 가져오기/내보내기)
  • STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS (KAFKA STREAMS로 이벤트 처리)
  • STEP 8: TERMINATE THE KAFKA ENVIRONMENT
  • Quickstart를 마치며 (축하합니다.)

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS (토픽 생성하기)


이 장에서는 이벤트를 처리하기 위해서 토픽(Topic)을 생성하는 법을 알아봅니다.

Kafka는 이벤트(레코드 또는 메시지라고도 함)를 읽고, 쓰고, 저장하고, 처리할 수 있는 분산 이벤트 스트리밍 플랫폼입니다.

예제 이벤트로는 결제 거래, 휴대폰의 위치 업데이트, 배송 주문, 센서 측정이 있습니다. IoT 장치 또는 의료 장비 등에서. 이러한 이벤트는 토픽에 구성되고 저장됩니다. 매우 단순화된 토픽은 파일 시스템의 폴더와 유사하며 이벤트는 해당 폴더의 파일입니다.

따라서 첫 번째 이벤트를 작성하기 전에 토픽을 만들어야 합니다. 다른 터미널 세션을 열고 다음을 실행합니다.

아래와 같이 수행하면 Created topic quickstart-events라고 결과가 나오면서 토픽이 생성되었다는 메시지를 확인할 수 있습니다.

lswhh@DESKTOP-HQPQNKV:~/kafka$ ls
confluent-7.3.1  confluent-community-7.3.1.tar.gz  confluentinc-kafka-connect-jdbc-10.6.4  confluentinc-kafka-connect-jdbc-10.6.4.zip  kafka-connect-jdbc  kafka_2.13-3.4.0  kafka_2.13-3.4.0.tgz
lswhh@DESKTOP-HQPQNKV:~/kafka$ cd kafka_2.13-3.4.0/
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ ls
LICENSE  NOTICE  bin  config  libs  licenses  logs  site-docs
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
Kafka의 모든 command line 도구(tool)에는 추가 옵션이 있습니다. 사용 정보를 표시하는 인수입니다. 예를 들어 새 항목의 파티션 수와 같은 세부 정보도 표시할 수 있습니다.

lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: H5i9qhBZTK6FGv-j98Q2PQ PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: quickstart-events        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$



STEP 4: WRITE SOME EVENTS INTO THE TOPIC (토픽에 이벤트 작성)


Kafka 클라이언트는 이벤트를 쓰거나 읽기 위해 네트워크를 통해 Kafka 브로커와 통신합니다. 한번 이벤트를 수신하면 브로커는 사용자가 필요하다고 판단하는 한 지속성 있고 내결함성이 있는(durable, fault-tolerant) 방식으로 이벤트를 저장합니다. 

kafka console producer 클라이언트를 실행하여 토픽에 몇 가지 이벤트를 작성합니다. 기본적으로 입력하는 각 줄은 토픽에 별도의 이벤트가 기록됩니다.

lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>first test event
>second test event
>^Clswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$

언제든지 Ctrl-C(^C)를 입력하여 프로듀서 클라이언트를 중지할 수 있습니다. 

STEP 5: READ THE EVENTS (이벤트 읽기)


다른 터미널 세션을 열고 kafka console consumer 클라이언트를 실행하여 방금 만든 이벤트를 읽습니다.

lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
first test event
second test event
^CProcessed a total of 2 messages
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$
언제든지 Ctrl-C(^C)를 입력하여 consumer 클라이언트를 중지할 수 있습니다. 

자유롭게 테스트해 보세요: 예를 들어, 프로듀서 터미널로 다시 전환하여(이전 단계) 추가 이벤트를 생산하고 이벤트가 consumer 터미널에 즉시 표시되는지 확인할 수 있습니다. 

이벤트는 Kafka에 안정적으로 저장되기 때문에 원하는 만큼 많은 소비자(consumer)가 읽을 수 있습니다. 

또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.

다음의 터미널 테스트에서 소비자는 항상 처음부터 다시 읽는 것을 보실 수 있고, 프로듀서가 생산하는 즉시 소비자에서 표시됩니다. 


STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT (KAFKA CONNECT를 사용하여 데이터를 이벤트 스트림으로 가져오기/내보내기)


많은 응용 프로그램들을 포함하여 관계형 데이터베이스 또는 기존 메시징 시스템과 같은 기존 시스템에 많은 데이터가 있을 수 있습니다. Kafka Connect를 사용 하면 외부 시스템에서 Kafka로 또는 그 반대로 데이터를 지속적으로 수집할 수 있습니다. 외부 시스템과 상호 작용하기 위한 사용자 로직(custom logic)을 구현하는 커넥터를 실행하는 확장 가능한 도구입니다. 따라서 기존 시스템을 Kafka와 통합하는 것이 매우 쉽습니다. 이 프로세스를 더욱 쉽게 하기 위해 이미 만들어진 수백 개의 커넥터가 있습니다.

이번 quickstart에서는 파일에서 Kafka 토픽으로 데이터를 가져오고 Kafka 토픽에서 파일로 데이터를 내보내는 간단한 커넥터로 Kafka Connect를 실행하는 방법을 살펴봅니다.

먼저  Connect worker's configuration property의 plugin.path에 connect-file-3.4.0.jar를 추가 해야 합니다 . 

plugin.path

이 문서의 목적을 위해 상대 경로를 사용하고 명령은 설치 디렉터리에서 실행될 때 작동하는 uber jar¹로 커넥터의 패키지를 고려합니다. 그러나 프로덕션 배포의 경우 절대 경로를 사용하는 것이 항상 바람직하다는 점은 주목할 가치가 있습니다. 이 구성을 설정하는 방법에 대한 자세한 설명은 plugin.path를 참조하십시오 .

1. uber jar: 의존성을 포함한 실행 가능한 single jar(이하 uber-jar)

다음과 일치하는 configuration property plugin.path를 config/connect-standalone.properties 파일에 추가하거나 변경 하고 파일을 저장합니다.

echo "plugin.path=libs/connect-file-3.4.0.jar"

config/connect-standalone.properties

# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
plugin.path=libs/connect-file-3.4.0.jar



데이터로 사용할 파일을 다음과 같이 생성합니다. 

A 방식
 echo -e "foo\nbar" > test.txt
B 방식
> echo foo> test.txt
> echo bar>> test.txt

다음으로 standalone mode, 독립 실행 형 모드 에서 실행되는 두 개의 커넥터를 시작합니다 . 즉, 단일 로컬 전용 프로세스에서 실행됩니다. 

세 가지 구성 파일을 매개변수로 제공합니다. 첫 번째는 Kafka Connect 프로세스에 대한 구성으로, 연결할 Kafka 브로커와 데이터의 직렬화 형식과 같은 공통 구성을 포함합니다. 나머지 구성 파일은 각각 만들 커넥터를 지정합니다. 이러한 파일에는 고유한 커넥터 이름, 인스턴스화할 커넥터 클래스 및 커넥터에 필요한 기타 구성이 포함됩니다.

lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ vi config/connect-standalone.properties
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ echo -e "foo\nbar" > test.txt
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ vi config/connect-standalone.properties
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ vi config/connect-standalone.properties
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ cat test.txt
foo
bar
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$
Kafka에 포함된 이러한 샘플 구성 파일은 이전에 시작한 기본 로컬 클러스터 구성을 사용합니다 두 개의 커넥터를 만듭니다 : 첫 번째는 입력 파일에서 줄을 읽고 각각을 Kafka 토픽에 생성하는 소스 커넥터입니다 두 번째는 Kafka 토픽에서 메시지를 읽고 각각을 출력 파일의 줄로 생성하는 싱크 커넥터입니다.
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2023-04-20 20:05:12,038] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:68)
[2023-04-20 20:05:12,051] INFO WorkerInfo values:
        jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/home/lswhh/kafka/kafka_2.13-3.4.0/bin/../logs, -Dlog4j.configuration=file:bin/../config/connect-log4j.properties
        jvm.spec = Oracle Corporation, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_241, 25.241-b07
        jvm.classpath = /home/lswhh/kafka/kafka_2.13-3.4.0/bin/../libs/activation-1.1.1.jar:/home/lswhh/kafka/
....
l-file-sink] Adding newly assigned partitions: connect-test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:317)
[2023-04-20 20:05:17,154] INFO [local-file-sink|task-0] [Consumer clientId=connector-consumer-local-file-sink-0, groupId=connect-local-file-sink] Found no committed offset for partition connect-test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1543)
[2023-04-20 20:05:17,169] INFO [local-file-sink|task-0] [Consumer clientId=connector-consumer-local-file-sink-0, groupId=connect-local-file-sink] Resetting offset for partition connect-test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:399)

시작하는 동안 커넥터가 인스턴스화되고 있음을 나타내는 일부를 포함하여 많은 로그 메시지가 표시됩니다. Kafka Connect 프로세스가 시작되면 소스 커넥터는 test.txt에서 라인을 읽고 connect-test 토픽에 이벤트를 생산하며 싱크 커넥터는 connect-test 토픽에서 메시지 읽기를 시작하고 test.sink.txt 파일에 기록해야 합니다. test.sink.txt 출력 파일의 내용을 검사하여 전체 파이프라인을 통해 데이터가 전달되었는지 확인할 수 있습니다.

lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ more test.sink.txt
foo
bar

데이터는 Kafka 토픽에 저장되므로 connect-test console consumer를 실행하여 토픽의 데이터를 볼 수도 있습니다(또는 사용자 consumer 코드를 사용하여 처리할 수도 있습니다.).

lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
커넥터는 계속해서 데이터를 처리하므로 파일에 데이터를 추가하고 데이터가 파이프라인을 통해 이동하는 것을 볼 수 있습니다.
다른 터미널 (터미널 feed)
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ echo xman >> test.txt 
source/sink connector 있는 터미널 메세지 (터미널 connector)
[2023-04-20 20:09:21,542] INFO [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Resetting the last seen epoch of partition connect-test-0 to 0 since the associated topicId changed from null to X8H_btjHTEa6n6Y-3Irtsw (org.apache.kafka.clients.Metadata:402)
터미널 console consumer
lswhh@DESKTOP-HQPQNKV:~/kafka/kafka_2.13-3.4.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"xman"}
터미널 feed에서 데이터를 xman으로 넣어주면 터미널 connector와 console consumer에서 각각 데이터를 받은 메시지가 출력되는 것을 확인할 수 있습니다. 

STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS (KAFKA STREAMS로 이벤트 처리)


데이터가 Kafka에 이벤트로 저장되면 Java/Scala용 Kafka Streams 클라이언트 라이브러리를 사용하여 데이터를 처리할 수 있습니다 . 입력 및/또는 출력 데이터가 Kafka 토픽에 저장되는 미션 크리티컬 실시간 애플리케이션 및 마이크로 서비스를 구현할 수 있습니다.
Kafka Streams는 클라이언트 측에서 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성을 높입니다. 라이브러리는 정확히 1회 처리, 상태 저장 작업 및 집계, 기간 설정, 조인, 이벤트 시간 기반 처리 등을 지원합니다.

첫 번째 샘플을 제공하기 위해 인기 있는 WordCount 알고리즘을 구현하는 방법은 다음과 같습니다.
KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Kafka Streams demoapp development tutorial은 이러한 스트리밍 애플리케이션을 처음부터 끝까지 코딩하고 실행하는 방법을 보여줍니다.

STEP 8: TERMINATE THE KAFKA ENVIRONMENT


이제 Quickstart를 마쳤으므로 자유롭게 Kafka 환경을 체험하거나 계속 사용해 보십시오.

  1. Ctrl-C를 아직 수행하지 않은 경우 Ctrl-C를 사용하여 생산자 및 소비자 클라이언트를 중지합니다 .
  2. Ctrl-C를 사용하여 Kafka 브로커를 중지합니다 .
  3. 마지막으로 Kafka with ZooKeeper 섹션을 수행했다면 Ctrl-C를 눌러서 중지합니다.

도중에 생성한 이벤트를 포함하여 로컬 Kafka 환경의 데이터도 삭제하려면 다음 명령을 실행하십시오.

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs


Quickstart를 마치며 (축하합니다.)



Apache Kafka Quickstart 가이드를 성공적으로 완료했습니다. 

자세히 알아보려면 다음 단계를 따르세요. 

간략한 소개를 읽고 Kafka가 고수준에서 작동하는 방식, 주요 개념 및 다른 기술과 비교하는 방법을 알아보세요. 

Kafka를 더 자세히 이해하려면 문서로 이동하십시오 . 

사용 사례를 살펴보고 전 세계 커뮤니티의 다른 사용자가 Kafka에서 가치를 얻는 방법을 알아보세요. 

지역 Kafka 모임 그룹에 가입 하고 Kafka 커뮤니티의 주요 회의인 Kafka Summit의 강연을 시청하세요 .

Tags