土豪大哥们给我来电币吧::lol
菜鸟冲啊,话不多说,附上源代码
的有点快,这速度
[b]相关资源来自于互联网,如有侵权请告知我删除,同时源码仅供交流学习使用,严禁用作商业用途!
from bs4 import BeautifulSoup
import asyncio
import aiohttp
import time
import os
import re
import hashlib
path_img = '你的绝对路径'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36",
'Pragma': 'no-cache',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,ja;q=0.8,en;q=0.7',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
"Content-Type": "text/html;charset=UTF-8"}
async def fetch_content(url):
async with aiohttp.ClientSession(
headers=headers, connector=aiohttp.TCPConnector(ssl=False)
) as session:
async with session.get(url) as resp:
return await resp.text()
async def download_one(url):
max_retries = 5
attempt = 0
while True:
try:
async with aiohttp.ClientSession(
headers=headers, connector=aiohttp.TCPConnector(ssl=False)
) as session:
async with session.get(url) as resp:
pic = await resp.read()
await write_pic(url, pic)
print('Get res from', url, 'Result:', resp.status, 'ok!')
break
except (
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
asyncio.TimeoutError,
aiohttp.ClientPayloadError
):
if attempt < max_retries:
print("times:{}".format(attempt))
attempt += 1
else:
raise
async def write_pic(url, pic):
image_guid = hashlib.sha1(url.encode('utf-8')).hexdigest()
img_path_name = "{}/{}".format(path_img,
"{}{}".format(image_guid, os.path.splitext(url)[-1]))
with open(img_path_name, 'wb') as f:
f.write(pic)
async def main():
url = 'https://www.tuiimg.com/meinv/'
text = await fetch_content(url)
bs_1 = BeautifulSoup(text, 'lxml')
a_all = bs_1.find_all('a', {'class': 'pic'})
detail_urls = []
for a in a_all:
detail_urls.append(a['href'])
tasks = [fetch_content(detail_url) for detail_url in detail_urls]
pages = await asyncio.gather(*tasks)
img_urls = []
for page in pages:
bs_2 = BeautifulSoup(page, 'lxml')
div = bs_2.find("div", {'class': 'content'})
img_init_link = div.find("img")['src']
text = bs_2.find("i", id='allbtn').get_text()
pattern = re.compile("\((.*?)\)")
total = pattern.search(text).group(1).split("/")[1]
for i in range(1, int(total) + 1):
img_url = img_init_link[0:-5] + str(i) + '.jpg'
img_urls.append(img_url)
tasks_img = [download_one(img_url) for img_url in img_urls]
await asyncio.gather(*tasks_img)
start = time.time()
asyncio.run(main())
end = time.time()
print(end - start, 's')