python:第八章文件与io
目录
第八章 文件与IO
本章概要
文件操作是编程的基础技能。本章将学习Python中文件的读写操作、上下文管理器、序列化和路径处理等内容。
8.1 文件基础
8.1.1 打开文件
使用 ``open()`` 函数打开文件:
# 基本语法 file = open(filename, mode, encoding='utf-8') # ... 使用文件 file.close()
打开模式:
| 模式 | 说明 |
| —— | —— |
| 'r' | 只读(默认) |
| 'w' | 只写(覆盖) |
| 'x' | 独占创建,文件已存在则失败 |
| 'a' | 追加写入 |
| 'b' | 二进制模式 |
| 't' | 文本模式(默认) |
| '+' | 读写模式 |
8.1.2 读取文件
# 方法1:读取全部内容 with open('example.txt', 'r', encoding='utf-8') as f: content = f.read() print(content) # 方法2:逐行读取 with open('example.txt', 'r', encoding='utf-8') as f: for line in f: print(line.strip()) # 方法3:读取为列表 with open('example.txt', 'r', encoding='utf-8') as f: lines = f.readlines() # 方法4:指定读取字节数 with open('example.txt', 'r', encoding='utf-8') as f: chunk = f.read(1024) # 读取1024个字符
8.1.3 写入文件
# 写入字符串 with open('output.txt', 'w', encoding='utf-8') as f: f.write('Hello, World!\n') f.write('第二行内容\n') # 写入多行 lines = ['第一行\n', '第二行\n', '第三行\n'] with open('output.txt', 'w', encoding='utf-8') as f: f.writelines(lines) # 追加写入 with open('output.txt', 'a', encoding='utf-8') as f: f.write('追加的内容\n') # print()写入文件 with open('output.txt', 'w', encoding='utf-8') as f: print('Hello', 'World', file=f, sep=', ')
8.2 上下文管理器
8.2.1 with 语句
# 传统方式 f = open('file.txt', 'r') try: content = f.read() finally: f.close() # 使用with语句(推荐) with open('file.txt', 'r') as f: content = f.read() # 文件自动关闭
8.2.2 自定义上下文管理器
class ManagedFile: def __init__(self, filename, mode='r'): self.filename = filename self.mode = mode self.file = None def __enter__(self): self.file = open(self.filename, self.mode) return self.file def __exit__(self, exc_type, exc_val, exc_tb): if self.file: self.file.close() # 返回False传播异常,True则抑制异常 return False # 使用 with ManagedFile('test.txt', 'w') as f: f.write('Hello')
8.2.3 contextlib模块
from contextlib import contextmanager @contextmanager def managed_file(filename, mode='r'): """使用生成器创建上下文管理器""" f = open(filename, mode) try: yield f finally: f.close() # 使用 with managed_file('test.txt', 'w') as f: f.write('Hello') # 其他工具 from contextlib import suppress # 抑制特定异常 with suppress(FileNotFoundError): os.remove('nonexistent_file.txt')
8.3 文件路径处理
8.3.1 os.path 模块
import os # 路径拼接 path = os.path.join('folder', 'subfolder', 'file.txt') # 获取绝对路径 abs_path = os.path.abspath('file.txt') # 获取路径组件 dirname = os.path.dirname('/home/user/file.txt') # /home/user basename = os.path.basename('/home/user/file.txt') # file.txt split = os.path.split('/home/user/file.txt') # ('/home/user', 'file.txt') splitext = os.path.splitext('file.txt') # ('file', '.txt') # 检查路径 exists = os.path.exists('file.txt') isfile = os.path.isfile('file.txt') isdir = os.path.isdir('folder') isabs = os.path.isabs('/home/user')
8.3.2 pathlib 模块(推荐)
from pathlib import Path # 创建Path对象 p = Path('/home/user/documents') # 路径拼接 file_path = p / 'file.txt' # 获取信息 print(file_path.name) # file.txt print(file_path.stem) # file print(file_path.suffix) # .txt print(file_path.parent) # /home/user/documents print(file_path.parts) # ('/', 'home', 'user', 'documents', 'file.txt') # 路径操作 new_path = file_path.with_suffix('.md') absolute = file_path.resolve() relative = file_path.relative_to('/home/user') # 检查路径 print(file_path.exists()) print(file_path.is_file()) print(file_path.is_dir()) # 创建目录 new_dir = Path('new_folder') new_dir.mkdir(parents=True, exist_ok=True) # 遍历目录 for file in Path('.').glob('*.txt'): print(file) # 递归遍历 for file in Path('.').rglob('*.py'): print(file)
8.4 文件系统操作
import os import shutil from pathlib import Path # 创建目录 os.mkdir('new_dir') # 创建单级目录 os.makedirs('a/b/c', exist_ok=True) # 创建多级目录 Path('new_dir').mkdir(parents=True, exist_ok=True) # 删除 os.remove('file.txt') # 删除文件 os.rmdir('empty_dir') # 删除空目录 shutil.rmtree('dir') # 删除目录树 Path('file.txt').unlink() # 删除文件 # 复制 shutil.copy('src.txt', 'dst.txt') # 复制文件 shutil.copy2('src.txt', 'dst.txt') # 保留元数据 shutil.copytree('src_dir', 'dst_dir') # 复制目录 # 移动/重命名 os.rename('old.txt', 'new.txt') shutil.move('src', 'dst') Path('old.txt').rename('new.txt') # 遍历目录 for root, dirs, files in os.walk('.'): for file in files: print(os.path.join(root, file))
8.5 序列化
8.5.1 pickle 模块
import pickle data = {'name': 'Alice', 'age': 25, 'scores': [90, 85, 88]} # 序列化到文件 with open('data.pkl', 'wb') as f: pickle.dump(data, f) # 从文件反序列化 with open('data.pkl', 'rb') as f: loaded_data = pickle.load(f) print(loaded_data) # 序列化为字节串 pickled = pickle.dumps(data) original = pickle.loads(pickled)
8.5.2 JSON 序列化
import json data = { 'name': 'Alice', 'age': 25, 'is_student': False, 'courses': ['Math', 'Physics'], 'address': None } # 序列化为JSON字符串 json_str = json.dumps(data) print(json_str) # 美化输出 pretty = json.dumps(data, indent=2, ensure_ascii=False) print(pretty) # 保存到文件 with open('data.json', 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # 从JSON加载 with open('data.json', 'r', encoding='utf-8') as f: loaded = json.load(f) # 处理自定义对象 class Person: def __init__(self, name, age): self.name = name self.age = age def person_to_dict(obj): if isinstance(obj, Person): return {'name': obj.name, 'age': obj.age, '__type__': 'Person'} raise TypeError(f'Object of type {obj.__class__.__name__} is not JSON serializable') person = Person('Bob', 30) json_str = json.dumps(person, default=person_to_dict)
8.5.3 CSV 文件处理
import csv # 写入CSV with open('data.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Name', 'Age', 'City']) writer.writerow(['Alice', 25, 'Beijing']) writer.writerow(['Bob', 30, 'Shanghai']) # 读取CSV with open('data.csv', 'r', encoding='utf-8') as f: reader = csv.reader(f) for row in reader: print(row) # 使用字典方式 with open('data.csv', 'w', newline='', encoding='utf-8') as f: fieldnames = ['name', 'age', 'city'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerow({'name': 'Alice', 'age': 25, 'city': 'Beijing'}) with open('data.csv', 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: print(row['name'], row['age'])
8.6 二进制文件操作
# 读取二进制文件 with open('image.png', 'rb') as f: data = f.read() print(f"文件大小: {len(data)} 字节") # 写入二进制文件 with open('copy.png', 'wb') as f: f.write(data) # 使用struct处理二进制数据 import struct # 打包数据(一个整数和两个浮点数) data = struct.pack('iff', 42, 3.14, 2.71) # 解包 def unpack_data(data): return struct.unpack('iff', data) print(unpack_data(data)) # (42, 3.14, 2.71)
8.7 临时文件
import tempfile # 临时文件 with tempfile.TemporaryFile(mode='w+t') as f: f.write('Hello, World!') f.seek(0) print(f.read()) # 文件自动删除 # 命名临时文件 with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: f.write('内容') print(f.name) # 临时目录 with tempfile.TemporaryDirectory() as tmpdir: print(tmpdir) # 在此目录中创建文件 # 目录自动删除
8.8 代码示例
示例1:文件搜索工具
from pathlib import Path def find_files(pattern, root='.', recursive=True): """搜索文件""" root_path = Path(root) if recursive: return list(root_path.rglob(pattern)) return list(root_path.glob(pattern)) def find_by_size(min_size=None, max_size=None, root='.'): """按大小搜索文件""" results = [] for path in Path(root).rglob('*'): if path.is_file(): size = path.stat().st_size if (min_size is None or size >= min_size) and \ (max_size is None or size <= max_size): results.append((path, size)) return results # 使用 # py_files = find_files('*.py') # large_files = find_by_size(min_size=1024*1024) # 大于1MB的文件
示例2:配置管理器
import json from pathlib import Path class ConfigManager: """JSON配置管理器""" def __init__(self, config_file='config.json'): self.config_file = Path(config_file) self._config = {} self.load() def load(self): """加载配置""" if self.config_file.exists(): with open(self.config_file, 'r', encoding='utf-8') as f: self._config = json.load(f) def save(self): """保存配置""" with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(self._config, f, indent=2, ensure_ascii=False) def get(self, key, default=None): """获取配置值""" keys = key.split('.') value = self._config for k in keys: if isinstance(value, dict): value = value.get(k) if value is None: return default else: return default return value def set(self, key, value): """设置配置值""" keys = key.split('.') config = self._config for k in keys[:-1]: if k not in config: config[k] = {} config = config[k] config[keys[-1]] = value self.save() # 使用 config = ConfigManager() config.set('database.host', 'localhost') config.set('database.port', 3306) print(config.get('database.host')) # localhost
8.9 练习题
练习1:文件合并工具
from pathlib import Path def merge_files(output_file, *input_files, separator='\n'): """合并多个文件""" with open(output_file, 'w', encoding='utf-8') as out: for i, input_file in enumerate(input_files): if i > 0: out.write(separator) with open(input_file, 'r', encoding='utf-8') as f: out.write(f.read()) # 使用 # merge_files('output.txt', 'file1.txt', 'file2.txt', 'file3.txt')
练习2:目录同步工具
import shutil from pathlib import Path def sync_directories(src, dst, delete=False): """同步源目录到目标目录""" src_path = Path(src) dst_path = Path(dst) # 确保目标目录存在 dst_path.mkdir(parents=True, exist_ok=True) # 复制/更新文件 for src_file in src_path.rglob('*'): if src_file.is_file(): rel_path = src_file.relative_to(src_path) dst_file = dst_path / rel_path dst_file.parent.mkdir(parents=True, exist_ok=True) if not dst_file.exists() or \ src_file.stat().st_mtime > dst_file.stat().st_mtime: shutil.copy2(src_file, dst_file) print(f"Copied: {rel_path}") # 可选:删除目标目录中多余的文件 if delete: for dst_file in dst_path.rglob('*'): if dst_file.is_file(): rel_path = dst_file.relative_to(dst_path) src_file = src_path / rel_path if not src_file.exists(): dst_file.unlink() print(f"Deleted: {rel_path}")
本章小结
本章学习了Python的文件和IO操作:
- 文件操作 - 打开、读取、写入文件
- 上下文管理器 - with语句和自定义上下文管理器
- 路径处理 - os.path和pathlib模块
- 文件系统操作 - 创建、删除、复制、移动文件
- 序列化 - pickle、JSON、CSV格式
- 二进制文件 - 二进制读写和struct模块
掌握文件操作是进行数据处理和应用开发的基础。
进一步阅读
python/第八章文件与io.txt · 最后更改: 由 127.0.0.1
