IT 꿈나무의 일상

kafka standalone으로 설치하기 (2) 본문

IT 관련

kafka standalone으로 설치하기 (2)

viera 2021. 12. 27. 23:36
반응형

## 카프카 설정 시작 ##

9. 카프카 로그 폴더 생성

[kafka@localhost app]$ mkdir -p /home/kafka/app/logs/kafka1
[kafka@localhost app]$ mkdir -p /home/kafka/app/logs/kafka2
[kafka@localhost app]$ mkdir -p /home/kafka/app/logs/kafka3


10. $KAFKA_HOME/config 에서 3개의 server.properties 생성

>> 카프카 환경변수 이미 설정되어 있어야함 (export KAFKA_HOME=/home/kafka/app/kafka)

[kafka@localhost ~]$ cd app/kafka/config/
[kafka@localhost config]$ cp server.properties server1.properties
[kafka@localhost config]$ cp server.properties server2.properties
[kafka@localhost config]$ cp server.properties server3.properties


11. server.peoperties 설정

- 하나의 서버에 설치하기 때문에 동일한 {IP주소입력} 를 입력해 준다.

vi server1.properties
-----------------------------
broker.id=1
listeners=PLAINTEXT://{IP주소입력}:9091
advertised.listeners=PLAINTEXT://{IP주소입력}:9091
log.dirs=/home/kafka/app/logs/broker1
log.retention.hours=168
log.flush.interval.ms=5000
log.flush.interval.messages=20000
log.segment.bytes=1073741824
group.initial.rebalance.delay.ms=10000
zookeeper.connect={IP주소입력}:2181,{IP주소입력}:2182,{IP주소입력}:2183
-----------------------------

vi server2.properties
-----------------------------
broker.id=2
listeners=PLAINTEXT://{IP주소입력}:9092
advertised.listeners=PLAINTEXT://{IP주소입력}:9092
log.dirs=/home/kafka/app/logs/broker2
log.retention.hours=168
log.flush.interval.ms=5000
log.flush.interval.messages=20000
log.segment.bytes=1073741824
group.initial.rebalance.delay.ms=10000
zookeeper.connect={IP주소입력}:2181,{IP주소입력}:2182,{IP주소입력}:2183
-----------------------------

vi server3.properties
-----------------------------
broker.id=3
listeners=PLAINTEXT://{IP주소입력}:9093
advertised.listeners=PLAINTEXT://{IP주소입력}:9093
log.dirs=/home/kafka/app/logs/broker3
log.retention.hours=168
log.flush.interval.ms=5000
log.flush.interval.messages=20000
log.segment.bytes=1073741824
group.initial.rebalance.delay.ms=10000
zookeeper.connect={IP주소입력}:2181,{IP주소입력}:2182,{IP주소입력}:2183
-----------------------------


12. broker 기동 ,중지 스크립트 생성

## 기동 스트립트
vi start_kafka1.sh
-----------------------------
nohup /home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server1.properties > /dev/null 2>&1 &
-----------------------------

vi start_kafka2.sh
-----------------------------
nohup /home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server2.properties > /dev/null 2>&1 &
-----------------------------

vi start_kafka3.sh
-----------------------------
nohup /home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server3.properties > /dev/null 2>&1 &
-----------------------------

 

## 중지 스크립트
vi stop_kafka1.sh
-----------------------------
PID=`ps -ef | grep java | grep "server1" | grep -v grep | awk '{print $2}'`
echo "$NAME-PID = $PID"
kill ${PID}
-----------------------------

vi stop_kafka2.sh
-----------------------------
PID=`ps -ef | grep java | grep "server2" | grep -v grep | awk '{print $2}'`
echo "$NAME-PID = $PID"
kill ${PID}
-----------------------------

vi stop_kafka3.sh
-----------------------------
PID=`ps -ef | grep java | grep "server3" | grep -v grep | awk '{print $2}'`
echo "$NAME-PID = $PID"
kill ${PID}
-----------------------------


13. 주키퍼와 카프카 실행

## 주키퍼 실행
/home/kafka/app/kafka/bin/zookeeper-server-start.sh -daemon /home/kafka/app/kafka/config/zookeeper1.properties
/home/kafka/app/kafka/bin/zookeeper-server-start.sh -daemon /home/kafka/app/kafka/config/zookeeper2.properties
/home/kafka/app/kafka/bin/zookeeper-server-start.sh -daemon /home/kafka/app/kafka/config/zookeeper3.properties

## 카프카 실행
/home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server1.properties
/home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server2.properties
/home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server3.properties

 

## 정상적으로 동작하는지 확인하기

>> 아래와 같이 jps를 입력하였을때 Kafka , QuorumPeerMain 프로세스가 잡히면 정상 동작
13459 Kafka
14342 Kafka
14374 Jps
12249 QuorumPeerMain
11855 QuorumPeerMain

## 해당 포트를 사용하는 프로세스가 존재하는지 확인
[kafka@localhost bin]$ netstat -ntlp | grep 2181
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2181                 :::*                    LISTEN      11855/java          
[kafka@localhost bin]$
[kafka@localhost bin]$ netstat -ntlp | grep 2182
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2182                 :::*                    LISTEN      12249/java          
[kafka@localhost bin]$ netstat -ntlp | grep 2183
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2183                 :::*                    LISTEN      13043/java         
[kafka@localhost kafka]$ netstat -ntlp | grep 9092
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6       0      0 {IP주소}:9092     :::*                    LISTEN      2247/java  


14. 카프카 사용해보기

# test1
1) 생성
[kafka@localhost bin]$ sh kafka-topics.sh --create --topic viera5 --bootstrap-server {IP주소입력}:9092 --partitions 1 --replication-factor 1

Created topic viera5.

2) 리스트 확인 
[kafka@localhost bin]$ sh kafka-topics.sh --list --bootstrap-server {IP주소입력}:9092
viera5

3) produce
[kafka@localhost bin]$ sh kafka-console-producer.sh --topic viera5 --bootstrap-server {IP주소입력}:9092
>HI
>opravdu
>end
>

4) consume
[kafka@localhost bin]$ sh kafka-console-consumer.sh --topic viera5 --bootstrap-server {IP주소입력}:9092

HI
opravdu
end

 

# test2
1) Topic 생성
[kafka@localhost bin]$ sh kafka-topics.sh --create --topic mytopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic mytopic.

2) Topic 리스트 확인
[kafka@localhost bin]$ sh kafka-topics.sh --list --bootstrap-server localhost:9092
mytopic

3) Topic 삭제
[kafka@localhost bin]$ sh kafka-topics.sh --delete --topic mytopic --bootstrap-server localhost:9092
[kafka@localhost bin]$ sh kafka-topics.sh --list --bootstrap-server localhost:9092
-- 삭제해서 아무것도 안보임 -- 

4) viera Topic 생성
[kafka@localhost bin]$ sh kafka-topics.sh --create --topic viera --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic viera.

[kafka@localhost bin]$ sh kafka-topics.sh --list --bootstrap-server localhost:9092                            
viera

5) produce : 이름이 viera 인 topic 에 producing (데이터 삽입)
sh kafka-console-producer.sh --topic viera --bootstrap-server localhost:9092

6) consume : 이름이 viera 인 topic 에서 consuming
sh kafka-console-consumer.sh --topic viera --from-beginning --bootstrap-server localhost:9092

7) kafka와 zookeeper 실행 멈춤
[kafka@localhost bin]$ sh kafka-server-stop.sh
[kafka@localhost bin]$ sh zookeeper-server-stop.sh
[kafka@localhost bin]$ jps
16837 Jps

8) 실행 예시
[kafka@localhost bin]$ sh kafka-console-producer.sh --topic viera --bootstrap-server localhost:9092
>HI  
>Im' viera
>

[kafka@localhost bin]$
[kafka@localhost bin]$ sh kafka-console-consumer.sh --topic viera --from-beginning --bootstrap-server localhost:9092
HI
Im' viera


## trouble shooting

>> 포트를 9091을 사용하였더니 Broker may not be available. 에러 발생

1) TOPIC 생성시
/home/kafka/app/kafka/bin/kafka-topics.sh --create --bootstrap-server {IP주소입력}:9091 --replication-factor 1 --partitions 1 --topic test_topic

--> Broker may not be available.
--> 기본포트가 9092 이기 때문에 그거써줘야함

반응형
Comments