1.豆瓣采集
1 #coding:utf-8
2 #采集豆瓣书信息和图片,写进数据库
3
4 from urllib import request
5 # from bs4 import BeautifulSoup
6 from lxml import etree
7 import json,pymysql
8
9 # from my_pymysql import pymysql
10
11 url="https://book.douban.com/tag/%E5%B0%8F%E8%AF%B4"
12 headers={
13 'Host':'book.douban.com',
14 'Upgrade-Insecure-Requests':'1',
15 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36'
16 }
17 req = request.Request(url=url,headers=headers,method="GET")
18 content = request.urlopen(req).read().decode("utf-8")
19 content_dict=etree.HTML(content) #格式化
20 # print(content_dict)
21 content_dict_allli = content_dict.xpath(r'//*[@id="subject_list"]/ul/li') #拿到列表
22 info_all = ''
23
24 for li in content_dict_allli:
25 # 书名/标题
26 title_list = li.xpath(r'div[2]/h2/a/@title') #取标签里的内容,注意地址是相对地址,不能直接拿来用 (注:和bs4不一样)
27 title =title_list[0]
28 title=title.replace(" ",'')
29 print(title)
30 #信息 作者、出版社
31 info_list = li.xpath(r'div[2]/div[1]/text()')
32 author = info_list[0].split('/')[0]
33 author = author.replace('\n','').replace(" ",'')
34 chubanshe = info_list[0].split('/')[1]
35 print(author)
36 print(chubanshe)
37 #评分
38 pingfen_list = li.xpath(r'div[2]/div[2]/span[2]/text()')
39 pingfen = pingfen_list[0]
40 print(pingfen)
41
42 #图片
43 img_net_addr =li.xpath(r'div[1]/a/img/@src')
44 img_net_addr = img_net_addr[0]
45 print(img_net_addr)
46 data = request.urlopen(img_net_addr).read()
47 img_name =str('douban/') + title + str('.jpg')
48 with open(img_name,'wb')as f:
49 f.write(data)
50
51 #数据库
52 db = pymysql.connect(host='localhost',port=3306,user="root",password='root',db='douban',charset='utf8') #
53 cur=db.cursor()
54 sql = "insert into douban(title,author,chubanshe,pingfen)values('%s','%s','%s','%s')"%(title,author,chubanshe,pingfen)
55 cur.execute(sql)
56 db.commit()
57
58 db.close()
采集豆瓣书信息和图片;带请求头、存数据库、图片;写进数据库
2.链家
#coding:utf-8
#完成,,取出链家数据存到文件里
from urllib import request,error
from bs4 import BeautifulSoup
import pymysql
# from my_pymysql import pymysql #引入数据库
#创建数据库
db = pymysql.connect(host='localhost',user='root',password='root',db='lianjia',charset='utf8')
cur = db.cursor() #实例化游标
for i in range(1,33):
req=request.urlopen('https://xa.lianjia.com/ershoufang/pg'+str(i)).read().decode('utf-8')
req_bs4 = BeautifulSoup(req,'html.parser') #建立对象,才能用bs4
body_ul=req_bs4.find('ul',class_="sellListContent")
try:
s=''
for li in body_ul:
# info_all = li.find('div',class_="info clear").get_text() #全部信息
tit = li.find('div',class_="title").get_text() #标题
addr = li.find('div',class_="houseInfo").get_text() #地址
pric = li.find('div',class_="totalPrice").get_text() #价格
s+=tit
s+=addr
s+=pric
s+='\n\n'
print(i) #提示采集的位置
# 采集图片开始++++++++++++++++++++++++++++++++++++++++++++
img = li.find("img", class_='lj-lazy')['data-original'] #图片地址
img_format = img.split('.')[-1] # 用点隔开,取图片的后缀
img_name = 'lianjia/images/' + li.find("img", class_='lj-lazy')['alt'] + '.' + img_format # 名字
adr = request.urlopen(img).read() # 读取图片地址,拿到字节流形式的图片,,写进去
try: #;空的话就跳过
with open(img_name, 'wb')as f:
f.write(adr)
except:
pass
# 采集图片完毕----------------------------
#存到数据库
sql = "insert into lianjia_hotel(title,address) values ('%s','%s')"%(tit,addr)
cur.execute(sql)
db.commit()
except:
print("本页完毕~")
#最后再关闭数据库
db.close()
#写到一个txt文件里面
# with open('lianjia/lianjia.txt','w',encoding="utf-8")as f:
# f.write(s)
链家下载,文字与图片,用bs4解析
3.今日头条
from selenium import webdriver
from lxml import etree
from pyquery import PyQuery as pq
import time
driver = webdriver.Chrome()
driver.maximize_window()
driver.get('https://www.toutiao.com/')
driver.implicitly_wait(10)
driver.find_element_by_link_text('科技').click()
driver.implicitly_wait(10)
for x in range(3):
js="var q=document.documentElement.scrollTop="+str(x*500)
driver.execute_script(js)
time.sleep(2)
time.sleep(5)
page = driver.page_source
doc = pq(page)
doc = etree.HTML(str(doc))
contents = doc.xpath('//div[@class="wcommonFeed"]/ul/li')
print(contents)
for x in contents:
title = x.xpath('div/div[1]/div/div[1]/a/text()')
if title:
title = title[0]
with open('toutiao.txt','a+',encoding='utf8')as f:
f.write(title+'\n')
print(title)
else:
pass
今日头条,selenium控制翻页
4.微信群信息(包括成员)和联系人
# -*- coding:utf-8 -*-
'''
扫码登陆微信后获取该微信账号的微信群(包括群内人员)和通讯录联系人信息【注:好像不全】
'''
import os
import re
import time
import sys
import subprocess
import requests
import xml.dom.minidom
import json
# 微信登陆
class WebwxLogin(object):
def __init__(self):
self.session = requests.session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 5.1; rv:33.0) Gecko/20100101 Firefox/33.0'}
self.QRImgPath = os.path.split(os.path.realpath(__file__))[0] + os.sep + 'webWeixinQr.jpg'
self.uuid = ''
self.tip = 0
self.base_uri = ''
self.redirect_uri = ''
self.skey = ''
self.wxsid = ''
self.wxuin = ''
self.pass_ticket = ''
self.deviceId = 'e000000000000000'
self.BaseRequest = {}
self.ContactList = []
self.My = []
self.SyncKey = ''
def getUUID(self):
url = 'https://login.weixin.qq.com/jslogin'
params = {
'appid': 'wx782c26e4c19acffb',
'redirect_uri': 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxnewloginpage',
'fun': 'new',
'lang': 'zh_CN',
'_': int(time.time() * 1000), # 时间戳
}
response = self.session.get(url, params=params)
target = response.content.decode('utf-8')
pattern = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'
ob = re.search(pattern, target) # 正则提取uuid
code = ob.group(1)
self.uuid = ob.group(2)
if code == '200': # 判断请求是否成功
return True
return False
def showQRImage(self):
url = 'https://login.weixin.qq.com/qrcode/' + self.uuid
response = self.session.get(url)
self.tip = 1
with open(self.QRImgPath, 'wb') as f:
f.write(response.content)
f.close()
# 打开二维码
if sys.platform.find('darwin') >= 0:
subprocess.call(['open', self.QRImgPath]) # 苹果系统
elif sys.platform.find('linux') >= 0:
subprocess.call(['xdg-open', self.QRImgPath]) # linux系统
else:
os.startfile(self.QRImgPath) # windows系统
print('请使用微信扫描二维码登录')
def checkLogin(self):
url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' % (
self.tip, self.uuid, int(time.time() * 1000))
response = self.session.get(url)
target = response.content.decode('utf-8')
pattern = r'window.code=(\d+);'
ob = re.search(pattern, target)
code = ob.group(1)
if code == '201': # 已扫描
print('成功扫描,请在手机上点击确认登录')
self.tip = 0
elif code == '200': # 已登录
print('正在登录中...')
regx = r'window.redirect_uri="(\S+?)";'
ob = re.search(regx, target)
self.redirect_uri = ob.group(1) + '&fun=new'
self.base_uri = self.redirect_uri[:self.redirect_uri.rfind('/')]
elif code == '408': # 超时
pass
return code
def login(self):
response = self.session.get(self.redirect_uri, verify=False)
data = response.content.decode('utf-8')
doc = xml.dom.minidom.parseString(data)
root = doc.documentElement
# 提取响应中的参数
for node in root.childNodes:
if node.nodeName == 'skey':
self.skey = node.childNodes[0].data
elif node.nodeName == 'wxsid':
self.wxsid = node.childNodes[0].data
elif node.nodeName == 'wxuin':
self.wxuin = node.childNodes[0].data
elif node.nodeName == 'pass_ticket':
self.pass_ticket = node.childNodes[0].data
if not all((self.skey, self.wxsid, self.wxuin, self.pass_ticket)):
return False
self.BaseRequest = {
'Uin': int(self.wxuin),
'Sid': self.wxsid,
'Skey': self.skey,
'DeviceID': self.deviceId,
}
return True
def webwxinit(self):
url = self.base_uri + \
'/webwxinit?pass_ticket=%s&skey=%s&r=%s' % (
self.pass_ticket, self.skey, int(time.time() * 1000))
params = {
'BaseRequest': self.BaseRequest
}
h = self.headers
h['ContentType'] = 'application/json; charset=UTF-8'
response = self.session.post(url, data=json.dumps(params), headers=h, verify=False)
data = response.content.decode('utf-8')
print(data)
dic = json.loads(data)
self.ContactList = dic['ContactList']
self.My = dic['User']
SyncKeyList = []
for item in dic['SyncKey']['List']:
SyncKeyList.append('%s_%s' % (item['Key'], item['Val']))
self.SyncKey = '|'.join(SyncKeyList)
ErrMsg = dic['BaseResponse']['ErrMsg']
Ret = dic['BaseResponse']['Ret']
if Ret != 0:
return False
return True
def webwxgetcontact(self):
url = self.base_uri + \
'/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (
self.pass_ticket, self.skey, int(time.time()))
h = self.headers
h['ContentType'] = 'application/json; charset=UTF-8'
response = self.session.get(url, headers=h, verify=False)
data = response.content.decode('utf-8')
# print(data)
dic = json.loads(data)
MemberList = dic['MemberList']
# 倒序遍历,不然删除的时候出问题..
SpecialUsers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage", "qqsync",
"floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp", "blogapp",
"facebookapp", "masssendapp",
"meishiapp", "feedsapp", "voip", "blogappweixin", "weixin", "brandsessionholder",
"weixinreminder", "wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts",
"notification_messages", "wxitil", "userexperience_alarm"]
for i in range(len(MemberList) - 1, -1, -1):
Member = MemberList[i]
if Member['VerifyFlag'] & 8 != 0: # 公众号/服务号
MemberList.remove(Member)
elif Member['UserName'] in SpecialUsers: # 特殊账号
MemberList.remove(Member)
elif Member['UserName'].find('@@') != -1: # 群聊
MemberList.remove(Member)
elif Member['UserName'] == self.My['UserName']: # 自己
MemberList.remove(Member)
return MemberList
def main(self):
if not self.getUUID():
print('获取uuid失败')
return
self.showQRImage()
time.sleep(1)
while self.checkLogin() != '200':
pass
os.remove(self.QRImgPath)
if not self.login():
print('登录失败')
return
# 登录完成, 下面查询好友
if not self.webwxinit():
print('初始化失败')
return
MemberList = self.webwxgetcontact()
print('通讯录共%s位好友' % len(MemberList))
for x in MemberList:
sex = '未知' if x['Sex'] == 0 else '男' if x['Sex'] == 1 else '女'
print('昵称:%s, 性别:%s, 备注:%s, 签名:%s' % (x['NickName'], sex, x['RemarkName'], x['Signature']))
if __name__ == '__main__':
print('开始')
wx = WebwxLogin()
wx.main()
爬取微信群信息(包括成员)和联系人信息
5.爬取淘宝固定类别商品信息+保存到mysql数据库【格式很规范】
import requests
import re
import pymysql
def getHTMLtext(url):
try:
r=requests.get(url,timeout=100)
r.raise_for_status()
r.encoding=r.apparent_encoding
return r.text
except:
return ""
def getpage(itl,html):
try:
plt=re.findall(r'"view_price":"[\d.]*"',html)
nlt=re.findall(r'"raw_title":".*?"',html)
for i in range(len(plt)):
price = eval(plt[i].split(':')[1]) # eval(fun,obj)
title = eval(nlt[i].split(':')[1])
itl.append([price, title])
except:
print("")
def printgoods(itl):
tplt = "{:2}\t{:8}\t{:16}"
print(tplt.format("序号", "价格", "商品名称"))
count = 0
conn = pymysql.connect(host='127.0.0.1', user='root', password='123456', db='company',charset="utf8")
cur = conn.cursor()
sqlc = '''
create table coffee(
id int(11) not null auto_increment primary key,
name varchar(255) not null,
price float not null)DEFAULT CHARSET=utf8;
'''
try:
A = cur.execute(sqlc)
conn.commit()
print('成功')
except:
print("错误")
for g in itl:
count = count + 1
b=tplt.format(count, g[0], g[1])
sqla = '''
insert into coffee(name,price)
values(%s,%s);
'''
try:
B = cur.execute(sqla,(g[1],g[0]))
conn.commit()
print('成功')
except:
print("错误")
# save_path = 'D:/taobao.txt'
# f=open(save_path,'a')
#
# f.write(b+'\n')
# f.close()
conn.commit()
cur.close()
conn.close()
def main():
goods="咖啡"
depth =2
start_url='https://s.taobao.com/search?q='+goods
List =[]
for i in range(depth):
try:
url =start_url +"&s="+ str(i*44)
html=getHTMLtext(url)
getpage(List,html)
except:
continue
print(printgoods(List))
# savefiles(data)
main()
淘宝信息采集+保存到Mysql数据库