跳转至

Interrupt - 中断🔗

有时你可能需要进一步获取信息以处理.

比如这样:

  • /kick 12345678
  • 请发送 "/confirm" 确认
  • /confirm
  • 已将 BadUser(12345678) 踢出

对于此种交互方式, 我们提供了 Interrupt 以支持.

开始使用🔗

先进行导入:

from graia.broadcast.interrupt import InterruptControl
from graia.broadcast.interrupt.waiter import Waiter

获取 InterruptControl 对象:

import creart

inc = creart.it(InterruptControl)

之后创建 Waiter:

以下介绍的都是相对底层的用法,建议优先考虑 方便的封装 一节提供的封装。

@Waiter.create_using_function([ListeningEvent])
def waiter(...):
    # 判断和处理
    if condition:
        return True # 只要不是 None 就会继续执行

提示: 这个函数 推荐 为闭包(局部)函数

class DirectWaiter(Waiter.create([GroupMessage])):

    def __init__(self, group: Union[Group, int], member: Union[Member, int]):
        self.group = group if isinstance(group, int) else group.id
        self.member = member if isinstance(member, int) else member.id

    # detected_event 方法是必须的
    async def detected_event(self, group: Group, member: Member, message: MessageChain):
        if self.group == group.id and self.member == member.id:
            return message
class DirectWaiter(Waiter):
    listening_events = [GroupMessage]
    using_dispatchers = None
    using_decorators = None
    priority = 15
    block_propagation = False

    def __init__(self, group: Union[Group, int], member: Union[Member, int]):
        self.group = group if isinstance(group, int) else group.id
        self.member = member if isinstance(member, int) else member.id

    # detected_event 方法是必须的
    async def detected_event(self, group: Group, member: Member, message: MessageChain):
        if self.group == group.id and self.member == member.id:
            return message

注意

waiter() 函数内填充的参数 不能通过手工传入, 而应该通过与 broadcast.receiver() 一样的参数分派机制进行自动填充.

注意 ListeningEvent 不能使用 "GroupMessage" 这种字符串形式, 而要导入具体事件之后填入.

提示

经过包装后, waiter 函数就变成了一个 SingleWaiter 对象.

可以将 Waiter.create_using_function 看作 Broadcast.receiver.

dispatcher, decoratorBroadcast 特性是受支持的.

之后在主监听函数中:

@broadcast.receiver(ListeningEvent)
async def handler(...):
    ...
    result = await inc.wait(waiter) # 在此处等待
    ...

InterruptControl.wait 上使用 timeout 参数可以设置超时时间, 超时会引发 TimeoutError.

方便的封装🔗

Ariadne 0.6.16 开始, 我们引入了两个方便的 Waiter 封装:

二者支持以下用法:

res = await FunctionWaiter(...).wait()

try:
    res = await EventWaiter(...).wait(timeout=30)
except asyncio.TimeoutError:
    ...

等价于:

inc = InterruptControl(broadcast)
res = await inc.wait(FunctionWaiter(...))
try:
    res = await inc.wait(EventWaiter(...), timeout=30)
except asyncio.TimeoutError:
    ...

FunctionWaiterWaiter.create_using_function 的简单封装.

它的参数名更自然, 且自动支持了类型标注.

EventWaiter 返回对应的事件, 和 Application 中提供的方便封装类似.

同时提供了 extra_validator 以供你自定义额外的验证.

配合 util.validator 模块使用更佳

它是怎么运作的?🔗

Interrupt 创建的 Waiter 对象很像 asyncio.Future, 是基于回调的设计.

inc.wait() 时, 会创建一个临时的事件监听器, 等待 Waiter 成功执行事件处理, 并将其处理结果传回 inc.wait() 的调用者.

也就是说, inc.wait 判断, 处理的是 "下一个" 消息 / 事件.

额外消息

你可以利用 asyncio.wait_for() 来进行带有超时的监听.

当其 Waiter 包装的函数返回非 None 值时, InterruptControl.wait 方法将该返回值作为执行结果返回.

你可以进一步处理返回值.

使用循环可以接受多个消息的参数输入.

提示

通过往 Waiter.create_using_function 添加事件类型可以同时监听多个事件,

或者直接传入 list(graia.ariadne.util.gen_subclass(EventType)) 一口气接受所有子事件.

就这么多, 接下来我们将介绍在 FastAPI 中广泛运用的特性: 依赖注入 (Depend).

社区文档相关章节: 链接