利用Python和ffmpeg实现录制BigoLive直播
原理
利用python获取bigo直播源后使用ffmpeg来下载并合并m3u8.
具体实现
import requests
import time
import os
import threading
import random
import datetime
class Bigo:
def __init__(self, rid):
self.rid = rid
self.flag = False
self.run = False
def get_real_url(self):
with requests.Session() as s:
url = f'https://ta.bigo.tv/official_website/studio/getInternalStudioInfo'
res = s.post(url, data={'siteId': self.rid}).json()
hls_src = res['data']['hls_src']
play_url = hls_src if hls_src else None
return play_url
def checkLive(self):
try:
u = self.get_real_url()
if u:
print('正在直播!',u)
self.flag = True
else:
print('未直播')
self.flag = False
time.sleep(random.randint(10,30))
except Exception as e:
print('Exception:', e)
self.flag = False
def ffmpegDown(self):
url = self.get_real_url()
print('直播源:',url)
filename = datetime.datetime.today()
filename = filename.strftime('%Y%m%d%H%M%S')
filename = filename + '.mp4'
print(filename)
file_path = "./"
print('ffmpeg -i {} -c copy {}'.format(url, os.path.join(file_path,filename)))
os.system('ffmpeg -i {} -c copy {}'.format(url, os.path.join(file_path,filename)))
def recording(self):
while True:
print("-------recoding-------")
time.sleep(5)
if self.flag == True and self.run == False:
print("---开始录制")
self.ffmpegDown()
self.run = True
elif self.flag == True and self.run == True:
print("---正在录制")
else:
print('---未直播')
self.run = False
if __name__ == '__main__':
r = input('输入bigo直播房间号:\n')
site = Bigo(r)
change = threading.Thread(target=site.checkLive)
ffm = threading.Thread(target=site.recording)
# 开始运行程序
change.start()
ffm.start()
版权声明:
作者:Ne-21
链接:https://blog.gocos.cn/archives/248.html
来源:云淡风轻
文章版权归作者所有,未经允许请勿转载。
THE END
0
二维码
海报
利用Python和ffmpeg实现录制BigoLive直播
原理
利用python获取bigo直播源后使用ffmpeg来下载并合并m3u8.
具体实现
import requests
import time
import os
import threading
import random
import dat……

共有 0 条评论