From ccf0c341a49785380bc3b3aafbc3b265c1296a00 Mon Sep 17 00:00:00 2001 From: 5656565566 <2393963330@qq.com> Date: Fri, 3 Apr 2026 18:02:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20Vocechat=20=E9=80=82?= =?UTF-8?q?=E9=85=8D=E5=99=A8=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../uniseg/adapters/vocechat/__init__.py | 17 +++ .../uniseg/adapters/vocechat/builder.py | 57 ++++++++ .../uniseg/adapters/vocechat/exporter.py | 132 ++++++++++++++++++ .../uniseg/constraint.py | 4 + 4 files changed, 210 insertions(+) create mode 100644 src/nonebot_plugin_alconna/uniseg/adapters/vocechat/__init__.py create mode 100644 src/nonebot_plugin_alconna/uniseg/adapters/vocechat/builder.py create mode 100644 src/nonebot_plugin_alconna/uniseg/adapters/vocechat/exporter.py diff --git a/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/__init__.py b/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/__init__.py new file mode 100644 index 0000000..8b069c8 --- /dev/null +++ b/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/__init__.py @@ -0,0 +1,17 @@ +from nonebot_plugin_alconna.uniseg.constraint import SupportAdapter +from nonebot_plugin_alconna.uniseg.loader import BaseLoader + + +class Loader(BaseLoader): + def get_adapter(self) -> SupportAdapter: + return SupportAdapter.vocechat + + def get_builder(self): + from .builder import VocechatMessageBuilder + + return VocechatMessageBuilder() + + def get_exporter(self): + from .exporter import VocechatMessageExporter + + return VocechatMessageExporter() diff --git a/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/builder.py b/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/builder.py new file mode 100644 index 0000000..80c22ae --- /dev/null +++ b/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/builder.py @@ -0,0 +1,57 @@ +from nonebot.adapters import Bot, Event +from nonebot.adapters.vocechat.event import MessageEvent +from nonebot.adapters.vocechat.message import MessageSegment + +from nonebot_plugin_alconna.uniseg.builder import MessageBuilder, build +from nonebot_plugin_alconna.uniseg.constraint import SupportAdapter +from nonebot_plugin_alconna.uniseg.segment import At, Audio, File, Image, Reply, Text, Video + + +class VocechatMessageBuilder(MessageBuilder[MessageSegment]): + @classmethod + def get_adapter(cls) -> SupportAdapter: + return SupportAdapter.vocechat + + @build("text") + def text(self, seg: MessageSegment): + return Text(seg.data["text"]) + + @build("markdown") + def markdown(self, seg: MessageSegment): + content = seg.data["text"] + return Text(content).mark(0, len(content), "markdown") + + @build("mention") + def mention(self, seg: MessageSegment): + return At("user", str(seg.data["user_id"])) + + @build("image") + def image(self, seg: MessageSegment): + file = seg.data["file"] + properties = seg.data.get("properties") or {} + return Image( + id=file.file_id, + name=file.filename or Image.__default_name__, + width=properties.get("width"), + height=properties.get("height"), + ) + + @build("audio") + def audio(self, seg: MessageSegment): + file = seg.data["file"] + return Audio(id=file.file_id, name=file.filename or Audio.__default_name__) + + @build("video") + def video(self, seg: MessageSegment): + file = seg.data["file"] + return Video(id=file.file_id, name=file.filename or Video.__default_name__) + + @build("file") + def file(self, seg: MessageSegment): + file = seg.data["file"] + return File(id=file.file_id, name=file.filename or File.__default_name__) + + async def extract_reply(self, event: Event, bot: Bot): + if isinstance(event, MessageEvent) and event.reply: + return Reply(str(event.reply.mid), event.reply.message, event.reply) + return None diff --git a/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/exporter.py b/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/exporter.py new file mode 100644 index 0000000..237d3c9 --- /dev/null +++ b/src/nonebot_plugin_alconna/uniseg/adapters/vocechat/exporter.py @@ -0,0 +1,132 @@ +from pathlib import Path +from typing import Any, Sequence + +from nonebot.adapters import Bot, Event +from nonebot.adapters.vocechat.bot import Bot as VcBot +from nonebot.adapters.vocechat.event import GroupMessageEvent, MessageEvent +from nonebot.adapters.vocechat.message import Message, MessageSegment +from tarina import lang + +from nonebot_plugin_alconna.uniseg.constraint import SupportScope +from nonebot_plugin_alconna.uniseg.exporter import MessageExporter, SerializeFailed, SupportAdapter, Target, export +from nonebot_plugin_alconna.uniseg.segment import At, Audio, File, Image, Reply, Segment, Text, Video, Voice + + +class VocechatMessageExporter(MessageExporter[Message]): + @classmethod + def get_adapter(cls) -> SupportAdapter: + return SupportAdapter.vocechat + + def get_message_type(self): + return Message + + def get_target(self, event: Event, bot: Bot | None = None) -> Target: + assert isinstance(event, MessageEvent) + if isinstance(event, GroupMessageEvent) or event.target.gid is not None: + return Target( + str(event.target.gid), + adapter=self.get_adapter(), + self_id=bot.self_id if bot else None, + scope=SupportScope.vocechat, + ) + return Target( + str(event.target.uid or event.from_uid), + private=True, + adapter=self.get_adapter(), + self_id=bot.self_id if bot else None, + scope=SupportScope.vocechat, + ) + + def get_message_id(self, event: Event) -> str: + assert isinstance(event, MessageEvent) + return str(event.message_id or event.mid) + + @export + async def text(self, seg: Text, bot: Bot | None) -> MessageSegment: + if seg.extract_most_style() == "markdown": + return MessageSegment.markdown(seg.text) + return MessageSegment.text(seg.text) + + @export + async def at(self, seg: At, bot: Bot | None) -> MessageSegment: + if seg.flag != "user": + raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type="at", seg=seg)) + return MessageSegment.mention(int(seg.target)) + + def _media_properties(self, seg: Image | Voice | Video | Audio | File) -> dict[str, Any] | None: + properties: dict[str, Any] = {} + if isinstance(seg, Image): + if seg.width is not None: + properties["width"] = seg.width + if seg.height is not None: + properties["height"] = seg.height + if isinstance(seg, (Audio, Voice, Video)) and seg.duration is not None: + properties["duration"] = seg.duration + return properties or None + + async def _media(self, seg: Image | Voice | Video | Audio | File, name: str) -> MessageSegment: + method = { + "image": MessageSegment.image, + "voice": MessageSegment.audio, + "audio": MessageSegment.audio, + "video": MessageSegment.video, + "file": MessageSegment.file, + }[name] + filename = None if seg.name == seg.__default_name__ else seg.name + properties = self._media_properties(seg) + if seg.id: + return method(file_id=seg.id, filename=filename, properties=properties) + if seg.path: + return method(file=Path(seg.path), filename=filename, properties=properties) + if seg.raw: + return method(file=seg.raw_bytes, filename=filename, properties=properties) + raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type=name, seg=seg)) + + @export + async def media(self, seg: Image | Voice | Video | Audio | File, bot: Bot | None) -> MessageSegment: + return await self._media(seg, seg.__class__.__name__.lower()) + + def _pop_reply(self, message: Message) -> tuple[int | None, Message]: + reply_id: int | None = None + new_message = self.get_message_type()([]) + for seg in message: + if seg.type == "$vocechat:reply": + if reply_id is None: + reply_id = int(seg.data["mid"]) + continue + new_message.append(seg) + return reply_id, new_message + + @export + async def reply(self, seg: Reply, bot: Bot | None) -> MessageSegment: + return MessageSegment("$vocechat:reply", {"mid": int(seg.id)}) + + async def send_to(self, target: Target | Event, bot: Bot, message: Message, **kwargs): + assert isinstance(bot, VcBot) + reply_id, message = self._pop_reply(message) + if isinstance(target, MessageEvent): + if reply_id is not None: + return await bot.send_message(message=message, reply=reply_id, **kwargs) + return await bot.send(target, message, **kwargs) + if isinstance(target, Event): + raise NotImplementedError + if reply_id is not None: + return await bot.send_message(message=message, reply=reply_id, **kwargs) + if target.private: + return await bot.send_message(message=message, user_id=int(target.id), **kwargs) + return await bot.send_message(message=message, group_id=int(target.id), **kwargs) + + async def recall(self, mid: Any, bot: Bot, context: Target | Event): + assert isinstance(bot, VcBot) + if isinstance(mid, str): + return await bot.delete(int(mid)) + return await bot.delete(int(mid)) + + async def edit(self, new: Sequence[Segment], mid: Any, bot: Bot, context: Target | Event): + assert isinstance(bot, VcBot) + new_msg = await self.export(new, bot, True) + _, new_msg = self._pop_reply(new_msg) + return await bot.edit(int(mid), new_msg) + + def get_reply(self, mid: Any): + return Reply(str(mid)) diff --git a/src/nonebot_plugin_alconna/uniseg/constraint.py b/src/nonebot_plugin_alconna/uniseg/constraint.py index 1e7ccda..403b213 100644 --- a/src/nonebot_plugin_alconna/uniseg/constraint.py +++ b/src/nonebot_plugin_alconna/uniseg/constraint.py @@ -37,6 +37,7 @@ class SupportAdapter(str, Enum): wxmp = "WXMP" efchat = "EFChat" yunhu = "YunHu" + vocechat = "VoceChat" nonebug = "fake" @@ -75,6 +76,8 @@ class SupportScope(str, Enum): """Bilibili直播平台""" yunhu = "YunHu" """云湖平台""" + vocechat = "VoceChat" + """VoceChat平台""" onebot12_other = "Onebot12" """ob12 的其他平台""" @@ -147,6 +150,7 @@ class SupportAdapterModule(str, Enum): tail_chat = "nonebot_adapter_tailchat" wxmp = "nonebot.adapters.wxmp" yunhu = "nonebot.adapters.yunhu" + vocechat = "nonebot.adapters.vocechat" UNISEG_MESSAGE: Literal["_alc_uniseg_message"] = "_alc_uniseg_message"