Python Script Writing Template
2024-04-21 02:30:15

爬虫

入口函数(爬虫调度段)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#coding:utf8
import time, datetime

from maya_Spider import url_manager, html_downloader, html_parser, html_outputer


class Spider_Main(object):
#初始化操作
def __init__(self):
#设置url管理器
self.urls = url_manager.UrlManager()
#设置HTML下载器
self.downloader = html_downloader.HtmlDownloader()
#设置HTML解析器
self.parser = html_parser.HtmlParser()
#设置HTML输出器
self.outputer = html_outputer.HtmlOutputer()

#爬虫调度程序
def craw(self, root_url):
count = 1
self.urls.add_new_url(root_url)
while self.urls.has_new_url():
try:
new_url = self.urls.get_new_url()
print('craw %d : %s' % (count, new_url))
html_content = self.downloader.download(new_url)
new_urls, new_data = self.parser.parse(new_url, html_content)
self.urls.add_new_urls(new_urls)
self.outputer.collect_data(new_data)

if count == 10:
break

count = count + 1
except:
print('craw failed')

self.outputer.output_html()

if __name__ == '__main__':
#设置爬虫入口
root_url = 'http://baike.baidu.com/view/21087.htm'
#开始时间
print('开始计时..............')
start_time = datetime.datetime.now()
obj_spider = Spider_Main()
obj_spider.craw(root_url)
#结束时间
end_time = datetime.datetime.now()
print('总用时:%ds'% (end_time - start_time).seconds)

URL管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class UrlManager(object):
def __init__(self):
self.new_urls = set()
self.old_urls = set()

def add_new_url(self, url):
if url is None:
return
if url not in self.new_urls and url not in self.old_urls:
self.new_urls.add(url)

def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)

def has_new_url(self):
return len(self.new_urls) != 0

def get_new_url(self):
new_url = self.new_urls.pop()
self.old_urls.add(new_url)
return new_url

网页下载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import urllib
import urllib.request

class HtmlDownloader(object):

def download(self, url):
if url is None:
return None

#伪装成浏览器访问,直接访问的话csdn会拒绝
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = {'User-Agent':user_agent}
#构造请求
req = urllib.request.Request(url,headers=headers)
#访问页面
response = urllib.request.urlopen(req)
#python3中urllib.read返回的是bytes对象,不是string,得把它转换成string对象,用bytes.decode方法
return response.read().decode()

网页解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import re
import urllib
from urllib.parse import urlparse

from bs4 import BeautifulSoup

class HtmlParser(object):

def _get_new_urls(self, page_url, soup):
new_urls = set()
#/view/123.htm
links = soup.find_all('a', href=re.compile(r'/item/.*?'))
for link in links:
new_url = link['href']
new_full_url = urllib.parse.urljoin(page_url, new_url)
new_urls.add(new_full_url)
return new_urls

#获取标题、摘要
def _get_new_data(self, page_url, soup):
#新建字典
res_data = {}
#url
res_data['url'] = page_url
#<dd class="lemmaWgt-lemmaTitle-title"><h1>Python</h1>获得标题标签
title_node = soup.find('dd', class_="lemmaWgt-lemmaTitle-title").find('h1')
print(str(title_node.get_text()))
res_data['title'] = str(title_node.get_text())
#<div class="lemma-summary" label-module="lemmaSummary">
summary_node = soup.find('div', class_="lemma-summary")
res_data['summary'] = summary_node.get_text()

return res_data

def parse(self, page_url, html_content):
if page_url is None or html_content is None:
return None

soup = BeautifulSoup(html_content, 'html.parser', from_encoding='utf-8')
new_urls = self._get_new_urls(page_url, soup)
new_data = self._get_new_data(page_url, soup)
return new_urls, new_data

网页输出器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class HtmlOutputer(object):

def __init__(self):
self.datas = []

def collect_data(self, data):
if data is None:
return
self.datas.append(data )

def output_html(self):
fout = open('maya.html', 'w', encoding='utf-8')
fout.write("<head><meta http-equiv='content-type' content='text/html;charset=utf-8'></head>")
fout.write('<html>')
fout.write('<body>')
fout.write('<table border="1">')
# <th width="5%">Url</th>
fout.write('''<tr style="color:red" width="90%">
<th>Theme</th>
<th width="80%">Content</th>
</tr>''')
for data in self.datas:
fout.write('<tr>\n')
# fout.write('\t<td>%s</td>' % data['url'])
fout.write('\t<td align="center"><a href=\'%s\'>%s</td>' % (data['url'], data['title']))
fout.write('\t<td>%s</td>\n' % data['summary'])
fout.write('</tr>\n')
fout.write('</table>')
fout.write('</body>')
fout.write('</html>')
fout.close()

多线程

实现可设置持续运行时间、线程数及时间间隔的多线程异步post请求功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#coding=utf8
'''
random.randint(a, b):用于生成一个指定范围内的整数。
其中参数a是下限,参数b是上限,生成的随机数n: a <= n <= b
random.choice(sequence):从序列中获取一个随机元素
参数sequence表示一个有序类型(列表,元组,字符串)
'''
import httplib,json
import time
import threading
from random import randint,choice
#创建请求函数
def postRequest(threadNum):
postJson={
}
#定义需要进行发送的数据
postData=json.dumps(postJson)
#定义一些文件头
headerdata = {
"content-type":"application/json",
}
#接口
requrl ="/v1/query"
#请求服务,例如:www.baidu.com
hostServer=""
#连接服务器
conn = httplib.HTTPConnection(hostServer)
#发送请求
conn.request(method="POST",url=requrl,body=postData,headers=headerdata)
#获取请求响应
response=conn.getresponse()
#打印请求状态
if response.status in range(200,300):
print u"线程"+str(threadNum)+u"状态码:"+str(response.status)
conn.close()
def run(threadNum,internTime,duration):
#创建数组存放线程
threads=[]
try:
#创建线程
for i in range(1,threadNum):
#针对函数创建线程
t=threading.Thread(target=postRequest,args=(i,))
#把创建的线程加入线程组
threads.append(t)
except Exception,e:
print e
try:
#启动线程
for thread in threads:
thread.setDaemon(True)
thread.start()
time.sleep(internTime)
#等待所有线程结束
for thread in threads:
thread.join(duration)
except Exception,e:
print e
if __name__ == '__main__':
startime=time.strftime("%Y%m%d%H%M%S")
now=time.strftime("%Y%m%d%H%M%S")
duratiion=raw_input(u"输入持续运行时间:")
while (startime+str(duratiion))!=now:
run(10,1,int(duratiion))
now=time.strftime("%Y%m%d%H%M%S")

爬虫数据处理Json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#encoding:'utf-8'
import urllib.request
from bs4 import BeautifulSoup
import os
import time
import codecs
import json
#找到网址
def getDatas():
# 伪装
header={'User-Agent':"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11"}
# url="https://movie.douban.com/top250"
url="file:///E:/scrapy/2018-04-27/movie/movie.html"
ret=urllib.request.Request(url=url,headers=header)
# 打开网页
res=urllib.request.urlopen(ret)
# 转化格式
response=BeautifulSoup(res,'html.parser')
# 找到想要数据的父元素
datas=response.find_all('div',{'class':'item'})
# print(datas)
#创建存放数据的文件夹
folder_name="output"
if not os.path.exists(folder_name):
os.mkdir(folder_name)
# 定义文件
current_time=time.strftime('%Y-%m-%d',time.localtime())
file_name="move"+current_time+".json"
# 文件路径
file_path=folder_name+"/"+file_name
for item in datas:
# print(item)
dict1={}
dict1['rank']=item.find('div',{'class':'pic'}).find('em').get_text()
dict1['title']=item.find('div',{'class':'info'}).find('div',{'class':'hd'}).find('a').find('span',{'class':'title'}).get_text()
dict1['picUrl']=item.find('div',{'class':'pic'}).find('a').find('img').get('src')
# print(picUrl)
# 保存数据为json格式
try:
with codecs.open(file_path,'a',encoding="utf-8") as fp:
fp.write(json.dumps(dict1,ensure_ascii=False)+",\n")
except IOError as err:
print('error'+str(err))
finally:
fp.close()
pass
getDatas()
# 爬取数据

Selenium自动化测试工具使用方法汇总

1、设置无头浏览器模式

1
2
3
4
5
6
7
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('- -headless')
chrome_options.add_argument('- -disable-gpu')
class XX(object):
self.driver = webdriver.Chrome(chrome_options=chrome_options)# 设置无头

2、设置屏幕尺寸

1
2
3
self.driver.maximize_window() # 最大
self.driver.get_window_size() # 获取窗口尺寸
self.driver.set_window_size(1296, 1000) # 指定像素 宽1296 高1000

3、通过xpath获取元素并单击

1
2
element = self.driver.find_elements_by_xpath('//*[@id="disabled"]/li[3]/a') # element是获取的元素列表
element [0].click()

4、关闭单个窗口

1
2
# 关闭单个窗口
self.driver.close()

5、退出driver程序,关闭浏览器

1
driver.quit()

6、获取input标签 输入文字 执行回车

1
2
3
4
5
from selenium.webdriver.common.keys import Keys
\# 输入主机名:spupa01bat04 回车查询
element = self.driver.find_elements_by_xpath(input_enter_xpath)[0]
element.send_keys('spupa01bat04')
element.send_keys(Keys.ENTER)

7、执行双击

1
2
3
4
5
6
7
8
9
10
from selenium import webdriver
from selenium.webdriver import ActionChains # 该类可执行鼠标动作:双击 拖拽等
self.driver = webdriver.Chrome()
self.acobj = ActionChains(self.driver)
checkName = '风险'
\# '" + checkName + "'是向标签xpath字符串路经引入的变量
Xpath = "//span[text()='" + checkName + "']/../../..//span[text()='FSCapacity']"
el_01 = self.driver.find_elements_by_xpath(Xpath)[0] # 获取元素
self.acobj.double_click(el_01).perform() # 执行双击
self.driver.implicitly_wait(20)

8、执行等待

1
2
3
4
5
6
7
8
9
10
# 隐式等待:设置一个等待时间,如果在这个等待时间内,网页加载完成,则执行下一步;否则一直等待时间截止,然后再执行下一步。这样也就会有个弊端,程序会一直等待整个页面加载完成,直到超时,但有时候我需要的那个元素早就加载完成了,只是页面上有个别其他元素加载特别慢,我仍要等待页面全部加载完成才能执行下一步。
self.driver.implicitly_wait(10)
# 显示等待:配合该类的until()和until_not()方法,就能够根据判断条件而进行灵活地等待了。它主要的意思就是:程序每隔xx检查一次,如果条件成立了,则执行下一步,否则继续等待,直到超过设置的最长时间,然后抛出TimeoutException
from selenium.webdriver.support.wait import WebDriverWait
# 超时时间为30秒,每0.2秒检查1次,直到class="tt"的元素出现
text = WebDriverWait(driver,30,0.2).until(lambda x:x.find_element_by_css_selector(".tt")).text
print(text)
# 强制等待:简单粗暴效率低
from time import sleep
sleep(5)

9、浏览器全屏截图保存到指定路径

1
2
png01_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'resource', 'png_codes.png')
self.driver.save_screenshot(png01_path)

10、select下拉框的选值处理

1
2
3
4
5
6
7
from selenium.webdriver.support.select import Select
# 定位下拉框对象
select_element = self.driver.find_elements_by_xpath(select_xpath)[0]
# 构建下拉框对象
sel_obj = Select(select_element)
# 通过value值进行选择:一周
sel_obj.select_by_value('168h')

11、浏览器位置

1
2
3
4
5
# 获取浏览器位置
position = driver.get_window_position()
print(position)
# 设置浏览器位置
driver.set_window_position(100, 100)

正则表达式

正则表达式的语法规则

2015430160820157.png (799×1719)

函数API列表

1
2
3
4
5
6
7
match(string[, pos[, endpos]]) | re.match(pattern, string[, flags])
search(string[, pos[, endpos]]) | re.search(pattern, string[, flags])
split(string[, maxsplit]) | re.split(pattern, string[, maxsplit])
findall(string[, pos[, endpos]]) | re.findall(pattern, string[, flags])
finditer(string[, pos[, endpos]]) | re.finditer(pattern, string[, flags])
sub(repl, string[, count]) | re.sub(pattern, repl, string[, count])
subn(repl, string[, count]) |re.sub(pattern, repl, string[, count])

正则表达式相关注解

(1)数量词的贪婪模式与非贪婪模式

正则表达式通常用于在文本中查找匹配的字符串。Python里数量词默认是贪婪的(在少数语言里也可能是默认非贪婪),总是尝试匹配尽可能多的字 符;非贪婪的则相反,总是尝试匹配尽可能少的字符。例如:正则表达式”ab*”如果用于查找”abbbc”,将找到”abbb”。而如果使用非贪婪的数量 词”ab*?”,将找到”a”

注:我们一般使用非贪婪模式来提取。
(2)反斜杠问题

与大多数编程语言相 同,正则表达式里使用”\”作为转义字符,这就可能造成反斜杠困扰。假如需要匹配文本中的字符”\”,那么使用编程语言表示的正则表达式里将需要4个反 斜杠”\\\\”:前两个和后两个分别用于在编程语言里转义成反斜杠,转换成两个反斜杠后再在正则表达式里转义成一个反斜杠。

Python里的原生字符串很好地解决了这个问题,这个例子中的正则表达式可以使用r”\\”表示。同样,匹配一个数字的”\\d”可以写成r”\d”
4.Python Re模块

Python 自带了re模块,它提供了对正则表达式的支持。主要用到的方法列举如下

1
2
3
4
5
6
7
8
9
10
11
#返回pattern对象
re.compile(string[,flag])
#以下为匹配所用函数
re.match(pattern, string[, flags])
re.search(pattern, string[, flags])
re.split(pattern, string[, maxsplit])
re.findall(pattern, string[, flags])
re.finditer(pattern, string[, flags])
re.sub(pattern, repl, string[, count])
re.subn(pattern, repl, string[, count])

pattern的概念
pattern可以理解为一个匹配模式,获得这个匹配模式,需要利用re.compile方法就可以。例如

1
pattern = re.compile(r'hello')

在参数中我们传入了原生字符串对象,通过compile方法编译生成一个pattern对象,然后我们利用这个对象来进行进一步的匹配。

另外大家可能注意到了另一个参数 flags,在这里解释一下这个参数的含义:

参数flag是匹配模式,取值可以使用按位或运算符’|’表示同时生效,比如re.I | re.M。

可选值有:

  • ? re.I(全拼:IGNORECASE): 忽略大小写(括号内是完整写法,下同)
  • ? re.M(全拼:MULTILINE): 多行模式,改变'^''$'的行为(参见上图)
  • ? re.S(全拼:DOTALL): 点任意匹配模式,改变’.’的行为
  • ? re.L(全拼:LOCALE): 使预定字符类 \w \W \b \B \s \S 取决于当前区域设定
  • ? re.U(全拼:UNICODE): 使预定字符类 \w \W \b \B \s \S \d \D 取决于unicode定义的字符属性
  • ? re.X(全拼:VERBOSE): 详细模式。这个模式下正则表达式可以是多行,忽略空白字符,并可以加入注释。

在刚才所说的另外几个方法例如 re.match 里我们就需要用到这个pattern了,下面我们一一介绍。

注:以下七个方法中的flags同样是代表匹配模式的意思,如果在pattern生成时已经指明了flags,那么在下面的方法中就不需要传入这个参数了。

(1)re.match(pattern, string[, flags])

这个方法将会从string(我们要匹配的字符串)的开头开始,尝试匹配pattern,一直向后匹配,如果遇到无法匹配的字符,立即返回 None,如果匹配未结束已经到达string的末尾,也会返回None。两个结果均表示匹配失败,否则匹配pattern成功,同时匹配终止,不再对 string向后匹配。下面我们通过一个例子理解一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
__author__ = 'L0ki'
# -*- coding: utf-8 -*-

#导入re模块
import re

# 将正则表达式编译成Pattern对象,注意hello前面的r的意思是“原生字符串”
pattern = re.compile(r'hello')

# 使用re.match匹配文本,获得匹配结果,无法匹配时将返回None
result1 = re.match(pattern,'hello')
result2 = re.match(pattern,'helloo L0ki!')
result3 = re.match(pattern,'helo L0ki!')
result4 = re.match(pattern,'hello L0ki!')

#如果1匹配成功
if result1:
# 使用Match获得分组信息
print result1.group()
else:
print '1匹配失败!'

#如果2匹配成功
if result2:
# 使用Match获得分组信息
print result2.group()
else:
print '2匹配失败!'

#如果3匹配成功
if result3:
# 使用Match获得分组信息
print result3.group()
else:
print '3匹配失败!'

#如果4匹配成功
if result4:
# 使用Match获得分组信息
print result4.group()
else:
print '4匹配失败!'

运行结果

1
2
3
4
5
hello``hello``3``匹配失败!``hellohello
hello
3匹配失败!
hello

匹配分析

1.第一个匹配,pattern正则表达式为’hello’,我们匹配的目标字符串string也为hello,从头至尾完全匹配,匹配成功。

2.第二个匹配,string为helloo L0ki,从string头开始匹配pattern完全可以匹配,pattern匹配结束,同时匹配终止,后面的o L0ki不再匹配,返回匹配成功的信息。

3.第三个匹配,string为helo L0ki,从string头开始匹配pattern,发现到 ‘o’ 时无法完成匹配,匹配终止,返回None

4.第四个匹配,同第二个匹配原理,即使遇到了空格符也不会受影响。

我们还看到最后打印出了result.group(),这个是什么意思呢?下面我们说一下关于match对象的的属性和方法
Match对象是一次匹配的结果,包含了很多关于此次匹配的信息,可以使用Match提供的可读属性或方法来获取这些信息。

属性:
1.string: 匹配时使用的文本。
2.re: 匹配时使用的Pattern对象。
3.pos: 文本中正则表达式开始搜索的索引。值与Pattern.match()Pattern.seach()方法的同名参数相同。
4.endpos: 文本中正则表达式结束搜索的索引。值与Pattern.match()Pattern.seach()方法的同名参数相同。
5.lastindex: 最后一个被捕获的分组在文本中的索引。如果没有被捕获的分组,将为None。
6.lastgroup: 最后一个被捕获的分组的别名。如果这个分组没有别名或者没有被捕获的分组,将为None。

方法:
1.group([group1, …]):
获得一个或多个分组截获的字符串;指定多个参数时将以元组形式返回。group1可以使用编号也可以使用别名;编号0代表整个匹配的子串;不填写参数时,返回group(0);没有截获字符串的组返回None;截获了多次的组返回最后一次截获的子串。
2.groups([default]):
以元组形式返回全部分组截获的字符串。相当于调用group(1,2,…last)。default表示没有截获字符串的组以这个值替代,默认为None。
3.groupdict([default]):
返回以有别名的组的别名为键、以该组截获的子串为值的字典,没有别名的组不包含在内。default含义同上。
4.start([group]):
返回指定的组截获的子串在string中的起始索引(子串第一个字符的索引)。group默认值为0。
5.end([group]):
返回指定的组截获的子串在string中的结束索引(子串最后一个字符的索引+1)。group默认值为0。
6.span([group]):
返回(start(group), end(group))。
7.expand(template):
将匹配到的分组代入template中然后返回。template中可以使用\id\g\g引用分组,但不能使用编号0。\id\g是等价的;但\10将被认为是第10个分组,如果你想表达\1之后是字符'0',只能使用\g0

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- coding: utf-8 -*-
#一个简单的match实例

import re
# 匹配如下内容:单词+空格+单词+任意字符
m = re.match(r'(\w+) (\w+)(?P.*)', 'hello world!')

print "m.string:", m.string
print "m.re:", m.re
print "m.pos:", m.pos
print "m.endpos:", m.endpos
print "m.lastindex:", m.lastindex
print "m.lastgroup:", m.lastgroup
print "m.group():", m.group()
print "m.group(1,2):", m.group(1, 2)
print "m.groups():", m.groups()
print "m.groupdict():", m.groupdict()
print "m.start(2):", m.start(2)
print "m.end(2):", m.end(2)
print "m.span(2):", m.span(2)
print r"m.expand(r'\g \g\g'):", m.expand(r'\2 \1\3')

### output ###
# m.string: hello world!
# m.re:
# m.pos: 0
# m.endpos: 12
# m.lastindex: 3
# m.lastgroup: sign
# m.group(1,2): ('hello', 'world')
# m.groups(): ('hello', 'world', '!')
# m.groupdict(): {'sign': '!'}
# m.start(2): 6
# m.end(2): 11
# m.span(2): (6, 11)
# m.expand(r'\2 \1\3'): world hello!

(2)re.search(pattern, string[, flags])

search方法与match方法极其类似,区别在于match()函数只检测re是不是在string的开始位置匹配,search()会扫描整个string查找匹配,match()只有在0位置匹配成功的话才有返回,如果不是开始位置匹配成功的话,match()就返回None。同样,search方法的返回对象同样match()返回对象的方法和属性。我们用一个例子感受一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#导入re模块
import re

# 将正则表达式编译成Pattern对象
pattern = re.compile(r'world')
# 使用search()查找匹配的子串,不存在能匹配的子串时将返回None
# 这个例子中使用match()无法成功匹配
match = re.search(pattern,'hello world!')
if match:
# 使用Match获得分组信息
print match.group()
### 输出 ###
# world

(3)re.split(pattern, string[, maxsplit])

按照能够匹配的子串将string分割后返回列表。maxsplit用于指定最大分割次数,不指定将全部分割。我们通过下面的例子感受一下。

1
2
3
4
5
6
7
8
import re

pattern = re.compile(r'\d+')
print re.split(pattern,'one1two2three3four4')

### 输出 ###
# ['one', 'two', 'three', 'four', '']

**(4)re.findall(pattern, string[, flags])
**

1
2
3
4
5
6
7
8
9
10
11
搜索string,以列表形式返回全部能匹配的子串。我们通过这个例子来感受一下

import re

pattern = re.compile(r'\d+')
print re.findall(pattern,'one1two2three3four4')

### 输出 ###
# ['1', '2', '3', '4']


(5)re.finditer(pattern, string[, flags])

搜索string,返回一个顺序访问每一个匹配结果(Match对象)的迭代器。我们通过下面的例子来感受一下

1
2
3
4
5
6
7
8
9
import re

pattern = re.compile(r'\d+')
for m in re.finditer(pattern,'one1two2three3four4'):
print m.group(),

### 输出 ###
# 1 2 3 4

(6)re.sub(pattern, repl, string[, count])

使用repl替换string中每一个匹配的子串后返回替换后的字符串。
当repl是一个字符串时,可以使用\id或\g、\g引用分组,但不能使用编号0。
当repl是一个方法时,这个方法应当只接受一个参数(Match对象),并返回一个字符串用于替换(返回的字符串中不能再引用分组)。
count用于指定最多替换次数,不指定时全部替换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import re

pattern = re.compile(r'(\w+) (\w+)')
s = 'i say, hello world!'

print re.sub(pattern,r'\2 \1', s)

def func(m):
return m.group(1).title() + ' ' + m.group(2).title()

print re.sub(pattern,func, s)

### output ###
# say i, world hello!
# I Say, Hello World!

(7)re.subn(pattern, repl, string[, count])

返回 (sub(repl, string[, count]), 替换次数)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import re

pattern = re.compile(r'(\w+) (\w+)')
s = 'i say, hello world!'

print re.subn(pattern,r'\2 \1', s)

def func(m):
return m.group(1).title() + ' ' + m.group(2).title()

print re.subn(pattern,func, s)

### output ###
# ('say i, world hello!', 2)
# ('I Say, Hello World!', 2)

socket

简单示例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket
import threading
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('0.0.0.0', 8000))
server.listen()

def handle_sock(sock, addr):
while True:
data = sock.recv(1024)
print(data.decode("utf8"))
re_data = input()
sock.send(re_data.encode("utf8"))

while True:
sock, addr = server.accept()
'''用线程去处理新接收的连接(用户)'''
client_thread = threading.Thread(target=handle_sock, args=(sock, addr))#传的一定是函数名称
client_thread.start()

客户端

1
2
3
4
5
6
7
8
9
import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8000))
while True:
re_data = input()
client.send(re_data.encode("utf8"))
data = client.recv(1024)
print(data.decode("utf8"))

实用示例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env python3
# encoding:utf-8
#
# SERVER
#

import socket
from socket import *
import os

HOST = '192.168.1.88'
PORT = 55555

#只接收这么多个bytes
BUFFERSIZE = 1024

# socket实例,UDP连接
s = socket(AF_INET, SOCK_DGRAM)
# 重用ip和端口
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# s绑定主机地址和端口
s.bind((HOST, PORT))
# 监听队列
#s.listen()
# 服务器对话日志
logfile = 'dialog.log'

def server_print(*msg):
# TODO: 这里格式还是不好,应该支持带格式的字符串
print("SERVER: ", msg)

def clean_up():
if os.path.exists(logfile):
os.remove(logfile)

clean_up()

# 简单地监听客户端的消息,收到消息后回复确认
while True:
print("=== THE SERVER START WORKING ===")
# 此处的s.recvfrom方法和s.recv都会返回接收到的数据,而recvfrom还会返回客户端的地址
data, addr = s.recvfrom(BUFFERSIZE)
data = data.decode(encoding='utf-8')
server_print("recived message from client(%s): %s" % (addr, data))
with open(logfile, 'a') as f:
log = "client{0} \t message: {1}\n".format(addr, data)
f.write(log)

# 回复消息
s.sendto("SERVER ROGER THAT ".encode(encoding='utf-8'), addr)

if not s._closed():
s.close()

# TODO:
# 不知道怎么退出服务端好 ...
# 如果通过客户端输入`exit`来退出,则会导致客户端英文和服务端断开连接而异常退出
# 然后如果在服务端加入input和用户交互,则会导致客户端和服务端必须依次轮流发送信息 ...

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python3
# encoding:utf-8
# CLIENT

import sys
import socket
from socket import *

HOST = '192.168.1.88'
PORT = 55555
addr = (HOST, PORT)

#一次接收这么多bytes
BUFFERSIZE = 1024

# SOCK_DGRAM :UDP连接
s = socket(AF_INET, SOCK_DGRAM)
# socket类s去连接服务器
s.connect((HOST, PORT))

def client_print(*msg):
print("CLIENT: ", msg)

while True:
kel = input('Question >>')
if kel == 'exit':
break

client_print('input data: %s' % kel)
# python3 socket 只能收发二进制数据,需要转码
kel = kel.encode(encoding='utf-8')
#s.sendall(kel)
# 发送数据
try:
s.sendto(kel, addr)
# 接收数据
data, addr = s.recvfrom(BUFFERSIZE)
client_print("recived message from server({0}:{1}) : {2}".format(addr[0], addr[1], data.decode('utf-8')))
except ConnectionRefusedError:
client_print("exit client")
s.close()

if not s._closed:
s.close()

sys.exit()

批处理

Json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
import json
headers={
'Host': 'url',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.28 Safari/537.36 OPR/61.0.3298.6 (Edition developer)',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate',
'Content-Type': 'application/json;charset=utf-8'
}
usr_list=[]
psw_list=[]
url='api/url'
def get_data(username,passwad):
data={
"username": username,
"password": passwad
}
return json.dumps(data)
with open('usr.txt',encoding='utf-8') as f:
for usr in f.readlines():
if usr!=None:
usr_list.append(usr.strip('\n'))
print(usr_list)
with open('psw.txt',encoding='utf-8') as f:
for psw in f.readlines():
if psw!=None:
psw_list.append(psw.strip('\n'))
print(psw_list)
for usname,pswa in zip(usr_list,psw_list):
print(usname)
print(pswa)
res=requests.post(url,headers=headers,data=get_data(username=usname,passwad=pswa))

待补充

正则匹配简单的爬取模板

1
2
3
4
5
6
7
8
9
10
11
12
import re
import requests

url = "https://xxxx"
header = {
'User-Agent': "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11"}
res = requests.get(url=url, headers=header)
pattern = re.compile(r'<a href="(.*?)/" target="_blank">') #匹配样例规则
domain = pattern.findall(str(res.text))
for i in domain:
print(i)

Prev
2024-04-21 02:30:15
Next