目录

第十一章 常用标准库

Python拥有丰富的标准库,提供了大量实用的功能。掌握这些标准库能够显著提高开发效率,避免重复造轮子。本章将介绍最常用和最重要的标准库模块。

11.1 数据结构增强模块

11.1.1 collections模块

collections模块提供了除内置容器外的专用容器数据类型。

from collections import Counter, defaultdict, OrderedDict, deque, namedtuple
 
# Counter - 计数器,用于统计元素出现次数
text = "hello world hello python"
counter = Counter(text.split())
print(counter)                      # Counter({'hello': 2, 'world': 1, 'python': 1})
print(counter.most_common(2))       # [('hello', 2), ('world', 1)]
 
# 数学运算
c1 = Counter(a=3, b=1)
c2 = Counter(a=1, b=2)
print(c1 + c2)                      # Counter({'a': 4, 'b': 3})
print(c1 - c2)                      # Counter({'a': 2})
 
# defaultdict - 带有默认值的字典
default_dict = defaultdict(list)
default_dict['fruits'].append('apple')
default_dict['fruits'].append('banana')
default_dict['colors'].append('red')
print(dict(default_dict))           # {'fruits': ['apple', 'banana'], 'colors': ['red']}
 
# 使用lambda设置默认值为0
int_dict = defaultdict(lambda: 0)
int_dict['count'] += 1
print(int_dict['count'])            # 1
print(int_dict['missing'])          # 0(自动创建)
 
# OrderedDict - 保持插入顺序的字典(Python 3.7+普通dict已实现)
ordered = OrderedDict()
ordered['first'] = 1
ordered['second'] = 2
ordered['third'] = 3
 
# 移动到末尾
ordered.move_to_end('first')
print(list(ordered.keys()))         # ['second', 'third', 'first']
 
# deque - 双端队列,两端操作都是O(1)
dq = deque(maxlen=5)                # 限制最大长度,自动丢弃旧元素
for i in range(10):
    dq.append(i)
print(dq)                           # deque([5, 6, 7, 8, 9], maxlen=5)
 
dq.appendleft(100)
print(dq)                           # deque([100, 5, 6, 7, 8], maxlen=5)
 
# namedtuple - 命名元组,可读性更强
Point = namedtuple('Point', ['x', 'y'])
p = Point(10, 20)
print(p.x, p.y)                     # 10 20
print(p[0], p[1])                   # 10 20(仍支持索引访问)
 
# 转换为字典
print(p._asdict())                  # {'x': 10, 'y': 20}
 
# 替换值创建新对象
p2 = p._replace(x=100)
print(p2)                           # Point(x=100, y=20)

11.1.2 heapq模块 - 堆队列

import heapq
 
# 创建堆(使用列表)
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
print(heap)                         # [1, 3, 2] - 保持堆属性
 
# 弹出最小元素
print(heapq.heappop(heap))          # 1
print(heap)                         # [2, 3]
 
# 将列表转换为堆
nums = [5, 3, 8, 1, 9, 2]
heapq.heapify(nums)
print(nums)                         # [1, 3, 2, 5, 9, 8]
 
# 获取最大/最小的n个元素
numbers = [12, 3, 5, 7, 19, 1, 4]
print(heapq.nlargest(3, numbers))   # [19, 12, 7]
print(heapq.nsmallest(3, numbers))  # [1, 3, 4]
 
# 合并多个有序列表
list1 = [1, 3, 5]
list2 = [2, 4, 6]
merged = heapq.merge(list1, list2)
print(list(merged))                 # [1, 2, 3, 4, 5, 6]

11.1.3 bisect模块 - 二分查找

import bisect
 
# 维护有序列表
sorted_list = [1, 3, 5, 7, 9]
 
# 查找插入位置
pos = bisect.bisect_left(sorted_list, 4)   # 2
pos = bisect.bisect_right(sorted_list, 5)  # 3
 
# 插入元素保持有序
bisect.insort_left(sorted_list, 4)
print(sorted_list)                  # [1, 3, 4, 5, 7, 9]
 
bisect.insort_right(sorted_list, 5)
print(sorted_list)                  # [1, 3, 4, 5, 5, 7, 9]

11.2 文件和目录操作

11.2.1 pathlib模块(Python 3.4+)

pathlib提供了面向对象的路径操作方式,推荐使用。

from pathlib import Path, PurePath
 
# 创建路径对象
current = Path('.')
home = Path.home()
cwd = Path.cwd()
 
# 构建路径
config_path = Path('/etc') / 'nginx' / 'nginx.conf'
print(config_path)                  # /etc/nginx/nginx.conf
 
# 路径属性
path = Path('/usr/local/bin/python')
print(path.name)                    # python
print(path.suffix)                  # ''
print(path.suffixes)                # []
print(path.stem)                    # python
print(path.parent)                  # /usr/local/bin
print(path.parents)                 # 祖先路径序列
print(path.parts)                   # ('/', 'usr', 'local', 'bin', 'python')
print(path.anchor)                  # /
 
# 路径查询
print(path.exists())                # 是否存在
print(path.is_file())               # 是否文件
print(path.is_dir())                # 是否目录
print(path.is_absolute())           # 是否绝对路径
 
# 文件操作
temp_file = Path('temp.txt')
temp_file.write_text('Hello World', encoding='utf-8')
content = temp_file.read_text(encoding='utf-8')
temp_file.unlink()                  # 删除文件
 
# 目录操作
new_dir = Path('new_folder')
new_dir.mkdir(exist_ok=True)
new_dir.mkdir(parents=True, exist_ok=True)  # 递归创建
new_dir.rmdir()                     # 删除空目录
 
# 遍历目录
for item in Path('.').iterdir():
    print(item)
 
# 递归查找
py_files = list(Path('.').rglob('*.py'))
print(f'找到 {len(py_files)} 个Python文件')
 
# 通配符匹配
for txt_file in Path('.').glob('*.txt'):
    print(txt_file)
 
# 统计代码行数
def count_lines_of_code(path):
    total = 0
    for py_file in Path(path).rglob('*.py'):
        try:
            lines = py_file.read_text(encoding='utf-8').splitlines()
            # 排除空行和注释
            code_lines = [l for l in lines if l.strip() and not l.strip().startswith('#')]
            total += len(code_lines)
        except:
            pass
    return total

11.2.2 shutil模块 - 高级文件操作

import shutil
from pathlib import Path
 
# 复制文件
shutil.copy('source.txt', 'dest.txt')           # 复制文件
shutil.copy2('source.txt', 'dest.txt')          # 保留元数据
shutil.copytree('src_dir', 'dst_dir')           # 复制目录
 
# 移动和重命名
shutil.move('old_name.txt', 'new_name.txt')
 
# 删除目录树
shutil.rmtree('directory_to_delete')
 
# 归档操作
shutil.make_archive('backup', 'zip', 'source_dir')
shutil.unpack_archive('backup.zip', 'extract_dir')
 
# 磁盘使用
total, used, free = shutil.disk_usage('/')
print(f'总空间: {total // 2**30} GB')
print(f'已使用: {used // 2**30} GB')
print(f'可用: {free // 2**30} GB')
 
# 获取支持的压缩格式
print(shutil.get_archive_formats())
print(shutil.get_unpack_formats())

11.3 数据序列化

11.3.1 pickle模块 - Python对象序列化

import pickle
 
data = {
    'name': '张三',
    'age': 30,
    'scores': [85, 90, 78],
    'metadata': {'created': '2024-01-01'}
}
 
# 序列化到文件
with open('data.pkl', 'wb') as f:
    pickle.dump(data, f)
 
# 从文件反序列化
with open('data.pkl', 'rb') as f:
    loaded_data = pickle.load(f)
 
print(loaded_data)
 
# 序列化为字节串
byte_data = pickle.dumps(data)
print(f'序列化后大小: {len(byte_data)} 字节')
 
# 从字节串反序列化
original = pickle.loads(byte_data)
 
# 处理多个对象
with open('multi.pkl', 'wb') as f:
    pickle.dump(obj1, f)
    pickle.dump(obj2, f)
    pickle.dump(obj3, f)
 
with open('multi.pkl', 'rb') as f:
    while True:
        try:
            obj = pickle.load(f)
            print(obj)
        except EOFError:
            break

警告:pickle不安全,不要加载不可信来源的数据。

11.3.2 csv模块 - CSV文件处理

import csv
 
# 写入CSV
with open('output.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    # 写入表头
    writer.writerow(['姓名', '年龄', '城市'])
    # 写入数据
    writer.writerow(['张三', 25, '北京'])
    writer.writerow(['李四', 30, '上海'])
    # 写入多行
    writer.writerows([
        ['王五', 28, '广州'],
        ['赵六', 35, '深圳']
    ])
 
# 读取CSV
with open('output.csv', 'r', encoding='utf-8') as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)
 
# 使用DictReader/DictWriter
with open('output.csv', 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"{row['姓名']} 住在 {row['城市']}")
 
# DictWriter
fieldnames = ['name', 'age', 'email']
with open('users.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({'name': 'Alice', 'age': 25, 'email': 'alice@example.com'})
 
# 自定义分隔符和引号
with open('data.tsv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f, delimiter='\\t', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    writer.writerow(['Field 1', 'Field with, comma', 'Field with "quotes"'])

11.3.3 configparser模块 - 配置文件

import configparser
 
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')
 
# 访问配置
print(config['DEFAULT']['ServerAliveInterval'])
print(config.get('bitbucket.org', 'User'))
print(config.getint('topsecret.server.com', 'Port'))
print(config.getboolean('DEFAULT', 'Compression'))
 
# 检查是否存在
if 'section_name' in config:
    print("Section exists")
 
# 获取所有section
print(config.sections())
 
# 创建/修改配置
config['Settings'] = {
    'debug': 'true',
    'log_level': 'INFO',
    'max_connections': '100'
}
 
config['Database'] = {}
config['Database']['host'] = 'localhost'
config['Database']['port'] = '5432'
 
# 写入文件
with open('new_config.ini', 'w', encoding='utf-8') as f:
    config.write(f)
 
# 示例配置文件内容:
"""
[DEFAULT]
ServerAliveInterval = 45
Compression = yes
CompressionLevel = 9
 
[bitbucket.org]
User = hg
 
[topsecret.server.com]
Port = 50022
ForwardX11 = no
"""

11.4 正则表达式

11.4.1 re模块基础

import re
 
text = "The quick brown fox jumps over the lazy dog. Contact: john@example.com"
 
# 搜索模式
match = re.search(r'fox', text)
if match:
    print(f"找到: {match.group()} 在位置 {match.start()}-{match.end()}")
 
# 查找所有匹配
emails = re.findall(r'\\b[\\w.-]+@[\\w.-]+\\.\\w+\\b', text)
print(emails)
 
# 替换
new_text = re.sub(r'dog', 'cat', text)
 
# 分割
parts = re.split(r'\\s+', text)     # 按空白分割
 
# 编译正则(多次使用时更高效)
pattern = re.compile(r'\\b\\w{5}\\b')  # 5个字母的单词
matches = pattern.findall(text)
print(matches)

11.4.2 正则表达式模式

import re
 
# 常用模式
patterns = {
    'email': r'^[\\w.-]+@[\\w.-]+\\.\\w+$',
    'phone': r'^1[3-9]\\d{9}$',          # 中国手机号
    'id_card': r'^\\d{17}[\\dXx]$',       # 身份证号
    'url': r'^https?://[^\\s/$.?#].[^\\s]*$',
    'ipv4': r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$',
    'date': r'^\\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12]\\d|3[01])$'
}
 
# 验证函数
def validate(pattern_name, value):
    pattern = patterns.get(pattern_name)
    if pattern:
        return bool(re.match(pattern, value))
    return False
 
# 测试
print(validate('email', 'test@example.com'))        # True
print(validate('phone', '13800138000'))             # True
print(validate('phone', '12345678901'))             # False
 
# 分组提取
text = "Name: John, Age: 30, City: New York"
pattern = r'Name: (\\w+), Age: (\\d+), City: (\\w+)'
match = re.search(pattern, text)
if match:
    print(f"姓名: {match.group(1)}")
    print(f"年龄: {match.group(2)}")
    print(f"城市: {match.group(3)}")
    print(match.groups())       # ('John', '30', 'NewYork')
 
# 命名分组
pattern = r'Name: (?P<name>\\w+), Age: (?P<age>\\d+)'
match = re.search(pattern, text)
if match:
    print(match.group('name'))
    print(match.groupdict())    # {'name': 'John', 'age': '30'}

11.5 时间和日期

11.5.1 time模块

import time
 
# 当前时间戳
timestamp = time.time()
print(f"当前时间戳: {timestamp}")
 
# 时间戳转本地时间
local_time = time.localtime(timestamp)
print(f"本地时间: {local_time}")
print(f"格式化: {time.strftime('%Y-%m-%d %H:%M:%S', local_time)}")
 
# UTC时间
utc_time = time.gmtime()
print(f"UTC时间: {time.strftime('%Y-%m-%d %H:%M:%S', utc_time)}")
 
# 字符串转时间结构
parsed = time.strptime('2024-01-15 14:30:00', '%Y-%m-%d %H:%M:%S')
 
# 睡眠
print("开始")
time.sleep(2)                       # 暂停2秒
print("2秒后")
 
# 性能计时
start = time.perf_counter()
# ... 执行代码
end = time.perf_counter()
print(f"耗时: {end - start:.6f} 秒")

11.5.2 datetime模块详解

from datetime import datetime, date, time, timedelta
from datetime import timezone
 
# 当前日期时间
now = datetime.now()
today = date.today()
 
# 创建特定日期时间
dt = datetime(2024, 1, 15, 14, 30, 0)
t = time(14, 30, 0)
d = date(2024, 1, 15)
 
# 格式化
print(now.strftime('%Y年%m月%d日 %H:%M:%S'))
print(now.isoformat())
 
# 解析
parsed = datetime.strptime('2024-01-15 14:30:00', '%Y-%m-%d %H:%M:%S')
 
# 时间差
delta = timedelta(days=5, hours=3, minutes=30)
future = now + delta
past = now - delta
diff = future - now
print(f"相差天数: {diff.days}")
print(f"总秒数: {diff.total_seconds()}")
 
# 时区处理(Python 3.9+)
from zoneinfo import ZoneInfo
 
utc = ZoneInfo('UTC')
beijing = ZoneInfo('Asia/Shanghai')
new_york = ZoneInfo('America/New_York')
 
# 添加时区信息
dt_utc = datetime.now(utc)
dt_bj = dt_utc.astimezone(beijing)
dt_ny = dt_utc.astimezone(new_york)
 
print(f"UTC: {dt_utc}")
print(f"北京: {dt_bj}")
print(f"纽约: {dt_ny}")
 
# 所有可用时区
import zoneinfo
print(sorted(zoneinfo.available_timezones())[:10])

11.6 网络和Web

11.6.1 urllib模块

from urllib import request, parse, error
 
# 简单GET请求
url = 'https://api.github.com'
with request.urlopen(url) as response:
    data = response.read()
    print(f"状态码: {response.status}")
    print(f"内容类型: {response.headers['Content-Type']}")
    print(data.decode('utf-8')[:500])
 
# 带参数的GET
params = parse.urlencode({'q': 'python', 'page': 1})
url = f'https://httpbin.org/get?{params}'
 
# POST请求
data = parse.urlencode({'name': 'test', 'value': '123'}).encode()
req = request.Request('https://httpbin.org/post', data=data, method='POST')
req.add_header('User-Agent', 'Python urllib')
req.add_header('Content-Type', 'application/x-www-form-urlencoded')
 
with request.urlopen(req) as response:
    print(response.read().decode())
 
# 处理异常
try:
    with request.urlopen('https://httpbin.org/status/404') as response:
        pass
except error.HTTPError as e:
    print(f"HTTP错误: {e.code} {e.reason}")
except error.URLError as e:
    print(f"URL错误: {e.reason}")

11.6.2 http.server模块 - 简单HTTP服务器

# 命令行运行: python -m http.server 8000
 
# 或编程方式
from http.server import HTTPServer, BaseHTTPRequestHandler
 
class SimpleHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()
 
        response = f"""
        <html>
        <body>
            <h1>Hello from Python!</h1>
            <p>路径: {self.path}</p>
        </body>
        </html>
        """
        self.wfile.write(response.encode())
 
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
 
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
 
        import json
        response = {'received': post_data.decode()}
        self.wfile.write(json.dumps(response).encode())
 
if __name__ == '__main__':
    server = HTTPServer(('localhost', 8000), SimpleHandler)
    print('服务器运行在 http://localhost:8000')
    server.serve_forever()

11.7 函数式编程工具

11.7.1 itertools模块

import itertools
 
# 无限迭代器
counter = itertools.count(start=10, step=2)
print(list(next(counter) for _ in range(5)))  # [10, 12, 14, 16, 18]
 
cycle = itertools.cycle(['A', 'B', 'C'])
print(list(next(cycle) for _ in range(7)))    # ['A', 'B', 'C', 'A', 'B', 'C', 'A']
 
repeat = itertools.repeat('X', 5)
print(list(repeat))                 # ['X', 'X', 'X', 'X', 'X']
 
# 组合迭代器
items = ['A', 'B', 'C']
 
# 笛卡尔积
print(list(itertools.product(items, repeat=2)))
# [('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'A'), ...]
 
# 排列
print(list(itertools.permutations(items, 2)))
# [('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ...]
 
# 组合
print(list(itertools.combinations(items, 2)))
# [('A', 'B'), ('A', 'C'), ('B', 'C')]
 
# 可重复组合
print(list(itertools.combinations_with_replacement(items, 2)))
# [('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ...]
 
# 分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('C', 5)]
for key, group in itertools.groupby(data, key=lambda x: x[0]):
    print(f"{key}: {list(group)}")
 
# 链式迭代
list1 = [1, 2, 3]
list2 = ['a', 'b', 'c']
list3 = [True, False]
print(list(itertools.chain(list1, list2, list3)))
# [1, 2, 3, 'a', 'b', 'c', True, False]
 
# 压缩
names = ['Alice', 'Bob', 'Charlie']
ages = [25, 30]
print(list(itertools.zip_longest(names, ages, fillvalue='Unknown')))
# [('Alice', 25), ('Bob', 30), ('Charlie', 'Unknown')]

11.7.2 functools模块

from functools import lru_cache, partial, reduce, wraps
 
# lru_cache - 函数结果缓存
@lru_cache(maxsize=128)
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)
 
# 比较有无缓存的性能
import time
 
start = time.time()
print(fibonacci(35))
print(f"首次调用: {time.time() - start:.4f}秒")
 
start = time.time()
print(fibonacci(35))
print(f"缓存调用: {time.time() - start:.6f}秒")
 
# partial - 部分参数绑定
from operator import mul
double = partial(mul, 2)
print(double(5))                    # 10
 
def greet(greeting, name):
    return f"{greeting}, {name}!"
 
say_hello = partial(greet, "Hello")
print(say_hello("Alice"))           # Hello, Alice!
 
# reduce
from operator import add
numbers = [1, 2, 3, 4, 5]
total = reduce(add, numbers)        # 等同于 sum(numbers)
product = reduce(mul, numbers)      # 120
 
# 自定义reduce操作
max_value = reduce(lambda x, y: x if x > y else y, numbers)
 
# wraps - 保留装饰器函数的元信息
def my_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        """Wrapper docstring"""
        print("Before")
        result = func(*args, **kwargs)
        print("After")
        return result
    return wrapper
 
@my_decorator
def example():
    """Example function"""
    pass
 
print(example.__name__)             # example(使用@wraps)
print(example.__doc__)              # Example function

11.8 随机和数学

11.8.1 random模块

import random
 
# 设置随机种子(可重复)
random.seed(42)
 
# 随机整数
print(random.randint(1, 100))       # 1-100之间
print(random.randrange(0, 100, 2))  # 0-98之间的偶数
 
# 随机浮点数
print(random.random())              # 0.0 - 1.0
print(random.uniform(1.5, 10.5))    # 1.5 - 10.5
 
# 序列操作
items = ['apple', 'banana', 'cherry', 'date']
print(random.choice(items))         # 随机选择一项
print(random.choices(items, k=3))   # 随机选择3项(可重复)
print(random.sample(items, k=2))    # 随机选择2项(不重复)
 
random.shuffle(items)               # 原地打乱
print(items)
 
# 加权随机
weights = [10, 1, 1, 1]             # apple被选中的概率更高
print(random.choices(items, weights=weights, k=5))
 
# 高斯分布
print(random.gauss(mu=0, sigma=1))  # 均值0,标准差1

11.8.2 statistics模块

import statistics
 
data = [2, 4, 4, 4, 5, 5, 7, 9]
 
print(f"均值: {statistics.mean(data)}")           # 5.0
print(f"中位数: {statistics.median(data)}")       # 4.5
print(f"众数: {statistics.mode(data)}")           # 4
print(f"方差: {statistics.variance(data)}")       # 4.571...
print(f"标准差: {statistics.stdev(data)}")        # 2.138...
 
# 其他统计量
print(f"几何平均: {statistics.geometric_mean([2, 8])}")  # 4.0
print(f"调和平均: {statistics.harmonic_mean([2, 4, 8])}")  # 3.428...

11.9 上下文管理器

11.9.1 contextlib模块

from contextlib import contextmanager, suppress, redirect_stdout
import io
 
# 自定义上下文管理器(装饰器方式)
@contextmanager
def managed_resource(name):
    print(f"获取资源: {name}")
    resource = {"name": name, "data": []}
    try:
        yield resource
    finally:
        print(f"释放资源: {name}")
        resource.clear()
 
# 使用
with managed_resource("database") as db:
    db["data"].append("记录1")
    print(f"使用资源: {db}")
 
# suppress - 忽略指定异常
with suppress(FileNotFoundError):
    with open('nonexistent.txt') as f:
        content = f.read()
print("继续执行")
 
# redirect_stdout
output = io.StringIO()
with redirect_stdout(output):
    print("这行输出被捕获")
    print("另一行")
 
captured = output.getvalue()
print(f"捕获的内容: {captured}")
 
# ExitStack - 管理多个上下文
from contextlib import ExitStack
 
with ExitStack() as stack:
    files = [stack.enter_context(open(fname)) for fname in ['a.txt', 'b.txt']]
    # 所有文件会在退出时自动关闭

11.10 本章习题

习题1:日志分析器

编写一个日志分析工具,功能包括:

习题2:配置文件管理器

实现一个配置管理类,支持:

习题3:正则表达式工具

创建正则表达式工具函数:

习题4:文件批处理工具

使用pathlib和shutil实现:

习题5:时间工具类

实现TimeUtil类,提供以下方法:

11.11 总结

本章介绍了Python标准库中最实用的模块:

熟练掌握这些标准库模块,能够解决大部分日常编程任务,避免引入不必要的第三方依赖。