node.js实现mqtt 发布/发送 消息到主题


  • mqtt是啥?我的博客有写这个东西

安装

首先你要安装node.js和npm 教程传送门

随后找个文件夹执行 命令行 安装mqtt模块;

npm install mqtt

如果需要服务端 执行

npm install mosca

使用

服务端

var mosca = require('mosca');
//构建自带服务器
var MqttServer = new mosca.Server({
    port: 1883
});
//对服务器端口进行配置, 在此端口进行监听
MqttServer.on('clientConnected', function(client) {
    //监听连接
    console.log('client connected', client.id);
});
/**
 * 监听MQTT主题消息
 **/
MqttServer.on('published', function(packet, client) {
    //当客户端有连接发布主题消息
    var topic = packet.topic;
    console.log(packet);
    switch (topic) {
        case 'test':
            console.log('message-publish', packet.payload.toString());
            //MQTT转发主题消息
            MqttServer.publish({ topic: 'other', payload: 'sssss' });
            break;
        case 'other':
            console.log('message-123', packet.payload.toString());
            break;
    }
});

MqttServer.on('ready', function() {
    //当服务开启时
    console.log('mqtt is running...');
});

PS: 不推荐node.js 作服务端

发布消息到主题

var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://127.0.0.1'); //连接到服务端
//client.subscribe('presence');
var num = 0;
var qtt = {}; //定义消息(可以为字符串、对象等)
qtt = 'setr=xxxxxxx1xx';
setInterval(function() { //一秒钟发送一次 消息到主题 SN69143809293670state 消息为 setr=xxxxxxx1xx
    client.publish('SN69143809293670state', qtt, { qos: 0, retain: true });  
}, 1000);

订阅主题

var mqtt = require('mqtt');  
var client2 = mqtt.connect("mqtt://127.0.0.1:1883");   //指定服务端地址和端口
  
client2.subscribe('test',{qos:1});//订阅主题为test的消息  
  
client2.on('message',function(top,message) {  
    console.log(message.toString());  
});  

制作接口(简化)

我所接触的这个物理设备是发送给他一个 控制 或者 查询 请求到他订阅的主题中(ctr), 并且它接收到控制信息,去执行.执行成功时 则会将 状态 发送到另一个 我订阅的主题中 . 使用php实现比较麻烦,并且返回信息时有时无,使用node.js更方便,返回信息也更快;

物理环境 :centos7.2 64位,装有mqtt服务代理端 和 node.js;

var mqtt = require('mqtt');
var express = require("express");
var app = express();
var hostName = '127.0.0.1'; //http服务的提供服务ip
var port = 8080;
var num = 1;
person = new Object();
person.firstname = "Bill";
app.all('*', function(req, res, next) {
    res.header("Access-Control-Allow-Origin", "*");
    res.header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
    res.header("Access-Control-Allow-Methods", "PUT,POST,GET,DELETE,OPTIONS");
    res.header("X-Powered-By", ' 3.2.1')
    res.header("Content-Type", "application/json;charset=utf-8");
    next();
});//json header头

app.get("/zhinengjiaju/get", function(req, res) {
    //如果有get请求/zhinengjiaju/get则执行回调中的代码(方便!)
    
    // console.log("请求url:", req.path)
    // console.log("请求参数:", req.query)
    req.setTimeout(200); 
    //设置请求建立200ms 就中断接受请求,但还是在接收到返回信息后返回给它
    var client = mqtt.connect('mqtt://127.0.0.1:1883', {
        username: 'username',
        password: 'password',
        clientId: 'ap' + num
    });
    //建立连接
    client.on('connect', function() {
        var sn = req.query.sn;
        var k = parseInt(req.query.k) - 1;
        // 127.0.0.1:8080/zhinengjiaju/get?sn=SN69143809293670&k=1&v=3&cmd=setr
        
        client.subscribe(sn + 'state', { qos: 1 });
        //开始订阅
        
        if (req.query.cmd != 'setr') {
            m = req.query.cmd;
            if (req.query.cmd == 'qk') {
                m = 'setr=1111111111';
            }
            if (req.query.cmd == 'qg') {
                m = 'setr=0000000000';
            }

        } else {
            var m = ['x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x', 'x'];
            m[k] = req.query.v;;

            m = req.query.cmd + '=' + m.join('');
        }
        //一系列简单接口处理

        client.publish(sn + 'ctr', m, { qos: 1, retain: true }); // 'Hello mqtt ' + (num++)
        //发送
        client.end();
        //发送完后立即结束此次和服务端建立的请求
    });
    client.on('message', function(topic, message) { 
        //订阅信息一直在运行,如果有设备返回信息到主题,就执行此回调
        aaak(message.toString());
        //将值通过aaak函数传递给res.end返回给页面数据;
        client.end();
        

    });

    function aaak(aaaa) {
        var objaaaa = JSON.parse(aaaa);
        // console.log(objaaaa);
        num++;
        client.end();
        res.end(aaaa);
    }

})

app.listen(port, hostName, function() {
    console.log(`服务器运行在http://${hostName}:${port}`);
});