====== 第七章 迭代器与生成器 ======
===== 本章概要 =====
迭代器和生成器是Python中处理序列数据的强大工具。本章将深入探讨迭代协议、生成器函数、yield表达式和协程等高级特性。
===== 7.1 迭代协议 =====
==== 7.1.1 可迭代对象与迭代器 ====
# 可迭代对象(Iterable): 实现了__iter__()的对象
# 迭代器(Iterator): 实现了__iter__()和__next__()的对象
# 获取迭代器
my_list = [1, 2, 3]
iterator = iter(my_list) # 调用__iter__()
# 使用迭代器
print(next(iterator)) # 1
print(next(iterator)) # 2
print(next(iterator)) # 3
# print(next(iterator)) # StopIteration异常
==== 7.1.2 自定义迭代器 ====
class CountDown:
"""倒数迭代器"""
def __init__(self, start):
self.start = start
def __iter__(self):
# 返回迭代器对象自身
return self
def __next__(self):
if self.start <= 0:
raise StopIteration
self.start -= 1
return self.start + 1
# 使用
for num in CountDown(5):
print(num, end=" ") # 5 4 3 2 1
==== 7.1.3 可迭代对象 vs 迭代器 ====
class Range:
"""可迭代对象(每次返回新的迭代器)"""
def __init__(self, start, end):
self.start = start
self.end = end
def __iter__(self):
# 每次返回新的迭代器
return RangeIterator(self.start, self.end)
class RangeIterator:
"""迭代器"""
def __init__(self, start, end):
self.current = start
self.end = end
def __iter__(self):
return self
def __next__(self):
if self.current >= self.end:
raise StopIteration
num = self.current
self.current += 1
return num
# 可以多次迭代
r = Range(1, 4)
print(list(r)) # [1, 2, 3]
print(list(r)) # [1, 2, 3]
===== 7.2 生成器 =====
==== 7.2.1 生成器函数 ====
使用 ``yield`` 关键字的函数就是生成器函数。
def countdown(n):
"""生成器函数"""
print(f"Starting countdown from {n}")
while n > 0:
yield n
n -= 1
print("Countdown finished!")
# 创建生成器对象(不会立即执行)
gen = countdown(3)
# 逐个获取值
print(next(gen)) # Starting... 3
print(next(gen)) # 2
print(next(gen)) # 1
# print(next(gen)) # StopIteration
==== 7.2.2 生成器的状态 ====
def generator_example():
print("Start")
yield 1
print("Continue")
yield 2
print("End")
gen = generator_example()
print("Created generator")
print(next(gen)) # Start, 1
print("---")
print(next(gen)) # Continue, 2
print("---")
# print(next(gen)) # End, StopIteration
==== 7.2.3 生成器表达式 ====
# 列表推导式 - 立即计算
squares_list = [x**2 for x in range(1000000)]
# 生成器表达式 - 惰性求值
squares_gen = (x**2 for x in range(1000000))
print(sum(squares_gen)) # 按需计算,节省内存
# 生成器表达式作为函数参数
print(sum(x**2 for x in range(10)))
print(max(len(word) for word in ["hello", "world", "python"]))
==== 7.2.4 生成器方法 ====
def counter(maximum):
i = 0
while i < maximum:
val = yield i
print(f"Got value: {val}")
if val is not None:
i = val
else:
i += 1
gen = counter(10)
print(next(gen)) # 0
print(gen.send(5)) # Got value: 5, 5
print(next(gen)) # Got value: None, 6
# 抛出异常
gen.throw(ValueError, "Custom error")
# 关闭生成器
gen.close()
===== 7.3 yield from =====
==== 7.3.1 委托子生成器 ====
def sub_generator():
yield 1
yield 2
yield 3
def main_generator():
yield "A"
yield from sub_generator() # 委托给子生成器
yield "B"
print(list(main_generator())) # ['A', 1, 2, 3, 'B']
==== 7.3.2 双向通信 ====
def accumulator():
total = 0
while True:
value = yield total
if value is None:
break
total += value
return total
def delegator():
result = yield from accumulator()
print(f"Final total: {result}")
return result
d = delegator()
print(next(d)) # 0
print(d.send(10)) # 10
print(d.send(20)) # 30
print(d.send(30)) # 60
try:
d.send(None)
except StopIteration as e:
print(f"Returned: {e.value}") # Final total: 60, Returned: 60
===== 7.4 生成器应用 =====
==== 7.4.1 无限序列 ====
def fibonacci():
"""无限斐波那契数列"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 获取前10个斐波那契数
fib = fibonacci()
print([next(fib) for _ in range(10)])
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
==== 7.4.2 文件逐行读取 ====
def read_large_file(file_path):
"""逐行读取大文件"""
with open(file_path, 'r') as f:
for line in f:
yield line.strip()
# 处理大文件而不占用大量内存
# for line in read_large_file("large_file.txt"):
# process(line)
==== 7.4.3 流水线处理 ====
def read_lines(file_path):
"""读取行"""
with open(file_path) as f:
for line in f:
yield line.strip()
def filter_comments(lines):
"""过滤注释"""
for line in lines:
if not line.startswith('#'):
yield line
def convert_to_int(lines):
"""转换为整数"""
for line in lines:
try:
yield int(line)
except ValueError:
pass
# 流水线处理
# numbers = convert_to_int(filter_comments(read_lines("data.txt")))
===== 7.5 协程(Coroutine) =====
==== 7.5.1 基本概念 ====
协程是可以在执行过程中暂停和恢复的函数,用于异步编程。
def simple_coroutine():
print("协程启动")
x = yield # 暂停,等待值传入
print(f"收到值: {x}")
y = yield
print(f"收到值: {y}")
print("协程结束")
# 启动协程
coro = simple_coroutine()
next(coro) # 预激(prime)协程
coro.send(10) # 发送值
coro.send(20)
==== 7.5.2 装饰器预激 ====
from functools import wraps
def coroutine(func):
"""预激协程装饰器"""
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
@coroutine
def averager():
"""计算移动平均值"""
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count
# 使用
avg = averager() # 已经预激
print(avg.send(10)) # 10.0
print(avg.send(20)) # 15.0
print(avg.send(30)) # 20.0
==== 7.5.3 使用协程处理数据 ====
@coroutine
def printer():
"""打印接收到的数据"""
while True:
data = yield
print(f"Received: {data}")
@coroutine
def filter_target(target, predicate):
"""过滤数据"""
while True:
data = yield
if predicate(data):
target.send(data)
@coroutine
def broadcast(targets):
"""广播到多个目标"""
while True:
data = yield
for target in targets:
target.send(data)
# 使用
p = printer()
f = filter_target(p, lambda x: x > 5)
f.send(3) # 被过滤
f.send(10) # Received: 10
===== 7.6 itertools模块 =====
import itertools
# 无限迭代器
counter = itertools.count(start=10, step=2) # 10, 12, 14, ...
cycle = itertools.cycle([1, 2, 3]) # 1, 2, 3, 1, 2, 3, ...
repeat = itertools.repeat(10, 3) # 10, 10, 10
# 有限迭代器
chain = itertools.chain([1, 2], [3, 4], [5, 6]) # 1, 2, 3, 4, 5, 6
compress = itertools.compress('ABCDEF', [1, 0, 1, 0, 1, 1]) # A, C, E, F
dropwhile = itertools.dropwhile(lambda x: x < 5, [1, 3, 5, 7, 2]) # 5, 7, 2
takewhile = itertools.takewhile(lambda x: x < 5, [1, 3, 5, 7, 2]) # 1, 3
# 组合生成器
product = itertools.product([1, 2], ['a', 'b']) # (1,a), (1,b), (2,a), (2,b)
permutations = itertools.permutations([1, 2, 3], 2) # 排列
combinations = itertools.combinations([1, 2, 3], 2) # 组合
combinations_r = itertools.combinations_with_replacement([1, 2], 2)
print(list(combinations))
===== 7.7 代码示例 =====
==== 示例1:实现一个树形结构的迭代器 ====
class TreeNode:
def __init__(self, value, children=None):
self.value = value
self.children = children or []
def __iter__(self):
"""深度优先遍历"""
yield self.value
for child in self.children:
yield from child
def dfs(self):
"""深度优先搜索"""
yield self.value
for child in self.children:
yield from child.dfs()
def bfs(self):
"""广度优先搜索"""
from collections import deque
queue = deque([self])
while queue:
node = queue.popleft()
yield node.value
queue.extend(node.children)
# 构建树
root = TreeNode("A", [
TreeNode("B", [
TreeNode("D"),
TreeNode("E")
]),
TreeNode("C", [
TreeNode("F")
])
])
print("DFS:", list(root.dfs())) # ['A', 'B', 'D', 'E', 'C', 'F']
print("BFS:", list(root.bfs())) # ['A', 'B', 'C', 'D', 'E', 'F']
==== 示例2:使用生成器实现上下文管理器 ====
from contextlib import contextmanager
@contextmanager
def managed_resource(name):
"""使用生成器实现上下文管理器"""
print(f"Acquiring {name}...")
resource = f"Resource({name})"
try:
yield resource
finally:
print(f"Releasing {name}...")
# 使用
with managed_resource("database") as db:
print(f"Using {db}")
# 等效于
class ManagedResource:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f"Acquiring {self.name}...")
return f"Resource({self.name})"
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"Releasing {self.name}...")
===== 7.8 练习题 =====
==== 练习1:实现flatten生成器 ====
def flatten(nested):
"""展平嵌套列表"""
for item in nested:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
# 测试
nested = [1, [2, [3, 4], 5], 6, [7, 8]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8]
==== 练习2:实现惰性读取大文件的类 ====
class LazyFileReader:
"""惰性文件读取器"""
def __init__(self, file_path, chunk_size=1024):
self.file_path = file_path
self.chunk_size = chunk_size
def __iter__(self):
with open(self.file_path, 'r') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
yield chunk
def lines(self):
"""逐行读取"""
with open(self.file_path, 'r') as f:
for line in f:
yield line.rstrip('\n')
# 使用
# reader = LazyFileReader("large_file.txt")
# for chunk in reader:
# process(chunk)
===== 本章小结 =====
本章学习了Python的迭代器和生成器:
* **迭代协议** - __iter__()和__next__()方法
* **生成器函数** - 使用yield创建惰性迭代器
* **生成器表达式** - 简洁的惰性计算语法
* **yield from** - 委托子生成器
* **协程** - 双向数据传递和异步编程基础
* **itertools** - 高效的迭代工具函数
迭代器和生成器让Python能够高效处理大量数据,是编写Pythonic代码的重要工具。
===== 进一步阅读 =====
* [[https://docs.python.org/zh-cn/3/library/stdtypes.html#iterator-types|迭代器类型]]
* [[https://docs.python.org/zh-cn/3/library/itertools.html|itertools模块]]