import asyncio
import websockets
import json

async def connect_to_wss(uri):
    # 连接到WSS服务器
    async with websockets.connect(uri) as websocket:
        # 执行一些操作,例如发送和接收消息
        data = {"脱敏处理"}
        await websocket.send(json.dumps(data))
        while 1:
            response = await websocket.recv()
            try:
                rjson = json.loads(response)
            except:
                with open('ok.mp3','wb')as f:
                    f.write(response)
                print(response)
            else:
                print(rjson)
                if rjson['event'] == 'TaskFinished':
                    print('tts is ok')
                    break
            print('='*50)
            
            

# WSS服务器地址,包含协议
wss_url = "wss://脱敏处理/internal/api/v2/ws?device_id=脱敏处理&iid=脱敏处理"

# 运行异步事件循环
asyncio.get_event_loop().run_until_complete(connect_to_wss(wss_url))
asyncio.get_event_loop().close()

上面采用的是异步方式 等待可以直接

下方的如果不采用异步方式,使用websocket-client需要不停发送消息的就需要使用线程

例如

import json
import time
import _thread
import websocket


web_socket_url = "wss://appcomm-user.脱敏处理.com/app-commserv-user/websocket?qrToken=%s"
qr_token = "ca6e6cfb70de4f2f915b968aefcad404"
once_password = ""
uuid = ""


def wss_on_message(ws, message):
    print("=============== [message] ===============")
    message = json.loads(message)
    print(message)
    if "扫码成功" in message["msg"]:
        global once_password, uuid
        once_password = message["oncePassword"]
        uuid = message["uuid"]
        ws.close()


def wss_on_error(ws, error):
    print("=============== [error] ===============")
    print(error)
    ws.close()


def wss_on_close(ws, close_status_code, close_msg):
    print("=============== [closed] ===============")
    print(close_status_code)
    print(close_msg)


def wss_on_open(ws):
    def run(*args):
        while True:
            ws.send(qr_token)
            time.sleep(8)
    _thread.start_new_thread(run, (qr_token,))


def wss():
    # websocket.enableTrace(True)  # 是否显示连接详细信息
    ws = websocket.WebSocketApp(
        web_socket_url % qr_token, on_open=wss_on_open,
        on_message=wss_on_message, on_error=wss_on_error,
        on_close=wss_on_close
    )
    ws.run_forever()

在这里需要每间隔8s进行发送请求获取二维码状态 那么这个时候 如果直接主线程会导致阻塞 无法获取服务器响应 所以必须采用线程 那么如果使用异步就不用啦

await asyncio.sleep(8)

如果想要更加简洁明了 还是使用websocket-client吧 方便的话websocket异步

当然 你也可以创建一个属于自己的服务端

import asyncio
import websockets


async def echo(websocket, path):
    async for message in websocket:
        message = "I got your message: {}".format(message)
        await websocket.send(message)


asyncio.get_event_loop().run_until_complete(websockets.serve(echo, 'localhost', 8765))
asyncio.get_event_loop().run_forever()

抓包调试推荐apipost

注意这个服务端 你需要在发送消息之后使用

await websocket.close()这样才能主动关闭客户端的连接 当然他也只有一次通信

 

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部