编辑
2025-12-18
Python
00

目录

🔍 问题分析
💡 解决方案
🛠️ 环境准备
🏗️ 项目结构搭建
💻 代码实战
🔧 数据库配置(config.py)
🗄️ 数据库连接管理(database.py)
📊 数据模型定义(models.py)
🔄 数据库操作类(operations.py)
🎯 主程序演示(main.py)
🎯 总结核心要点

作为一名Python开发者,你是否曾经为编写繁琐的SQL语句而苦恼?是否在处理数据库连接、事务管理时感到头疼?SQLAlchemy作为Python生态中最强大的ORM框架,能够帮你彻底摆脱这些困扰。

本文将通过一个完整的用户管理系统实战案例,详细讲解如何在Windows环境下使用SQLAlchemy操作MySQL数据库。从环境搭建到高级应用,从基础CRUD到复杂查询,让你快速掌握SQLAlchemy的核心技能,提升Python开发效率。

无论你是初学者还是有经验的开发者,这篇文章都将为你的上位机开发和数据库操作提供实用的解决方案。

🔍 问题分析

在实际的Python开发中,我们经常遇到以下数据库操作难题:

传统方式的痛点:

  • 手写SQL语句容易出错,维护困难
  • 数据库连接管理复杂,容易出现连接泄露
  • 不同数据库之间的SQL语法差异导致移植困难
  • 数据类型转换和验证需要大量重复代码

SQLAlchemy的优势:

  • 对象关系映射(ORM):将数据库表映射为Python类
  • 数据库无关性:支持多种数据库,代码易于迁移
  • 连接池管理:自动管理数据库连接,提高性能
  • 强大的查询语法:提供直观的Python式查询方式

💡 解决方案

🛠️ 环境准备

首先,我们需要安装必要的依赖包:

Bash
pip install sqlalchemy pip install pymysql pip install cryptography

🏗️ 项目结构搭建

创建一个清晰的项目结构:

Python
sqlalchemy_demo/ ├── config.py # 数据库配置 ├── models.py # 数据模型定义 ├── database.py # 数据库连接管理 ├── operations.py # 数据库操作 └── main.py # 主程序入口

💻 代码实战

🔧 数据库配置(config.py

Python
# config.py import urllib.parse class DatabaseConfig: """数据库配置类""" # MySQL连接参数 MYSQL_HOST = 'localhost' MYSQL_PORT = 3306 MYSQL_USER = 'root' MYSQL_PASSWORD = 'your_password' # 替换为你的密码 MYSQL_DATABASE = 'test_db' # 构建数据库连接URL @classmethod def get_database_url(cls): password = urllib.parse.quote_plus(cls.MYSQL_PASSWORD) return f'mysql+pymysql://{cls.MYSQL_USER}:{password}@{cls.MYSQL_HOST}:{cls.MYSQL_PORT}/{cls.MYSQL_DATABASE}?charset=utf8mb3' # SQLAlchemy配置 SQLALCHEMY_ECHO = True # 打印SQL语句,便于调试 SQLALCHEMY_POOL_SIZE = 10 SQLALCHEMY_MAX_OVERFLOW = 20 SQLALCHEMY_POOL_TIMEOUT = 30

🗄️ 数据库连接管理(database.py

Python
# database.py from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from config import DatabaseConfig class DatabaseManager: """数据库管理器""" def __init__(self): self.engine = None self.SessionLocal = None self.Base = declarative_base() def init_database(self): """初始化数据库连接""" try: # 创建数据库引擎 self.engine = create_engine( DatabaseConfig.get_database_url(), echo=DatabaseConfig.SQLALCHEMY_ECHO, pool_size=DatabaseConfig.SQLALCHEMY_POOL_SIZE, max_overflow=DatabaseConfig.SQLALCHEMY_MAX_OVERFLOW, pool_timeout=DatabaseConfig.SQLALCHEMY_POOL_TIMEOUT, pool_pre_ping=True # 连接前ping数据库 ) # 创建会话工厂 self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) print("✅ 数据库连接初始化成功") return True except Exception as e: print(f"❌ 数据库连接失败: {e}") return False def get_session(self): """获取数据库会话""" return self.SessionLocal() def create_tables(self): """创建数据表""" try: self.Base.metadata.create_all(bind=self.engine) print("✅ 数据表创建成功") except Exception as e: print(f"❌ 数据表创建失败: {e}") # 创建全局数据库管理器实例 db_manager = DatabaseManager()

📊 数据模型定义(models.py

Python
# models.py from sqlalchemy import Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from database import db_manager Base = db_manager.Base class User(Base): """用户模型类""" __tablename__ = 'users' # 定义表字段 id = Column(Integer, primary_key=True, nullable=False, comment='用户ID') name = Column(String(255), nullable=True, comment='用户姓名') age = Column(Integer, nullable=True, comment='用户年龄') email = Column(String(255), nullable=True, comment='用户邮箱') def __init__(self, id, name=None, age=None, email=None): """构造函数""" self.id = id self.name = name self.age = age self.email = email def __repr__(self): """字符串表示""" return f"<User(id={self.id}, name='{self.name}', age={self.age}, email='{self.email}')>" def to_dict(self): """转换为字典格式""" return { 'id': self.id, 'name': self.name, 'age': self.age, 'email': self.email } @classmethod def from_dict(cls, data): """从字典创建实例""" return cls( id=data.get('id'), name=data.get('name'), age=data.get('age'), email=data.get('email') )

🔄 数据库操作类(operations.py

Python
# operations.py from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import and_, or_, desc, asc from models import User from database import db_manager from typing import List, Optional, Dict, Any class UserOperations: """用户数据库操作类""" def __init__(self): self.session: Session = None def __enter__(self): """上下文管理器入口""" self.session = db_manager.get_session() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器退出""" if self.session: if exc_type is None: self.session.commit() else: self.session.rollback() self.session.close() # ========== 增加操作 ========== def create_user(self, user_data: Dict[str, Any]) -> Optional[User]: """创建新用户""" try: user = User.from_dict(user_data) self.session.add(user) self.session.flush() # 获取自动生成的ID print(f"✅ 创建用户成功: {user}") return user except SQLAlchemyError as e: print(f"❌ 创建用户失败: {e}") return None def create_users_batch(self, users_data: List[Dict[str, Any]]) -> bool: """批量创建用户""" try: users = [User.from_dict(data) for data in users_data] self.session.add_all(users) print(f"✅ 批量创建 {len(users)} 个用户成功") return True except SQLAlchemyError as e: print(f"❌ 批量创建用户失败: {e}") return False # ========== 查询操作 ========== def get_user_by_id(self, user_id: int) -> Optional[User]: """根据ID查询用户""" try: user = self.session.query(User).filter(User.id == user_id).first() if user: print(f"✅ 查询用户成功: {user}") else: print(f"⚠️ 未找到ID为 {user_id} 的用户") return user except SQLAlchemyError as e: print(f"❌ 查询用户失败: {e}") return None def get_users_by_name(self, name: str) -> List[User]: """根据姓名查询用户(模糊查询)""" try: users = self.session.query(User).filter( User.name.like(f'%{name}%') ).all() print(f"✅ 查询到 {len(users)} 个用户") return users except SQLAlchemyError as e: print(f"❌ 查询用户失败: {e}") return [] def get_users_by_age_range(self, min_age: int, max_age: int) -> List[User]: """根据年龄范围查询用户""" try: users = self.session.query(User).filter( and_(User.age >= min_age, User.age <= max_age) ).order_by(User.age).all() print(f"✅ 查询到年龄在 {min_age}-{max_age} 之间的 {len(users)} 个用户") return users except SQLAlchemyError as e: print(f"❌ 查询用户失败: {e}") return [] def get_all_users(self, page: int = 1, page_size: int = 10) -> List[User]: """获取所有用户(支持分页)""" try: offset = (page - 1) * page_size users = self.session.query(User)\ .offset(offset)\ .limit(page_size)\ .all() print(f"✅ 查询到第 {page} 页的 {len(users)} 个用户") return users except SQLAlchemyError as e: print(f"❌ 查询用户失败: {e}") return [] # ========== 更新操作 ========== def update_user(self, user_id: int, update_data: Dict[str, Any]) -> bool: """更新用户信息""" try: user = self.session.query(User).filter(User.id == user_id).first() if not user: print(f"⚠️ 未找到ID为 {user_id} 的用户") return False # 更新字段 for key, value in update_data.items(): if hasattr(user, key): setattr(user, key, value) print(f"✅ 更新用户成功: {user}") return True except SQLAlchemyError as e: print(f"❌ 更新用户失败: {e}") return False def update_users_by_condition(self, condition_data: Dict[str, Any], update_data: Dict[str, Any]) -> int: """根据条件批量更新用户""" try: query = self.session.query(User) # 构建查询条件 for key, value in condition_data.items(): if hasattr(User, key): query = query.filter(getattr(User, key) == value) # 执行更新 updated_count = query.update(update_data) print(f"✅ 批量更新 {updated_count} 个用户") return updated_count except SQLAlchemyError as e: print(f"❌ 批量更新用户失败: {e}") return 0 # ========== 删除操作 ========== def delete_user(self, user_id: int) -> bool: """删除用户""" try: user = self.session.query(User).filter(User.id == user_id).first() if not user: print(f"⚠️ 未找到ID为 {user_id} 的用户") return False self.session.delete(user) print(f"✅ 删除用户成功: {user}") return True except SQLAlchemyError as e: print(f"❌ 删除用户失败: {e}") return False def delete_users_by_condition(self, condition_data: Dict[str, Any]) -> int: """根据条件批量删除用户""" try: query = self.session.query(User) # 构建查询条件 for key, value in condition_data.items(): if hasattr(User, key): query = query.filter(getattr(User, key) == value) # 执行删除 deleted_count = query.delete() print(f"✅ 批量删除 {deleted_count} 个用户") return deleted_count except SQLAlchemyError as e: print(f"❌ 批量删除用户失败: {e}") return 0 # ========== 统计操作 ========== def count_users(self) -> int: """统计用户总数""" try: count = self.session.query(User).count() print(f"📊 用户总数: {count}") return count except SQLAlchemyError as e: print(f"❌ 统计用户失败: {e}") return 0 def get_age_statistics(self) -> Dict[str, Any]: """获取年龄统计信息""" try: from sqlalchemy import func result = self.session.query( func.min(User.age).label('min_age'), func.max(User.age).label('max_age'), func.avg(User.age).label('avg_age'), func.count(User.id).label('total_count') ).filter(User.age.isnot(None)).first() statistics = { 'min_age': result.min_age, 'max_age': result.max_age, 'avg_age': round(float(result.avg_age), 2) if result.avg_age else 0, 'total_count': result.total_count } print(f"📊 年龄统计: {statistics}") return statistics except SQLAlchemyError as e: print(f"❌ 获取年龄统计失败: {e}") return {}

🎯 主程序演示(main.py

Python
# main.py from database import db_manager from operations import UserOperations import time def demo_basic_operations(): """演示基本CRUD操作""" print("🚀 开始演示SQLAlchemy基本操作") print("=" * 50) # 使用上下文管理器确保事务正确处理 with UserOperations() as user_ops: # 1. 创建单个用户 print("\n📝 1. 创建用户操作") user_data = { 'id': 1, 'name': '张三', 'age': 25, 'email': 'zhangsan@example.com' } user_ops.create_user(user_data) # 2. 批量创建用户 print("\n📝 2. 批量创建用户操作") users_data = [ {'id': 2, 'name': '李四', 'age': 30, 'email': 'lisi@example.com'}, {'id': 3, 'name': '王五', 'age': 28, 'email': 'wangwu@example.com'}, {'id': 4, 'name': '赵六', 'age': 35, 'email': 'zhaoliu@example.com'}, {'id': 5, 'name': '孙七', 'age': 22, 'email': 'sunqi@example.com'} ] user_ops.create_users_batch(users_data) def demo_query_operations(): """演示查询操作""" print("\n🔍 3. 查询操作演示") print("-" * 30) with UserOperations() as user_ops: # 根据ID查询 print("\n🔍 根据ID查询用户:") user = user_ops.get_user_by_id(1) # 根据姓名模糊查询 print("\n🔍 根据姓名模糊查询:") users = user_ops.get_users_by_name('张') for user in users: print(f" {user}") # 根据年龄范围查询 print("\n🔍 查询年龄在25-30之间的用户:") users = user_ops.get_users_by_age_range(25, 30) for user in users: print(f" {user}") # 分页查询所有用户 print("\n🔍 分页查询用户(第1页,每页3条):") users = user_ops.get_all_users(page=1, page_size=3) for user in users: print(f" {user}") def demo_update_operations(): """演示更新操作""" print("\n✏️ 4. 更新操作演示") print("-" * 30) with UserOperations() as user_ops: # 更新单个用户 print("\n✏️ 更新用户信息:") update_success = user_ops.update_user(1, { 'name': '张三丰', 'age': 26, 'email': 'zhangsanfeng@example.com' }) if update_success: updated_user = user_ops.get_user_by_id(1) print(f" 更新后: {updated_user}") # 批量更新 print("\n✏️ 批量更新操作:") updated_count = user_ops.update_users_by_condition( condition_data={'age': 30}, update_data={'age': 31} ) def demo_statistics(): """演示统计操作""" print("\n📊 5. 统计操作演示") print("-" * 30) with UserOperations() as user_ops: # 统计用户总数 total_count = user_ops.count_users() # 年龄统计 age_stats = user_ops.get_age_statistics() def demo_delete_operations(): """演示删除操作""" print("\n🗑️ 6. 删除操作演示") print("-" * 30) with UserOperations() as user_ops: # 删除单个用户 print("\n🗑️ 删除单个用户:") delete_success = user_ops.delete_user(5) # 验证删除结果 if delete_success: user = user_ops.get_user_by_id(5) print(f" 验证删除结果: {'删除成功' if user is None else '删除失败'}") def main(): """主函数""" print("🎯 SQLAlchemy MySQL 应用演示") print("=" * 60) # 初始化数据库 if not db_manager.init_database(): print("❌ 数据库初始化失败,程序退出") return # 创建数据表 db_manager.create_tables() try: # 依次演示各种操作 demo_basic_operations() time.sleep(1) demo_query_operations() time.sleep(1) demo_update_operations() time.sleep(1) demo_statistics() time.sleep(1) demo_delete_operations() print("\n🎉 所有演示完成!") except KeyboardInterrupt: print("\n⚠️ 程序被用户中断") except Exception as e: print(f"\n❌ 程序执行出错: {e}") finally: print("📝 程序执行结束") if __name__ == "__main__": main()

image.png

🎯 总结核心要点

通过本文的详细讲解和实战演示,我们完整地掌握了Python MySQL下SQLAlchemy的应用技术。让我们回顾三个关键要点:

🚀 架构设计优势:采用分层架构设计,将数据库配置、连接管理、模型定义和操作逻辑分离,不仅提高了代码的可维护性,还为后续的上位机开发项目奠定了坚实基础。这种设计模式特别适合复杂的Python开发项目。

💡 实战操作精髓:通过完整的CRUD操作示例,展示了SQLAlchemy在实际项目中的强大功能。从基础的单条记录操作到高级的批量处理和复杂查询,每个代码片段都可以直接应用到你的编程项目中。

⚡ 性能优化策略:掌握了连接池管理、异常处理、批量操作等高级技巧,这些都是提升应用性能和稳定性的关键因素。在Windows环境下的Python开发中,这些优化策略将显著提升你的开发效率。

SQLAlchemy不仅是一个ORM框架,更是Python开发者的得力助手。掌握它,你就拥有了构建高质量数据库应用的核心技能。继续深入学习,你会发现更多实用的编程技巧等待探索!

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!