Skip to content
Merged
164 changes: 153 additions & 11 deletions deeptutor/tutorbot/channels/zulip.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ZulipConfig(Base):
api_key: str = Field(default="", repr=False)
allow_from: list[str] = Field(default_factory=list)
group_policy: Literal["mention", "open"] = "mention"
subscribe_streams: list[str] = Field(default_factory=list)
timeout: float = Field(default=60.0)


Expand All @@ -68,6 +69,7 @@ def __init__(self, config: Any, bus: MessageBus):
self._client: Any = None
self._bot_email: str = ""
self._bot_user_id: int | None = None
self._bot_full_name: str = ""
self._queue_id: str | None = None
self._last_event_id: int = -1
self._max_message_id: int = 0
Expand Down Expand Up @@ -118,12 +120,15 @@ async def start(self) -> None:

self._bot_email = profile.get("email", self.config.email)
self._bot_user_id = profile.get("user_id")
self._bot_full_name = profile.get("full_name", "")
logger.info(
"Zulip bot connected: {} (user_id={})",
self._bot_email,
self._bot_user_id,
)

self._subscribe_to_streams()

self._listener_thread = threading.Thread(
target=self._run_listener, daemon=True, name="zulip-listener"
)
Expand Down Expand Up @@ -157,6 +162,14 @@ async def send(self, msg: OutboundMessage) -> None:
logger.warning("Zulip client not running")
return

if msg.metadata.get("_tool_hint"):
return

if not msg.metadata.get("msg_type"):
stored = self._recipient_map.get(msg.chat_id)
if stored:
msg.metadata = {**stored, **msg.metadata}

if not msg.metadata.get("_progress", False):
self._stop_typing(msg.chat_id)

Expand Down Expand Up @@ -240,6 +253,62 @@ def _run_listener(self) -> None:

logger.info("Zulip listener stopped")

def _subscribe_to_streams(self) -> None:
streams = self.config.subscribe_streams
if not streams:
logger.info("No subscribe_streams configured, skipping auto-subscribe")
return

stream_names: list[str]
if "*" in streams:
try:
result = self._call_with_retry(self._client.get_streams, include_all=True)
except Exception as e:
logger.error("Zulip get_streams failed during auto-subscribe: {}", e)
return
if result.get("result") != "success":
logger.error("Failed to fetch streams for auto-subscribe: {}", result.get("msg"))
return
fetched = {s["name"] for s in result.get("streams", [])}
extra = {s for s in streams if s != "*"}
stream_names = list(fetched | extra)
else:
stream_names = list(streams)

if not stream_names:
logger.info("No streams to subscribe to")
return

subscriptions = [{"name": name} for name in stream_names]
try:
result = self._call_with_retry(
self._client.add_subscriptions, streams=subscriptions
)
except Exception as e:
logger.error("Zulip auto-subscribe failed: {}", e)
return
already_subscribed_names: set[str] = set()
for names in result.get("already_subscribed", {}).values():
already_subscribed_names.update(names)
new_count = len(stream_names) - len(already_subscribed_names)
if new_count > 0:
logger.info(
"Zulip bot subscribed to {} streams ({} new, {} already subscribed)",
len(stream_names),
new_count,
len(stream_names) - new_count,
)
else:
logger.info(
"Zulip bot already subscribed to {} streams (no new subscriptions)",
len(stream_names),
)
if result.get("result") != "success" and new_count == 0:
logger.debug(
"Zulip add_subscriptions returned non-success but all streams already subscribed: {}",
result.get("msg", "unknown"),
)

def _register_queue(self) -> None:
try:
result = self._call_with_retry(
Expand Down Expand Up @@ -290,6 +359,18 @@ def _on_message(self, message: dict) -> None:

msg_type = message.get("type", "")
content = message.get("content", "")
flags = message.get("flags", [])
sender_email = message.get("sender_email", "?")
subject = message.get("subject", "")
display_recipient = message.get("display_recipient", "")
logger.info(
"Zulip message received: type={}, flags={}, sender={}, stream={}, topic={}",
msg_type,
flags,
sender_email,
display_recipient if msg_type == "stream" else "N/A",
subject,
)
content_type = message.get("content_type", "text/x-markdown")
if content_type == "text/x-markdown":
content = self._convert_zulip_latex_to_standard(content)
Expand All @@ -309,7 +390,14 @@ def _on_message(self, message: dict) -> None:
chat_id = self._stream_chat_id(stream_name, topic)
if self.config.group_policy == "mention":
if not self._is_mentioned(message):
logger.info(
"Zulip stream message ignored (not mentioned): stream={}, topic={}, flags={}",
stream_name,
topic,
message.get("flags", []),
)
return
logger.info("Zulip stream message will be processed: stream={}, topic={}", stream_name, topic)
content = f"**[{stream_name} > {topic}]** {content}"
elif msg_type == "private":
chat_id = f"pm:{sender_id}"
Expand Down Expand Up @@ -369,10 +457,20 @@ def _session_key_for(self, chat_id: str) -> str | None:

def _is_mentioned(self, message: dict) -> bool:
if self._bot_user_id is None:
logger.warning("Zulip _is_mentioned: bot_user_id is None, cannot check mention")
return False
for flag in message.get("flags", []):
if isinstance(flag, str) and flag == "mentioned":
mention_flags = {
"mentioned",
"wildcard_mentioned",
"stream_wildcard_mentioned",
"topic_wildcard_mentioned",
}
flags = message.get("flags", [])
for flag in flags:
if isinstance(flag, str) and flag in mention_flags:
logger.debug("Zulip _is_mentioned: found flag={}", flag)
return True
logger.debug("Zulip _is_mentioned: no mention flags found, flags={}", flags)
return False

def _download_attachments(self, message: dict) -> list[str]:
Expand Down Expand Up @@ -532,20 +630,64 @@ async def _send_text(self, chat_id: str, text: str, metadata: dict) -> None:
if result.get("result") != "success":
logger.error("Zulip send failed: {}", result.get("msg", "unknown"))

def _resolve_media_path(self, media_path: str) -> str | None:
if Path(media_path).exists():
return media_path

path_id: str | None = None
if media_path.startswith("/user_uploads/"):
path_id = media_path
elif self.config.site and media_path.startswith(self.config.site):
stripped = media_path[len(self.config.site.rstrip("/")):]
if stripped.startswith("/user_uploads/"):
path_id = stripped

if not path_id:
return None

media_dir = get_media_dir("zulip")
name = Path(unquote(path_id)).name
dest = self._attachment_destination(media_dir, name, path_id, 0)

if dest.exists():
return str(dest)

url = f"{self.config.site.rstrip('/')}{path_id}"
try:
resp = requests.get(
url,
auth=(self.config.email, self.config.api_key),
timeout=self.config.timeout,
)
resp.raise_for_status()
dest.write_bytes(resp.content)
logger.debug("Downloaded Zulip attachment for re-upload: {}", name)
return str(dest)
except Exception as e:
logger.warning("Failed to download Zulip attachment {}: {}", name, e)
return None

async def _upload_and_send(self, chat_id: str, media_path: str, metadata: dict) -> None:
client = self._client
if not client:
return

local_path = self._resolve_media_path(media_path)
if not local_path:
logger.error("Cannot resolve media path: {}", media_path)
return

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: self._call_with_retry(
client.call_endpoint,
url="user_uploads",
files=[media_path],
timeout=self.config.timeout,
),
)
with open(local_path, "rb") as f:
result = await loop.run_in_executor(
None,
lambda: self._call_with_retry(
client.call_endpoint,
url="user_uploads",
files=[f],
timeout=self.config.timeout,
),
)
if result.get("result") != "success":
logger.error("Zulip upload failed: {}", result.get("msg", "unknown"))
return
Expand Down
Loading
Loading