编辑
2025-12-03
Python
00

目录

🤔 问题分析:为什么需要事务?
典型场景举例
没有事务的风险
💡 解决方案:pyodbc事务机制
事务的ACID特性
pyodbc事务控制机制
🔧 代码实战:三种事务处理模式
🎯 模式一:基础事务处理
🎯 模式二:上下文管理器模式(推荐)
🎯 模式三:高级事务类封装
🛡️ 最佳实践与注意事项
性能优化技巧
错误处理策略
连接池管理
🎯 总结与延伸

在日常的Python开发工作中,特别是开发上位机应用或数据管理系统时,我们经常需要与SqlServer数据库进行交互。但你是否遇到过这样的场景:执行多个相关的数据库操作时,如果中途出现异常,部分数据已经写入数据库,而部分数据却没有成功保存,导致数据不一致的问题?

这就是为什么我们需要数据库事务的原因。事务能够确保一组数据库操作要么全部成功,要么全部失败回滚,保证数据的完整性和一致性。今天我们就来深入探讨如何使用Python的pyodbc库来优雅地处理SqlServer事务,让你的数据操作更加安全可靠。

🤔 问题分析:为什么需要事务?

典型场景举例

想象一下银行转账的场景:张三要给李四转账1000元,这个操作需要两个步骤:

  1. 从张三账户扣除1000元
  2. 向李四账户增加1000元

如果第一步成功了,但第二步因为网络异常或其他原因失败了,那么张三的钱就凭空消失了!这显然是不可接受的。

在Python开发中,类似的场景包括:

  • 订单系统:创建订单记录的同时需要更新库存
  • 用户注册:插入用户信息的同时需要创建用户权限记录
  • 数据迁移:批量更新多个相关联的表

没有事务的风险

Python
# ❌ 危险的操作方式 import pyodbc conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=testdb;UID=user;PWD=password') cursor = conn.cursor() try: # 第一步:扣除张三账户余额 cursor.execute("UPDATE accounts SET balance = balance - 1000 WHERE name = '张三'") # 假设这里发生了异常... raise Exception("网络异常") # 第二步:增加李四账户余额(永远不会执行到) cursor.execute("UPDATE accounts SET balance = balance + 1000 WHERE name = '李四'") conn.commit() except Exception as e: print(f"操作失败: {e}") finally: conn.close()

上面的代码中,如果在两次UPDATE之间发生异常,第一次UPDATE已经执行并提交,但第二次UPDATE没有执行,导致数据不一致。

💡 解决方案:pyodbc事务机制

事务的ACID特性

在深入代码之前,我们需要理解事务的四个核心特性(ACID):

  • 原子性(Atomicity):事务中的所有操作要么全部成功,要么全部失败
  • 一致性(Consistency):事务执行前后数据库都处于一致状态
  • 隔离性(Isolation):并发事务之间不会相互干扰
  • 持久性(Durability):事务提交后的修改永久保存

pyodbc事务控制机制

pyodbc提供了简单而强大的事务控制方法:

  • conn.commit():提交事务,使所有修改生效
  • conn.rollback():回滚事务,撤销所有修改
  • 自动事务模式:默认情况下,每个SQL语句都是一个独立事务
  • 手动事务模式:通过控制commit和rollback来管理事务边界

🔧 代码实战:三种事务处理模式

🎯 模式一:基础事务处理

这是最基本的事务处理方式,适合简单的场景:

Python
import pyodbc import logging def basic_transaction_demo(): """基础事务处理示例""" # 数据库连接配置 conn_str = ( 'DRIVER={ODBC Driver 18 for SQL Server};' 'SERVER=localhost;' 'DATABASE=dbtest;' 'UID=sa;' 'PWD=123;' 'Trusted_Connection=yes;' 'TrustServerCertificate=yes;' ) conn = None try: # 建立连接 conn = pyodbc.connect(conn_str) cursor = conn.cursor() # 开始事务操作 print("🚀 开始执行转账操作...") # 第一步:检查张三账户余额 cursor.execute("SELECT balance FROM accounts WHERE name = ?", ('张三',)) balance = cursor.fetchone()[0] if balance < 1000: raise ValueError("账户余额不足") # 第二步:扣除张三账户余额 cursor.execute( "UPDATE accounts SET balance = balance - ? WHERE name = ?", (1000, '张三') ) print("✅ 已从张三账户扣除1000元") # 第三步:增加李四账户余额 cursor.execute( "UPDATE accounts SET balance = balance + ? WHERE name = ?", (1000, '李四') ) print("✅ 已向李四账户增加1000元") # 提交事务 conn.commit() print("🎉 转账成功!事务已提交") except Exception as e: # 发生异常时回滚事务 if conn: conn.rollback() print(f"❌ 转账失败,事务已回滚: {e}") finally: # 清理资源 if conn: conn.close() # 调用示例 basic_transaction_demo()

image.png

image.png

🎯 模式二:上下文管理器模式(推荐)

使用上下文管理器可以让代码更加简洁和安全:

Python
import pyodbc from contextlib import contextmanager @contextmanager def get_db_transaction(conn_str): """数据库事务上下文管理器""" conn = None try: conn = pyodbc.connect(conn_str) yield conn conn.commit() print("✅ 事务提交成功") except Exception as e: if conn: conn.rollback() print(f"❌ 事务回滚: {e}") raise finally: if conn: conn.close() def context_transaction_demo(): """使用上下文管理器处理事务""" conn_str = ( 'DRIVER={ODBC Driver 18 for SQL Server};' 'SERVER=localhost;' 'DATABASE=dbtest;' 'UID=sa;' 'PWD=123;' 'Trusted_Connection=yes;' 'TrustServerCertificate=yes;' ) try: with get_db_transaction(conn_str) as conn: cursor = conn.cursor() # 批量插入订单数据 orders = [ ('ORD001', '张三', 'iPhone 14', 5999.00), ('ORD002', '李四', 'MacBook Pro', 12999.00), ('ORD003', '王五', 'iPad Air', 4399.00) ] print("🚀 开始批量插入订单...") for order in orders: # 插入订单记录 cursor.execute( "INSERT INTO orders (order_id, customer_name, product_name, amount) VALUES (?, ?, ?, ?)", order ) # 同时更新库存 cursor.execute( "UPDATE inventory SET stock = stock - 1 WHERE product_name = ?", (order[2],) ) print(f"✅ 订单 {order[0]} 处理完成") print("🎉 所有订单处理完成!") except Exception as e: print(f"❌ 批量操作失败: {e}") # 调用示例 context_transaction_demo()

image.png

🎯 模式三:高级事务类封装

对于复杂的业务场景,我们可以创建一个专门的事务管理类:

Python
import pyodbc import logging from typing import Optional, Any, List, Tuple class SqlServerTransaction: """SqlServer事务管理类""" def __init__(self, conn_str: str): self.conn_str = conn_str self.conn: Optional[pyodbc.Connection] = None self.cursor: Optional[pyodbc.Cursor] = None self._in_transaction = False # 配置日志 logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def connect(self): """建立数据库连接""" try: self.conn = pyodbc.connect(self.conn_str) self.cursor = self.conn.cursor() self.logger.info("🔗 数据库连接成功") except Exception as e: self.logger.error(f"❌ 数据库连接失败: {e}") raise def begin_transaction(self): """开始事务""" if not self.conn: self.connect() self._in_transaction = True self.logger.info("🚀 事务开始") def execute(self, sql: str, params: Optional[Tuple] = None) -> Any: """执行SQL语句""" if not self._in_transaction: raise RuntimeError("请先调用 begin_transaction() 开始事务") try: if params: result = self.cursor.execute(sql, params) else: result = self.cursor.execute(sql) self.logger.info(f"✅ SQL执行成功: {sql[:50]}...") return result except Exception as e: self.logger.error(f"❌ SQL执行失败: {e}") raise def execute_many(self, sql: str, params_list: List[Tuple]) -> None: """批量执行SQL语句""" if not self._in_transaction: raise RuntimeError("请先调用 begin_transaction() 开始事务") try: self.cursor.executemany(sql, params_list) self.logger.info(f"✅ 批量SQL执行成功,共 {len(params_list)} 条记录") except Exception as e: self.logger.error(f"❌ 批量SQL执行失败: {e}") raise def fetchone(self) -> Optional[Tuple]: """获取单行结果""" return self.cursor.fetchone() def fetchall(self) -> List[Tuple]: """获取所有结果""" return self.cursor.fetchall() def commit(self): """提交事务""" if self.conn and self._in_transaction: try: self.conn.commit() self._in_transaction = False self.logger.info("🎉 事务提交成功") except Exception as e: self.logger.error(f"❌ 事务提交失败: {e}") raise def rollback(self): """回滚事务""" if self.conn and self._in_transaction: try: self.conn.rollback() self._in_transaction = False self.logger.info("🔄 事务回滚成功") except Exception as e: self.logger.error(f"❌ 事务回滚失败: {e}") raise def close(self): """关闭连接""" if self.conn: self.conn.close() self.logger.info("🔐 数据库连接已关闭") def __enter__(self): """上下文管理器入口""" self.begin_transaction() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器出口""" try: if exc_type is None: # 没有异常,提交事务 self.commit() else: # 发生异常,回滚事务 self.rollback() finally: self.close() def advanced_transaction_demo(): """高级事务管理示例""" conn_str = ( 'DRIVER={ODBC Driver 18 for SQL Server};' 'SERVER=localhost;' 'DATABASE=dbtest;' 'UID=sa;' 'PWD=123;' 'Trusted_Connection=yes;' 'TrustServerCertificate=yes;' ) # 使用高级事务管理类 try: with SqlServerTransaction(conn_str) as tx: # 场景:用户注册流程 user_info = ('user123', '张三', 'zhangsan@email.com', '18888888888') # 第一步:插入用户基本信息 tx.execute( "INSERT INTO users (user_id, name, email, phone) VALUES (?, ?, ?, ?)", user_info ) # 第二步:创建用户权限记录 permissions = [ ('user123', 'read'), ('user123', 'write'), ('user123', 'delete') ] tx.execute_many( "INSERT INTO user_permissions (user_id, permission) VALUES (?, ?)", permissions ) # 第三步:初始化用户配置 tx.execute( "INSERT INTO user_config (user_id, theme, language) VALUES (?, ?, ?)", ('user123', 'dark', 'zh-CN') ) # 第四步:记录操作日志 tx.execute( "INSERT INTO operation_log (user_id, operation, timestamp) VALUES (?, ?, GETDATE())", ('user123', 'user_register') ) print("🎉 用户注册流程完成!") except Exception as e: print(f"❌ 用户注册失败: {e}") # 调用示例 advanced_transaction_demo()

image.png

🛡️ 最佳实践与注意事项

性能优化技巧

合理控制事务大小

Python
# ✅ 推荐:适中的事务大小 def process_batch_data(data_list, batch_size=1000): """分批处理大量数据""" for i in range(0, len(data_list), batch_size): batch = data_list[i:i + batch_size] with SqlServerTransaction(conn_str) as tx: for item in batch: tx.execute("INSERT INTO table1 VALUES (?)", (item,))

避免长时间事务

Python
# ❌ 避免:长时间占用资源的事务 def bad_long_transaction(): with SqlServerTransaction(conn_str) as tx: tx.execute("SELECT * FROM huge_table") # 大量数据查询 time.sleep(60) # 长时间处理 tx.execute("UPDATE another_table SET status = 'processed'") # ✅ 推荐:缩短事务时间 def good_short_transaction(): # 先查询数据(不在事务中) data = query_data() processed_data = process_data(data) # 处理数据 # 快速提交事务 with SqlServerTransaction(conn_str) as tx: tx.execute("UPDATE another_table SET status = 'processed'")

错误处理策略

Python
def robust_transaction_handling(): """健壮的事务错误处理""" retry_count = 3 for attempt in range(retry_count): try: with SqlServerTransaction(conn_str) as tx: # 执行业务逻辑 tx.execute("INSERT INTO orders VALUES (?, ?, ?)", order_data) break # 成功后跳出循环 except pyodbc.IntegrityError as e: # 数据完整性错误,不应重试 print(f"❌ 数据完整性错误: {e}") break except pyodbc.OperationalError as e: # 网络或服务器错误,可以重试 print(f"⚠️ 网络错误,第{attempt + 1}次重试: {e}") if attempt == retry_count - 1: print("❌ 重试次数已用完,操作失败") raise time.sleep(2 ** attempt) # 指数退避

连接池管理

对于高并发场景,建议使用连接池:

Python
import pyodbc import threading import logging import time from queue import Queue, Empty from contextlib import contextmanager from typing import Optional, Any, List, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed class ConnectionPool: """数据库连接池类""" def __init__(self, conn_str: str, pool_size: int = 10, max_retries: int = 3): self.conn_str = conn_str self.pool_size = pool_size self.max_retries = max_retries self.pool = Queue(maxsize=pool_size) self.lock = threading.Lock() self._created_connections = 0 self._active_connections = 0 # 配置日志 logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) # 初始化连接池 self._initialize_pool() def _initialize_pool(self): """初始化连接池""" try: for i in range(self.pool_size): conn = self._create_connection() self.pool.put(conn) self._created_connections += 1 self.logger.info(f"🏊‍♂️ 连接池初始化成功,创建了 {self.pool_size} 个连接") except Exception as e: self.logger.error(f"❌ 连接池初始化失败: {e}") raise def _create_connection(self) -> pyodbc.Connection: """创建新的数据库连接""" for attempt in range(self.max_retries): try: conn = pyodbc.connect(self.conn_str) conn.autocommit = False # 关闭自动提交,使用手动事务 return conn except Exception as e: if attempt == self.max_retries - 1: self.logger.error(f"❌ 创建连接失败,已重试 {self.max_retries} 次: {e}") raise else: self.logger.warning(f"⚠️ 创建连接失败,第 {attempt + 1} 次重试: {e}") time.sleep(1) # 等待1秒后重试 def get_connection(self, timeout: int = 30) -> pyodbc.Connection: """获取连接""" try: conn = self.pool.get(timeout=timeout) with self.lock: self._active_connections += 1 # 检查连接是否有效 if not self._is_connection_valid(conn): self.logger.warning("⚠️ 连接已失效,重新创建") conn = self._create_connection() self.logger.debug(f"🔗 获取连接,当前活跃连接数: {self._active_connections}") return conn except Empty: raise TimeoutError(f"获取连接超时 ({timeout}秒),连接池可能已满") def return_connection(self, conn: pyodbc.Connection): """归还连接""" if conn: try: # 重置连接状态 if conn and not conn.closed: conn.rollback() # 回滚未提交的事务 self.pool.put(conn) with self.lock: self._active_connections -= 1 self.logger.debug(f"🔄 归还连接,当前活跃连接数: {self._active_connections}") except Exception as e: self.logger.error(f"❌ 归还连接失败: {e}") def _is_connection_valid(self, conn: pyodbc.Connection) -> bool: """检查连接是否有效""" try: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() return True except: return False @contextmanager def get_connection_context(self): """连接上下文管理器""" conn = None try: conn = self.get_connection() yield conn finally: if conn: self.return_connection(conn) def get_pool_status(self) -> dict: """获取连接池状态""" return { "pool_size": self.pool_size, "available_connections": self.pool.qsize(), "active_connections": self._active_connections, "created_connections": self._created_connections } def close_all_connections(self): """关闭所有连接""" closed_count = 0 while not self.pool.empty(): try: conn = self.pool.get_nowait() conn.close() closed_count += 1 except Empty: break except Exception as e: self.logger.error(f"❌ 关闭连接失败: {e}") self.logger.info(f"🔐 连接池已关闭,共关闭 {closed_count} 个连接") class PooledSqlServerTransaction: """基于连接池的SQL Server事务管理类""" def __init__(self, connection_pool: ConnectionPool): self.pool = connection_pool self.conn: Optional[pyodbc.Connection] = None self.cursor: Optional[pyodbc.Cursor] = None self._in_transaction = False # 配置日志 self.logger = logging.getLogger(__name__) def begin_transaction(self): """开始事务""" if self._in_transaction: raise RuntimeError("事务已经开始,不能重复开始") self.conn = self.pool.get_connection() self.cursor = self.conn.cursor() self._in_transaction = True self.logger.info("🚀 事务开始") def execute(self, sql: str, params: Optional[Tuple] = None) -> Any: """执行SQL语句""" if not self._in_transaction: raise RuntimeError("请先调用 begin_transaction() 开始事务") try: if params: result = self.cursor.execute(sql, params) else: result = self.cursor.execute(sql) self.logger.info(f"✅ SQL执行成功: {sql[:50]}...") return result except Exception as e: self.logger.error(f"❌ SQL执行失败: {e}") raise def execute_many(self, sql: str, params_list: List[Tuple]) -> None: """批量执行SQL语句""" if not self._in_transaction: raise RuntimeError("请先调用 begin_transaction() 开始事务") try: self.cursor.executemany(sql, params_list) self.logger.info(f"✅ 批量SQL执行成功,共 {len(params_list)} 条记录") except Exception as e: self.logger.error(f"❌ 批量SQL执行失败: {e}") raise def fetchone(self) -> Optional[Tuple]: """获取单行结果""" return self.cursor.fetchone() def fetchall(self) -> List[Tuple]: """获取所有结果""" return self.cursor.fetchall() def commit(self): """提交事务""" if self.conn and self._in_transaction: try: self.conn.commit() self.logger.info("🎉 事务提交成功") except Exception as e: self.logger.error(f"❌ 事务提交失败: {e}") raise finally: self._cleanup() def rollback(self): """回滚事务""" if self.conn and self._in_transaction: try: self.conn.rollback() self.logger.info("🔄 事务回滚成功") except Exception as e: self.logger.error(f"❌ 事务回滚失败: {e}") raise finally: self._cleanup() def _cleanup(self): """清理资源""" if self.cursor: self.cursor.close() self.cursor = None if self.conn: self.pool.return_connection(self.conn) self.conn = None self._in_transaction = False def __enter__(self): """上下文管理器入口""" self.begin_transaction() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器出口""" if exc_type is None: # 没有异常,提交事务 self.commit() else: # 发生异常,回滚事务 self.rollback() def pooled_transaction_demo(): """使用连接池的事务管理示例""" # 数据库连接字符串 conn_str = ( 'DRIVER={ODBC Driver 18 for SQL Server};' 'SERVER=localhost;' 'DATABASE=dbtest;' 'UID=sa;' 'PWD=123;' 'Trusted_Connection=yes;' 'TrustServerCertificate=yes;' ) # 创建连接池 pool = ConnectionPool(conn_str, pool_size=5) print("📊 连接池状态:", pool.get_pool_status()) try: # 示例1:基本事务操作 print("\n=== 示例1:基本事务操作 ===") with PooledSqlServerTransaction(pool) as tx: # 插入用户数据 user_data = ('user456', '李四', 'lisi@email.com', '13999999999') tx.execute( "INSERT INTO users (user_id, name, email, phone) VALUES (?, ?, ?, ?)", user_data ) # 插入权限数据 permissions = [ ('user456', 'read'), ('user456', 'write') ] tx.execute_many( "INSERT INTO user_permissions (user_id, permission) VALUES (?, ?)", permissions ) print("✅ 基本事务操作完成") # 示例2:查询操作 print("\n=== 示例2:查询操作 ===") with pool.get_connection_context() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM users") count = cursor.fetchone()[0] print(f"📈 当前用户总数: {count}") cursor.close() except Exception as e: print(f"❌ 操作失败: {e}") finally: print("\n📊 连接池最终状态:", pool.get_pool_status()) def concurrent_transaction_demo(): """并发事务测试""" conn_str = ( 'DRIVER={ODBC Driver 18 for SQL Server};' 'SERVER=localhost;' 'DATABASE=dbtest;' 'UID=sa;' 'PWD=123;' 'Trusted_Connection=yes;' 'TrustServerCertificate=yes;' ) pool = ConnectionPool(conn_str, pool_size=3) def worker_task(worker_id: int, task_count: int): """工作线程任务""" results = [] for i in range(task_count): try: with PooledSqlServerTransaction(pool) as tx: user_id = f"worker_{worker_id}_{i}" user_data = (user_id, f"工作者{worker_id}-{i}", f"{user_id}@test.com", "13000000000") tx.execute( "INSERT INTO users (user_id, name, email, phone) VALUES (?, ?, ?, ?)", user_data ) tx.execute( "INSERT INTO operation_log (user_id, operation) VALUES (?, ?)", (user_id, 'concurrent_test') ) results.append(f"✅ Worker-{worker_id} Task-{i} 成功") time.sleep(0.1) # 模拟处理时间 except Exception as e: results.append(f"❌ Worker-{worker_id} Task-{i} 失败: {e}") return results print("\n=== 并发事务测试 ===") print(f"📊 测试前连接池状态: {pool.get_pool_status()}") # 使用线程池执行并发任务 with ThreadPoolExecutor(max_workers=3) as executor: # 提交任务 futures = [] for worker_id in range(3): future = executor.submit(worker_task, worker_id, 5) futures.append(future) # 收集结果 all_results = [] for future in as_completed(futures): results = future.result() all_results.extend(results) # 显示结果 for result in all_results: print(result) print(f"\n📊 测试后连接池状态: {pool.get_pool_status()}") # 清理测试数据 try: with pool.get_connection_context() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM operation_log WHERE operation = 'concurrent_test'") cursor.execute("DELETE FROM users WHERE user_id LIKE 'worker_%'") conn.commit() cursor.close() print("🧹 测试数据清理完成") except Exception as e: print(f"❌ 清理失败: {e}") # 关闭连接池 pool.close_all_connections() if __name__ == "__main__": print("🔧 开始数据库连接池测试") # 基本功能测试 pooled_transaction_demo() print("\n" + "=" * 50) # 并发测试 concurrent_transaction_demo() print("\n🎉 所有测试完成!")

image.png

🎯 总结与延伸

通过本文的详细讲解,我们掌握了使用Python pyodbc处理SqlServer事务的三种核心模式:基础事务处理让我们理解了事务的基本概念和操作方式;上下文管理器模式提供了更加优雅和安全的代码结构;高级事务类封装则为复杂业务场景提供了强大的功能支持。

三个关键要点回顾

  1. 事务的本质:确保数据操作的原子性,要么全部成功,要么全部失败
  2. 异常处理的重要性:合理的try-catch-finally结构和rollback机制是数据安全的保障
  3. 性能与可靠性的平衡:通过连接池、批处理、重试机制等技术手段提升系统的健壮性

在实际的Python开发项目中,特别是涉及上位机开发、数据采集系统或企业级应用时,掌握这些事务处理技巧将大大提升你的编程技巧水平。建议你在自己的项目中尝试应用这些模式,并根据具体业务需求进行适当的调整和优化。

记住,优秀的Python开发者不仅要写出能运行的代码,更要写出安全、可靠、可维护的代码。事务处理正是体现这种专业素养的重要技能之一!

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!