python bilibili直播弹幕

编程入门 行业动态 更新时间:2024-10-24 22:19:18

python bilibili直播<a href=https://www.elefans.com/category/jswz/34/1748663.html style=弹幕"/>

python bilibili直播弹幕

原文:AioWebSocket实现python异步接收B站直播弹幕

import ssl
import websocket
import asyncio
import zlib
import json
import _thread as thread
import time
import requests
class WebsocketClient(object):"""docstring for WebsocketClient"""def __init__(self, address, send_message):super(WebsocketClient, self).__init__()self.address = addressself.send_message = send_messagedef on_message(self, ws, message):printDM(message)def on_error(self, ws, error):print("### 报错:", error)def on_close(self, ws,x,y):print("### 已关闭###")print(x,y)def on_open(self, ws):print("开始连接")def run(*args):self.ws.send(self.send_message)print(self.send_message)# self.ws.close()hb='00 00 00 10 00 10 00 01  00 00 00 02 00 00 00 01'def sendHeartBeat():while True:time.sleep(30)self.ws.send(bytes.fromhex(hb))print('[Notice] 发送心跳包,并获取人气值')thread.start_new_thread(run, ())thread.start_new_thread(sendHeartBeat, ())def run(self):websocket.enableTrace(False)self.ws = websocket.WebSocketApp(self.address,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)self.ws.on_open = self.on_openself.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})def color_text(text,front=33,back=40):#自定义文字颜色return f'\033[1;{front};{back}m'+str(text)+'\033[0m' #参数文字颜色front:灰色(30)-红色(31)-绿色(32)-黄色(33)-蓝色(34)-粉色(35),#背景颜色back:红色(41)-绿色(42)-黄色(43)-蓝色(44)-粉色(45)-绿色(46)-灰白(47)# 将数据包传入,解析数据:
dm_count = 0#弹幕计数
lw_count = 0#礼物计数
def printDM(data):global dm_count# 获取数据包的长度,版本和操作类型packetLen = int(data[:4].hex(), 16)ver = int(data[6:8].hex(), 16)op = int(data[8:12].hex(), 16)# 有的时候可能会两个数据包连在一起发过来,所以利用前面的数据包长度判断,if (len(data) > packetLen):printDM(data[packetLen:])data = data[:packetLen]# 有时会发送过来 zlib 压缩的数据包,这个时候要去解压。if (ver == 2):data = zlib.decompress(data[16:])printDM(data)return# ver 为1的时候为进入房间后或心跳包服务器的回应。op 为3的时候为房间的人气值。if (ver == 1):if (op == 3):print('[人气]  {}'.format(int(data[16:].hex(), 16)))return# ver 不为2也不为1目前就只能是0了,也就是普通的 json 数据。# op 为5意味着这是通知消息,cmd 基本就那几个了。if (op == 5):try:jd = json.loads(data[16:].decode('utf-8', errors='ignore'))if (jd['cmd'] == 'DANMU_MSG'):dm_count+=1print(color_text(f'[弹幕{dm_count}] '), jd['info'][2][1], ': ', jd['info'][1])elif (jd['cmd'] == 'SEND_GIFT'):print(color_text(f'[礼物]',31), jd['data']['uname'], ' ', jd['data']['action'], ' ', jd['data']['num'], 'x',jd['data']['giftName'])elif (jd['cmd'] == 'LIVE'):print('[Notice] LIVE Start!')elif (jd['cmd'] == 'PREPARING'):print('[Notice] LIVE Ended!')# else:#     print('[OTHER] ', jd['cmd'])except Exception as e:passif __name__ == '__main__':short_id = '82568'#url链接里的idresponse = requests.get(f'={short_id}').json()roomid = str(response['data']['room_id'])#真实房间号route = ['wss://hw-gz-live-comet-02.chat.bilibili/sub','wss://tx-bj-live-comet-03.chat.bilibili/sub','wss://hw-gz-live-comet-05.chat.bilibili/sub']#线路data_raw = '000000{headerLen}0010000100000007000000017b22726f6f6d6964223a{roomid}7d'.format(headerLen=hex(27 + len(roomid))[2:],roomid=''.join(map(lambda x: hex(ord(x))[2:], list(roomid))))ws_client = WebsocketClient(route[0],bytes.fromhex(data_raw))ws_client.run()

更多推荐

python bilibili直播弹幕

本文发布于:2024-03-13 03:51:17,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1733126.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:弹幕   python   bilibili

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!