MQTT & Kafka Test

2023. 9. 13. 07:20·Engineering/Configuring
목차
  1. MQTT Broker install
  2. mosquitto install
  3. simple send/receive test
  4. nanomq install
  5. 파이썬 환경에서 MQTT Publisher, Subscriber 구현
  6. paho-mqtt 라이브러리 설치
  7. kafka-python
  8. kafka-mqtt-connect
  9. Apache Camel Install
반응형

MQTT Test Server 구현

MQTT Broker install

MQTT 메세지 송/수신을 위해 MQTT Broker가 필요
여러가지 MQTT Broker 플랫폼이 있는데 오픈소스 중 가장 유명한 mosquitto, 그리고 서베이 중 경량, 멀티스레딩을 강조하는 nanomq test

mosquitto install

$ sudo apt install openjdk-8-jdk mosquitto mosquitto-client

간략한 상태 확인 명령어

$ ps -ef | grep (프로세스 이름) # 내가 실행시킨 프로세스가 현재 실행중인지, PID는 어떻게 되는지 확인 가능
$ netstat -tnlpa | grep (프로세스 이름 or Port 번호) # 실행시킨 프로세스가 사용중인 포트 확인 등에 유용
$ pkill -9 (프로세스 이름) # 해당 프로세스 죽이

simple send/receive test

simple mosquitto publish

$ mosquitto_pub -h 'host_ip_address' -t 'topic' -m 'message'

simpe mosquitto subscribe

$ mosquitto_sub -h 'host_ip_address' -t 'topic'

nanomq install

https://github.com/emqx/nanomq

$ sudo apt install ninja-build
$ git clone https://github.com/emqx/nanomq.git ; cd nanomq
$ git submodule update --init --recursive
$ mkdir build && cd build
$ cmake -G Ninja ..
$ ninja

파이썬 환경에서 MQTT Publisher, Subscriber 구현

paho-mqtt 라이브러리 설치

MQTT 5.0, 3.1.1, 3.1 프로토콜 구현
Eclipse Paho MQTT Python 클라이언트 라이브러리 사용

paho-mqtt

Kafka Test Server 구현

추후 Amazon MSK 구현 시 원활함을 위해 MSK에서 권장하는 Kafka 버전인 2.6.2버전으로 테스트

Apache Kafka

지원되는 Apache Kafka 버전

3단계: 주제 생성

Apache Kafka

Apache Kafka

kafka_2.1.2-2.6.2.tgz 설치, ZooKeeper 사용
(Kafka 최신 버전에서는 Zookeeper 의존성 제거를 위해 KRaft로 변경 중에 있으나 아직 안정적인 버전, 참고 문헌 부족 등의 이유로 Zookeeper로 테스트)

$ wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
$ tar -xzf kafka_2.12-2.6.2.tgz
$ cd kafka_2.1.2-2.6.2

Zookeeper 실행
$ bin/zookeeper-server-start.sh config/zookeeper.properties (-daemon)

Kafka broker 실행
$ bin/kafka-server-start.sh config/server.properties (-daemon)

Topic 만들기
$ bin/kafka-topics.sh --create --topic topic_name --bootstrap-server localhost:9092

생성된 Topic 확인
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Topic 상세 정보 확인
$ bin/kafka-topics.sh --describe --topic topic_name --bootstrap-server localhost:9092

Topic에 event 쓰기
$ bin/kafka-console-producer.sh --topic topic_name --bootstrap-server localhost:9092
> 이후 입력하는 줄마다 topic에 기록됨
> Ctrl-C로 중지

Topic의 event 읽기
$ bin/kafka-console-consumer.sh --topic topic_name --from-beginning --bootstrap-server localhost:9092

Kafka Connect 예시
$ nano config/connect-standalone.properties
plugin.path=libs/connect-file-2.6.2.jar
save

$ echo -e "foo\nbar" > test.txt
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
# 만약 nanomq 실행 중 위 명령이 에러가 발생할 경우 사용 포트 충돌 문제
# nanomq의 websocket 포트 = 8083 = kafka connector 포트
# 1. nanomq stop
# 2. /etc/nanomq.conf 파일의 websocket.enable=false로 변경
# 3. nanomq start -d

명령어 실행되면 test.sink.txt 파일 생성
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

echo more_text >> test.txt
텍스트 내용 추가되면 consumer 출력, sink 파일에 추가됨

테스트로 생성한 이벤트, 환경 데이터 삭제
$ rm -rf /tmp/kafka-logs /tmp/zookeeper

kafka-python

https://github.com/dpkp/kafka-python

[카프카] Python으로 Kafka에 전송(Producer)하고 가져오기(consumer)

kafka-mqtt-connect

[Kafka Connect] Connector Rest API 정리

https://github.com/johanvandevenne/kafka-connect-mqtt

Kafka mqtt connector 사용법 - mqtt broker

maven 설치 필요

$ sudo apt install maven
$ git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
$ cd kafka-connect-mqtt
$ mvn clean install

위 open source로는 연결 실패

Apache Camel에서 지원하는 mqtt kafka source connector를 사용하여 mqtt message to kafka 성공

Apache Camel Install

Apache Kafka

Camel Kafka Connector

camel-mqtt-source-kafka-connector source configuration

$ sudo apt install maven
$ git clone https://github.com/apache/camel-kafka-connector
$ cd camel-kafka-connector/connectors/camel-mqtt-source-kafka-connector
$ mvn clean package
$ cd target
$ cp KAFKA_PLUGIN_PATH ./*.tar.gz

# kafka directory 이동
$ nano config/camel-mqtt-source.properties

name=CamelMqttSourceConnector

connector.class=org.apache.camel.kafkaconnector.mqttsource.Came>
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
#value.converter=org.apache.kafka.connect.storage.StringConvert>
value.converter=org.apache.kafka.connect.converters.ByteArrayCo>
value.converter.schemas.enable=false

topics=test
camel.kamelet.mqtt-source.topic=test
camel.kamelet.mqtt-source.brokerUrl=tcp://127.0.0.1:1883
converter.encoding=UTF-8

# kafka connect 실행
$ bin/connect-standalone.sh config/connect-standalone.properties config/camel-mqtt-source.properties

# 이후 kafka consumer 실행, mqtt broker로 message publish하여 테스트
반응형
저작자표시 비영리 변경금지 (새창열림)

'Engineering > Configuring' 카테고리의 다른 글

Kafka 구성 요소  (0) 2023.09.13
Kafka 동작 원리  (0) 2023.09.13
Docker 사용 관련  (0) 2023.09.13
WSL2 CUDA, CUDNN 설정  (0) 2023.09.13
WSL2 외부 접속 설정  (0) 2023.09.13
  1. MQTT Broker install
  2. mosquitto install
  3. simple send/receive test
  4. nanomq install
  5. 파이썬 환경에서 MQTT Publisher, Subscriber 구현
  6. paho-mqtt 라이브러리 설치
  7. kafka-python
  8. kafka-mqtt-connect
  9. Apache Camel Install
'Engineering/Configuring' 카테고리의 다른 글
  • Kafka 구성 요소
  • Kafka 동작 원리
  • Docker 사용 관련
  • WSL2 CUDA, CUDNN 설정
AIVILLAIN
AIVILLAIN
호랑이는 죽어서 가죽을 남기고 개발자는 죽어서 기록을 남겨보자
    반응형
  • AIVILLAIN
    뎁사유기
    AIVILLAIN
  • 전체
    오늘
    어제
    • 분류 전체보기 (20) N
      • Machine Learning (5)
        • Theory (5)
      • Engineering (7)
        • Configuring (7)
        • Web (0)
      • Programming (1)
        • Python (0)
      • Etc (4)
        • Review (2)
        • Trace (2)
      • Retrospect (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

    • Linkedin
    • Github
    • Obsidian
    • Velog
  • 공지사항

  • 인기 글

  • 태그

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
AIVILLAIN
MQTT & Kafka Test

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.