您现在的位置是:网站首页 > 博客日记 >

SQLAlchemy入门

作者:YXN-python 阅读量:171 发布日期:2025-02-26

SQLAlchemy是一个基于Python实现的ORM对象关系映射框架。

该框架建立在DB API之上,使用关系对象映射进行数据库操作,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

官网:

https://www.sqlalchemy.org/

1、安装

// 默认从官网下载安装PyMySQL库
pip3 install sqlalchemy

// 从豆瓣源下载安装PyMySQL库
pip3 install sqlalchemy -i https://pypi.douban.com/simple

// 从清华源下载安装PyMySQL库
pip3 install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple

2、连接数据库

SQLAlchemy支持多种数据库后端,包括SQLite、MySQL、PostgreSQL等。以下是一个使用SQLite数据库的示例:

from sqlalchemy import create_engine
engine = create_engine('sqlite:///example.db')

 

3、定义模型

模型通常是通过定义Python类来实现的,这些类继承自 sqlalchemy.ext.declarative.api.DeclarativeMeta。每个类对应于数据库中的一个表,类的属性对应于表中的列。

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

4、创建表

Base.metadata.create_all(engine)

5、创建会话

会话(Session)是用于与数据库进行交互的主要接口。会话允许您执行数据库操作,如插入、查询、更新和删除数据。

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()

6、插入数据

要插入数据,首先需要创建模型类的实例,并将其添加到会话中。然后,可以使用会话的 commit() 方法将更改提交到数据库。

user = User(name='Alice', age=30)
session.add(user)
session.commit()

7、查询数据

查询数据,可以使用会话的 query() 方法。query() 方法接受模型类作为参数,并返回一个查询对象。查询对象提供了多种方法来执行不同的查询操作。

# 查询所有用户
users = session.query(User).all()

# 查询特定条件的用户
user = session.query(User).filter_by(name='Alice').first()

# 查询并排序
users = session.query(User).order_by(User.age.desc()).all()

# 查询并限制返回数量
users = session.query(User).limit(5).all()

# 查询并偏移
users = session.query(User).offset(5).limit(5).all()

# 查询并使用原生SQL
users = session.query(User).filter("age > 25").all()

8、更新数据

更新数据,可以使用会话的 query() 方法获取要更新的记录,然后修改记录的属性,并提交会话。

user = session.query(User).filter_by(name='Alice').first()
user.age = 31
session.commit()

9、删除数据

要删除数据,可以使用会话的 query() 方法获取要删除的记录,然后使用 delete() 方法删除记录,并提交会话。

user = session.query(User).filter_by(name='Alice').first()
session.delete(user)
session.commit()

10、使用事务

使用会话来管理事务。会话提供了一个 begin() 方法来开始一个事务,以及一个 commit() 方法来提交事务。如果发生错误,可以使用 rollback() 方法来回滚事务。

session.begin()
try:
    user = User(name='Bob', age=25)
    session.add(user)
    session.commit()
except:
    session.rollback()
    raise

11、关闭会话

在完成数据库操作后,应该关闭会话以释放资源。

session.close()

知识点二

组件构成

SQLAlchemy ORM组成部分如下:

  • Object Relation Mapping(ORM):对象关系映射

 

SQLAlchemy Core组成部分如下:

  • Engine:框架的引擎
  • Connection Pooling:数据库连接池
  • Dialect:选择连接数据库的DB API种类
  • Schema/Types:架构和类型
  • SQL Exprression Language:SQL表达式语言

 

SQLAlchemy本身无法操作数据库,其必须以来PYMYSQL等第三方插件驱动,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作

MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

配置数据库连接串

uri: dialect[+driver]://user:password@host/dbname[?key=value..]
- dialect:数据库,如:sqlite、mysql、oracle、postgresql等
- driver:数据库驱动,用于连接数据库,比如pymysql、mysqldb等
- username:数据库用户
- password:数据库密码
- host:数据库服务IP地址
- port:数据库服务端口
- database:数据库名
# 实例:MySQL + PyMySQL
# MySQL服务端配置信息
DB_INFO = dict(
    host="127.0.0.1",
    port=6379,
    user="admin",
    password="123456",
    database="test",
    charset="utf8"
)
# 数据库连接URL格式化
DB_URI = 'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset={charset}'.format(**DB_INFO)

创建引擎并连接数据库

engine = create_engine("mysql+pymysql://root:123456@localhost:3306/db4?charset=utf8",
                        max_overflow=0,  # 超过连接池大小外最多创建的连接
                        pool_size=5,  # 连接池大小
                        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                        echo = True    # echo参数为True时,会显示每条执行的SQL语句,可以关闭 ",max_overflow = 5)
from sqlalchemy import create_engine
# 连接地址
DB_URI = 'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset={charset}'
# 创建引擎
engine = create_engine(DB_URI)
# 打开连接
conn = engine.connect() 
# 执行查询
result = conn.execute('select * from user limit %s offset %s', 10, 2)
# 获取单条数据
data_line = result.fetchone()
# 获取多条数据
data_list = result.fetchmany(2)
# 获取全部数据
data_list = result.fetchall()
# 插入数据操作,获取最后行ID
last_row_id = result.lastrowid
# 关闭连接
conn.close()  

数据库对象映射模型

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
# 数据库连接地址
DB_URI = 'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset={charset}'
# 创建数据库引擎
engine = create_engine(DB_URI)
# 模型基类
Base = declarative_base(engine)
session = sessionmaker(engine)()
class Student(Base):
    """功能:学生映射模型类"""
    __tablename__ = 'Student'
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键ID")
    name = Column(String(50), index=True, nullable=True, comment="学生名称")
    age = Column(Integer, comment="学生年龄")
    sex = Column(String(10), comment="学生性别")
# 创建全部表,默认自动跳过已存在表
Base.metadata.create_all()
# 创建指定表,默认自动跳过已存在表
Base.metadata.create_all(tables=[Student.__table__])
# 删除全部表,默认自动跳过不存在的表
Base.metadata.drop_all()
# 删除指定表,默认自动跳过不存在的表
Base.metadata.drop_all(tables=[Student.__table__])

 

YXN-python

2025-02-26