在Python开发中,你是否遇到过这样的困扰:程序出错时只能依靠print语句调试?生产环境中无法追踪问题发生的具体位置?多个模块的调试信息混乱不堪?
这些问题的根本原因是缺乏一个完善的日志系统。本文将深入解析Python logging模块的应用,从基础概念到高级实战,帮你构建一个专业、高效的日志管理体系。无论你是Python新手还是有经验的开发者,都能从中获得实用的编程技巧和最佳实践。
Python# 不规范的调试方式
def process_data(data):
print("开始处理数据...") # 调试信息
if not data:
print("错误:数据为空!") # 错误信息
return None
result = complex_calculation(data)
print(f"计算结果:{result}") # 结果信息
return result
刚开始学python基本都是这样输出日志调试的。
这种方式存在以下问题:
Python的logging模块提供了完整的解决方案:
logging模块的核心组件:
Pythonimport logging
# 1. Logger(记录器)- 应用程序的接口
logger = logging.getLogger('my_app')
# 2. Handler(处理器)- 决定日志输出到哪里
console_handler = logging.StreamHandler() # 控制台输出
file_handler = logging.FileHandler('app.log') # 文件输出
# 3. Formatter(格式器)- 决定日志的输出格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 4. 组装配置
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
Pythonimport logging
# 配置基础日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def demonstrate_log_levels():
"""演示不同日志级别的使用场景"""
# DEBUG: 详细的调试信息
logging.debug("变量值检查:user_id = 12345")
# INFO: 一般信息,程序正常运行的确认
logging.info("用户登录成功,开始处理业务逻辑")
# WARNING: 警告信息,程序仍能运行但有潜在问题
logging.warning("磁盘空间不足,仅剩10%")
# ERROR: 错误信息,程序某些功能无法正常执行
logging.error("数据库连接失败,无法保存用户数据")
# CRITICAL: 严重错误,程序可能无法继续运行
logging.critical("系统内存耗尽,程序即将崩溃")
demonstrate_log_levels()

Pythonimport logging
import logging.handlers
import os
from datetime import datetime
class ProductionLogger:
"""生产环境日志配置类"""
def __init__(self, name="UpperComputer", log_dir="logs"):
self.name = name
self.log_dir = log_dir
self._ensure_log_dir()
self.logger = self._setup_logger()
def _ensure_log_dir(self):
"""确保日志目录存在"""
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
def _setup_logger(self):
"""配置专业级日志系统"""
logger = logging.getLogger(self.name)
logger.setLevel(logging.DEBUG)
# 清除现有handlers,避免重复
logger.handlers.clear()
# 1. 控制台输出(开发环境)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%H:%M:%S'
)
console_handler.setFormatter(console_format)
# 2. 文件输出(详细日志)
log_file = os.path.join(self.log_dir, f"{self.name}.log")
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_format = logging.Formatter(
'%(asctime)s | %(name)s | %(levelname)-8s | %(filename)s:%(lineno)d | %(message)s'
)
file_handler.setFormatter(file_format)
# 3. 错误日志单独记录
error_file = os.path.join(self.log_dir, f"{self.name}_error.log")
error_handler = logging.FileHandler(error_file, encoding='utf-8')
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_format)
# 添加所有处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.addHandler(error_handler)
return logger
def get_logger(self):
"""获取配置好的logger实例"""
return self.logger
# 使用示例
def main():
# 初始化日志系统
log_manager = ProductionLogger("PLCControl")
logger = log_manager.get_logger()
# 模拟上位机操作流程
logger.info("=== PLC控制系统启动 ===")
try:
# 模拟设备连接
logger.debug("正在扫描PLC设备...")
logger.info("已连接到PLC设备: 192.168.1.100")
# 模拟数据处理
sensor_data = [23.5, 24.1, 23.8, 25.2]
logger.debug(f"传感器原始数据: {sensor_data}")
avg_temp = sum(sensor_data) / len(sensor_data)
logger.info(f"平均温度: {avg_temp:.2f}°C")
# 模拟异常情况
if avg_temp > 25.0:
logger.warning(f"温度偏高警告: {avg_temp:.2f}°C > 25.0°C")
# 模拟严重错误
if avg_temp > 30.0:
logger.error("温度超出安全范围,触发保护机制")
raise Exception("温度过高,系统自动停机")
except Exception as e:
logger.critical(f"系统发生严重错误: {str(e)}")
logger.exception("详细错误信息:") # 自动记录堆栈信息
finally:
logger.info("=== 系统运行结束 ===")
if __name__ == "__main__":
main()

Pythonimport logging.config
import yaml
# YAML配置文件方式
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'detailed': {
'format': '%(asctime)s | %(name)-12s | %(levelname)-8s | %(filename)s:%(lineno)d | %(message)s'
},
'simple': {
'format': '%(levelname)s | %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'level': 'DEBUG',
'formatter': 'detailed',
'filename': 'logs/application.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
'encoding': 'utf8'
}
},
'loggers': {
'database': {
'level': 'DEBUG',
'handlers': ['console', 'file'],
'propagate': False
},
'network': {
'level': 'INFO',
'handlers': ['console', 'file'],
'propagate': False
},
'ui': {
'level': 'WARNING',
'handlers': ['console', 'file'],
'propagate': False
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file']
}
}
def setup_logging():
"""初始化日志配置"""
import os
if not os.path.exists('logs'):
os.makedirs('logs')
logging.config.dictConfig(LOGGING_CONFIG)
Pythonlogger = logging.getLogger('database')
class DatabaseManager:
def __init__(self):
logger.info("数据库管理器初始化")
def connect(self):
logger.debug("尝试连接数据库...")
try:
# 模拟数据库连接
logger.info("数据库连接成功")
return True
except Exception as e:
logger.error(f"数据库连接失败: {e}")
return False
def execute_query(self, sql):
logger.debug(f"执行SQL查询: {sql}")
# 模拟查询执行
logger.info("查询执行完成")
# 示例用法
if __name__ == "__main__":
setup_logging()
db_manager = DatabaseManager()
if db_manager.connect():
db_manager.execute_query("SELECT * FROM users")
else:
logger.error("无法连接到数据库")

Pythonimport logging
logger = logging.getLogger('network')
class NetworkManager:
def __init__(self):
logger.info("网络管理器初始化")
def send_data(self, data):
logger.info(f"发送数据包,大小: {len(str(data))} bytes")
# 模拟网络传输
if len(str(data)) > 1024:
logger.warning("数据包过大,可能影响传输效率")
# 示例用法
if __name__ == "__main__":
setup_logging()
# 创建网络管理器实例
network_manager = NetworkManager()
# 发送数据包
network_manager.send_data("这是一个测试数据包,用于模拟网络传输。")
# 发送一个较大的数据包
large_data = "x" * 2048 # 模拟大数据包
network_manager.send_data(large_data)
logger.info("应用程序运行结束")

Pythonimport functools
import logging
import time
def log_execution(logger_name=None, level=logging.INFO):
"""函数执行日志装饰器"""
def decorator(func):
logger = logging.getLogger(logger_name or func.__module__)
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
# 记录函数开始执行
logger.log(level, f"开始执行 {func.__name__}")
logger.debug(f"参数: args={args}, kwargs={kwargs}")
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# 记录执行成功
logger.log(level, f"{func.__name__} 执行成功,耗时: {execution_time:.3f}s")
logger.debug(f"返回值: {result}")
return result
except Exception as e:
execution_time = time.time() - start_time
# 记录执行失败
logger.error(f"{func.__name__} 执行失败,耗时: {execution_time:.3f}s")
logger.exception(f"异常信息: {str(e)}")
raise # 重新抛出异常
return wrapper
return decorator
# 使用示例
@log_execution('business_logic', logging.INFO)
def calculate_statistics(data_list):
"""计算统计数据"""
if not data_list:
raise ValueError("数据列表不能为空")
return {
'count': len(data_list),
'sum': sum(data_list),
'avg': sum(data_list) / len(data_list),
'max': max(data_list),
'min': min(data_list)
}
# 测试装饰器
def test_decorator():
logging.basicConfig(level=logging.DEBUG)
# 正常执行
result = calculate_statistics([1, 2, 3, 4, 5])
print(f"结果: {result}")
# 异常执行
try:
calculate_statistics([])
except ValueError as e:
print(f"捕获异常: {e}")
test_decorator()

Pythonimport logging
import threading
import time
from flask import Flask, request, jsonify
class DynamicLogController:
"""动态日志级别控制器"""
def __init__(self):
self.loggers = {}
self._lock = threading.Lock()
def register_logger(self, name, logger):
"""注册需要动态控制的logger"""
with self._lock:
self.loggers[name] = logger
def set_log_level(self, logger_name, level):
"""动态设置日志级别"""
with self._lock:
if logger_name in self.loggers:
self.loggers[logger_name].setLevel(level)
return True
return False
def get_log_levels(self):
"""获取所有logger的当前级别"""
with self._lock:
return {
name: logging.getLevelName(logger.level)
for name, logger in self.loggers.items()
}
# 全局控制器实例
log_controller = DynamicLogController()
# Flask应用示例(用于远程控制)
app = Flask(__name__)
@app.route('/logs/levels', methods=['GET'])
def get_log_levels():
"""获取当前日志级别"""
return jsonify(log_controller.get_log_levels())
@app.route('/logs/levels', methods=['POST'])
def set_log_level():
"""设置日志级别"""
data = request.json
logger_name = data.get('logger_name')
level_name = data.get('level', 'INFO')
# 转换级别名称为数值
level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
level = level_map.get(level_name.upper())
if not level:
return jsonify({'error': '无效的日志级别'}), 400
success = log_controller.set_log_level(logger_name, level)
if success:
return jsonify({'message': f'日志级别已设置为 {level_name}'})
else:
return jsonify({'error': '找不到指定的logger'}), 404
# 使用示例
def setup_dynamic_logging():
"""设置动态日志控制"""
# 创建多个logger
business_logger = logging.getLogger('business')
database_logger = logging.getLogger('database')
network_logger = logging.getLogger('network')
# 注册到控制器
log_controller.register_logger('business', business_logger)
log_controller.register_logger('database', database_logger)
log_controller.register_logger('network', network_logger)
# 模拟业务运行
def simulate_business():
while True:
business_logger.debug("处理业务逻辑中...")
database_logger.info("执行数据库查询")
network_logger.warning("网络延迟较高")
time.sleep(2)
# 启动模拟线程
threading.Thread(target=simulate_business, daemon=True).start()
# 启动Flask控制接口
app.run(host='0.0.0.0', port=5000, debug=False)
if __name__ == "__main__":
setup_dynamic_logging()

通过本文的深入解析,我们完整地构建了一个专业级的Python日志系统。让我们回顾三个核心要点:
1. 架构设计是基础 - 合理的Logger、Handler、Formatter组合是日志系统的骨架。在Python开发项目中,特别是上位机开发这类对稳定性要求极高的应用,良好的日志架构能够大大提升排错效率。
2. 分级管理是关键 - DEBUG用于开发调试,INFO记录关键流程,WARNING标识潜在问题,ERROR记录功能异常,CRITICAL标记严重故障。这种分级策略让我们能够在不同环境下灵活控制信息输出,这是专业编程技巧的重要体现。
3. 性能优化不可忽视 - 生产环境中,日志系统的性能直接影响应用表现。使用懒加载格式化、条件性记录、合理的轮转策略等优化手段,能够在保证功能完整性的同时最小化性能开销。
掌握了这套完整的日志体系,你的Python项目将具备更强的可维护性和专业性。下次遇到程序问题时,完善的日志记录将成为你最可靠的助手!
如果这篇文章对你有帮助,欢迎关注我们获取更多Python开发**实战技巧!
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!