发布时间:2025-06-24 20:07:22 作者:北方职教升学中心 阅读量:300
第一种方式 通过GatewayClient(websocket常用 物联网设备连接时不能这样)
1.网站页面建立与GatewayWorker的websocket连接
2. GatewayWorker发现有页面发起连接时,将对应连接的client_id发给网站页面
3. 网站页面收到client_id后触发一个ajax请求(假设是bind.php)将client_id发到tp后端
4. mvc后端bind.php收到client_id后利用GatewayClient调用Gateway::bindUid($client_id, u i d ) 将 c l i e n t i d 与当前 u i d ( 用户 i d 或者客户端唯一标识 ) 绑定。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。群发功能,也可以利用Gateway::joinGroup( uid)将clientid与当前uid(用户id或者客户端唯一标识)绑定。第二种方式(Gatewayworker) 可能更多用于物联网设备绑定
1、topthink/think-worker/src/command/GatewayWorker.php 复制一份改名为 GatewayWorker2.php
里面的类名改为GatewayWorker2 调用配置文件 改为
$option = Config::get('gateway_worker2');
2、js把网站携带的uid发送到websocket服务器 socket.send(uid);(设备主动上报信息携带其序列号 Mqtt每个设备发布主题带设备编号 必如 Device/{id}/+)
3、businessWorker Events触发onMessage事件 调用Gateway::bindUid 实现uid和clientid绑定 以便下一步发送指令(TCP根据设备上报的内容获得设备编号 Mqtt根据主题解析出设备编号)
同样也常用两种方式下发指令
一、如果有群组、thinkphp 连接tcp 给gatewayworker发送信息 以Mqtt发布主题为例
$client = stream_socket_client('tcp://127.0.0.1:2348', $errno, $errmsg, 30);$data = array('topic'=>'869219071430769/set', 'content'=>'{"FH":"1.200","FL":"0.001","TH":"50.0","TL":"0.0","AT":"2","CT":"3","FF":"1.600","TF":"5.0","FB":"0.1","TB":"2.0","BR":"0"}','options'=>array('qos'=>0,'retain'=>0));// 发送数据, fwrite($client, json_encode($data)."\n");// 读取推送结果 echo fread($client, 8192);
2.gatewayworker会转发给businessWorker 处理, Events触发onMessage事件,调用Gateway::sendToUid 给客户端发送指令 (Mqtt是发送主题设备订阅$mqtt->publish(‘topic’,‘content’)
不管用那种方式绑定了客户端和uid 都可以使用GatewayClient给uid发送指令,我们的目的就是为了用GatewayClient 所以建议尽量用第一种方式,Mqtt没办法只能用第二种,当然也可以和官方那样重新开个worker单独连接mqtt服务器去发布主题 或者配合别的phpMqttclient扩展,不用worker也可以实现连接Mqtt服务器下发指令
注意:一个Gatewayworker实例 不能同时开TCP和websocket ,如果单台服务器需要同时支持TCP和websocket 需要开两个 Gatewayworker实例,下面是步骤 (当然也可以用一个Gatewayworker同时开TCP+mqq,再用一个workerman单开Websocket 这样应该更简单些,而且防止端口冲突,但是我是为了都用GatewayClient的api所以这样操作)
1、群发功能,也可以利用Gateway::joinGroup(client_id, $group_id)将client_id加入到对应分组
二、Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。如果有群组、通过直接连接Gatewayworker作为一个客户端去和别的客户端通信 其实就是实现GatewayClient (Websocket如果实现PHP后端下发指令不大好操作,js很方便 )
1、第一种通过GatewayClient (Mqtt不能用这个方式,Mqtt是通过发布主题实现命令下发)
前面绑定后 tp框架处理业务过程中需要向某个uid或者某个群组发送数据时,直接调用GatewayClient的接口Gateway::sendToUid Gateway::sendToGroup 等发送即可
二、如果有群组、
要注意的几个点:
1.businessWorker Events里onWorkerStart启动时同时连接mqtt服务器 :
php think worker:gateway
namespace app\worker;use GatewayWorker\Lib\Gateway;use Workerman\Worker;use think\facade\Db;use Workerman\Mqtt\Client as MqttClient;use Workerman\Lib\Timer;use think\facade\Log;use app\model\Company as CompanyModel;/** * Worker 命令行服务类 */class TcpEvents{ private static $mqtt = null; /** * @var true */ private static bool $mqtt_connected; /** * onWorkerStart 事件回调 * 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次 * * @access public * @param \Workerman\Worker $businessWorker * @return void */ public static function onWorkerStart(Worker $businessWorker) { $options = [ 'username' => 'MQTT', 'password' => 'MQTTPW', ]; $mqtt = new MqttClient('mqtt://0.0.0.0:1883',$options); $mqtt->onConnect = function($mqtt) { $mqtt->subscribe('YK/#'); self::$mqtt_connected = true;//mqtt连接成功 下面判断用 }; $mqtt->onMessage = function($topic, $content, $mqtt){ var_dump($topic.'主题信息:', $content); if($content == 'getParam'){ $arr = array('topic'=>'TPR/869219071430769/setParam', 'content'=>'{"FH":"1.200","FL":"0.001","TH":"50.0","TL":"0.0","AT":"200","CT":"3","FF":"1.600","TF":"5.0","FB":"0.1","TB":"2.0","BR":"0"}'); $mqtt->publish($arr['topic'], $arr['content']); } Log::write('MQTT设备发送的信息:'.$topic,'notice'); }; $mqtt->connect(); static::$mqtt = $mqtt;//mqtt对象下面用 // Gateway::sendToAll('hello'); //$app = new Application; //$app->initialize(); Log::write('TCP设备发送的信息:','notice'); }
说下TCP和websocket 以及Mqtt实现客户端UID的绑定和下发指令
常用两种方式绑定uid
一、config里的gateway_worker 复制一份改名为gateway_worker2 修改里面的配置(一个为tcp一个为websocket) port registerAddress端口 startPort eventHandler name等都根据自己需求改下,特别注意,增加配置:
'pidFile' => app()->getRuntimePath().'gateway_worker2.pid',
因为tp集成的think-worker版本比较低默认的pidFile是固定的 两个Gatewayworker实例必须两个pidFile
3.修改命令:topthink/think-worker/src/Service.php 增加一条
'worker:gateway2' => '\\think\\worker\\command\\GatewayWorker2',
还有一个注意事项,因为用了thinkphp集成的workerman(think-worker)版本是3.5不是最新的 ,使用workerman/mqtt有点问题,定时器类引用位置要改下,暂时没发现别的问题
namespace Workerman\Mqtt;use \Workerman\Connection\AsyncTcpConnection;use Workerman\Mqtt\Consts\MQTTConst;use Workerman\Mqtt\Consts\ReasonCodeConst;use \Workerman\Protocols\mqtt;use \Workerman\Lib\Timer;//这里要改