目录

第八章 文件与IO

本章概要

文件操作是编程的基础技能。本章将学习Python中文件的读写操作、上下文管理器、序列化和路径处理等内容。

8.1 文件基础

8.1.1 打开文件

使用 ``open()`` 函数打开文件:

# 基本语法
file = open(filename, mode, encoding='utf-8')
# ... 使用文件
file.close()

打开模式:

模式 说明
————
'r' 只读(默认)
'w' 只写(覆盖)
'x' 独占创建,文件已存在则失败
'a' 追加写入
'b' 二进制模式
't' 文本模式(默认)
'+' 读写模式

8.1.2 读取文件

# 方法1:读取全部内容
with open('example.txt', 'r', encoding='utf-8') as f:
    content = f.read()
    print(content)
 
# 方法2:逐行读取
with open('example.txt', 'r', encoding='utf-8') as f:
    for line in f:
        print(line.strip())
 
# 方法3:读取为列表
with open('example.txt', 'r', encoding='utf-8') as f:
    lines = f.readlines()
 
# 方法4:指定读取字节数
with open('example.txt', 'r', encoding='utf-8') as f:
    chunk = f.read(1024)  # 读取1024个字符

8.1.3 写入文件

# 写入字符串
with open('output.txt', 'w', encoding='utf-8') as f:
    f.write('Hello, World!\n')
    f.write('第二行内容\n')
 
# 写入多行
lines = ['第一行\n', '第二行\n', '第三行\n']
with open('output.txt', 'w', encoding='utf-8') as f:
    f.writelines(lines)
 
# 追加写入
with open('output.txt', 'a', encoding='utf-8') as f:
    f.write('追加的内容\n')
 
# print()写入文件
with open('output.txt', 'w', encoding='utf-8') as f:
    print('Hello', 'World', file=f, sep=', ')

8.2 上下文管理器

8.2.1 with 语句

# 传统方式
f = open('file.txt', 'r')
try:
    content = f.read()
finally:
    f.close()
 
# 使用with语句(推荐)
with open('file.txt', 'r') as f:
    content = f.read()
# 文件自动关闭

8.2.2 自定义上下文管理器

class ManagedFile:
    def __init__(self, filename, mode='r'):
        self.filename = filename
        self.mode = mode
        self.file = None
 
    def __enter__(self):
        self.file = open(self.filename, self.mode)
        return self.file
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.file:
            self.file.close()
        # 返回False传播异常,True则抑制异常
        return False
 
# 使用
with ManagedFile('test.txt', 'w') as f:
    f.write('Hello')

8.2.3 contextlib模块

from contextlib import contextmanager
 
@contextmanager
def managed_file(filename, mode='r'):
    """使用生成器创建上下文管理器"""
    f = open(filename, mode)
    try:
        yield f
    finally:
        f.close()
 
# 使用
with managed_file('test.txt', 'w') as f:
    f.write('Hello')
 
# 其他工具
from contextlib import suppress
 
# 抑制特定异常
with suppress(FileNotFoundError):
    os.remove('nonexistent_file.txt')

8.3 文件路径处理

8.3.1 os.path 模块

import os
 
# 路径拼接
path = os.path.join('folder', 'subfolder', 'file.txt')
 
# 获取绝对路径
abs_path = os.path.abspath('file.txt')
 
# 获取路径组件
dirname = os.path.dirname('/home/user/file.txt')   # /home/user
basename = os.path.basename('/home/user/file.txt') # file.txt
split = os.path.split('/home/user/file.txt')       # ('/home/user', 'file.txt')
splitext = os.path.splitext('file.txt')            # ('file', '.txt')
 
# 检查路径
exists = os.path.exists('file.txt')
isfile = os.path.isfile('file.txt')
isdir = os.path.isdir('folder')
isabs = os.path.isabs('/home/user')

8.3.2 pathlib 模块(推荐)

from pathlib import Path
 
# 创建Path对象
p = Path('/home/user/documents')
 
# 路径拼接
file_path = p / 'file.txt'
 
# 获取信息
print(file_path.name)      # file.txt
print(file_path.stem)      # file
print(file_path.suffix)    # .txt
print(file_path.parent)    # /home/user/documents
print(file_path.parts)     # ('/', 'home', 'user', 'documents', 'file.txt')
 
# 路径操作
new_path = file_path.with_suffix('.md')
absolute = file_path.resolve()
relative = file_path.relative_to('/home/user')
 
# 检查路径
print(file_path.exists())
print(file_path.is_file())
print(file_path.is_dir())
 
# 创建目录
new_dir = Path('new_folder')
new_dir.mkdir(parents=True, exist_ok=True)
 
# 遍历目录
for file in Path('.').glob('*.txt'):
    print(file)
 
# 递归遍历
for file in Path('.').rglob('*.py'):
    print(file)

8.4 文件系统操作

import os
import shutil
from pathlib import Path
 
# 创建目录
os.mkdir('new_dir')                    # 创建单级目录
os.makedirs('a/b/c', exist_ok=True)    # 创建多级目录
Path('new_dir').mkdir(parents=True, exist_ok=True)
 
# 删除
os.remove('file.txt')                  # 删除文件
os.rmdir('empty_dir')                  # 删除空目录
shutil.rmtree('dir')                   # 删除目录树
Path('file.txt').unlink()              # 删除文件
 
# 复制
shutil.copy('src.txt', 'dst.txt')      # 复制文件
shutil.copy2('src.txt', 'dst.txt')     # 保留元数据
shutil.copytree('src_dir', 'dst_dir')  # 复制目录
 
# 移动/重命名
os.rename('old.txt', 'new.txt')
shutil.move('src', 'dst')
Path('old.txt').rename('new.txt')
 
# 遍历目录
for root, dirs, files in os.walk('.'):
    for file in files:
        print(os.path.join(root, file))

8.5 序列化

8.5.1 pickle 模块

import pickle
 
data = {'name': 'Alice', 'age': 25, 'scores': [90, 85, 88]}
 
# 序列化到文件
with open('data.pkl', 'wb') as f:
    pickle.dump(data, f)
 
# 从文件反序列化
with open('data.pkl', 'rb') as f:
    loaded_data = pickle.load(f)
 
print(loaded_data)
 
# 序列化为字节串
pickled = pickle.dumps(data)
original = pickle.loads(pickled)

8.5.2 JSON 序列化

import json
 
data = {
    'name': 'Alice',
    'age': 25,
    'is_student': False,
    'courses': ['Math', 'Physics'],
    'address': None
}
 
# 序列化为JSON字符串
json_str = json.dumps(data)
print(json_str)
 
# 美化输出
pretty = json.dumps(data, indent=2, ensure_ascii=False)
print(pretty)
 
# 保存到文件
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, indent=2, ensure_ascii=False)
 
# 从JSON加载
with open('data.json', 'r', encoding='utf-8') as f:
    loaded = json.load(f)
 
# 处理自定义对象
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age
 
def person_to_dict(obj):
    if isinstance(obj, Person):
        return {'name': obj.name, 'age': obj.age, '__type__': 'Person'}
    raise TypeError(f'Object of type {obj.__class__.__name__} is not JSON serializable')
 
person = Person('Bob', 30)
json_str = json.dumps(person, default=person_to_dict)

8.5.3 CSV 文件处理

import csv
 
# 写入CSV
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerow(['Name', 'Age', 'City'])
    writer.writerow(['Alice', 25, 'Beijing'])
    writer.writerow(['Bob', 30, 'Shanghai'])
 
# 读取CSV
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)
 
# 使用字典方式
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['name', 'age', 'city']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({'name': 'Alice', 'age': 25, 'city': 'Beijing'})
 
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row['name'], row['age'])

8.6 二进制文件操作

# 读取二进制文件
with open('image.png', 'rb') as f:
    data = f.read()
    print(f"文件大小: {len(data)} 字节")
 
# 写入二进制文件
with open('copy.png', 'wb') as f:
    f.write(data)
 
# 使用struct处理二进制数据
import struct
 
# 打包数据(一个整数和两个浮点数)
data = struct.pack('iff', 42, 3.14, 2.71)
 
# 解包
def unpack_data(data):
    return struct.unpack('iff', data)
 
print(unpack_data(data))  # (42, 3.14, 2.71)

8.7 临时文件

import tempfile
 
# 临时文件
with tempfile.TemporaryFile(mode='w+t') as f:
    f.write('Hello, World!')
    f.seek(0)
    print(f.read())
# 文件自动删除
 
# 命名临时文件
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
    f.write('内容')
    print(f.name)
 
# 临时目录
with tempfile.TemporaryDirectory() as tmpdir:
    print(tmpdir)
    # 在此目录中创建文件
# 目录自动删除

8.8 代码示例

示例1:文件搜索工具

from pathlib import Path
 
def find_files(pattern, root='.', recursive=True):
    """搜索文件"""
    root_path = Path(root)
    if recursive:
        return list(root_path.rglob(pattern))
    return list(root_path.glob(pattern))
 
def find_by_size(min_size=None, max_size=None, root='.'):
    """按大小搜索文件"""
    results = []
    for path in Path(root).rglob('*'):
        if path.is_file():
            size = path.stat().st_size
            if (min_size is None or size >= min_size) and \
               (max_size is None or size <= max_size):
                results.append((path, size))
    return results
 
# 使用
# py_files = find_files('*.py')
# large_files = find_by_size(min_size=1024*1024)  # 大于1MB的文件

示例2:配置管理器

import json
from pathlib import Path
 
class ConfigManager:
    """JSON配置管理器"""
 
    def __init__(self, config_file='config.json'):
        self.config_file = Path(config_file)
        self._config = {}
        self.load()
 
    def load(self):
        """加载配置"""
        if self.config_file.exists():
            with open(self.config_file, 'r', encoding='utf-8') as f:
                self._config = json.load(f)
 
    def save(self):
        """保存配置"""
        with open(self.config_file, 'w', encoding='utf-8') as f:
            json.dump(self._config, f, indent=2, ensure_ascii=False)
 
    def get(self, key, default=None):
        """获取配置值"""
        keys = key.split('.')
        value = self._config
        for k in keys:
            if isinstance(value, dict):
                value = value.get(k)
                if value is None:
                    return default
            else:
                return default
        return value
 
    def set(self, key, value):
        """设置配置值"""
        keys = key.split('.')
        config = self._config
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        config[keys[-1]] = value
        self.save()
 
# 使用
config = ConfigManager()
config.set('database.host', 'localhost')
config.set('database.port', 3306)
print(config.get('database.host'))  # localhost

8.9 练习题

练习1:文件合并工具

from pathlib import Path
 
def merge_files(output_file, *input_files, separator='\n'):
    """合并多个文件"""
    with open(output_file, 'w', encoding='utf-8') as out:
        for i, input_file in enumerate(input_files):
            if i > 0:
                out.write(separator)
            with open(input_file, 'r', encoding='utf-8') as f:
                out.write(f.read())
 
# 使用
# merge_files('output.txt', 'file1.txt', 'file2.txt', 'file3.txt')

练习2:目录同步工具

import shutil
from pathlib import Path
 
def sync_directories(src, dst, delete=False):
    """同步源目录到目标目录"""
    src_path = Path(src)
    dst_path = Path(dst)
 
    # 确保目标目录存在
    dst_path.mkdir(parents=True, exist_ok=True)
 
    # 复制/更新文件
    for src_file in src_path.rglob('*'):
        if src_file.is_file():
            rel_path = src_file.relative_to(src_path)
            dst_file = dst_path / rel_path
            dst_file.parent.mkdir(parents=True, exist_ok=True)
 
            if not dst_file.exists() or \
               src_file.stat().st_mtime > dst_file.stat().st_mtime:
                shutil.copy2(src_file, dst_file)
                print(f"Copied: {rel_path}")
 
    # 可选:删除目标目录中多余的文件
    if delete:
        for dst_file in dst_path.rglob('*'):
            if dst_file.is_file():
                rel_path = dst_file.relative_to(dst_path)
                src_file = src_path / rel_path
                if not src_file.exists():
                    dst_file.unlink()
                    print(f"Deleted: {rel_path}")

本章小结

本章学习了Python的文件和IO操作:

掌握文件操作是进行数据处理和应用开发的基础。

进一步阅读