使用阿里云 MQTT 服务进行消息传输的基本实践

注意:实际上我的项目是 Spring Cloud Alibaba 微服务项目,并且配置写在了 Nacos 上面!

在现代应用开发中、消息传输是分布式系统中不可或缺的一部分. 阿里云的 MQTT 服务提供了一种高效且可靠的消息传输方式、适用于物联网 (IoT) 等场景. 本文将结合代码示例、介绍如何在 Spring Boot 项目中集成阿里云 MQTT 服务、并分享一些最佳实践.

1、项目背景

我们正在开发一个基于 Spring Boot 的应用、该应用需要使用阿里云的 MQTT 服务进行消息传输. 为了实现这一目标、我们需要配置 MQTT 服务的连接信息、并编写相关的消息发送逻辑.

2、引入依赖

<!-- 云消息队列 MQTT 版 -->
<dependency>
    <groupId>com.alibaba.mqtt</groupId>
    <artifactId>server-sdk</artifactId>
    <version>1.0.8.Final</version>
</dependency>

3、配置阿里云 MQTT 服务

首先、我们需要创建一个配置类来存储 MQTT 服务的相关配置. 这包括接入点、端口、实例 ID、AccessKey 和 SecretKey 等信息.

4、动刷新配置

为了在配置变更时自动刷新 MQTT 连接、我们可以使用 @EventListener 监听 EnvironmentChangeEvent 事件. 这样、当配置发生变化时、MQTT 连接会自动重新初始化.

下面是『3、配置阿里云 MQTT 服务』和『4、动刷新配置』的代码示例:

package com.kumy.requrchase.treasure.conf;

import com.alibaba.mqtt.server.ServerProducer;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ProducerConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 阿里云-云消息队列 MQTT 版配置类
 * - <a href="https://help.aliyun.com/zh/apsaramq-for-mqtt/mqtt-upgraded/developer-reference/api-operations-and-parameters">云消息队列 MQTT 版文档地址</a>
 * <a href="https://help.aliyun.com/zh/apsaramq-for-mqtt/mqtt-upgraded/product-overview/service-introduction-terms?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3">基本概念</a>
 * 此类用于表示云消息队列 MQTT 版的配置信息. 
 * 包含接入点、协议端口、实例 ID、AccessKey 和 SecretKey 等字段. 
 *
 * @author zibo
 * @date 2024/10/25
 * @slogan 慢慢学、不要停. 
 */
@Data
@Slf4j
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "aliyuncs-mqtt")
public class AliyuncsMqttConfig {

    /**
     * 云消息队列 MQTT 版实例的接入点. 
     * 客户端通过接入点连接云消息队列 MQTT 版服务端. 
     */
    private String domain;

    /**
     * 协议端口. 
     * 端口和使用的协议必须匹配、云端 SDK 中、该参数值固定为 5672. 
     */
    private Integer port = 5672;

    /**
     * 您在云消息队列 MQTT 版控制台创建的实例的 ID. 
     */
    private String instanceId;

    /**
     * 您在阿里云账号管理控制台中创建的 AccessKey ID、用于身份认证. 
     */
    private String accessKey;

    /**
     * 您在阿里云账号管理控制台中创建的 AccessKey Secret、用于身份认证. 
     * 仅在 Signature 鉴权模式下需要设置. 
     */
    private String secretKey;

    /**
     * 此处参数内容为示意. firstTopic 是 MQTT 消息的一级 topic、需要在控制台申请才能使用. 
     * 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败、MQTT 服务端会断开客户端连接. 
     */
    private String firstTopic;

    /**
     * MQTT 的二级 Topic、甚至三级 Topic 都是父级 Topic 下的子类. 
     * 使用时无需在控制台创建、直接在代码中设置即可. 
     * 命名格式为: 父级 Topic 和各子级 Topic 间均使用正斜线 (/) 隔开、<父级 Topic 名称>/<二级 Topic 名称>/<三级 Topic 名称>、例如、SendMessage/demo/producer. 
     * 需要注意的是云消息队列 MQTT 版限制父级 Topic 和子级 Topic 的总长度为 64 个字符、如果超出长度限制将会导致客户端异常. 
     * 您可以使用 MQTT.fx 客户端验证子级 Topic 发布和订阅消息. 
     * -
     * 不再配置、不同业务场景下、在代码中设置. 
     */
    // private String secondTopic;

    @Bean
    @RefreshScope
    public ServerProducer serverProducer() throws IOExceptionTimeoutException {
        return createServerProducer();
    }

    ServerProducer createServerProducer() throws IOExceptionTimeoutException {
        // 创建 ChannelConfig 实例
        ChannelConfig channelConfig = new ChannelConfig();
        channelConfig.setDomain(domain);
        channelConfig.setPort(port);
        channelConfig.setInstanceId(instanceId);
        channelConfig.setAccessKey(accessKey);
        channelConfig.setSecretKey(secretKey);

        // 创建 ServerProducer 实例
        ServerProducer serverProducer = new ServerProducer(channelConfig、new ProducerConfig());
        // 启动 ServerProducer
        serverProducer.start();

        // 实例创建日志
        log.info("MQTT 配置下 ServerProducer 实例创建日志、配置: {}"、serverProducer.getChannelConfig());

        // 返回 ServerProducer 实例
        return serverProducer;
    }
}

@Slf4j
@Component
class ServerProducerRefresher {

    private final AliyuncsMqttConfig aliyuncsMqttConfig;
    private ServerProducer serverProducer;

    @Autowired
    public ServerProducerRefresher(AliyuncsMqttConfig aliyuncsMqttConfig、ServerProducer serverProducer) {
        this.aliyuncsMqttConfig = aliyuncsMqttConfig;
        this.serverProducer = serverProducer;
    }

    @EventListener(EnvironmentChangeEvent.class)
    public synchronized void onEnvironmentChange() throws IOExceptionTimeoutException {
        // 停止旧的 ServerProducer
        if (serverProducer != null) {
            // 实例停止日志
            log.info("MQTT 配置下 ServerProducer 实例停止日志、旧配置: {}"、serverProducer.getChannelConfig());
            serverProducer.stop();
        }

        // 重新初始化 ServerProducer
        serverProducer = aliyuncsMqttConfig.createServerProducer();
    }
}

5、消息发送示例

在控制器中、我们可以通过 ServerProducer 发送消息. 以下是一个简单的示例:

package com.example.mqtt.controller;

import com.alibaba.mqtt.server.ServerProducer;
import com.example.mqtt.config.AliyuncsMqttConfig;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Resource
    private ServerProducer serverProducer;
    @Resource
    private AliyuncsMqttConfig aliyuncsMqttConfig;

    @RequestMapping("/send")
    public String sendMessage() {
        try {
            serverProducer.sendMessage(aliyuncsMqttConfig.getFirstTopic() + "/test""hello world".getBytes());
        } catch (IOException e) {
            throw new RuntimeException("消息发送失败"、e);
        }
        return "消息发送成功";
    }
}

6、总结

通过上述步骤、我们成功地在 Spring Boot 项目中集成了阿里云的 MQTT 服务、能够实现消息的发送和接收. 在实际项目中、您可以根据业务需求进一步扩展和优化代码. 同时、确保配置的安全性、避免敏感信息泄露.

希望这篇博客能帮助您更好地理解和使用阿里云的 MQTT 服务. 如果您有任何问题或建议、欢迎在下方留言讨论.

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部