====== 第八章 文件与IO ======
===== 本章概要 =====
文件操作是编程的基础技能。本章将学习Python中文件的读写操作、上下文管理器、序列化和路径处理等内容。
===== 8.1 文件基础 =====
==== 8.1.1 打开文件 ====
使用 ``open()`` 函数打开文件:
# 基本语法
file = open(filename, mode, encoding='utf-8')
# ... 使用文件
file.close()
打开模式:
| 模式 | 说明 |
|------|------|
| 'r' | 只读(默认) |
| 'w' | 只写(覆盖) |
| 'x' | 独占创建,文件已存在则失败 |
| 'a' | 追加写入 |
| 'b' | 二进制模式 |
| 't' | 文本模式(默认) |
| '+' | 读写模式 |
==== 8.1.2 读取文件 ====
# 方法1:读取全部内容
with open('example.txt', 'r', encoding='utf-8') as f:
content = f.read()
print(content)
# 方法2:逐行读取
with open('example.txt', 'r', encoding='utf-8') as f:
for line in f:
print(line.strip())
# 方法3:读取为列表
with open('example.txt', 'r', encoding='utf-8') as f:
lines = f.readlines()
# 方法4:指定读取字节数
with open('example.txt', 'r', encoding='utf-8') as f:
chunk = f.read(1024) # 读取1024个字符
==== 8.1.3 写入文件 ====
# 写入字符串
with open('output.txt', 'w', encoding='utf-8') as f:
f.write('Hello, World!\n')
f.write('第二行内容\n')
# 写入多行
lines = ['第一行\n', '第二行\n', '第三行\n']
with open('output.txt', 'w', encoding='utf-8') as f:
f.writelines(lines)
# 追加写入
with open('output.txt', 'a', encoding='utf-8') as f:
f.write('追加的内容\n')
# print()写入文件
with open('output.txt', 'w', encoding='utf-8') as f:
print('Hello', 'World', file=f, sep=', ')
===== 8.2 上下文管理器 =====
==== 8.2.1 with 语句 ====
# 传统方式
f = open('file.txt', 'r')
try:
content = f.read()
finally:
f.close()
# 使用with语句(推荐)
with open('file.txt', 'r') as f:
content = f.read()
# 文件自动关闭
==== 8.2.2 自定义上下文管理器 ====
class ManagedFile:
def __init__(self, filename, mode='r'):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
# 返回False传播异常,True则抑制异常
return False
# 使用
with ManagedFile('test.txt', 'w') as f:
f.write('Hello')
==== 8.2.3 contextlib模块 ====
from contextlib import contextmanager
@contextmanager
def managed_file(filename, mode='r'):
"""使用生成器创建上下文管理器"""
f = open(filename, mode)
try:
yield f
finally:
f.close()
# 使用
with managed_file('test.txt', 'w') as f:
f.write('Hello')
# 其他工具
from contextlib import suppress
# 抑制特定异常
with suppress(FileNotFoundError):
os.remove('nonexistent_file.txt')
===== 8.3 文件路径处理 =====
==== 8.3.1 os.path 模块 ====
import os
# 路径拼接
path = os.path.join('folder', 'subfolder', 'file.txt')
# 获取绝对路径
abs_path = os.path.abspath('file.txt')
# 获取路径组件
dirname = os.path.dirname('/home/user/file.txt') # /home/user
basename = os.path.basename('/home/user/file.txt') # file.txt
split = os.path.split('/home/user/file.txt') # ('/home/user', 'file.txt')
splitext = os.path.splitext('file.txt') # ('file', '.txt')
# 检查路径
exists = os.path.exists('file.txt')
isfile = os.path.isfile('file.txt')
isdir = os.path.isdir('folder')
isabs = os.path.isabs('/home/user')
==== 8.3.2 pathlib 模块(推荐) ====
from pathlib import Path
# 创建Path对象
p = Path('/home/user/documents')
# 路径拼接
file_path = p / 'file.txt'
# 获取信息
print(file_path.name) # file.txt
print(file_path.stem) # file
print(file_path.suffix) # .txt
print(file_path.parent) # /home/user/documents
print(file_path.parts) # ('/', 'home', 'user', 'documents', 'file.txt')
# 路径操作
new_path = file_path.with_suffix('.md')
absolute = file_path.resolve()
relative = file_path.relative_to('/home/user')
# 检查路径
print(file_path.exists())
print(file_path.is_file())
print(file_path.is_dir())
# 创建目录
new_dir = Path('new_folder')
new_dir.mkdir(parents=True, exist_ok=True)
# 遍历目录
for file in Path('.').glob('*.txt'):
print(file)
# 递归遍历
for file in Path('.').rglob('*.py'):
print(file)
===== 8.4 文件系统操作 =====
import os
import shutil
from pathlib import Path
# 创建目录
os.mkdir('new_dir') # 创建单级目录
os.makedirs('a/b/c', exist_ok=True) # 创建多级目录
Path('new_dir').mkdir(parents=True, exist_ok=True)
# 删除
os.remove('file.txt') # 删除文件
os.rmdir('empty_dir') # 删除空目录
shutil.rmtree('dir') # 删除目录树
Path('file.txt').unlink() # 删除文件
# 复制
shutil.copy('src.txt', 'dst.txt') # 复制文件
shutil.copy2('src.txt', 'dst.txt') # 保留元数据
shutil.copytree('src_dir', 'dst_dir') # 复制目录
# 移动/重命名
os.rename('old.txt', 'new.txt')
shutil.move('src', 'dst')
Path('old.txt').rename('new.txt')
# 遍历目录
for root, dirs, files in os.walk('.'):
for file in files:
print(os.path.join(root, file))
===== 8.5 序列化 =====
==== 8.5.1 pickle 模块 ====
import pickle
data = {'name': 'Alice', 'age': 25, 'scores': [90, 85, 88]}
# 序列化到文件
with open('data.pkl', 'wb') as f:
pickle.dump(data, f)
# 从文件反序列化
with open('data.pkl', 'rb') as f:
loaded_data = pickle.load(f)
print(loaded_data)
# 序列化为字节串
pickled = pickle.dumps(data)
original = pickle.loads(pickled)
==== 8.5.2 JSON 序列化 ====
import json
data = {
'name': 'Alice',
'age': 25,
'is_student': False,
'courses': ['Math', 'Physics'],
'address': None
}
# 序列化为JSON字符串
json_str = json.dumps(data)
print(json_str)
# 美化输出
pretty = json.dumps(data, indent=2, ensure_ascii=False)
print(pretty)
# 保存到文件
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# 从JSON加载
with open('data.json', 'r', encoding='utf-8') as f:
loaded = json.load(f)
# 处理自定义对象
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def person_to_dict(obj):
if isinstance(obj, Person):
return {'name': obj.name, 'age': obj.age, '__type__': 'Person'}
raise TypeError(f'Object of type {obj.__class__.__name__} is not JSON serializable')
person = Person('Bob', 30)
json_str = json.dumps(person, default=person_to_dict)
==== 8.5.3 CSV 文件处理 ====
import csv
# 写入CSV
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Name', 'Age', 'City'])
writer.writerow(['Alice', 25, 'Beijing'])
writer.writerow(['Bob', 30, 'Shanghai'])
# 读取CSV
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
print(row)
# 使用字典方式
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
fieldnames = ['name', 'age', 'city']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'name': 'Alice', 'age': 25, 'city': 'Beijing'})
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['name'], row['age'])
===== 8.6 二进制文件操作 =====
# 读取二进制文件
with open('image.png', 'rb') as f:
data = f.read()
print(f"文件大小: {len(data)} 字节")
# 写入二进制文件
with open('copy.png', 'wb') as f:
f.write(data)
# 使用struct处理二进制数据
import struct
# 打包数据(一个整数和两个浮点数)
data = struct.pack('iff', 42, 3.14, 2.71)
# 解包
def unpack_data(data):
return struct.unpack('iff', data)
print(unpack_data(data)) # (42, 3.14, 2.71)
===== 8.7 临时文件 =====
import tempfile
# 临时文件
with tempfile.TemporaryFile(mode='w+t') as f:
f.write('Hello, World!')
f.seek(0)
print(f.read())
# 文件自动删除
# 命名临时文件
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write('内容')
print(f.name)
# 临时目录
with tempfile.TemporaryDirectory() as tmpdir:
print(tmpdir)
# 在此目录中创建文件
# 目录自动删除
===== 8.8 代码示例 =====
==== 示例1:文件搜索工具 ====
from pathlib import Path
def find_files(pattern, root='.', recursive=True):
"""搜索文件"""
root_path = Path(root)
if recursive:
return list(root_path.rglob(pattern))
return list(root_path.glob(pattern))
def find_by_size(min_size=None, max_size=None, root='.'):
"""按大小搜索文件"""
results = []
for path in Path(root).rglob('*'):
if path.is_file():
size = path.stat().st_size
if (min_size is None or size >= min_size) and \
(max_size is None or size <= max_size):
results.append((path, size))
return results
# 使用
# py_files = find_files('*.py')
# large_files = find_by_size(min_size=1024*1024) # 大于1MB的文件
==== 示例2:配置管理器 ====
import json
from pathlib import Path
class ConfigManager:
"""JSON配置管理器"""
def __init__(self, config_file='config.json'):
self.config_file = Path(config_file)
self._config = {}
self.load()
def load(self):
"""加载配置"""
if self.config_file.exists():
with open(self.config_file, 'r', encoding='utf-8') as f:
self._config = json.load(f)
def save(self):
"""保存配置"""
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self._config, f, indent=2, ensure_ascii=False)
def get(self, key, default=None):
"""获取配置值"""
keys = key.split('.')
value = self._config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
if value is None:
return default
else:
return default
return value
def set(self, key, value):
"""设置配置值"""
keys = key.split('.')
config = self._config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
self.save()
# 使用
config = ConfigManager()
config.set('database.host', 'localhost')
config.set('database.port', 3306)
print(config.get('database.host')) # localhost
===== 8.9 练习题 =====
==== 练习1:文件合并工具 ====
from pathlib import Path
def merge_files(output_file, *input_files, separator='\n'):
"""合并多个文件"""
with open(output_file, 'w', encoding='utf-8') as out:
for i, input_file in enumerate(input_files):
if i > 0:
out.write(separator)
with open(input_file, 'r', encoding='utf-8') as f:
out.write(f.read())
# 使用
# merge_files('output.txt', 'file1.txt', 'file2.txt', 'file3.txt')
==== 练习2:目录同步工具 ====
import shutil
from pathlib import Path
def sync_directories(src, dst, delete=False):
"""同步源目录到目标目录"""
src_path = Path(src)
dst_path = Path(dst)
# 确保目标目录存在
dst_path.mkdir(parents=True, exist_ok=True)
# 复制/更新文件
for src_file in src_path.rglob('*'):
if src_file.is_file():
rel_path = src_file.relative_to(src_path)
dst_file = dst_path / rel_path
dst_file.parent.mkdir(parents=True, exist_ok=True)
if not dst_file.exists() or \
src_file.stat().st_mtime > dst_file.stat().st_mtime:
shutil.copy2(src_file, dst_file)
print(f"Copied: {rel_path}")
# 可选:删除目标目录中多余的文件
if delete:
for dst_file in dst_path.rglob('*'):
if dst_file.is_file():
rel_path = dst_file.relative_to(dst_path)
src_file = src_path / rel_path
if not src_file.exists():
dst_file.unlink()
print(f"Deleted: {rel_path}")
===== 本章小结 =====
本章学习了Python的文件和IO操作:
* **文件操作** - 打开、读取、写入文件
* **上下文管理器** - with语句和自定义上下文管理器
* **路径处理** - os.path和pathlib模块
* **文件系统操作** - 创建、删除、复制、移动文件
* **序列化** - pickle、JSON、CSV格式
* **二进制文件** - 二进制读写和struct模块
掌握文件操作是进行数据处理和应用开发的基础。
===== 进一步阅读 =====
* [[https://docs.python.org/zh-cn/3/tutorial/inputoutput.html|输入输出教程]]
* [[https://docs.python.org/zh-cn/3/library/pathlib.html|pathlib模块]]