From 959710ab613bd201c5cf56bb01b9e1e7d6ee84e5 Mon Sep 17 00:00:00 2001 From: Stijn Peeters Date: Mon, 23 Sep 2024 18:36:07 +0200 Subject: [PATCH] Find Telegram crawl refs in message body --- datasources/telegram/search_telegram.py | 78 ++++++++++++++++++------- 1 file changed, 58 insertions(+), 20 deletions(-) diff --git a/datasources/telegram/search_telegram.py b/datasources/telegram/search_telegram.py index 525430910..c963e47f6 100644 --- a/datasources/telegram/search_telegram.py +++ b/datasources/telegram/search_telegram.py @@ -6,6 +6,7 @@ import hashlib import asyncio import json +import ural import time import re @@ -24,7 +25,7 @@ FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError from telethon.tl.functions.channels import GetFullChannelRequest from telethon.tl.functions.users import GetFullUserRequest -from telethon.tl.types import User +from telethon.tl.types import User, MessageEntityMention @@ -214,6 +215,14 @@ def get_options(cls, parent_dataset=None, user=None): "tooltip": "Entities need to be references at least this many times to be added to the query. Only " "references discovered below the max crawl depth are taken into account." } + options["crawl-via-links"] = { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Extract new groups from links", + "tooltip": "Look for references to other groups in message content via t.me links and @references. " + "This is more error-prone than crawling only via forwards, but can be a way to discover " + "links that would otherwise remain undetected." + } return options @@ -358,6 +367,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): crawl_max_depth = self.parameters.get("crawl-depth", 0) crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) + crawl_via_links = self.parameters.get("crawl-via-links", False) self.dataset.log(f"Max crawl depth: {crawl_max_depth}") self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") @@ -434,6 +444,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): break # if crawling is enabled, see if we found something to add to the query + linked_entities = set() if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): message_fwd = serialized_message.get("fwd_from") fwd_from = None @@ -451,7 +462,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) fwd_source_type = "channel" - elif message_fwd and message_fwd.get("from_id", {}).get('full_user',{}): + elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): # forwards can also come from users # these can never be followed, so don't add these to the crawl, but do document them fwd_source_type = "user" @@ -460,23 +471,50 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") fwd_source_type = "unknown" + if fwd_from: + linked_entities.add(fwd_from) + + + if crawl_via_links: + # t.me links + all_links = ural.urls_from_text(serialized_message["message"]) + all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me"] + for link in all_links: + if link.startswith("+"): + # invite links + continue + + entity_name = link.split("?")[0].split("#")[0] + linked_entities.add(entity_name) + + # @references + references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] + for reference in references: + if reference.startswith("@"): + reference = reference[1:] + + linked_entities.add(reference) + # Check if fwd_from or the resolved entity ID is already queued or has been queried - if fwd_from and fwd_from not in full_query and fwd_from not in queries and fwd_source_type not in ("user",): - # new entity discovered! - # might be discovered (before collection) multiple times, so retain lowest depth - print(f"Potentially crawling {fwd_from}") - depth_map[fwd_from] = min(depth_map.get(fwd_from, crawl_max_depth), depth_map[query] + 1) - if fwd_from not in crawl_references: - crawl_references[fwd_from] = 0 - crawl_references[fwd_from] += 1 - - # Add to queries if it has been referenced enough times - if crawl_references[fwd_from] >= crawl_msg_threshold: - queries.append(fwd_from) - full_query.add(fwd_from) - num_queries += 1 - discovered += 1 - self.dataset.update_status(f"Discovered new entity {entity_id_map.get(fwd_from, fwd_from)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") + for link in linked_entities: + if link not in full_query and link not in queries and fwd_source_type not in ("user",): + # new entity discovered! + # might be discovered (before collection) multiple times, so retain lowest depth + # print(f"Potentially crawling {link}") + depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) + if link not in crawl_references: + crawl_references[link] = 0 + crawl_references[link] += 1 + + # Add to queries if it has been referenced enough times + if crawl_references[link] >= crawl_msg_threshold: + queries.append(link) + full_query.add(link) + num_queries += 1 + discovered += 1 + self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") + + serialized_message["4CAT_metadata"] = { "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls @@ -1012,7 +1050,6 @@ def validate_query(query, request, user): return { "items": num_items, "query": ",".join(sanitized_items), - "board": "", # needed for web interface "api_id": query.get("api_id"), "api_hash": query.get("api_hash"), "api_phone": query.get("api_phone"), @@ -1021,7 +1058,8 @@ def validate_query(query, request, user): "min_date": min_date, "max_date": max_date, "crawl-depth": query.get("crawl-depth"), - "crawl-threshold": query.get("crawl-threshold") + "crawl-threshold": query.get("crawl-threshold"), + "crawl-via-links": query.get("crawl-via-links") } @staticmethod