0%

'爬虫练习脚本'

西拉代理爬虫,判断可用性,入库

加了多线程,不过这站好像有点问题,正在写其它站的爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# -*-coding:utf-8-*-
import requests
from bs4 import BeautifulSoup
import requests
import threading
import pymysql
from socket import timeout
from urllib.error import URLError
from urllib.request import ProxyHandler, build_opener
from fake_useragent import UserAgent
import time
#Author:sunian

port_list = []
OK_list = []
def get_data():
for x in range(1,200):
ua = UserAgent()
headers = {'User-Agent': ua.random}
url = "http://www.xiladaili.com/gaoni/{0}/".format(x)
try:
r = requests.get(url, headers=headers).text
soup = BeautifulSoup(r, "html.parser")
ips = soup.findAll('tr')
num = 0
for x in range(1, len(ips)):
ip = ips[x]
tds = ip.findAll("td")
ip_temp = tds[0].contents[0] # IP地址
port_list.append(ip_temp)
except (URLError,URLError) as error:
print(error)
except:
pass

def validateIp(http_proxy):
proxy = http_proxy
proxy_handler = ProxyHandler({
'http': 'http://' + proxy,
'https': 'https://' + proxy
})
opener = build_opener(proxy_handler)
try:
response = opener.open('https://tbip.alicdn.com/api/getipinfo?callback=taobao_callback', timeout=3)
if 'taobao_callback' in (response.read().decode('utf-8')):
response.close()
OK_list.append(http_proxy)
#若多线程抢占数据库资源,百度死锁可解决
WriteMysql(proxy)
else:
return False
except (URLError,URLError) as error:
print(error.reason)
except timeout:
print('超时',proxy)
return False

def WriteMysql(ip_temp):
db = pymysql.connect(db='xiciip', host='localhost', port=3306, user='root', passwd='root',charset='utf8')
cursor = db.cursor()
sql = "insert into `xiciip` VALUE('%s')"%(ip_temp)
cursor.execute(sql)
db.commit()

def run(thread_function, *args):
threads = []
for par in args[0]:
thread = threading.Thread(target=thread_function,args=(par,))
threads.append(thread)
for i in threads:
i.start()
for t in threads:
t.join()


def main():
get_data()
thread_num = 50 #这里设置线程
for nu in range(0, len(port_list), thread_num+1):
start = nu
end = nu + thread_num
run(validateIp, port_list[start:end])

if __name__ == '__main__':
print('扫描开始')
start = time.time()
main()
end = time.time()
print('扫描完毕,用时:',end-start)

for i in OK_list:
print(i)

妹子图爬虫

5分钟爬取2500页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import requests
from fake_useragent import UserAgent
import re
from multiprocessing.pool import ThreadPool

ua = UserAgent()
pool = ThreadPool(100)

def get_tu(num):
for x in range(1, num):
headers = {'User-Agent': ua.random,
"Referer": "https://www.mzitu.com/",}
url = "https://www.mzitu.com/tag/beautyleg/page/{0}/".format(x)
try:
r = requests.get(url, headers=headers)
srcs = re.findall('li>.*?<a href="(.*?)"',r.text,re.S)[1:]
for src_url in srcs:
for x in range(1, 100):
print(src_url)
pool.apply_async(get_pian, args=(src_url, x))
except Exception as e:
print(e)

def get_pian(src_url,num):
headers = {'User-Agent': ua.random,
"Referer": src_url, }
try:
url = src_url+"/{0}/".format(num)
r = requests.get(url, headers=headers)
if "404 - 妹子图" in r.text:
pass
imgs = re.findall('<img src="(.*?)" ', r.text, re.S)
for img in imgs:
jieguo = requests.get(img,headers=headers).content
filepath = 'D:/meizi/%s' % (img.rsplit('/',maxsplit=1)[1])
print(filepath)
with open(filepath,'wb') as f:
f.write(jieguo)
except:
pass

if __name__ == '__main__':
get_tu(2)
pool.close()
pool.join()