目录
一、背景
二、爬虫的实现思路
1、直接requests爬取
2、综合思路
二、代码实现
1、初始化Selenium WebDriver:
2、访问用户视频列表页面
3、翻页处理
4、获取视频详细信息
5、保存数据
6、定义一个运行函数串联所有方法功能
三、实际运行效果
1、selenium爬取视频BV号
2、BV号的json文件
3、execl文件信息
之前B站有刷到一个视频,关于某UP主的视频播放数统计,这视频的UP主是个狠人,一个个视频统计,最后算所有视频的播放数,可以是可以,就是有点废人,突然有了想用爬虫的思路了.
首先我们知道B站的每一个视频是BV号,视频的URL是由B站域名加唯一BV号如:
https://www.bilibili.com/video/BV1Qm411S7BP/5分钟了解 蓝色监狱【『剧场版 蓝色监狱 -EPIOSODE 凪-』2024年4月19日公開】_哔哩哔哩_bilibili
那这样爬取的大致思路就是获取所有BV号,再用requests,去爬取视频,获取视频信息
发现行不通,这里直接在up主页面访问得不到每一个视频的BV号
看样子这个JS脚本是来用于检测用户的信息,并根据这些条件可能会重定向不同的页面。此外,还有一些用于页面性能监控、日志上报和webp图片格式支持检测的脚本。
本菜鸟技术有限,打不过就绕过,直接上selenium来获取所有视频的BV号
selenium用来获取BV号,爬取到具体的信息保存下来,配合多线程、requests用来爬取具体视频的信息,提高爬取效率,最后用execl保存爬取到的所有信息
使用Selenium的Chrome WebDriver来模拟用户在浏览器中的操作,这样可以处理JavaScript渲染的页面
class GetInfo(): def __init__(self, user_id): self.a_list = [] # 存储每一个视频的url self.d = webdriver.Chrome() # 初始化Chrome浏览器驱动 self.user_id = user_id self.base_url = f'https://space.bilibili.com/{ user_id}/video' self.d.get(self.base_url) #这篇文章写于2022年,当时B站免登入可以搜索视频,查看视频,但是这段时间再次尝试爬取资源时,加了必须认证登入,尝试过很多次,没有获取token,只能老老实实,登入后再去爬取信息 time.sleep(10) print("速度扫码登入")
实际效果,是可以直接访问,
但是会弹出登入验证,甚至因为未登入无法获取视频列表,接口返回无权限,遇到这种情况,只能登入,尝试过selenium带cookie,但是没用,甚至登入后copy as curl再postman导入请求,都返回无权限,说明搜索接口再请求服务器时候,没有带上cookie
这里使用XPath定位到视频列表的ul元素,然后遍历其中的li元素,提取每个视频的URL。
def get_url(self): # 从当前页面获取所有视频的URL并保存到本地文件 ul=WebDriverWait(self.d, 10).until(lambda x: x.find_element(By.XPATH, '//*[@id="submit-video-list"]/ul[1]')) lis = ul.find_elements(By.XPATH, "li") for li in lis: self.a_list.append(li.get_attribute("data-aid")) with open("url.json", "w+", encoding="utf-8") as f: data = json.dumps(self.a_list, ensure_ascii=False) # 确保中文字符正常保存 f.write(data) # 使用write而不是writelines
首先获取总页数,然后通过点击“下一页”按钮来遍历所有页面,并在每个页面上调用get_urls方法来提取视频URL
def next_page(self): # 遍历所有页面,获取所有视频的URL) total = WebDriverWait(self.d, 10).until(lambda x: x.find_element(By.XPATH, '//*[@id="submit-video-list"]/ul[3]/span[1]')) number = re.findall(r"\d+", total.text) total = int(number[0]) for page in range(1, total): try: self.d.find_element(By.LINK_TEXT, '下一页').click() time.sleep(2) # 等待页面加载 self.get_url() # 修复方法名错误 except Exception as e: print(f"Failed to click next page: { e}") return self.a_list
使用requests库来获取每个视频页面的HTML内容,然后调用parse_video_page方法来解析并提取视频的详细信息
def get_video(self, urls, start, end): # 使用requests.Session()来复用TCP连接 with requests.Session() as session: session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36" }) base_url = "http://www.bilibili.com/video/" # 预编译正则表达式以提高性能 title_pattern = re.compile(r'([^&]+) ') play_count_pattern = re.compile(r'视频播放量 (\d+)') danmu_count_pattern = re.compile(r'弹幕量 (\d+)') like_count_pattern = re.compile(r'点赞数 (\d+)') coin_count_pattern = re.compile(r'投硬币枚数 (\d+)') favorite_count_pattern = re.compile(r'收藏人数 (\d+)') share_count_pattern = re.compile(r'转发人数 (\d+)') for url in urls[int(start):int(end)]: full_url = base_url + url try: response = session.get(full_url) if response.status_code == 200: string = response.text # 使用正则表达式提取视频信息 title_match = title_pattern.search(string) title = title_match.group(1) if title_match else "未找到匹配的内容" # 提取视频播放量、弹幕量等信息 play_count = play_count_pattern.search(string).group(1) if play_count_pattern.search( string) else '0' danmu_count = danmu_count_pattern.search(string).group(1) if danmu_count_pattern.search( string) else '0' like_count = like_count_pattern.search(string).group(1) if like_count_pattern.search( string) else '0' coin_count = coin_count_pattern.search(string).group(1) if coin_count_pattern.search( string) else '0' favorite_count = favorite_count_pattern.search(string).group( 1) if favorite_count_pattern.search(string) else '0' share_count = share_count_pattern.search(string).group(1) if share_count_pattern.search( string) else '0' # 将提取的信息添加到self.data_list中 video_info = { "url":full_url, "title": title, "play_count": play_count, "danmu_count": danmu_count, "like_count": like_count, "coin_count": coin_count, "favorite_count": favorite_count, "share_count": share_count } self.data_list.append(video_info) else: print(f"Failed { full_url}: HTTP { response.status_code}") except Exception as e: print(f"Failed to get video info for url { full_url}: { e}")
导入openpyxl库,用于将数据保存到Excel
def save_to_excel(self, filename): wb = Workbook() ws = wb.active ws.append(['url','标题', '播放量', '弹幕数', '点赞数', '投币数', '收藏数', '分享数']) # 添加表头 for video_info in self.data_list: ws.append([ video_info['url'], video_info['title'], video_info['play_count'], video_info['danmu_count'], video_info['like_count'], video_info['coin_count'], video_info['favorite_count'], video_info['share_count'] ]) wb.save(filename)
def run(self): # 运行整个流程 self.get_url() # 获取当前页面的视频URL self.next_page() # 遍历所有页面获取视频URL with open("url.json", "r", encoding="utf-8") as f: data = json.load(f) # 使用多线程提高数据获取效率 threads = [] part = int(len(data) / 3) for i in range(3): start = i * part end = (i + 1) * part if i != 2 else len(data) thread = threading.Thread(target=self.get_video, args=(data, start, end)) threads.append(thread) thread.start() for thread in threads: thread.join() # 等待所有线程完成 # 所有线程完成后,保存数据到Excel self.save_to_excel('final_video_info1.xlsx')
if __name__ == '__main__': obj = GetInfo() obj.run() # 运行整个流程
自动翻页至最后一页
json文件列表长度和视频号一致
有了这些文件,想统计具体的数据,就很轻松