在日常的Python开发工作中,特别是开发上位机应用或数据管理系统时,我们经常需要与SqlServer数据库进行交互。但你是否遇到过这样的场景:执行多个相关的数据库操作时,如果中途出现异常,部分数据已经写入数据库,而部分数据却没有成功保存,导致数据不一致的问题?
这就是为什么我们需要数据库事务的原因。事务能够确保一组数据库操作要么全部成功,要么全部失败回滚,保证数据的完整性和一致性。今天我们就来深入探讨如何使用Python的pyodbc库来优雅地处理SqlServer事务,让你的数据操作更加安全可靠。
想象一下银行转账的场景:张三要给李四转账1000元,这个操作需要两个步骤:
如果第一步成功了,但第二步因为网络异常或其他原因失败了,那么张三的钱就凭空消失了!这显然是不可接受的。
在Python开发中,类似的场景包括:
Python# ❌ 危险的操作方式
import pyodbc
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=testdb;UID=user;PWD=password')
cursor = conn.cursor()
try:
# 第一步:扣除张三账户余额
cursor.execute("UPDATE accounts SET balance = balance - 1000 WHERE name = '张三'")
# 假设这里发生了异常...
raise Exception("网络异常")
# 第二步:增加李四账户余额(永远不会执行到)
cursor.execute("UPDATE accounts SET balance = balance + 1000 WHERE name = '李四'")
conn.commit()
except Exception as e:
print(f"操作失败: {e}")
finally:
conn.close()
上面的代码中,如果在两次UPDATE之间发生异常,第一次UPDATE已经执行并提交,但第二次UPDATE没有执行,导致数据不一致。
在深入代码之前,我们需要理解事务的四个核心特性(ACID):
pyodbc提供了简单而强大的事务控制方法:
conn.commit():提交事务,使所有修改生效conn.rollback():回滚事务,撤销所有修改这是最基本的事务处理方式,适合简单的场景:
Pythonimport pyodbc
import logging
def basic_transaction_demo():
"""基础事务处理示例"""
# 数据库连接配置
conn_str = (
'DRIVER={ODBC Driver 18 for SQL Server};'
'SERVER=localhost;'
'DATABASE=dbtest;'
'UID=sa;'
'PWD=123;'
'Trusted_Connection=yes;'
'TrustServerCertificate=yes;'
)
conn = None
try:
# 建立连接
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
# 开始事务操作
print("🚀 开始执行转账操作...")
# 第一步:检查张三账户余额
cursor.execute("SELECT balance FROM accounts WHERE name = ?", ('张三',))
balance = cursor.fetchone()[0]
if balance < 1000:
raise ValueError("账户余额不足")
# 第二步:扣除张三账户余额
cursor.execute(
"UPDATE accounts SET balance = balance - ? WHERE name = ?",
(1000, '张三')
)
print("✅ 已从张三账户扣除1000元")
# 第三步:增加李四账户余额
cursor.execute(
"UPDATE accounts SET balance = balance + ? WHERE name = ?",
(1000, '李四')
)
print("✅ 已向李四账户增加1000元")
# 提交事务
conn.commit()
print("🎉 转账成功!事务已提交")
except Exception as e:
# 发生异常时回滚事务
if conn:
conn.rollback()
print(f"❌ 转账失败,事务已回滚: {e}")
finally:
# 清理资源
if conn:
conn.close()
# 调用示例
basic_transaction_demo()


使用上下文管理器可以让代码更加简洁和安全:
Pythonimport pyodbc
from contextlib import contextmanager
@contextmanager
def get_db_transaction(conn_str):
"""数据库事务上下文管理器"""
conn = None
try:
conn = pyodbc.connect(conn_str)
yield conn
conn.commit()
print("✅ 事务提交成功")
except Exception as e:
if conn:
conn.rollback()
print(f"❌ 事务回滚: {e}")
raise
finally:
if conn:
conn.close()
def context_transaction_demo():
"""使用上下文管理器处理事务"""
conn_str = (
'DRIVER={ODBC Driver 18 for SQL Server};'
'SERVER=localhost;'
'DATABASE=dbtest;'
'UID=sa;'
'PWD=123;'
'Trusted_Connection=yes;'
'TrustServerCertificate=yes;'
)
try:
with get_db_transaction(conn_str) as conn:
cursor = conn.cursor()
# 批量插入订单数据
orders = [
('ORD001', '张三', 'iPhone 14', 5999.00),
('ORD002', '李四', 'MacBook Pro', 12999.00),
('ORD003', '王五', 'iPad Air', 4399.00)
]
print("🚀 开始批量插入订单...")
for order in orders:
# 插入订单记录
cursor.execute(
"INSERT INTO orders (order_id, customer_name, product_name, amount) VALUES (?, ?, ?, ?)",
order
)
# 同时更新库存
cursor.execute(
"UPDATE inventory SET stock = stock - 1 WHERE product_name = ?",
(order[2],)
)
print(f"✅ 订单 {order[0]} 处理完成")
print("🎉 所有订单处理完成!")
except Exception as e:
print(f"❌ 批量操作失败: {e}")
# 调用示例
context_transaction_demo()

对于复杂的业务场景,我们可以创建一个专门的事务管理类:
Pythonimport pyodbc
import logging
from typing import Optional, Any, List, Tuple
class SqlServerTransaction:
"""SqlServer事务管理类"""
def __init__(self, conn_str: str):
self.conn_str = conn_str
self.conn: Optional[pyodbc.Connection] = None
self.cursor: Optional[pyodbc.Cursor] = None
self._in_transaction = False
# 配置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def connect(self):
"""建立数据库连接"""
try:
self.conn = pyodbc.connect(self.conn_str)
self.cursor = self.conn.cursor()
self.logger.info("🔗 数据库连接成功")
except Exception as e:
self.logger.error(f"❌ 数据库连接失败: {e}")
raise
def begin_transaction(self):
"""开始事务"""
if not self.conn:
self.connect()
self._in_transaction = True
self.logger.info("🚀 事务开始")
def execute(self, sql: str, params: Optional[Tuple] = None) -> Any:
"""执行SQL语句"""
if not self._in_transaction:
raise RuntimeError("请先调用 begin_transaction() 开始事务")
try:
if params:
result = self.cursor.execute(sql, params)
else:
result = self.cursor.execute(sql)
self.logger.info(f"✅ SQL执行成功: {sql[:50]}...")
return result
except Exception as e:
self.logger.error(f"❌ SQL执行失败: {e}")
raise
def execute_many(self, sql: str, params_list: List[Tuple]) -> None:
"""批量执行SQL语句"""
if not self._in_transaction:
raise RuntimeError("请先调用 begin_transaction() 开始事务")
try:
self.cursor.executemany(sql, params_list)
self.logger.info(f"✅ 批量SQL执行成功,共 {len(params_list)} 条记录")
except Exception as e:
self.logger.error(f"❌ 批量SQL执行失败: {e}")
raise
def fetchone(self) -> Optional[Tuple]:
"""获取单行结果"""
return self.cursor.fetchone()
def fetchall(self) -> List[Tuple]:
"""获取所有结果"""
return self.cursor.fetchall()
def commit(self):
"""提交事务"""
if self.conn and self._in_transaction:
try:
self.conn.commit()
self._in_transaction = False
self.logger.info("🎉 事务提交成功")
except Exception as e:
self.logger.error(f"❌ 事务提交失败: {e}")
raise
def rollback(self):
"""回滚事务"""
if self.conn and self._in_transaction:
try:
self.conn.rollback()
self._in_transaction = False
self.logger.info("🔄 事务回滚成功")
except Exception as e:
self.logger.error(f"❌ 事务回滚失败: {e}")
raise
def close(self):
"""关闭连接"""
if self.conn:
self.conn.close()
self.logger.info("🔐 数据库连接已关闭")
def __enter__(self):
"""上下文管理器入口"""
self.begin_transaction()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
try:
if exc_type is None:
# 没有异常,提交事务
self.commit()
else:
# 发生异常,回滚事务
self.rollback()
finally:
self.close()
def advanced_transaction_demo():
"""高级事务管理示例"""
conn_str = (
'DRIVER={ODBC Driver 18 for SQL Server};'
'SERVER=localhost;'
'DATABASE=dbtest;'
'UID=sa;'
'PWD=123;'
'Trusted_Connection=yes;'
'TrustServerCertificate=yes;'
)
# 使用高级事务管理类
try:
with SqlServerTransaction(conn_str) as tx:
# 场景:用户注册流程
user_info = ('user123', '张三', 'zhangsan@email.com', '18888888888')
# 第一步:插入用户基本信息
tx.execute(
"INSERT INTO users (user_id, name, email, phone) VALUES (?, ?, ?, ?)",
user_info
)
# 第二步:创建用户权限记录
permissions = [
('user123', 'read'),
('user123', 'write'),
('user123', 'delete')
]
tx.execute_many(
"INSERT INTO user_permissions (user_id, permission) VALUES (?, ?)",
permissions
)
# 第三步:初始化用户配置
tx.execute(
"INSERT INTO user_config (user_id, theme, language) VALUES (?, ?, ?)",
('user123', 'dark', 'zh-CN')
)
# 第四步:记录操作日志
tx.execute(
"INSERT INTO operation_log (user_id, operation, timestamp) VALUES (?, ?, GETDATE())",
('user123', 'user_register')
)
print("🎉 用户注册流程完成!")
except Exception as e:
print(f"❌ 用户注册失败: {e}")
# 调用示例
advanced_transaction_demo()

合理控制事务大小
Python# ✅ 推荐:适中的事务大小
def process_batch_data(data_list, batch_size=1000):
"""分批处理大量数据"""
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i + batch_size]
with SqlServerTransaction(conn_str) as tx:
for item in batch:
tx.execute("INSERT INTO table1 VALUES (?)", (item,))
避免长时间事务
Python# ❌ 避免:长时间占用资源的事务
def bad_long_transaction():
with SqlServerTransaction(conn_str) as tx:
tx.execute("SELECT * FROM huge_table") # 大量数据查询
time.sleep(60) # 长时间处理
tx.execute("UPDATE another_table SET status = 'processed'")
# ✅ 推荐:缩短事务时间
def good_short_transaction():
# 先查询数据(不在事务中)
data = query_data()
processed_data = process_data(data) # 处理数据
# 快速提交事务
with SqlServerTransaction(conn_str) as tx:
tx.execute("UPDATE another_table SET status = 'processed'")
Pythondef robust_transaction_handling():
"""健壮的事务错误处理"""
retry_count = 3
for attempt in range(retry_count):
try:
with SqlServerTransaction(conn_str) as tx:
# 执行业务逻辑
tx.execute("INSERT INTO orders VALUES (?, ?, ?)", order_data)
break # 成功后跳出循环
except pyodbc.IntegrityError as e:
# 数据完整性错误,不应重试
print(f"❌ 数据完整性错误: {e}")
break
except pyodbc.OperationalError as e:
# 网络或服务器错误,可以重试
print(f"⚠️ 网络错误,第{attempt + 1}次重试: {e}")
if attempt == retry_count - 1:
print("❌ 重试次数已用完,操作失败")
raise
time.sleep(2 ** attempt) # 指数退避
对于高并发场景,建议使用连接池:
Pythonimport pyodbc
import threading
import logging
import time
from queue import Queue, Empty
from contextlib import contextmanager
from typing import Optional, Any, List, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
class ConnectionPool:
"""数据库连接池类"""
def __init__(self, conn_str: str, pool_size: int = 10, max_retries: int = 3):
self.conn_str = conn_str
self.pool_size = pool_size
self.max_retries = max_retries
self.pool = Queue(maxsize=pool_size)
self.lock = threading.Lock()
self._created_connections = 0
self._active_connections = 0
# 配置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# 初始化连接池
self._initialize_pool()
def _initialize_pool(self):
"""初始化连接池"""
try:
for i in range(self.pool_size):
conn = self._create_connection()
self.pool.put(conn)
self._created_connections += 1
self.logger.info(f"🏊♂️ 连接池初始化成功,创建了 {self.pool_size} 个连接")
except Exception as e:
self.logger.error(f"❌ 连接池初始化失败: {e}")
raise
def _create_connection(self) -> pyodbc.Connection:
"""创建新的数据库连接"""
for attempt in range(self.max_retries):
try:
conn = pyodbc.connect(self.conn_str)
conn.autocommit = False # 关闭自动提交,使用手动事务
return conn
except Exception as e:
if attempt == self.max_retries - 1:
self.logger.error(f"❌ 创建连接失败,已重试 {self.max_retries} 次: {e}")
raise
else:
self.logger.warning(f"⚠️ 创建连接失败,第 {attempt + 1} 次重试: {e}")
time.sleep(1) # 等待1秒后重试
def get_connection(self, timeout: int = 30) -> pyodbc.Connection:
"""获取连接"""
try:
conn = self.pool.get(timeout=timeout)
with self.lock:
self._active_connections += 1
# 检查连接是否有效
if not self._is_connection_valid(conn):
self.logger.warning("⚠️ 连接已失效,重新创建")
conn = self._create_connection()
self.logger.debug(f"🔗 获取连接,当前活跃连接数: {self._active_connections}")
return conn
except Empty:
raise TimeoutError(f"获取连接超时 ({timeout}秒),连接池可能已满")
def return_connection(self, conn: pyodbc.Connection):
"""归还连接"""
if conn:
try:
# 重置连接状态
if conn and not conn.closed:
conn.rollback() # 回滚未提交的事务
self.pool.put(conn)
with self.lock:
self._active_connections -= 1
self.logger.debug(f"🔄 归还连接,当前活跃连接数: {self._active_connections}")
except Exception as e:
self.logger.error(f"❌ 归还连接失败: {e}")
def _is_connection_valid(self, conn: pyodbc.Connection) -> bool:
"""检查连接是否有效"""
try:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
return True
except:
return False
@contextmanager
def get_connection_context(self):
"""连接上下文管理器"""
conn = None
try:
conn = self.get_connection()
yield conn
finally:
if conn:
self.return_connection(conn)
def get_pool_status(self) -> dict:
"""获取连接池状态"""
return {
"pool_size": self.pool_size,
"available_connections": self.pool.qsize(),
"active_connections": self._active_connections,
"created_connections": self._created_connections
}
def close_all_connections(self):
"""关闭所有连接"""
closed_count = 0
while not self.pool.empty():
try:
conn = self.pool.get_nowait()
conn.close()
closed_count += 1
except Empty:
break
except Exception as e:
self.logger.error(f"❌ 关闭连接失败: {e}")
self.logger.info(f"🔐 连接池已关闭,共关闭 {closed_count} 个连接")
class PooledSqlServerTransaction:
"""基于连接池的SQL Server事务管理类"""
def __init__(self, connection_pool: ConnectionPool):
self.pool = connection_pool
self.conn: Optional[pyodbc.Connection] = None
self.cursor: Optional[pyodbc.Cursor] = None
self._in_transaction = False
# 配置日志
self.logger = logging.getLogger(__name__)
def begin_transaction(self):
"""开始事务"""
if self._in_transaction:
raise RuntimeError("事务已经开始,不能重复开始")
self.conn = self.pool.get_connection()
self.cursor = self.conn.cursor()
self._in_transaction = True
self.logger.info("🚀 事务开始")
def execute(self, sql: str, params: Optional[Tuple] = None) -> Any:
"""执行SQL语句"""
if not self._in_transaction:
raise RuntimeError("请先调用 begin_transaction() 开始事务")
try:
if params:
result = self.cursor.execute(sql, params)
else:
result = self.cursor.execute(sql)
self.logger.info(f"✅ SQL执行成功: {sql[:50]}...")
return result
except Exception as e:
self.logger.error(f"❌ SQL执行失败: {e}")
raise
def execute_many(self, sql: str, params_list: List[Tuple]) -> None:
"""批量执行SQL语句"""
if not self._in_transaction:
raise RuntimeError("请先调用 begin_transaction() 开始事务")
try:
self.cursor.executemany(sql, params_list)
self.logger.info(f"✅ 批量SQL执行成功,共 {len(params_list)} 条记录")
except Exception as e:
self.logger.error(f"❌ 批量SQL执行失败: {e}")
raise
def fetchone(self) -> Optional[Tuple]:
"""获取单行结果"""
return self.cursor.fetchone()
def fetchall(self) -> List[Tuple]:
"""获取所有结果"""
return self.cursor.fetchall()
def commit(self):
"""提交事务"""
if self.conn and self._in_transaction:
try:
self.conn.commit()
self.logger.info("🎉 事务提交成功")
except Exception as e:
self.logger.error(f"❌ 事务提交失败: {e}")
raise
finally:
self._cleanup()
def rollback(self):
"""回滚事务"""
if self.conn and self._in_transaction:
try:
self.conn.rollback()
self.logger.info("🔄 事务回滚成功")
except Exception as e:
self.logger.error(f"❌ 事务回滚失败: {e}")
raise
finally:
self._cleanup()
def _cleanup(self):
"""清理资源"""
if self.cursor:
self.cursor.close()
self.cursor = None
if self.conn:
self.pool.return_connection(self.conn)
self.conn = None
self._in_transaction = False
def __enter__(self):
"""上下文管理器入口"""
self.begin_transaction()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
if exc_type is None:
# 没有异常,提交事务
self.commit()
else:
# 发生异常,回滚事务
self.rollback()
def pooled_transaction_demo():
"""使用连接池的事务管理示例"""
# 数据库连接字符串
conn_str = (
'DRIVER={ODBC Driver 18 for SQL Server};'
'SERVER=localhost;'
'DATABASE=dbtest;'
'UID=sa;'
'PWD=123;'
'Trusted_Connection=yes;'
'TrustServerCertificate=yes;'
)
# 创建连接池
pool = ConnectionPool(conn_str, pool_size=5)
print("📊 连接池状态:", pool.get_pool_status())
try:
# 示例1:基本事务操作
print("\n=== 示例1:基本事务操作 ===")
with PooledSqlServerTransaction(pool) as tx:
# 插入用户数据
user_data = ('user456', '李四', 'lisi@email.com', '13999999999')
tx.execute(
"INSERT INTO users (user_id, name, email, phone) VALUES (?, ?, ?, ?)",
user_data
)
# 插入权限数据
permissions = [
('user456', 'read'),
('user456', 'write')
]
tx.execute_many(
"INSERT INTO user_permissions (user_id, permission) VALUES (?, ?)",
permissions
)
print("✅ 基本事务操作完成")
# 示例2:查询操作
print("\n=== 示例2:查询操作 ===")
with pool.get_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"📈 当前用户总数: {count}")
cursor.close()
except Exception as e:
print(f"❌ 操作失败: {e}")
finally:
print("\n📊 连接池最终状态:", pool.get_pool_status())
def concurrent_transaction_demo():
"""并发事务测试"""
conn_str = (
'DRIVER={ODBC Driver 18 for SQL Server};'
'SERVER=localhost;'
'DATABASE=dbtest;'
'UID=sa;'
'PWD=123;'
'Trusted_Connection=yes;'
'TrustServerCertificate=yes;'
)
pool = ConnectionPool(conn_str, pool_size=3)
def worker_task(worker_id: int, task_count: int):
"""工作线程任务"""
results = []
for i in range(task_count):
try:
with PooledSqlServerTransaction(pool) as tx:
user_id = f"worker_{worker_id}_{i}"
user_data = (user_id, f"工作者{worker_id}-{i}", f"{user_id}@test.com", "13000000000")
tx.execute(
"INSERT INTO users (user_id, name, email, phone) VALUES (?, ?, ?, ?)",
user_data
)
tx.execute(
"INSERT INTO operation_log (user_id, operation) VALUES (?, ?)",
(user_id, 'concurrent_test')
)
results.append(f"✅ Worker-{worker_id} Task-{i} 成功")
time.sleep(0.1) # 模拟处理时间
except Exception as e:
results.append(f"❌ Worker-{worker_id} Task-{i} 失败: {e}")
return results
print("\n=== 并发事务测试 ===")
print(f"📊 测试前连接池状态: {pool.get_pool_status()}")
# 使用线程池执行并发任务
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
futures = []
for worker_id in range(3):
future = executor.submit(worker_task, worker_id, 5)
futures.append(future)
# 收集结果
all_results = []
for future in as_completed(futures):
results = future.result()
all_results.extend(results)
# 显示结果
for result in all_results:
print(result)
print(f"\n📊 测试后连接池状态: {pool.get_pool_status()}")
# 清理测试数据
try:
with pool.get_connection_context() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM operation_log WHERE operation = 'concurrent_test'")
cursor.execute("DELETE FROM users WHERE user_id LIKE 'worker_%'")
conn.commit()
cursor.close()
print("🧹 测试数据清理完成")
except Exception as e:
print(f"❌ 清理失败: {e}")
# 关闭连接池
pool.close_all_connections()
if __name__ == "__main__":
print("🔧 开始数据库连接池测试")
# 基本功能测试
pooled_transaction_demo()
print("\n" + "=" * 50)
# 并发测试
concurrent_transaction_demo()
print("\n🎉 所有测试完成!")

通过本文的详细讲解,我们掌握了使用Python pyodbc处理SqlServer事务的三种核心模式:基础事务处理让我们理解了事务的基本概念和操作方式;上下文管理器模式提供了更加优雅和安全的代码结构;高级事务类封装则为复杂业务场景提供了强大的功能支持。
三个关键要点回顾:
在实际的Python开发项目中,特别是涉及上位机开发、数据采集系统或企业级应用时,掌握这些事务处理技巧将大大提升你的编程技巧水平。建议你在自己的项目中尝试应用这些模式,并根据具体业务需求进行适当的调整和优化。
记住,优秀的Python开发者不仅要写出能运行的代码,更要写出安全、可靠、可维护的代码。事务处理正是体现这种专业素养的重要技能之一!
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!