Awesome Open Source
Awesome Open Source

MQTTX Project

license language

| English

1

Mqttx MQTT v3.1.1 ****** ****** mqtt broker

v1.2 JDK17, JDK8

1.1

docker docker

    • mvnw -P test -DskipTests=true clean package

      1. redis
      2. mvnw -P dev -DskipTests=true clean package
1. `java -jar mqttx-1.0.5.BETA.jar`

-

    1. redis
    1. redis, localhost:6376

******** 6.1 mqttx

mqttx redis mysql, mongodb, kafka springboot spring-boot-starter-***

1.2

  • [x] Redis
  • [x] Kafka
  1. lombok ide

Intellij IDEA 😊

idea Lombok, settings > Build,Execution,Deployment > Compiler > Annotation Processor Enable annotation processing

~~1.3 ~~

/(o)/~~

mqttx

  1. ssl
  2. websocket, http://tools.emqx.io/ 119.45.158.51()
  3. v1.0.6.RELEASE

websocket

2

mqttxtopic /

()

ak6mB6.png

2.1

java
  com
      jun
          mqttx
              broker         # mqtt 
                codec       # 
                handler     # pub, sub, connn, etc
              config         #  bean 
              constants      # 
              consumer       # 
              entity         # 
              exception      # 
              service        # , 
                impl        # 
              utils          # 
resources                     # application.yml 
    META-INF                  # spring-configuration 
    tls                       # ca 

3 docker

docker-hub , fantasywujun/mqttx - Docker Hub

docker docker-compose -f ./docker-compose.yml up ,

y3R3tI.md.png

4

4.1 qos

qos0 qos1 qos2

qos1qos2 redis mysql

4.2 topicFilter

  1. # +
  2. /topic a/b/ a/b
  3. mqtt v3.1.1 4.7 Topic Names and Topic Filters

mqttx topicFilter publish topic 4.5 topic topic

topicFilter match topics
/a/b/+ /a/b/abc,/a/b/test
a/b/# a/b, a/b/abc, a/b/c/def
a/+/b/# a/nani/b/abc
/+/a/b/+/c /aaa/a/b/test/c

com.jun.mqttx.utils.TopicUtils

4.3

mqttx

  • [x] Kafka
  • [x] Redis

ak6nHK.png

  1. mqttx.cluster.enable false

  2. mqttx.cluster.type: redis

  3. v1.0.5.RELEASE bug

  4. kafka application-*.yml, application-dev.yml ***3. kafka ***


4.4 ssl

ssl ca() application.yml

  1. mqttx.ssl.enable false websocket socket
  2. mqttx.ssl.key-store-locationkeystore classpath
  3. mqttx.ssl.key-store-passwordkeystore
  4. mqttx.ssl.key-store-typekeystore PKCS12
  5. mqttx.ssl.client-auth NONE

resources/tls mqttx.keystore , : 123456

com/jun/mqttx/utils/SslUtils.java

4.5 topic

client topic topic &:

  1. mqttx.enable-topic-sub-pub-secure: false

  2. broker conn {clientId, username, password} mqttx.auth.url , authorizedSub,authorizedPub client topic

    4.12

  3. broker

  • [x]
  • [x]
  • [x]

4.6

mqtt5 mq( kafka)

  1. mqttx.share-topic.enable: true

  2. : $share/{ShareName}/{filter}, $share , ShareName , filter

  3. hash, random, round

hash client ****** clientId**

:

share-topic

msg-a mqttx.share-topic.share-sub-strategy

cleanSession = 1

CleanSession mqtt3.1.1 cleanSession = 1 retained mqtt5

mqttx v1.0.5.BETA ()cleanSession = 1 .

If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session [MQTT-3.1.2-6].

The Session state in the Client consists of:

  • QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
  • QoS 2 messages which have been received from the Server, but have not been completely acknowledged.

The Session state in the Server consists of:

  • The existence of a Session, even if the rest of the Session state is empty.
  • The Clients subscriptions.
  • QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
  • QoS 1 and QoS 2 messages pending transmission to the Client.
  • QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
  • Optionally, QoS 0 messages pending transmission to the Client.

4.7 websocket

4.8

mqttx broker

  • QoS: QoS 1,2 QoS 0

*******topic *** ,

  1. broker
4.8.1

broker

$SYS/broker/{brokerId}/status mqttx.sys-topic.interval broker .
****
$SYS/broker/activeConnectCount
$SYS/broker/time
$SYS/broker/version broker
$SYS/broker/receivedMsg broker MqttMessage, ping
$SYS/broker/sendMsg broker MqttMessage, pingAck
$SYS/broker/uptime broker ******
$SYS/broker/maxActiveConnectCount broker tcp

$SYS/broker/{brokerId}/status brokerId 6.1 $SYS/broker/+/status

json

{
    "activeConnectCount": 1,
    "maxActiveConnectCount": 2,
    "receivedMsg": 6,
    "sendMsg": 77,
    "timestamp": "2021-03-23T23:05:37.035",
    "uptime": 149,
    "version": "1.0.7.RELEASE"
}
field
activeConnectCount
maxActiveConnectCount
receiveMsg ping
sendMsg pingAck
timestamp (yyyy-MM-dd HH:mm:ss)
uptime broker
version mqttx
4.8.2

issue: MQTT Issue #8 Amazingwujun/mqttx (github.com)

$SYS/broker/{borkerId}/clients/{clientId}/connected
broker
$SYS/broker/{borkerId}/clients/{clientId}/disconnected
broker
  1. $SYS/broker/+/clients/#:
  2. $SYS/broker/+/clients/+/connected:
  3. $SYS/broker/+/clients/+/disconnected:

4.9

  • [x] kafka
  1. mqttx.message-bridge.enable
  2. mqttx.bridge-topics kafka topic

mqttx ****** MQ

device(client) => mqttx => MQ

4.10

com.jun.mqttx.utils.RateLimiter

https://stripe.com/blog/rate-limiters

capacity replenish-rate``token-consumed-per-acquire

** qos 0 **

mqttx:
  rate-limiter:
    enable: true
    topic-rate-limits:
      # 
      - topic: "/test/a"
        capacity: 9
        replenish-rate: 4
        token-consumed-per-acquire: 3
      # 
      - topic: "/test/b"
        capacity: 5
        replenish-rate: 5
        token-consumed-per-acquire: 2
  • capacity:
  • replenish-rate:
  • token-consumed-per-acquire:

QPS

  1. QPS = capacity token-consumed-per-acquire
    1. 9 3 = 3
    2. 5 2 = 2.5
  2. QPS = replenish-rate token-consumed-per-acquire
    1. 4 3 1.3
    2. 5 2 = 2.5

4.11

mqttx redis , mqttx cleanSession = false & qos > 0 , Serializer redis

mqttx

  1. JsonSerializer
  2. KryoSerializer

JsonSerializer, v1.0.6.release KryoSerializer

mqttx.serialize-strategy

4.12

mqttx

  1. mqttx.auth.url:
  2. mqttx.auth.readTimeout: OkHttpClient readTimeout
  3. mqttx.auth.connectTimeout: OkHttpClient connectTimeout

mqtt.auth.url com.jun.mqttx.service.impl.DefaultAuthenticationServiceImpl OkHttpClient POST mqttx.auth.url

mqtt conn username, password.

POST / HTTP/1.1
Host: mqttx.auth.url
Content-Type: application/json
Content-Length: 91

{
    "clientId": "device_id_test",
    "username": "mqttx",
    "password": "123456"
}

json :

{
    "authorizedSub": [
        "subTopic1",
        "subTopic2"
    ],
    "authorizedPub": [
        "pubTopic1",
        "pubTopic2"
    ]
}

4.5 topic

  • http status = 200 ****, ****

5

  1. v1.0 mqttv3.1.1

  2. mqttx issue

    • [ ] v1.0.7.RELEASE Benchmark
    • [ ] v1.0.8.RELEASE
    • [ ] v1.1.0.RELEASE
    • [x] v1.2.0.RELEASE
    • [ ] v2.0.0.RELEASE
    • [x] bug
  3. v1.2 JDK8 JDK17

  4. v2.0 mqttv5

  5. bug

  6. caffee

caffee

6

6.1

src/main/resources

  1. application.yml
  2. application-dev.yml
  3. application-prod.yml
mqttx.version pom.xml
mqttx.broker-id pom.xml ,
mqttx.heartbeat 60s conn keepalive
mqttx.host 0.0.0.0
mqttx.so-backlog 512 tcp
mqttx.enable-topic-sub-pub-secure false //
mqttx.enable-inner-cache true redis -,
mqttx.enable-test-mode false ;
****
mqttx.ignore-client-self-pub true client client
mqttx.serialize-strategy json broker ******
mqttx.redis.cluster-session-hash-key mqttx.session.key redis map key
mqttx.redis.topic-prefix mqttx:topic: topic <==> client
mqttx.redis.retain-message-prefix mqttx:retain: , retain
mqttx.redis.pub-msg-set-prefix mqttx:client:pubmsg: client pub redis set pubmsg puback pubrec
mqttx.redis.pub-rel-msg-set-prefix mqttx:client:pubrelmsg: client pubRel redis set pubrel flag pubcom
mqttx.redis.topic-set-key mqttx:alltopic topic redis set key
mqttx.redis.message-id-prefix mqttx:messageId: cleanSession client messageId, redis INCR
mqttx.redis.client-topic-set-prefix mqttx:client:topicset: client redis set ; client
mqttx.cluster.enable false
mqttx.cluster.inner-cache-consistancy-key mqttx:cache_consistence redis key
mqttx.cluster.type redis
mqttx.ssl.enable false ssl
mqttx.ssl.client-auth NONE
mqttx.ssl.key-store-location classpath: tls/mqttx.keystore keyStore
mqttx.ssl.key-store-password 123456 keyStore
mqttx.ssl.key-store-type pkcs12 keyStore
mqttx.socket.enable true socket
mqttx.socket.port 1883 socket
mqttx.websocket.enable false websocket
mqttx.websocket.port 8083 websocket
mqttx.websocket.path /mqtt websocket path
mqttx.share-topic.enable true
mqttx.share-topic.share-sub-strategy round ,
mqttx.sys-topic.enable false
mqttx.sys-topic.interval 60s
mqttx.message-bridge.enable false
mqttx.message-bridge.topics null
mqttx.rate-limiter.enable false
mqttx.rate-limiter.token-rate-limit
mqttx.auth.url null mqtt conn username/password
mqttx.auth.readTimeout 3s readTimeout
mqttx.auth.connectTimeout 3s connectTimeout

6.2

prometheus MQTTX Prometheus .

6.2.1 v1.0

  • v1.0.8.RELEASE
    • [ ] redis hmap PubMsg hmap payloadId, redis PubMsg
  • v1.0.7.RELEASE
    • [x] Kryo
    • [x]
    • [x] retain bug
    • [x] isWillRetain:true bug
    • [x]
    • [x] bug
  • v1.0.6.RELEASE
    • [x] netty 4.1.52.Final MqttEncoder.java UnsubAck NPE
    • [x] bug
  • v1.0.5.RELEASE
  • v1.0.4.RELEASE
    • [x] websocket
    • [x]
    • [x] bug
  • v1.0.3.RELEASE
    • [x] bug
  • v1.0.2.RELEASE
    • [x]
    • [x] bug
  • v1.0.1.RELEASE
    • [x] redis
    • [x]
    • [x]
    • [x] bug
  • v1.0.0.RELEASE
    • [x] mqttv3.1.1

6.2.2 v1.1

  • v1.1.0.RELEASE
    • [ ] redis

6.2.3 v2.0

  • v2.0.0.RELEASE

6.2.4 v1.2

  • v1.2.0.RELEASE ()
    • [x] JDK JDK8 JDK17

6.3 Benchmark

MQTTX v1.0.5.BETA

mqtt-bench

cpu
win10 i5-4460 16G

6.3.1 CleanSessionTrue

  1. redis
  2. cleanSession : true

** pub redis cleanSession **

java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar

  • qos0
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 15:33:54.462089 +0800 CST Start benchmark
2020-09-30 15:34:33.6010217 +0800 CST End benchmark

Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=39134ms, throughput=25553.23messages/sec
  • qos1
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 15:29:17.9027515 +0800 CST Start benchmark
2020-09-30 15:30:25.0316915 +0800 CST End benchmark

Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=67124ms, throughput=14897.80messages/sec
  • qos2
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 15:37:00.0678207 +0800 CST Start benchmark
2020-09-30 15:38:55.4419847 +0800 CST End benchmark

Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=115369ms, throughput=8667.84messages/sec
qos qps
1000 1024byte 1000 0 39.1s 25553
1000 1024byte 1000 1 67.1s 14897
1000 1024byte 1000 2 115.3s 8667

cpu: 25%, mem 440 MB

6.3.2 CleanSessionFalse

  1. redis
  2. cleanSession: false

java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar

  • qos0
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 17:03:55.7560928 +0800 CST Start benchmark
2020-09-30 17:04:36.2080909 +0800 CST End benchmark

Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=40447ms, throughput=24723.71messages/sec
  • qos1
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 17:06:18.9136484 +0800 CST Start benchmark
2020-09-30 17:08:20.9072865 +0800 CST End benchmark

Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=121992ms, throughput=8197.26messages/sec
  • qos2
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 17:09:35.1314262 +0800 CST Start benchmark
2020-09-30 17:13:10.7914125 +0800 CST End benchmark

Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=215656ms, throughput=4637.01messages/sec
qos qps
1000 1024byte 1000 0 40.4s 24723
1000 1024byte 1000 1 121.9s 8197
1000 1024byte 1000 2 215.6s 4637

cpu: 45%, mem 440 MB

6.4

mqttx: (gitee.com) sonarQube

sonar

  • keyStore TLS

Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
Java (708,733
Docker (33,781
Spring Boot (12,252
Redis (6,730
Websocket (5,309
Kafka (3,339
Mqtt (2,918
Cluster (1,238
Netty (823
Bridge (497
Iot Application (324
Topic (276
Broker (265
Mqtt Broker (187
Related Projects