爬虫
入口函数(爬虫调度段)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import time, datetime
from maya_Spider import url_manager, html_downloader, html_parser, html_outputer
class Spider_Main(object): def __init__(self): self.urls = url_manager.UrlManager() self.downloader = html_downloader.HtmlDownloader() self.parser = html_parser.HtmlParser() self.outputer = html_outputer.HtmlOutputer()
def craw(self, root_url): count = 1 self.urls.add_new_url(root_url) while self.urls.has_new_url(): try: new_url = self.urls.get_new_url() print('craw %d : %s' % (count, new_url)) html_content = self.downloader.download(new_url) new_urls, new_data = self.parser.parse(new_url, html_content) self.urls.add_new_urls(new_urls) self.outputer.collect_data(new_data)
if count == 10: break
count = count + 1 except: print('craw failed')
self.outputer.output_html()
if __name__ == '__main__': root_url = 'http://baike.baidu.com/view/21087.htm' print('开始计时..............') start_time = datetime.datetime.now() obj_spider = Spider_Main() obj_spider.craw(root_url) end_time = datetime.datetime.now() print('总用时:%ds'% (end_time - start_time).seconds)
|
URL管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class UrlManager(object): def __init__(self): self.new_urls = set() self.old_urls = set()
def add_new_url(self, url): if url is None: return if url not in self.new_urls and url not in self.old_urls: self.new_urls.add(url)
def add_new_urls(self, urls): if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url)
def has_new_url(self): return len(self.new_urls) != 0
def get_new_url(self): new_url = self.new_urls.pop() self.old_urls.add(new_url) return new_url
|
网页下载器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import urllib import urllib.request
class HtmlDownloader(object):
def download(self, url): if url is None: return None
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)' headers = {'User-Agent':user_agent} req = urllib.request.Request(url,headers=headers) response = urllib.request.urlopen(req) return response.read().decode()
|
网页解析器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import re import urllib from urllib.parse import urlparse
from bs4 import BeautifulSoup
class HtmlParser(object):
def _get_new_urls(self, page_url, soup): new_urls = set() links = soup.find_all('a', href=re.compile(r'/item/.*?')) for link in links: new_url = link['href'] new_full_url = urllib.parse.urljoin(page_url, new_url) new_urls.add(new_full_url) return new_urls
def _get_new_data(self, page_url, soup): res_data = {} res_data['url'] = page_url title_node = soup.find('dd', class_="lemmaWgt-lemmaTitle-title").find('h1') print(str(title_node.get_text())) res_data['title'] = str(title_node.get_text()) summary_node = soup.find('div', class_="lemma-summary") res_data['summary'] = summary_node.get_text()
return res_data
def parse(self, page_url, html_content): if page_url is None or html_content is None: return None
soup = BeautifulSoup(html_content, 'html.parser', from_encoding='utf-8') new_urls = self._get_new_urls(page_url, soup) new_data = self._get_new_data(page_url, soup) return new_urls, new_data
|
网页输出器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class HtmlOutputer(object):
def __init__(self): self.datas = []
def collect_data(self, data): if data is None: return self.datas.append(data )
def output_html(self): fout = open('maya.html', 'w', encoding='utf-8') fout.write("<head><meta http-equiv='content-type' content='text/html;charset=utf-8'></head>") fout.write('<html>') fout.write('<body>') fout.write('<table border="1">') fout.write('''<tr style="color:red" width="90%"> <th>Theme</th> <th width="80%">Content</th> </tr>''') for data in self.datas: fout.write('<tr>\n') fout.write('\t<td align="center"><a href=\'%s\'>%s</td>' % (data['url'], data['title'])) fout.write('\t<td>%s</td>\n' % data['summary']) fout.write('</tr>\n') fout.write('</table>') fout.write('</body>') fout.write('</html>') fout.close()
|
多线程
实现可设置持续运行时间、线程数及时间间隔的多线程异步post请求功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| ''' random.randint(a, b):用于生成一个指定范围内的整数。 其中参数a是下限,参数b是上限,生成的随机数n: a <= n <= b random.choice(sequence):从序列中获取一个随机元素 参数sequence表示一个有序类型(列表,元组,字符串) ''' import httplib,json import time import threading from random import randint,choice
def postRequest(threadNum): postJson={ } postData=json.dumps(postJson) headerdata = { "content-type":"application/json", } requrl ="/v1/query" hostServer="" conn = httplib.HTTPConnection(hostServer) conn.request(method="POST",url=requrl,body=postData,headers=headerdata) response=conn.getresponse() if response.status in range(200,300): print u"线程"+str(threadNum)+u"状态码:"+str(response.status) conn.close() def run(threadNum,internTime,duration): threads=[] try: for i in range(1,threadNum): t=threading.Thread(target=postRequest,args=(i,)) threads.append(t) except Exception,e: print e try: for thread in threads: thread.setDaemon(True) thread.start() time.sleep(internTime) for thread in threads: thread.join(duration) except Exception,e: print e if __name__ == '__main__': startime=time.strftime("%Y%m%d%H%M%S") now=time.strftime("%Y%m%d%H%M%S") duratiion=raw_input(u"输入持续运行时间:") while (startime+str(duratiion))!=now: run(10,1,int(duratiion)) now=time.strftime("%Y%m%d%H%M%S")
|
爬虫数据处理Json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import urllib.request from bs4 import BeautifulSoup import os import time import codecs import json
def getDatas(): header={'User-Agent':"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11"} url="file:///E:/scrapy/2018-04-27/movie/movie.html" ret=urllib.request.Request(url=url,headers=header) res=urllib.request.urlopen(ret) response=BeautifulSoup(res,'html.parser') datas=response.find_all('div',{'class':'item'}) folder_name="output" if not os.path.exists(folder_name): os.mkdir(folder_name) current_time=time.strftime('%Y-%m-%d',time.localtime()) file_name="move"+current_time+".json" file_path=folder_name+"/"+file_name for item in datas: dict1={} dict1['rank']=item.find('div',{'class':'pic'}).find('em').get_text() dict1['title']=item.find('div',{'class':'info'}).find('div',{'class':'hd'}).find('a').find('span',{'class':'title'}).get_text() dict1['picUrl']=item.find('div',{'class':'pic'}).find('a').find('img').get('src') try: with codecs.open(file_path,'a',encoding="utf-8") as fp: fp.write(json.dumps(dict1,ensure_ascii=False)+",\n") except IOError as err: print('error'+str(err)) finally: fp.close() pass getDatas()
|
Selenium自动化测试工具使用方法汇总
1、设置无头浏览器模式
1 2 3 4 5 6 7
| from selenium import webdriver from selenium.webdriver.chrome.options import Options chrome_options = Options() chrome_options.add_argument('- -headless') chrome_options.add_argument('- -disable-gpu') class XX(object): self.driver = webdriver.Chrome(chrome_options=chrome_options)
|
2、设置屏幕尺寸
1 2 3
| self.driver.maximize_window() self.driver.get_window_size() self.driver.set_window_size(1296, 1000)
|
3、通过xpath获取元素并单击
1 2
| element = self.driver.find_elements_by_xpath('//*[@id="disabled"]/li[3]/a') element [0].click()
|
4、关闭单个窗口
5、退出driver程序,关闭浏览器
6、获取input标签 输入文字 执行回车
1 2 3 4 5
| from selenium.webdriver.common.keys import Keys \ element = self.driver.find_elements_by_xpath(input_enter_xpath)[0] element.send_keys('spupa01bat04') element.send_keys(Keys.ENTER)
|
7、执行双击
1 2 3 4 5 6 7 8 9 10
| from selenium import webdriver from selenium.webdriver import ActionChains self.driver = webdriver.Chrome() self.acobj = ActionChains(self.driver) checkName = '风险' \ Xpath = "//span[text()='" + checkName + "']/../../..//span[text()='FSCapacity']" el_01 = self.driver.find_elements_by_xpath(Xpath)[0] self.acobj.double_click(el_01).perform() self.driver.implicitly_wait(20)
|
8、执行等待
1 2 3 4 5 6 7 8 9 10
| self.driver.implicitly_wait(10)
from selenium.webdriver.support.wait import WebDriverWait
text = WebDriverWait(driver,30,0.2).until(lambda x:x.find_element_by_css_selector(".tt")).text print(text)
from time import sleep sleep(5)
|
9、浏览器全屏截图保存到指定路径
1 2
| png01_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'resource', 'png_codes.png') self.driver.save_screenshot(png01_path)
|
10、select下拉框的选值处理
1 2 3 4 5 6 7
| from selenium.webdriver.support.select import Select
select_element = self.driver.find_elements_by_xpath(select_xpath)[0]
sel_obj = Select(select_element)
sel_obj.select_by_value('168h')
|
11、浏览器位置
1 2 3 4 5
| position = driver.get_window_position() print(position)
driver.set_window_position(100, 100)
|
正则表达式
正则表达式的语法规则
函数API列表
1 2 3 4 5 6 7
| match(string[, pos[, endpos]]) | re.match(pattern, string[, flags]) search(string[, pos[, endpos]]) | re.search(pattern, string[, flags]) split(string[, maxsplit]) | re.split(pattern, string[, maxsplit]) findall(string[, pos[, endpos]]) | re.findall(pattern, string[, flags]) finditer(string[, pos[, endpos]]) | re.finditer(pattern, string[, flags]) sub(repl, string[, count]) | re.sub(pattern, repl, string[, count]) subn(repl, string[, count]) |re.sub(pattern, repl, string[, count])
|
正则表达式相关注解
(1)数量词的贪婪模式与非贪婪模式
正则表达式通常用于在文本中查找匹配的字符串。Python里数量词默认是贪婪的(在少数语言里也可能是默认非贪婪),总是尝试匹配尽可能多的字 符;非贪婪的则相反,总是尝试匹配尽可能少的字符。例如:正则表达式”ab*”
如果用于查找”abbbc”
,将找到”abbb”
。而如果使用非贪婪的数量 词”ab*?”
,将找到”a”
。
注:我们一般使用非贪婪模式来提取。
(2)反斜杠问题
与大多数编程语言相 同,正则表达式里使用”\”
作为转义字符,这就可能造成反斜杠困扰。假如需要匹配文本中的字符”\”
,那么使用编程语言表示的正则表达式里将需要4个反 斜杠”\\\\”
:前两个和后两个分别用于在编程语言里转义成反斜杠,转换成两个反斜杠后再在正则表达式里转义成一个反斜杠。
Python里的原生字符串很好地解决了这个问题,这个例子中的正则表达式可以使用r”\\”
表示。同样,匹配一个数字的”\\d”
可以写成r”\d”
。
4.Python Re模块
Python 自带了re模块,它提供了对正则表达式的支持。主要用到的方法列举如下
1 2 3 4 5 6 7 8 9 10 11
| re.compile(string[,flag])
re.match(pattern, string[, flags]) re.search(pattern, string[, flags]) re.split(pattern, string[, maxsplit]) re.findall(pattern, string[, flags]) re.finditer(pattern, string[, flags]) re.sub(pattern, repl, string[, count]) re.subn(pattern, repl, string[, count])
|
pattern的概念
pattern
可以理解为一个匹配模式,获得这个匹配模式,需要利用re.compile
方法就可以。例如
1
| pattern = re.compile(r'hello')
|
在参数中我们传入了原生字符串对象,通过compile
方法编译生成一个pattern
对象,然后我们利用这个对象来进行进一步的匹配。
另外大家可能注意到了另一个参数 flags
,在这里解释一下这个参数的含义:
参数flag是匹配模式,取值可以使用按位或运算符’|’表示同时生效,比如re.I | re.M。
可选值有:
? re.I
(全拼:IGNORECASE): 忽略大小写(括号内是完整写法,下同)
? re.M
(全拼:MULTILINE): 多行模式,改变'^'
和'$'
的行为(参见上图)
? re.S
(全拼:DOTALL): 点任意匹配模式,改变’.’的行为
? re.L
(全拼:LOCALE): 使预定字符类 \w \W \b \B \s \S
取决于当前区域设定
? re.U
(全拼:UNICODE): 使预定字符类 \w \W \b \B \s \S \d \D
取决于unicode
定义的字符属性
? re.X
(全拼:VERBOSE): 详细模式。这个模式下正则表达式可以是多行,忽略空白字符,并可以加入注释。
在刚才所说的另外几个方法例如 re.match
里我们就需要用到这个pattern
了,下面我们一一介绍。
注:以下七个方法中的flags
同样是代表匹配模式的意思,如果在pattern
生成时已经指明了flags,那么在下面的方法中就不需要传入这个参数了。
(1)re.match(pattern, string[, flags])
这个方法将会从string(我们要匹配的字符串)的开头开始,尝试匹配pattern,一直向后匹配,如果遇到无法匹配的字符,立即返回 None,如果匹配未结束已经到达string的末尾,也会返回None。两个结果均表示匹配失败,否则匹配pattern成功,同时匹配终止,不再对 string向后匹配。下面我们通过一个例子理解一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| __author__ = 'L0ki'
import re
pattern = re.compile(r'hello')
result1 = re.match(pattern,'hello') result2 = re.match(pattern,'helloo L0ki!') result3 = re.match(pattern,'helo L0ki!') result4 = re.match(pattern,'hello L0ki!')
if result1: print result1.group() else: print '1匹配失败!'
if result2: print result2.group() else: print '2匹配失败!'
if result3: print result3.group() else: print '3匹配失败!'
if result4: print result4.group() else: print '4匹配失败!'
|
运行结果
1 2 3 4 5
| hello``hello``3``匹配失败!``hellohello hello 3匹配失败! hello
|
匹配分析
1.第一个匹配,pattern正则表达式为’hello’,我们匹配的目标字符串string也为hello,从头至尾完全匹配,匹配成功。
2.第二个匹配,string为helloo L0ki,从string头开始匹配pattern完全可以匹配,pattern匹配结束,同时匹配终止,后面的o L0ki不再匹配,返回匹配成功的信息。
3.第三个匹配,string为helo L0ki,从string头开始匹配pattern,发现到 ‘o’ 时无法完成匹配,匹配终止,返回None
4.第四个匹配,同第二个匹配原理,即使遇到了空格符也不会受影响。
我们还看到最后打印出了result.group(),这个是什么意思呢?下面我们说一下关于match对象的的属性和方法
Match对象是一次匹配的结果,包含了很多关于此次匹配的信息,可以使用Match提供的可读属性或方法来获取这些信息。
属性:
1.string: 匹配时使用的文本。
2.re: 匹配时使用的Pattern对象。
3.pos: 文本中正则表达式开始搜索的索引。值与Pattern.match()
和Pattern.seach()
方法的同名参数相同。
4.endpos: 文本中正则表达式结束搜索的索引。值与Pattern.match()
和Pattern.seach()
方法的同名参数相同。
5.lastindex: 最后一个被捕获的分组在文本中的索引。如果没有被捕获的分组,将为None。
6.lastgroup: 最后一个被捕获的分组的别名。如果这个分组没有别名或者没有被捕获的分组,将为None。
方法:
1.group([group1, …]):
获得一个或多个分组截获的字符串;指定多个参数时将以元组形式返回。group1可以使用编号也可以使用别名;编号0代表整个匹配的子串;不填写参数时,返回group(0);没有截获字符串的组返回None;截获了多次的组返回最后一次截获的子串。
2.groups([default]):
以元组形式返回全部分组截获的字符串。相当于调用group(1,2,…last)。default表示没有截获字符串的组以这个值替代,默认为None。
3.groupdict([default]):
返回以有别名的组的别名为键、以该组截获的子串为值的字典,没有别名的组不包含在内。default含义同上。
4.start([group]):
返回指定的组截获的子串在string中的起始索引(子串第一个字符的索引)。group默认值为0。
5.end([group]):
返回指定的组截获的子串在string中的结束索引(子串最后一个字符的索引+1)。group默认值为0。
6.span([group]):
返回(start(group), end(group))。
7.expand(template):
将匹配到的分组代入template中然后返回。template中可以使用\id
或\g
、\g
引用分组,但不能使用编号0。\id
与\g
是等价的;但\10
将被认为是第10个分组,如果你想表达\1
之后是字符'0'
,只能使用\g0
。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
import re
m = re.match(r'(\w+) (\w+)(?P.*)', 'hello world!') print "m.string:", m.string print "m.re:", m.re print "m.pos:", m.pos print "m.endpos:", m.endpos print "m.lastindex:", m.lastindex print "m.lastgroup:", m.lastgroup print "m.group():", m.group() print "m.group(1,2):", m.group(1, 2) print "m.groups():", m.groups() print "m.groupdict():", m.groupdict() print "m.start(2):", m.start(2) print "m.end(2):", m.end(2) print "m.span(2):", m.span(2) print r"m.expand(r'\g \g\g'):", m.expand(r'\2 \1\3')
|
(2)re.search(pattern, string[, flags])
search方法与match方法极其类似,区别在于match()函数只检测re是不是在string的开始位置匹配,search()会扫描整个string查找匹配,match()只有在0位置匹配成功的话才有返回,如果不是开始位置匹配成功的话,match()就返回None。同样,search方法的返回对象同样match()返回对象的方法和属性。我们用一个例子感受一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import re
pattern = re.compile(r'world')
match = re.search(pattern,'hello world!') if match: print match.group()
|
(3)re.split(pattern, string[, maxsplit])
按照能够匹配的子串将string分割后返回列表。maxsplit用于指定最大分割次数,不指定将全部分割。我们通过下面的例子感受一下。
1 2 3 4 5 6 7 8
| import re pattern = re.compile(r'\d+') print re.split(pattern,'one1two2three3four4')
|
**(4)re.findall(pattern, string[, flags])
**
1 2 3 4 5 6 7 8 9 10 11
| 搜索string,以列表形式返回全部能匹配的子串。我们通过这个例子来感受一下 import re pattern = re.compile(r'\d+') print re.findall(pattern,'one1two2three3four4')
|
(5)re.finditer(pattern, string[, flags])
搜索string,返回一个顺序访问每一个匹配结果(Match对象)的迭代器。我们通过下面的例子来感受一下
1 2 3 4 5 6 7 8 9
| import re pattern = re.compile(r'\d+') for m in re.finditer(pattern,'one1two2three3four4'): print m.group(),
|
(6)re.sub(pattern, repl, string[, count])
使用repl替换string中每一个匹配的子串后返回替换后的字符串。
当repl是一个字符串时,可以使用\id或\g、\g引用分组,但不能使用编号0。
当repl是一个方法时,这个方法应当只接受一个参数(Match对象),并返回一个字符串用于替换(返回的字符串中不能再引用分组)。
count用于指定最多替换次数,不指定时全部替换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import re pattern = re.compile(r'(\w+) (\w+)') s = 'i say, hello world!' print re.sub(pattern,r'\2 \1', s) def func(m): return m.group(1).title() + ' ' + m.group(2).title() print re.sub(pattern,func, s)
|
(7)re.subn(pattern, repl, string[, count])
返回 (sub(repl, string[, count]), 替换次数)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import re pattern = re.compile(r'(\w+) (\w+)') s = 'i say, hello world!' print re.subn(pattern,r'\2 \1', s) def func(m): return m.group(1).title() + ' ' + m.group(2).title() print re.subn(pattern,func, s)
|
socket
简单示例
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import socket import threading server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(('0.0.0.0', 8000)) server.listen()
def handle_sock(sock, addr): while True: data = sock.recv(1024) print(data.decode("utf8")) re_data = input() sock.send(re_data.encode("utf8"))
while True: sock, addr = server.accept() '''用线程去处理新接收的连接(用户)''' client_thread = threading.Thread(target=handle_sock, args=(sock, addr)) client_thread.start()
|
客户端
1 2 3 4 5 6 7 8 9
| import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('127.0.0.1', 8000)) while True: re_data = input() client.send(re_data.encode("utf8")) data = client.recv(1024) print(data.decode("utf8"))
|
实用示例
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
import socket from socket import * import os
HOST = '192.168.1.88' PORT = 55555
BUFFERSIZE = 1024
s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((HOST, PORT))
logfile = 'dialog.log'
def server_print(*msg): print("SERVER: ", msg)
def clean_up(): if os.path.exists(logfile): os.remove(logfile)
clean_up()
while True: print("=== THE SERVER START WORKING ===") data, addr = s.recvfrom(BUFFERSIZE) data = data.decode(encoding='utf-8') server_print("recived message from client(%s): %s" % (addr, data)) with open(logfile, 'a') as f: log = "client{0} \t message: {1}\n".format(addr, data) f.write(log)
s.sendto("SERVER ROGER THAT ".encode(encoding='utf-8'), addr)
if not s._closed(): s.close()
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
import sys import socket from socket import *
HOST = '192.168.1.88' PORT = 55555 addr = (HOST, PORT)
BUFFERSIZE = 1024
s = socket(AF_INET, SOCK_DGRAM)
s.connect((HOST, PORT))
def client_print(*msg): print("CLIENT: ", msg)
while True: kel = input('Question >>') if kel == 'exit': break
client_print('input data: %s' % kel) kel = kel.encode(encoding='utf-8') try: s.sendto(kel, addr) data, addr = s.recvfrom(BUFFERSIZE) client_print("recived message from server({0}:{1}) : {2}".format(addr[0], addr[1], data.decode('utf-8'))) except ConnectionRefusedError: client_print("exit client") s.close()
if not s._closed: s.close()
sys.exit()
|
批处理
Json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import requests import json headers={ 'Host': 'url', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.28 Safari/537.36 OPR/61.0.3298.6 (Edition developer)', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate', 'Content-Type': 'application/json;charset=utf-8' } usr_list=[] psw_list=[] url='api/url' def get_data(username,passwad): data={ "username": username, "password": passwad } return json.dumps(data) with open('usr.txt',encoding='utf-8') as f: for usr in f.readlines(): if usr!=None: usr_list.append(usr.strip('\n')) print(usr_list) with open('psw.txt',encoding='utf-8') as f: for psw in f.readlines(): if psw!=None: psw_list.append(psw.strip('\n')) print(psw_list) for usname,pswa in zip(usr_list,psw_list): print(usname) print(pswa) res=requests.post(url,headers=headers,data=get_data(username=usname,passwad=pswa))
|
待补充
正则匹配简单的爬取模板
1 2 3 4 5 6 7 8 9 10 11 12
| import re import requests
url = "https://xxxx" header = { 'User-Agent': "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11"} res = requests.get(url=url, headers=header) pattern = re.compile(r'<a href="(.*?)/" target="_blank">') domain = pattern.findall(str(res.text)) for i in domain: print(i)
|