物聯(lián)網(wǎng)數(shù)據(jù)庫 IoTDB —— 從協(xié)議到數(shù)據(jù)
首先,先允許我,祝各位讀者小可愛們,節(jié)日快樂。
在這個系列之前的文章里,我們介紹了Iotdb的LSM,以及Iot中的最佳實踐,這次我們看看如何將mqtt和Iotdb整合起來。下面我們開始:
iotdb in docker
首先,做一個測試環(huán)境,我現(xiàn)在越發(fā)喜歡docker 和 WSL 了,除了吃點硬盤,內(nèi)存和CPU資源以外,沒有什么缺點了......
run in docker
直接把該開的端口都打開,只是測試環(huán)境,我就沒再掛目錄。
docker run -d -p 6667:6667 -p 31999:31999 -p 8181:8181 -p 5555:5555 -p 1883:1883 apache/iotdb
等待一會,執(zhí)行 docker ps 查看是否成功了
- ➜ ~ docker ps
 - CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
 - ad9b18f8bff3 apache/iotdb "/iotdb/sbin/start-s…" 2 hours ago Up 2 hours 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 0.0.0.0:6667->6667/tcp, :::6667->6667/tcp, 0.0.0.0:8181->8181/tcp, :::
 
初步的iotdb in docker 環(huán)境,我們就搞好了。接下來,開啟mqtt服務(wù)。
開啟 Mqtt 服務(wù)
進(jìn)入iotdb的docker docker exec -it ad9b18f8bff3 /bin/bash
編輯配置文件vi iotdb/conf/iotdb-engine.properties
開啟服務(wù),根據(jù)自己的需要,配置ip和端口等。
- ####################
 - ### MQTT Broker Configuration
 - ####################
 - # whether to enable the mqtt service.
 - enable_mqtt_service=false # 修改成 true , 代表開啟 mqtt服務(wù)
 - # the mqtt service binding host.
 - mqtt_host=0.0.0.0 # ip
 - # the mqtt service binding port.
 - mqtt_port=1883 # 端口
 - # the handler pool size for handing the mqtt messages.
 - mqtt_handler_pool_size=1
 - # the mqtt message payload formatter.
 - mqtt_payload_formatter=json # 數(shù)據(jù)格式
 - # max length of mqtt message in byte
 - mqtt_max_message_size=1048576
 
重啟服務(wù),如果不會,就重啟docker鏡像。
iotdb 基礎(chǔ)操作
- 啟動服務(wù): sbin/start-client.sh
 
- root@ad9b18f8bff3:/iotdb/sbin# ./start-cli.sh
 - ---------------------
 - Starting IoTDB Cli
 - ---------------------
 - _____ _________ ______ ______
 - |_ _| | _ _ ||_ _ `.|_ _ \
 - | | .--.|_/ | | \_| | | `. \ | |_) |
 - | | / .'`\ \ | | | | | | | __'.
 - _| |_| \__. | _| |_ _| |_.' /_| |__) |
 - |_____|'.__.' |_____| |______.'|_______/ version 0.11.1
 - IoTDB> login successfully
 
- 退出CLI: quit 或 exit
 - 停止服務(wù):$sbin/stop-server.sh
 - 設(shè)置一個存儲組到IOTDB,名為root : IoTDB> SET STORAGE GROUP TO root
 - 查看當(dāng)前IOTDB的存儲組 : IoTDB> SHOW STORAGE GROUP
 
- IoTDB> SHOW STORAGE GROUP
 - +-------------+
 - |storage group|
 - +-------------+
 - | root.test|
 - +-------------+
 - Total line number = 1
 - It costs 0.127s
 
- 查看系統(tǒng)中存在的所有時間序列 :IoTDB> SHOW TIMESERIES
 
- IoTDB> show timeseries
 - +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
 - | timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
 - +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
 - |root.test.wf01.wt01.temperature| null| root.test| FLOAT| GORILLA| SNAPPY|null| null|
 - | root.test.wf01.wt01.status| null| root.test| BOOLEAN| RLE| SNAPPY|null| null|
 - | root.test.wf01.wt01.hardware| null| root.test| TEXT| PLAIN| SNAPPY|null| null|
 - +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
 - Total line number = 3
 - It costs 0.009s
 
- 查看系統(tǒng)中存在的特定時間序列: SHOW TIMESERIES root.test.wf01.wt01.status
 
- IoTDB> SHOW TIMESERIES root.test.wf01.wt01.status
 - +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
 - | timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
 - +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
 - |root.test.wf01.wt01.status| null| root.test| BOOLEAN| RLE| SNAPPY|null| null|
 - +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
 - Total line number = 1
 - It costs 0.003s
 
- 插入數(shù)據(jù) INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)
 
- IoTDB> INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)
 - Msg: The statement is executed successfully.
 
- 查看數(shù)據(jù): select * from root.test;
 
- IoTDB> select * from root.test;
 - +------------------------+-------------------------------+--------------------------+----------------------------+
 - | Time|root.test.wf01.wt01.temperature|root.test.wf01.wt01.status|root.test.wf01.wt01.hardware|
 - +------------------------+-------------------------------+--------------------------+----------------------------+
 - |2021-01-20T02:00:00.000Z| 21.2| true| hello|
 - +------------------------+-------------------------------+--------------------------+----------------------------+
 - Total line number = 1
 - It costs 0.077s
 
- 查看設(shè)備:show devices
 
- IoTDB> show devices
 - +-------------------+
 - | devices|
 - +-------------------+
 - |root.test.wf01.wt01|
 - +-------------------+
 - Total line number = 1
 - It costs 0.002s
 
mqtt to iotdb
代碼
構(gòu)建一個實體對象,用于存儲
- package wang.datahub.iotdb;
 - import com.google.gson.Gson;
 - import java.util.List;
 - public class IotdbVO {
 - private String device;
 - private long timestamp = System.currentTimeMillis();
 - private List<String> measurements;
 - private List<Object> values;
 - public String getDevice() {
 - return device;
 - }
 - public void setDevice(String device) {
 - this.device = device;
 - }
 - public long getTimestamp() {
 - return timestamp;
 - }
 - public void setTimestamp(long timestamp) {
 - this.timestamp = timestamp;
 - }
 - public List<String> getMeasurements() {
 - return measurements;
 - }
 - public void setMeasurements(List<String> measurements) {
 - this.measurements = measurements;
 - }
 - public List<Object> getValues() {
 - return values;
 - }
 - public void setValues(List<Object> values) {
 - this.values = values;
 - }
 - public String toJson(){
 - Gson g = new Gson();
 - String jsonData = g.toJson(this);
 - return jsonData;
 - }
 - @Override
 - public String toString() {
 - return "IotdbVO{" +
 - "device='" + device + '\'' +
 - ", timestamp=" + timestamp +
 - ", measurements=" + measurements +
 - ", values=" + values +
 - '}';
 - }
 - }
 
使用祖?zhèn)鞯拇a來模擬數(shù)據(jù)發(fā)射到iotdb,這里直接將mqtt的主機和端口,配置到前文所修改的iotdb的mqtt服務(wù)上,就大功告成了。
- package wang.datahub.iotdb;
 - import org.fusesource.mqtt.client.BlockingConnection;
 - import org.fusesource.mqtt.client.MQTT;
 - import org.fusesource.mqtt.client.QoS;
 - import java.util.ArrayList;
 - import java.util.List;
 - import java.util.Random;
 - public class EmmitToIotdb {
 - public static void main(String[] args) {
 - String[] hardwares = new String[]{
 - "a1",
 - "b1",
 - "b2",
 - "c3",
 - "d1",
 - "f5"
 - };
 - int count = 1000;
 - for(int i = 0; i < count ;i++){
 - IotdbVO iotdbVO = new IotdbVO();
 - iotdbVO.setDevice("root.test.wf01.wt01");
 - List<String> measurements = new ArrayList<>();
 - List<Object> values = new ArrayList<>();
 - measurements.add("temperature");
 - measurements.add("status");
 - measurements.add("hardware");
 - Random r = new Random();
 - values.add(r.nextInt(40));
 - values.add(r.nextBoolean());
 - values.add(hardwares[r.nextInt(hardwares.length)]);
 - iotdbVO.setMeasurements(measurements);
 - iotdbVO.setValues(values);
 - emmitToIotdb(iotdbVO);
 - }
 - }
 - public static void emmitToIotdb(IotdbVO content){
 - try {
 - MQTT mqtt = new MQTT();
 - mqtt.setHost("127.0.0.1", 1883);
 - mqtt.setUserName("root");
 - mqtt.setPassword("root");
 - BlockingConnection connection = mqtt.blockingConnection();
 - connection.connect();
 - String payload = content.toJson();
 - connection.publish(content.getDevice(),payload.getBytes(), QoS.AT_LEAST_ONCE,false);
 - connection.disconnect();
 - } catch (Exception e){
 - e.printStackTrace();
 - }
 - }
 - }
 
執(zhí)行結(jié)果
iotdb,功能還是相當(dāng)強大的,也非常有意思,希望本篇文章對你有所幫助,也非常歡迎您來與我交流。
本文轉(zhuǎn)載自微信公眾號「麒思妙想」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系麒思妙想公眾號。


















 
 
 







 
 
 
 