跳转至

http

HttpClientConnection 🔗

Bases: ConnectionMixin[HttpClientInfo]

HTTP 客户端连接

Source code in graia/ariadne/connection/http.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class HttpClientConnection(ConnectionMixin[HttpClientInfo]):
    """HTTP 客户端连接"""

    dependencies = {AiohttpClientInterface}
    http_interface: AiohttpClientInterface

    def __init__(self, config: HttpClientInfo) -> None:
        super().__init__(config)
        self.is_hook: bool = False

    async def request(
        self,
        method: str,
        url: str,
        params: Optional[dict] = None,
        data: Optional[Any] = None,
        json: Optional[dict] = None,
    ) -> Any:
        if data and isinstance(data, dict):
            form = FormData(quote_fields=False)
            for k, v in data.items():
                form.add_field(k, **v) if isinstance(v, dict) else form.add_field(k, v)
            data = form
        if json:
            data = json_mod.dumps(json, cls=DatetimeJsonEncoder)
        rider = await self.http_interface.request(method, url, params=params, data=data)
        byte_data = await rider.io().read()
        result = Json.deserialize(byte_data.decode("utf-8"))
        return validate_response(result)

    async def http_auth(self) -> None:
        from ..app import Ariadne

        app = Ariadne.current()
        await app.launch_manager.get_interface(Memcache).delete(f"account.{app.account}.version")

        data = await self.request(
            "POST",
            self.info.get_url("verify"),
            json={"verifyKey": self.info.verify_key},
        )
        session_key = data["session"]
        await self.request(
            "POST",
            self.info.get_url("bind"),
            json={"qq": self.info.account, "sessionKey": session_key},
        )
        self.status.session_key = session_key

    async def call(
        self, command: str, method: CallMethod, params: Optional[dict] = None, *, in_session: bool = True
    ) -> Any:
        params = params or {}
        command = command.replace("_", "/")
        while not self.status.connected:
            await self.status.wait_for_update()
        if in_session:
            if not self.status.session_key:
                await self.http_auth()
            params["sessionKey"] = self.status.session_key
        try:
            if method in (CallMethod.GET, CallMethod.RESTGET):
                return await self.request("GET", self.info.get_url(command), params=params)
            elif method in (CallMethod.POST, CallMethod.RESTPOST):
                return await self.request("POST", self.info.get_url(command), json=params)
            elif method == CallMethod.MULTIPART:
                return await self.request("POST", self.info.get_url(command), data=params)
        except InvalidSession:
            self.status.session_key = None
            raise

    @property
    def stages(self):
        return {} if self.is_hook else {"blocking"}

    async def launch(self, mgr: Launart) -> None:
        self.http_interface = mgr.get_interface(AiohttpClientInterface)
        if self.is_hook:
            return
        async with self.stage("blocking"):
            exit_signal = asyncio.create_task(mgr.status.wait_for_sigexit())
            while not exit_signal.done():
                try:
                    if not self.status.session_key:
                        logger.info("HttpClient: authenticate", style="dark_orange")
                        await self.http_auth()
                    data = await self.request(
                        "GET",
                        self.info.get_url("fetchMessage"),
                        {"sessionKey": self.status.session_key, "count": 10},
                    )
                    self.status.alive = True
                except Exception as e:
                    self.status.session_key = None
                    self.status.alive = False
                    logger.exception(e)
                    continue
                assert isinstance(data, list)
                for event_data in data:
                    event = build_event(event_data)
                    await asyncio.gather(*(callback(event) for callback in self.event_callbacks))
                await wait_fut(
                    [asyncio.sleep(0.5), exit_signal],
                    return_when=asyncio.FIRST_COMPLETED,
                )

HttpServerConnection 🔗

Bases: ConnectionMixin[HttpServerInfo], Transport

HTTP 服务器连接

Source code in graia/ariadne/connection/http.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class HttpServerConnection(ConnectionMixin[HttpServerInfo], Transport):
    """HTTP 服务器连接"""

    dependencies = {AbstractRouter}

    def __init__(self, config: HttpServerInfo) -> None:
        super().__init__(config)
        self.handlers[HttpEndpoint(self.info.path, ["POST"])] = self.__class__.handle_request

    async def handle_request(self, io: AbstractServerRequestIO):
        req: HttpRequest = await io.extra(HttpRequest)
        if req.headers.get("qq") != str(self.info.account):
            return
        for k, v in self.info.headers.items():
            if req.headers.get(k) != v:
                return "Authorization failed", {"status": 401}
        data = Json.deserialize((await io.read()).decode("utf-8"))
        assert isinstance(data, dict)
        self.status.connected = True
        self.status.alive = True
        event = build_event(data)
        await asyncio.gather(*(callback(event) for callback in self.event_callbacks))
        return {"command": "", "data": {}}

    async def launch(self, mgr: Launart) -> None:
        router = mgr.get_interface(AbstractRouter)
        router.use(self)