开发者指南
开发者指南内容较多,故分为了一个示例以及数个专题。
阅读(并且最好跟随实践)示例后,你将会对使用 nonebot-plugin-orm
开发插件有一个基本的认识。
如果想要更深入地学习关于 SQLAlchemy 和 Alembic 的知识,或者在使用过程中遇到了问题,可以查阅专题以及其官方文档。
示例
模型定义
首先,我们需要设计存储的数据的结构。
例如天气插件,需要存储什么地方 (location
) 的天气是什么 (weather
)。
其中,一个地方只会有一种天气,而不同地方可能有相同的天气。
所以,我们可以设计出如下的模型:
from nonebot_plugin_orm import Model
from sqlalchemy.orm import Mapped, mapped_column
class Weather(Model):
location: Mapped[str] = mapped_column(primary_key=True)
weather: Mapped[str]
其中,primary_key=True
意味着此列 (location
) 是主键,即内容是唯一的且非空的。
每一个模型必须有至少一个主键。
我们可以用以下代码检查模型生成的数据库模式是否正确:
from sqlalchemy.schema import CreateTable
print(CreateTable(Weather.__table__))
CREATE TABLE weather_weather (
location VARCHAR NOT NULL,
weather VARCHAR NOT NULL,
CONSTRAINT pk_weather_weather PRIMARY KEY (location)
)
可以注意到表名是 weather_weather
而不是 Weather
或者 weather
。
这是因为 nonebot-plugin-orm
会自动为模型生成一个表名,规则是:<插件模块名>_<类名小写>
。
你也可以通过指定 __tablename__
属性来自定义表名:
class Weather(Model):
__tablename__ = "weather"
...
CREATE TABLE weather (
...
)
但是,并不推荐你这么做,因为这可能会导致不同插件间的表名重复,引发冲突。 特别是当你会发布插件时,你并不知道其他插件会不会使用相同的表名。
首次迁移
我们成功定义了模型,现在启动机器人试试吧:
$ nb run
01-02 15:04:05 [SUCCESS] nonebot | NoneBot is initializing...
01-02 15:04:05 [ERROR] nonebot_plugin_orm | 启动检查失败
01-02 15:04:05 [ERROR] nonebot | Application startup failed. Exiting.
Traceback (most recent call last):
...
click.exceptions.UsageError: 检测到新的升级操作:
[('add_table',
Table('weather', MetaData(), Column('location', String(), table=<weather>, primary_key=True, nullable=False), Column('weather', String(), table=<weather>, nullable=False), schema=None))]
咦,发生了什么?
nonebot-plugin-orm
试图阻止我们启动机器人。
原来是我们定义了模型,但是数据库中并没有对应的表,这会导致插件不能正常运行。
所以,我们需要迁移数据库。
首先,我们需要创建一个迁移脚本:
nb orm revision -m "first revision" --branch-label weather
其中,-m
参数是迁移脚本的描述,--branch-label
参数是迁移脚本的分支,一般为插件模块名。
执行命令过后,出现了一个 weather/migrations
目录,其中有一个 xxxxxxxxxxxx_first_revision.py
文件:
weather
├── __init__.py
├── config.py
└── migrations
└── xxxxxxxxxxxx_first_revision.py
这就是我们创建的迁移脚本,它记录了数据库模式的变化。 我们可以查看一下它的内容:
"""first revision
迁移 ID: xxxxxxxxxxxx
父迁移:
创建时间: 2006-01-02 15:04:05.999999
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
revision: str = "xxxxxxxxxxxx"
down_revision: str | Sequence[str] | None = None
branch_labels: str | Sequence[str] | None = ("weather",)
depends_on: str | Sequence[str] | None = None
def upgrade(name: str = "") -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"weather_weather",
sa.Column("location", sa.String(), nullable=False),
sa.Column("weather", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")),
info={"bind_key": "weather"},
)
# ### end Alembic commands ###
def downgrade(name: str = "") -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("weather_weather")
# ### end Alembic commands ###
可以注意到脚本的主体部分(其余是模版代码,请勿修改)是:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table( # CREATE TABLE
"weather_weather", # weather_weather
sa.Column("location", sa.String(), nullable=False), # location VARCHAR NOT NULL,
sa.Column("weather", sa.String(), nullable=False), # weather VARCHAR NOT NULL,
sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")), # CONSTRAINT pk_weather_weather PRIMARY KEY (location)
info={"bind_key": "weather"},
)
# ### end Alembic commands ###
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("weather_weather") # DROP TABLE weather_weather;
# ### end Alembic commands ###
虽然我们不是很懂这些代码的意思,但是可以注意到它们几乎与 SQL 语句 (DDL) 一一对应。 显然,它们是用来创建和删除表的。
我们还可以注意到,upgrade()
和 downgrade()
函数中的代码是互逆的。
也就是说,执行一次 upgrade()
函数,再执行一次 downgrade()
函数后,数据库的模式就会回到原来的状态。
这就是迁移脚本的作用:记录数据库模式的变化,以便我们在不同的环境中(例如开发环境和生产环境)可复现地、可逆地同步数据库模式,正如 git 对我们的代码做的事情那样。
对了,不要忘记还有一段注释:commands auto generated by Alembic - please adjust!
。
它在提醒我们,这些代码是由 Alembic 自动生成的,我们应该检查它们,并且根据需要进行调整。
迁移脚本冗长且繁琐,我们一般不会手写它们,而是由 Alembic 自动生成。 一般情况下,Alembic 足够智能,可以正确地生成迁移脚本。 但是,在复杂或有歧义的情况下,我们可能需要手动调整迁移脚本。 所以,永远要检查迁移脚本,并且在开发环境中测试!
迁移脚本中任何一处错误都足以使数据付之东流!
确定迁移脚本正确后,我们就可以执行迁移脚本,将数据库模式同步到数据库中:
nb orm upgrade
现在,我们可以正常启动机器人了。
开发过程中,我们可能会频繁地修改模型,这意味着我们需要频繁地创建并执行迁移脚本,非常繁琐。
实际上,此时我们不在乎数据安全,只需要数据库模式与模型定义一致即可。
所以,我们可以关闭 nonebot-plugin-orm
的启动检查:
ALEMBIC_STARTUP_CHECK=false
现在,每次启动机器人时,数据库模式会自动与模型定义同步,无需手动迁移。
会话管理
我们已经成功定义了模型,并且迁移了数据库,现在可以开始使用数据库了……吗?
并不能,因为模型只是数据结构的定义,并不能通过它操作数据(如果你曾经使用过 Tortoise ORM,可能会知道 await Weather.get(location="上海")
这样的面向对象编程。
但是 SQLAlchemy 不同,选择了命令式编程)。
我们需要使用会话操作数据:
from nonebot import on_command
from nonebot.adapters import Message
from nonebot.params import CommandArg
from nonebot_plugin_orm import async_scoped_session
weather = on_command("天气")
@weather.handle()
async def _(session: async_scoped_session, args: Message = CommandArg()):
location = args.extract_plain_text()
if wea := await session.get(Weather, location):
await weather.finish(f"今天{location}的天气是{wea.weather}")
await weather.finish(f"未查询到{location}的天气")
我们通过 session: async_scoped_session
依赖注入获得了一个会话,然后使用 await session.get(Weather, location)
查询数据库。
async_scoped_session
是一个有作用域限制的会话,作用域为当前事件、当前事件响应器。
会话产生的模型实例(例如此处的 wea := await session.get(Weather, location)
)作用域与会话相同。
此处提到的“会话”指的是 ORM 会话,而非 NoneBot 会话,两者的生命周期也是不同的(NoneBot 会话的生命周期中可能包含多个事件,不同的事件也会有不同的事件响应器)。 具体而言,就是不要将 ORM 会话和模型实例存储在 NoneBot 会话状态中:
from nonebot.params import ArgPlainText
from nonebot.typing import T_State
@weather.got("location", prompt="请输入地名")
async def _(state: T_State, session: async_scoped_session, location: str = ArgPlainText()):
wea = await session.get(Weather, location)
if not wea:
await weather.finish(f"未查询到{location}的天气")
state["weather"] = wea # 不要这么做,除非你知道自己在做什么
当然非要这么做也不是不可以:
@weather.handle()
async def _(state: T_State, session: async_scoped_session):
# 通过 await session.merge(state["weather"]) 获得了此 ORM 会话中的相应模型实例,
# 而非直接使用会话状态中的模型实例,
# 因为先前的 ORM 会话已经关闭了。
wea = await session.merge(state["weather"])
await weather.finish(f"今天{state['location']}的天气是{wea.weather}")
当有数据更改时,我们需要提交事务,也要注意会话作用域问题:
from nonebot.params import Depends
async def get_weather(
session: async_scoped_session, args: Message = CommandArg()
) -> Weather:
location = args.extract_plain_text()
if not (wea := await session.get(Weather, location)):
wea = Weather(location=location, weather="未知")
session.add(wea)
# await session.commit() # 不应该在其他地方提交事务
return wea
@weather.handle()
async def _(session: async_scoped_session, wea: Weather = Depends(get_weather)):
await weather.send(f"今天的天气是{wea.weather}")
await session.commit() # 而应该在事件响应器结束前提交事务
当然我们也可以获得一个新的会话,不过此时就要手动管理会话了:
from nonebot_plugin_orm import get_session
async def get_weather(location: str) -> str:
session = get_session()
async with session.begin():
wea = await session.get(Weather, location)
if not wea:
wea = Weather(location=location, weather="未知")
session.add(wea)
return wea.weather
@weather.handle()
async def _(args: Message = CommandArg()):
wea = await get_weather(args.extract_plain_text())
await weather.send(f"今天的天气是{wea}")
依赖注入
在上面的示例中,我们都是通过会话获得数据的。 不过,我们也可以通过依赖注入获得数据:
from sqlalchemy import select
from nonebot.params import Depends
from nonebot_plugin_orm import SQLDepends
def extract_arg_plain_text(args: Message = CommandArg()) -> str:
return args.extract_plain_text()
@weather.handle()
async def _(
wea: Weather = SQLDepends(
select(Weather).where(Weather.location == Depends(extract_arg_plain_text))
),
):
await weather.send(f"今天的天气是{wea.weather}")
其中,SQLDepends
是一个特殊的依赖注入,它会根据类型标注和 SQL 语句提供数据,SQL 语句中也可以有子依赖。
不同的类型标注也会获得不同形式的数据:
from collections.abc import Sequence
@weather.handle()
async def _(
weas: Sequence[Weather] = SQLDepends(
select(Weather).where(Weather.weather == Depends(extract_arg_plain_text))
),
):
await weather.send(f"今天的天气是{weas[0].weather}的城市有{','.join(wea.location for wea in weas)}")
支持的类型标注请参见 依赖注入。
我们也可以像 类作为依赖 那样,在类属性中声明子依赖:
from collections.abc import Sequence
class Weather(Model):
location: Mapped[str] = mapped_column(primary_key=True)
weather: Mapped[str] = Depends(extract_arg_plain_text)
# weather: Annotated[Mapped[str], Depends(extract_arg_plain_text)] # Annotated 支持
@weather.handle()
async def _(weas: Sequence[Weather]):
await weather.send(
f"今天的天气是{weas[0].weather}的城市有{','.join(wea.location for wea in weas)}"
)