0%

SQLAlchemy2.0 简明指南

简介

  • python 最知名的数据库 ORM 终于在 23 年 1 月 26 号发布了可用于生产环境的 2.0版本,提供了全新的 ORM 与 Core 层操作,异步支持,更友好的类型提示,性能提升。
1
pip install SQLAlchemy  #2.0.10

Core

初始化连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import sqlalchemy as sa

db = sa.create_engine("sqlite:///test.db", echo=False, pool_size=5, pool_recycle=-1, pool_pre_ping=False,
max_overflow=10, pool_timeout=30)
#db_url
# mysql+mysqlclient://user:password@host:port/db?charset=utf8mb4
# postgresql+psycopg://user:password@host:port/db

# echo: 是否打印sql语句,用于debug,默认False
# pool_size: 连接池大小 默认5
# pool_recycle: 连接回收时间,单位秒,默认-1,表示不回收 ,对于mysql等可设置为3600(秒)
# pool_pre_ping: 是否开启连接池预检,默认False,生产环境建议开启。如果开启:每次适用都会先测试连接可用性,如果不可用则重新建立连接(增加一点开销保证连接可用)
# max_overflow: 超过连接池大小外最多创建的连接,如果设置为-1或者None则不限制,默认10
# pool_timeout: 连接池中如果没有可用连接后,最多等待的时间,否则报错,默认30秒

执行原始 SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# sql语句必须适用text处理以支持绑定参数,防止注入
with db.connect() as conn:
try:
conn.execute(text("CREATE TABLE foo(id int, name varchar)"))
conn.execute(text("INSERT INTO foo VALUES(:id,:name)"), {"id": 1, "name": 'zhang'})
# 手动提交才会执行
conn.commit()
except SQLAlchemyError as e:
print(e)
# 回滚
conn.rollback()


with db.begin() as conn:
# 开启一个事务,会自动commit
cursor = conn.execute(text("SELECT id,name FROM foo"))
# 如果使用原始sql做SELECT查询,不启用mappings映射 会返回数据tuple
for i in cursor.mappings():
print(i)
print(i.name)
print(i["id"])
# 只能被消费一次,再次查询结果为空
print(list(cursor.mappings()))

CRUD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

from sqlalchemy import select, Table, MetaData, insert, func

# 自动从数据库映射表
t_foo = Table("foo", MetaData(), autoload_with=db)

with db.begin() as conn:
sql = select(t_foo.c.id.label('gg'), t_foo.c.name).filter_by(id=1).order_by(t_foo.c.id.desc())
print(sql) # 查看翻译后的sql
# 获取所有结果
print(list(map(dict, conn.execute(sql).mappings().all())))

# 插入
conn.execute(insert(t_foo).values(id=2))
# 批量插入
conn.execute(insert(t_foo), [{"id": 3, "name": 'wang'}, {"id": 4, "name": 'li'}, {"id": 4, "name": 'sun'}])

# 删除
conn.execute(delete(t_foo).where(t_foo.c.id.in_((2, 3))))

# 更新, like %a%
conn.execute(update(t_foo).where(t_foo.c.name.contains('a')).values(name='aaaa'))

# group by
sql = select(t_foo.c.id, func.count(t_foo.c.id).label("cnt"), func.max(t_foo.c.id)).group_by(t_foo.c.id)

ORM

Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

from sqlalchemy import DateTime, String, text
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from datetime import datetime
from typing import Optional


class Base(DeclarativeBase):
# sqlite 服务端 默认时间戳函数
created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=text("CURRENT_TIMESTAMP"))


class User(Base):
__tablename__ = "user"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
nickname: Mapped[Optional[str]]


# 创建表, checkfirst=已存在则不创建
Base.metadata.create_all(checkfirst=True, bind=db)

CRUD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

Session = sessionmaker(db)

with Session.begin() as session:
# 不存在主键=1
if not session.get(User, 1):
# 添加
session.add(User(id=1, name='zhang'))
# 批量插入,性能高
# session.bulk_insert_mappings(User, [{"name": "zhang2"}, {"name": "zhang3"}])

# 查询
user = session.scalars(select(User).filter_by(id=1)).one_or_none()
if user:
# 修改值
print(user.name)
user.name = 'zhang5'

# 删除
session.delete(user)

# 子查询
sub = select(t_foo.c.name).join(User, User.id == t_foo.c.id).scalar_subquery().label("nn")
sql = select(User.id, User.name, sub)
print(list(map(dict, session.execute(sql).mappings())))

Async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio

from sqlalchemy import Column
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import create_async_engine

meta = MetaData()
t1 = Table("t1", meta, Column("name", String(50), primary_key=True))


async def async_main() -> None:
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
)

async with engine.begin() as conn:
await conn.run_sync(meta.create_all)

await conn.execute(
t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
)

async with engine.connect() as conn:
# select a Result, which will be delivered with buffered
# results
result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))

print(result.fetchall())

# for AsyncEngine created in function scope, close and
# clean-up pooled connections
await engine.dispose()


asyncio.run(async_main())

技巧

  • 使用函数
1
2
3
4
5
6
7
import sqlachemy as sa

# postgresql 里data_time(timestamptz) 转unix 时间戳
sa.func.extract('epoch', table.data_time).cast(sa.Integer).label("time")

# postgresql 里double 保留6位小数
sa.func.round(sa.cast(data_table.lat, sa.NUMERIC), 6).label("lat"),
  • 获取表所有字段
1
2
3
4
5
# core Table
sa.select( *table.c)

# orm Table
sa.select( *table.__table__.c)
  • 更快的json处理 (orjson)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import orjson
import sqlachemy as sa

def orjson_serializer(obj) -> str:
return orjson.dumps(
obj, option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_NAIVE_UTC
).decode()

# 替换默认的json, 主要加速 json类型的字段
sa.create_engine(
...,
json_serializer=orjson_serializer,
json_deserializer=orjson.loads,
)

参考

欢迎关注我的其它发布渠道