说到“世界频道”想必大家都不陌生,常见的如王者荣耀的世界广播摇人组队以及最近兴起的Discord社区交友等等。究其目的就是在应用内让海量用户可以实时互动。有些开发者为了实现这种场景会选择聊天室方案来实现,但是这种方式存在一定的局限性,比如聊天室人数上限、海量消息处理等各种情况。
当然如果有钱有颜,可以直接选择云厂商产品(比如环信的聊天室方案和超级社区),如果有才有time,也可以选择平替版MQTT实现方案。今天小猿将介绍用环信MQTT消息云实现应用内的世界频道,满满干货,不要错过~~
使用MQTT实现世界频道-Demo效果演示
协议优势:
在介绍具体方案之前,我们先唠一唠为啥选择MQTT协议。
轻量级:MQTT本身是物联网的连接协议,专为受限设备和低带宽场景使用。所以其代码占用空间较小,同样适用于注重SDK大小的移动应用领域(比如:游戏领域)。
易集成:MQTT作为标准开放的消息协议,经过多年演进,已支持30多种开发语言,10余种SDK,无论何种开发环境,都可以快速找到开源SDK。
高并发:MQTT是轻量级的消息传输协议,2字节心跳报文,最小化传输和连接成本,云厂商broker产品都可支持千万级并发接入,适用于高并发连接场景。
低成本:MQTT是基于客户端-服务器的订阅/发布模型,通过服务器中间件实现消息分发,减少消息复制成本,快速实现一对多在线推送。
灵活性:MQTT协议支持多种消息特性,包括:topic主题层级、消息分级(QoS0,1,2)、遗嘱消息、保留消息等,可以灵活实现多种业务场景。
衍生功能:随着MQTT云服务的发展,部分服务器厂商已支持消息存储、获取在线设备列表、查看历史消息等衍生功能,降低开发工作量与消息存储成本。
实现方案:
言归正传,上干货。本次技术实现方案包含:移动客户端(Android)、后端服务(Java)以及MQTT服务器。这里提一下,MQTT服务器使用环信MQTT消息云,使用三方云服务比较省心,既节省开发时间,产品性能也不需要担心,现在注册可以直接使用环信MQTT消息云超高额度的免费版:每月100并发连接、300万消息,完全满足功能开发使用。
客户端实现:
客户端实现主要包含以下两部分:
底层MQTT业务集成:包含引入SDK、MQTT方法封装、业务交互(消息收发)。
APP上层交互:在APP首页提供世界频道入口,实现心情弹幕飘窗(接收)和发送。
接下来上底层MQTT业务集成代码。
引入SDK:
这一步环信官方文档比较明确,就是根据自己的平台引入相应的mqtt客户端sdk,这里简单贴一下AndroidStudio的引入配置
1// 在根目录 build.gradle repositories 下加入配置 2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" } 3... 4// 然后加入 MQTT 依赖 5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk 6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
方法封装
这里贴一下对mqtt相关方法的简单封装,代码在vmmqtt模块儿的MQTTHelper类下:
1 /** 2 * Create by lzan13 on 2022/3/22 3 * 描述:MQTT 帮助类 4 */ 5 object MQTTHelper { 6 7 private var mqttClient: MqttAndroidClient? = null 8 9 // 缓存主题集合 10 private val topicList = mutableListOf<String>() 11 12 /** 13 * 链接MQTT 14 * @param id 用户 Id 15 * @param token 用户链接 MQTT 的 Token 16 * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅 17 */ 18 fun connect(id: String, token: String, topic: String = "") { 19 // 处理订阅主题 20 if (topic.isNotEmpty()) topicList.add(topic) 21 22 // 拼接链接地址 23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}" 24 // 拼接 clientId 25 val clientId = "${id}@${MQTTConstants.mqttAppId()}" 26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId) 27 28 //连接参数 29 val options = MqttConnectOptions() 30 options.isAutomaticReconnect = true //设置自动重连 31 options.isCleanSession = true // 缓存 32 options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒 33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒 34 options.userName = id // 用户名 35 options.password = token.toCharArray() // 密码 36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1; 37 // 设置MQTT监听 38 mqttClient?.setCallback(object : MqttCallback { 39 override fun connectionLost(t: Throwable) { 40 // 通知链接断开 41 VMLog.d("MQTT 链接断开 $t") 42 } 43 44 @Throws(Exception::class) 45 override fun messageArrived(topic: String, message: MqttMessage) { 46 // 通知收到消息 47 VMLog.d("MQTT 收到消息:$message") 48 // 如果未订阅则直接丢弃 49 if (!topicList.contains(topic)) return 50 notifyEvent(topic, String(message.payload)) 51 } 52 53 override fun deliveryComplete(token: IMqttDeliveryToken) {} 54 }) 55 //进行连接 56 mqttClient?.connect(options, null, object : IMqttActionListener { 57 override fun onSuccess(token: IMqttToken) { 58 VMLog.d("MQTT 链接成功") 59 // 链接成功,循环订阅缓存的主题 60 topicList.forEach { subscribe(it) } 61 } 62 63 override fun onFailure(token: IMqttToken, t: Throwable) { 64 VMLog.d("MQTT 链接失败 $t") 65 } 66 }) 67 } 68 69 /** 70 * 订阅主题 71 * @param topic 主题 72 */ 73 fun subscribe(topic: String) { 74 if (!topicList.contains(topic)) { 75 topicList.add(topic) 76 } 77 try { 78 //连接成功后订阅主题 79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener { 80 override fun onSuccess(token: IMqttToken) { 81 VMLog.d("MQTT 订阅成功 $topic") 82 } 83 84 override fun onFailure(token: IMqttToken, t: Throwable) { 85 VMLog.d("MQTT 订阅失败 $topic $t") 86 } 87 }) 88 } catch (e: MqttException) { 89 e.printStackTrace() 90 } 91 } 92 93 /** 94 * 取消订阅 95 * @param topic 主题 96 */ 97 fun unsubscribe(topic: String) { 98 if (topicList.contains(topic)) { 99 topicList.remove(topic) 100 } 101 try { 102 mqttClient?.unsubscribe(topic) 103 } catch (e: MqttException) { 104 e.printStackTrace() 105 } 106 } 107 108 /** 109 * 发送 MQTT 消息 110 * @param topic 主题 111 * @param content 内容 112 */ 113 fun sendMsg(topic: String, content: String) { 114 val msg = MqttMessage() 115 msg.payload = content.encodeToByteArray() // 设置消息内容 116 msg.qos = 0 //设置消息发送质量,可为0,1,2. 117 // 设置消息的topic,并发送。 118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener { 119 override fun onSuccess(asyncActionToken: IMqttToken) { 120 VMLog.d("MQTT 消息发送成功") 121 } 122 123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { 124 VMLog.d("MQTT 消息发送失败 ${exception.message}") 125 } 126 }) 127 } 128 129 /** 130 * 通知 MQTT 事件 131 */ 132 private fun notifyEvent(topic: String, data: String) { 133 LDEventBus.post(topic, data) 134 } 135 }
业务交互
和业务相关的就是在启动APP后,使用后端服务器返回的鉴权token信息及连接封装接口登录环信通MQTT服务器,登录成功后订阅主题并监听消息。
1// 请求 token 成功后,调用MQTTHelper.connect()链接 MQTT 服务器,这里会同时传递监听的主题 2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo) 3 4/** 5 * 发送匹配信息 6 */ 7private fun sendMatchInfo() { 8 if (selfMatch.user.nickname.isEmpty()) return 9 // 提交自己的匹配信息到服务器 10 mViewModel.submitMatch(selfMatch) 11 val json = JSONObject() 12 json.put("content", selfMatch.content) 13 json.put("emotion", selfMatch.emotion) 14 json.put("gender", selfMatch.gender) 15 json.put("type", selfMatch.type) 16 val jsonUser = JSONObject() 17 jsonUser.put("avatar", mUser.avatar) 18 jsonUser.put("id", mUser.id) 19 jsonUser.put("nickname", mUser.nickname) 20 jsonUser.put("username", mUser.username) 21 json.put("user", jsonUser) 22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString()) 23} 24 25// 监听消息这里使用了一个事件总线进行通知,在上边封装 MQTTHelper 发送消息也使用了这个, 26// 订阅 MQTT 事件 27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) { 28 val match = JsonUtils.fromJson<Match>(it, Match::class.java) 29 // 这里收到匹配信息之后就增加一条弹幕 30 addBarrage(match) 31}
后端服务实现
接下来介绍后端服务实现,主要包含以下两部分:
配置连接信息:配置环信MQTT消息云连接信息。
获取鉴权信息:获取客户端连接需要的鉴权信息。
配置连接信息
配置部分只需要按照环信后台配置信息进行替换就好,配置在config目录下的config.xxx.json文件内
1/** 2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService 3 */ 4config.mqtt = { 5 host: 'mqtt host', // MQTT 链接地址 6 appId: 'appId', // MQTT AppId 7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss) 8 restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服务 API 地址 9 clientId: 'client id', // 替换环信后台 clientId 10 clientSecret: 'client secret', // 替换环信后台 clientSecret 11};
获取鉴权信息
这里主要是获取客户端连接所需要的鉴权信息token,为了安全token肯定是要放在服务器端生成的,废话不多说,上代码:
1/** 2 * Create by lzan13 on 2022/3/22 3 * 描述:MQTT 帮助类 4 */ 5object MQTTHelper { 6 7 private var mqttClient: MqttAndroidClient? = null 8 9 // 缓存主题集合 10 private val topicList = mutableListOf<String>() 11 12 /** 13 * 链接MQTT 14 * @param id 用户 Id 15 * @param token 用户链接 MQTT 的 Token 16 * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅 17 */ 18 fun connect(id: String, token: String, topic: String = "") { 19 // 处理订阅主题 20 if (topic.isNotEmpty()) topicList.add(topic) 21 22 // 拼接链接地址 23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}" 24 // 拼接 clientId 25 val clientId = "${id}@${MQTTConstants.mqttAppId()}" 26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId) 27 28 //连接参数 29 val options = MqttConnectOptions() 30 options.isAutomaticReconnect = true //设置自动重连 31 options.isCleanSession = true // 缓存 32 options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒 33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒 34 options.userName = id // 用户名 35 options.password = token.toCharArray() // 密码 36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1; 37 // 设置MQTT监听 38 mqttClient?.setCallback(object : MqttCallback { 39 override fun connectionLost(t: Throwable) { 40 // 通知链接断开 41 VMLog.d("MQTT 链接断开 $t") 42 } 43 44 @Throws(Exception::class) 45 override fun messageArrived(topic: String, message: MqttMessage) { 46 // 通知收到消息 47 VMLog.d("MQTT 收到消息:$message") 48 // 如果未订阅则直接丢弃 49 if (!topicList.contains(topic)) return 50 notifyEvent(topic, String(message.payload)) 51 } 52 53 override fun deliveryComplete(token: IMqttDeliveryToken) {} 54 }) 55 //进行连接 56 mqttClient?.connect(options, null, object : IMqttActionListener { 57 override fun onSuccess(token: IMqttToken) { 58 VMLog.d("MQTT 链接成功") 59 // 链接成功,循环订阅缓存的主题 60 topicList.forEach { subscribe(it) } 61 } 62 63 override fun onFailure(token: IMqttToken, t: Throwable) { 64 VMLog.d("MQTT 链接失败 $t") 65 } 66 }) 67 } 68 69 /** 70 * 订阅主题 71 * @param topic 主题 72 */ 73 fun subscribe(topic: String) { 74 if (!topicList.contains(topic)) { 75 topicList.add(topic) 76 } 77 try { 78 //连接成功后订阅主题 79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener { 80 override fun onSuccess(token: IMqttToken) { 81 VMLog.d("MQTT 订阅成功 $topic") 82 } 83 84 override fun onFailure(token: IMqttToken, t: Throwable) { 85 VMLog.d("MQTT 订阅失败 $topic $t") 86 } 87 }) 88 } catch (e: MqttException) { 89 e.printStackTrace() 90 } 91 } 92 93 /** 94 * 取消订阅 95 * @param topic 主题 96 */ 97 fun unsubscribe(topic: String) { 98 if (topicList.contains(topic)) { 99 topicList.remove(topic) 100 } 101 try { 102 mqttClient?.unsubscribe(topic) 103 } catch (e: MqttException) { 104 e.printStackTrace() 105 } 106 } 107 108 /** 109 * 发送 MQTT 消息 110 * @param topic 主题 111 * @param content 内容 112 */ 113 fun sendMsg(topic: String, content: String) { 114 val msg = MqttMessage() 115 msg.payload = content.encodeToByteArray() // 设置消息内容 116 msg.qos = 0 //设置消息发送质量,可为0,1,2. 117 // 设置消息的topic,并发送。 118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener { 119 override fun onSuccess(asyncActionToken: IMqttToken) { 120 VMLog.d("MQTT 消息发送成功") 121 } 122 123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { 124 VMLog.d("MQTT 消息发送失败 ${exception.message}") 125 } 126 }) 127 } 128 129 /** 130 * 通知 MQTT 事件 131 */ 132 private fun notifyEvent(topic: String, data: String) { 133 LDEventBus.post(topic, data) 134 }
135}
源码地址
核心代码就这么多,不超过500行,这里没有直接调用环信历史消息接口获取消息存储记录,后续可以在进行改良,简化实现流程。源码链接附上,配合使用效果更佳。
服务端github源码:
https://github.com/lzan13/vmtemplateserver
客户端github源码:
https://gitee.com/lzan13/VMTemplateAndroid
写在最后
MQTT协议资源占用小,并发连接高,集成简单,特别适用于高频数据交互场景,比如:游戏的世界广场、视频平台弹幕等等等等,欢迎各位小伙伴集思广益,基于MQTT服务实现更多的业务场景,享受技术带来的便利与快乐。