目录
- 前言
- 编写代码
- 总结
前言
上次我们获取了gitee网站中项目的评论,本次我们来对gitee网站上的用户的贡献进行统计。来看看效果吧。
编写代码
1.分析网页
我们所需要的内容在用户主页的动态里。 我们需要从动态里提取出类型为Pull Request和issue的url,然后进入到url里,获取这条url的状态,将状态为已关闭的url进行剔除。 然后将剩余的url分成Pull Request和issue俩类,然后判断这些url的类型是openeuler还是src-openeuler,这个的话可以从它们的url中就可以看出来。 这次的数据比较多,用selenium速度会比较慢。我们直接抓包找到用户动态的数据获取网址。 但这数量有点少啊,看看请求信息。这个表单里的url前面就是用户id,拼接一下就好了。这个limit根据经验一般就是数据的条数,20也刚好对应0-19,测试了一下,没问题,这就是数据的大小。经过测试,这个limit最大就只能为1000,再多就没有数据返回了。至于下面的_这个参数是当前的时间乘以1000,不过不携带这个参数也没问题。 点开看下返回的数据,这个type是url的类型,我们需要的就pull_request和issue俩种,然后target里的path是url,我们对它进行一个拼接就好了。 获取到url后我们还要获取它的状态。访问一个url进行抓包,发现我们需要的状态直接就在原先的url里,并不是异步加载的。那我们直接获取网页解析就好了。
2.编写代码
这次因为每一个用户的url都很多,我们采用多线程来提高爬取的速度。 首先是获取用户动态的函数。
def get(username):
'''
获取一个用户主页所有的动态
'''
cnt = 0
rs = None
while cnt < 3:
payload = {'url': f'/{username}/contribution_timeline', 'scope': 'my', 'day': '',
'start_date': '', 'end_date': '', 'year': '', 'limit': 1000,}
try:
rs = requests.get(f'https://gitee.com/{username}/contribution_timeline', params=payload)
except Exception as e:
print(cnt, str(e))
cnt += 1
time.sleep(cnt)
else:
break
return rs
然后是提取我们所需要的url的函数。
def jiexi_data(js):
'''
得到url列表
'''
url_names = []
for cm in js:
try:
if cm['type'] == 'pull_request' or cm['type'] == 'issue':
url = ''.join(['https://gitee.com/', cm['target']['path']])
name = ''.join([cm['target']['pre_iid'], cm['target']['title']])
url_names.append((url, name))
except BaseException:
print('解析错误的动态', cm)
url_names = set(url_names)
return url_names
接着是获取url状态的函数。这里要注意一下,获取到的状态有时候会是繁体字或者是英文,我们需要对其进行处理。
def get_status(url_names):
'''
获取url的状态
'''
url_names_status = []
us = UserAgent()
headers = {
'User-Agent': us.random
}
for url_name in url_names:
url = url_name[0]
times = 3
res = 0
while times >=0:
try:
res = requests.get(url, headers=headers)
break
except BaseException:
print('链接失败,正在重试')
times -= 1
if times >= 0:
xp = etree.HTML(res.content.decode('utf-8'))
status = xp.xpath(
'//*[@class="ui label issue-state"]/text() | //*[@class="ui label pull-request-state"]/text()')
try:
if status[0].replace('\n', '') == '':
status = status[1].replace('\n', '')
else:
status = status[0].replace('\n', '')
except BaseException:
status = '404'
if status != '已关闭' and status != 'Closed' and status != '404' and status != '已關閉':
if status == 'Merged' or status == '已合併':
status = '已合并'
elif status == 'Backlog':
status = '待办的'
elif status == 'Done':
status = '已完成'
elif status == 'Open' or status == '開啟的':
status = '开启的'
url_names_status.append((url, url_name[1], status))
time.sleep(1)
else:
print(url)
print('已尝试3次,均失败,可能被反爬')
url_names_status.append((url, url_name[1], None))
return url_names_status
然后是将函数写入表格的函数。这里根据需求可以选择是否分openeuler和src-openeuler俩类。根据需求将对应的代码注释掉就好了。
def write_data(all_data):
'''
写入数据
'''
wb = Workbook()
ws = wb.create_sheet('贡献统计', 0)
ws_len = 1
total = 0
total_pr = 0
total_issues = 0
for data in all_data:
ws[f'A{ws_len}'] = data[0]
ws[f'A{ws_len + 1}'] = 'pr'
ws[f'B{ws_len + 1}'] = 'url'
ws[f'C{ws_len + 1}'] = 'status'
ws[f'D{ws_len + 1}'] = 'pr-name'
ws[f'F{ws_len + 1}'] = 'issues'
ws[f'G{ws_len + 1}'] = 'url'
ws[f'H{ws_len + 1}'] = 'status'
ws[f'I{ws_len + 1}'] = 'issues-name'
ws_len += 2
pull_len = 0
issues_len = 0
src_openeuler_num = 0
openeuler_num = 0
for url_name_status in data[1]:
if len(re.findall('pulls',url_name_status[0])) != 0:
# 分类
if len(re.findall('https://gitee.com//src-openeuler', url_name_status[0])) != 0 or len(re.findall('https://gitee.com//openeuler',url_name_status[0])) != 0:
if len(re.findall('https://gitee.com//src-openeuler', url_name_status[0])) != 0:
ws[f'A{ws_len + pull_len}'] = 'src-openeuler'
ws[f'B{ws_len + pull_len}'] = url_name_status[0]
ws[f'C{ws_len + pull_len}'] = url_name_status[1]
ws[f'D{ws_len + pull_len}'] = url_name_status[2]
src_openeuler_num += 1
pull_len += 1
else:
ws[f'A{ws_len + pull_len}'] = 'openeuler'
ws[f'B{ws_len + pull_len}'] = url_name_status[0]
ws[f'C{ws_len + pull_len}'] = url_name_status[1]
ws[f'D{ws_len + pull_len}'] = url_name_status[2]
openeuler_num += 1
pull_len += 1
# 不分类
# ws[f'B{ws_len + pull_len}'] = url_name_status[0]
# ws[f'C{ws_len + pull_len}'] = url_name_status[1]
# ws[f'D{ws_len + pull_len}'] = url_name_status[2]
# pull_len += 1
else:
# 分类
if len(re.findall('https://gitee.com//src-openeuler', url_name_status[0])) != 0 or len(re.findall('https://gitee.com//openeuler', url_name_status[0])) != 0:
if len(re.findall('https://gitee.com//src-openeuler',url_name_status[0])) != 0:
ws[f'F{ws_len + issues_len}'] = 'src-openeuler'
ws[f'G{ws_len + issues_len}'] = url_name_status[0]
ws[f'H{ws_len + issues_len}'] = url_name_status[1]
ws[f'I{ws_len + issues_len}'] = url_name_status[2]
src_openeuler_num += 1
issues_len += 1
else:
ws[f'F{ws_len + issues_len}'] = 'openeuler'
ws[f'G{ws_len + issues_len}'] = url_name_status[0]
ws[f'H{ws_len + issues_len}'] = url_name_status[1]
ws[f'I{ws_len + issues_len}'] = url_name_status[2]
openeuler_num += 1
issues_len += 1
# 不分类
# ws[f'G{ws_len + issues_len}'] = url_name_status[0]
# ws[f'H{ws_len + issues_len}'] = url_name_status[1]
# ws[f'I{ws_len + issues_len}'] = url_name_status[2]
# issues_len += 1
if issues_len < pull_len:
max_len = pull_len
else:
max_len = issues_len
total = total + issues_len + pull_len
total_pr += pull_len
total_issues += issues_len
ws[f'A{ws_len + max_len}'] = 'pr-num'
ws[f'B{ws_len + max_len}'] = pull_len
ws[f'F{ws_len + max_len}'] = 'issues-num'
ws[f'G{ws_len + max_len}'] = issues_len
# 不分类
# ws[f'A{ws_len + max_len + 1}'] = '合计'
# ws[f'B{ws_len + max_len + 1}'] = pull_len + issues_len
# ws_len += (max_len + 3)
# 分类
ws[f'A{ws_len + max_len + 1}'] = 'openeuler-num'
ws[f'B{ws_len + max_len + 1}'] = openeuler_num
ws[f'F{ws_len + max_len + 1}'] = 'src-openeuler-num'
ws[f'G{ws_len + max_len + 1}'] = src_openeuler_num
ws[f'A{ws_len + max_len + 2}'] = '合计'
ws[f'B{ws_len + max_len + 2}'] = pull_len + issues_len
ws_len += (max_len + 4)
ws[f'A{ws_len}'] = 'pr总合计'
ws[f'B{ws_len}'] = total_pr
ws[f'F{ws_len}'] = 'issues总合计'
ws[f'G{ws_len}'] = total_issues
ws[f'A{ws_len + 1}'] = '所有人的贡献总数'
ws[f'B{ws_len + 1}'] = total
wb.save('贡献统计.xlsx')
3.总的代码
import requests
import time
from fake_useragent import UserAgent
from lxml import etree
import re
from openpyxl import Workbook
import threading
class MyThread(threading.Thread):
'''
统计用户贡献多线程类
'''
def __init__(self, user_list):
threading.Thread.__init__(self)
self.user_list = user_list
self.all_data = []
self.error_users = []
def run(self):
for user in self.user_list:
print(user[0])
rs = get(user[1])
try:
print('共有', len(rs.json()), '条动态')
print('正在解析动态')
url_names = jiexi_data(rs.json())
print('解析完毕,正在获取状态')
self.all_data.append((user[0], get_status(url_names)))
except BaseException as b:
print(b)
print(''.join([user[0], '的用户id错误,无法访问']))
self.error_users.append(user)
def get(username):
'''
获取一个用户主页所有的动态
'''
cnt = 0
rs = None
while cnt < 3:
payload = {'url': f'/{username}/contribution_timeline', 'scope': 'my', 'day': '',
'start_date': '', 'end_date': '', 'year': '', 'limit': 1000,}
try:
rs = requests.get(f'https://gitee.com/{username}/contribution_timeline', params=payload)
except Exception as e:
print(cnt, str(e))
cnt += 1
time.sleep(cnt)
else:
break
return rs
def jiexi_data(js):
'''
得到url列表
'''
url_names = []
for cm in js:
try:
if cm['type'] == 'pull_request' or cm['type'] == 'issue':
url = ''.join(['https://gitee.com/', cm['target']['path']])
name = ''.join([cm['target']['pre_iid'], cm['target']['title']])
url_names.append((url, name))
except BaseException:
print('解析错误的动态', cm)
url_names = set(url_names)
return url_names
def get_status(url_names):
'''
获取url的状态
'''
url_names_status = []
us = UserAgent()
headers = {
'User-Agent': us.random
}
for url_name in url_names:
url = url_name[0]
times = 3
res = 0
while times >=0:
try:
res = requests.get(url, headers=headers)
break
except BaseException:
print('链接失败,正在重试')
times -= 1
if times >= 0:
xp = etree.HTML(res.content.decode('utf-8'))
status = xp.xpath(
'//*[@class="ui label issue-state"]/text() | //*[@class="ui label pull-request-state"]/text()')
try:
if status[0].replace('\n', '') == '':
status = status[1].replace('\n', '')
else:
status = status[0].replace('\n', '')
except BaseException:
status = '404'
if status != '已关闭' and status != 'Closed' and status != '404' and status != '已關閉':
if status == 'Merged' or status == '已合併':
status = '已合并'
elif status == 'Backlog':
status = '待办的'
elif status == 'Done':
status = '已完成'
elif status == 'Open' or status == '開啟的':
status = '开启的'
url_names_status.append((url, url_name[1], status))
time.sleep(1)
else:
print(url)
print('已尝试3次,均失败,可能被反爬')
url_names_status.append((url, url_name[1], None))
return url_names_status
def write_data(all_data):
'''
写入数据
'''
wb = Workbook()
ws = wb.create_sheet('贡献统计', 0)
ws_len = 1
total = 0
total_pr = 0
total_issues = 0
for data in all_data:
ws[f'A{ws_len}'] = data[0]
ws[f'A{ws_len + 1}'] = 'pr'
ws[f'B{ws_len + 1}'] = 'url'
ws[f'C{ws_len + 1}'] = 'status'
ws[f'D{ws_len + 1}'] = 'pr-name'
ws[f'F{ws_len + 1}'] = 'issues'
ws[f'G{ws_len + 1}'] = 'url'
ws[f'H{ws_len + 1}'] = 'status'
ws[f'I{ws_len + 1}'] = 'issues-name'
ws_len += 2
pull_len = 0
issues_len = 0
src_openeuler_num = 0
openeuler_num = 0
for url_name_status in data[1]:
if len(re.findall('pulls',url_name_status[0])) != 0:
# 分类
if len(re.findall('https://gitee.com//src-openeuler', url_name_status[0])) != 0 or len(re.findall('https://gitee.com//openeuler',url_name_status[0])) != 0:
if len(re.findall('https://gitee.com//src-openeuler', url_name_status[0])) != 0:
ws[f'A{ws_len + pull_len}'] = 'src-openeuler'
ws[f'B{ws_len + pull_len}'] = url_name_status[0]
ws[f'C{ws_len + pull_len}'] = url_name_status[1]
ws[f'D{ws_len + pull_len}'] = url_name_status[2]
src_openeuler_num += 1
pull_len += 1
else:
ws[f'A{ws_len + pull_len}'] = 'openeuler'
ws[f'B{ws_len + pull_len}'] = url_name_status[0]
ws[f'C{ws_len + pull_len}'] = url_name_status[1]
ws[f'D{ws_len + pull_len}'] = url_name_status[2]
openeuler_num += 1
pull_len += 1
# 不分类
# ws[f'B{ws_len + pull_len}'] = url_name_status[0]
# ws[f'C{ws_len + pull_len}'] = url_name_status[1]
# ws[f'D{ws_len + pull_len}'] = url_name_status[2]
# pull_len += 1
else:
# 分类
if len(re.findall('https://gitee.com//src-openeuler', url_name_status[0])) != 0 or len(re.findall('https://gitee.com//openeuler', url_name_status[0])) != 0:
if len(re.findall('https://gitee.com//src-openeuler',url_name_status[0])) != 0:
ws[f'F{ws_len + issues_len}'] = 'src-openeuler'
ws[f'G{ws_len + issues_len}'] = url_name_status[0]
ws[f'H{ws_len + issues_len}'] = url_name_status[1]
ws[f'I{ws_len + issues_len}'] = url_name_status[2]
src_openeuler_num += 1
issues_len += 1
else:
ws[f'F{ws_len + issues_len}'] = 'openeuler'
ws[f'G{ws_len + issues_len}'] = url_name_status[0]
ws[f'H{ws_len + issues_len}'] = url_name_status[1]
ws[f'I{ws_len + issues_len}'] = url_name_status[2]
openeuler_num += 1
issues_len += 1
# 不分类
# ws[f'G{ws_len + issues_len}'] = url_name_status[0]
# ws[f'H{ws_len + issues_len}'] = url_name_status[1]
# ws[f'I{ws_len + issues_len}'] = url_name_status[2]
# issues_len += 1
if issues_len < pull_len:
max_len = pull_len
else:
max_len = issues_len
total = total + issues_len + pull_len
total_pr += pull_len
total_issues += issues_len
ws[f'A{ws_len + max_len}'] = 'pr-num'
ws[f'B{ws_len + max_len}'] = pull_len
ws[f'F{ws_len + max_len}'] = 'issues-num'
ws[f'G{ws_len + max_len}'] = issues_len
# 不分类
# ws[f'A{ws_len + max_len + 1}'] = '合计'
# ws[f'B{ws_len + max_len + 1}'] = pull_len + issues_len
# ws_len += (max_len + 3)
# 分类
ws[f'A{ws_len + max_len + 1}'] = 'openeuler-num'
ws[f'B{ws_len + max_len + 1}'] = openeuler_num
ws[f'F{ws_len + max_len + 1}'] = 'src-openeuler-num'
ws[f'G{ws_len + max_len + 1}'] = src_openeuler_num
ws[f'A{ws_len + max_len + 2}'] = '合计'
ws[f'B{ws_len + max_len + 2}'] = pull_len + issues_len
ws_len += (max_len + 4)
ws[f'A{ws_len}'] = 'pr总合计'
ws[f'B{ws_len}'] = total_pr
ws[f'F{ws_len}'] = 'issues总合计'
ws[f'G{ws_len}'] = total_issues
ws[f'A{ws_len + 1}'] = '所有人的贡献总数'
ws[f'B{ws_len + 1}'] = total
wb.save('贡献统计.xlsx')
def main():
all_data = []
error_user = []
users = [
('江新宇', 'jxy_git'),
('王冕', 'wm-wm-wm'),
('丁丽丽', 'blueskycs2c'),
('车梦月', 'chemengyue'),
('杜奕威', 'duyiwei7w'),
('吕晗', 'myshow2258'),
('王悦良', 'wangyueliang'),
]
num = len(users)//5
users1 = users[:num]
users2 = users[num:num*2]
users3 = users[num*2:num*3]
users4 = users[num*3:num*4]
users5 = users[num*4:]
thread1 = MyThread(users1)
thread2 = MyThread(users2)
thread3 = MyThread(users3)
thread4 = MyThread(users4)
thread5 = MyThread(users5)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
all_data += thread1.all_data
all_data += thread2.all_data
all_data += thread3.all_data
all_data += thread4.all_data
all_data += thread5.all_data
error_user += thread1.error_users
error_user += thread2.error_users
error_user += thread3.error_users
error_user += thread4.error_users
error_user += thread5.error_users
print('正在写入数据')
write_data(all_data) # 写入数据
if len(error_user) != 0:
print(error_user)
else:
print('无错误用户')
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print(f'运行了{end_time - start_time}秒')
总结
这里的多线程写法我还是请教了一位大佬才写出来的,本来我觉得这样写很麻烦,想看看有没有简单方便的方法。本来想使用for循环来创建多线程的,但是线程在for循环里会因为join函数的原因被阻塞,其实还是单线程。大佬和我说别管方不方便,实用最好,别那么花里胡哨的,我想了下也确实是这么个道理,所以这份代码就出炉喽!