Added photo download flag (#38)

* Added photo download flag
* Added logging, docstrings, bash cmd in README
* Black formatting
* Use logger for stdout results
This commit is contained in:
Douglas
2024-06-24 09:48:47 -05:00
committed by GitHub
parent 80d5eb1723
commit 6ac1cac601
4 changed files with 80 additions and 14 deletions

4
.gitignore vendored
View File

@@ -9,3 +9,7 @@ dist/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
**/*.py[cod] **/*.py[cod]
# Images downloaded
*.jpg
*.jpeg

View File

@@ -43,6 +43,9 @@ See the examples below:
# single phone number # single phone number
telegram-phone-number-checker --phone-numbers +1234567890 telegram-phone-number-checker --phone-numbers +1234567890
# single phone number, download profile photo
telegram-phone-number-checker --phone-numbers +1234567890 --download-profile-photos
# multiple phone numbers # multiple phone numbers
telegram-phone-number-checker --phone-numbers +1234567890,+9876543210,+111111111 telegram-phone-number-checker --phone-numbers +1234567890,+9876543210,+111111111

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "telegram-phone-number-checker" name = "telegram-phone-number-checker"
version = "1.2.0" version = "1.2.1"
description = "Check if phone numbers are connected to Telegram accounts." description = "Check if phone numbers are connected to Telegram accounts."
authors = ["Bellingcat"] authors = ["Bellingcat"]
license = "MIT" license = "MIT"

View File

@@ -1,14 +1,18 @@
import asyncio import asyncio
import json import json
import os import os
from pathlib import Path
import re import re
from getpass import getpass from getpass import getpass
import logging
import click import click
from dotenv import load_dotenv from dotenv import load_dotenv
from telethon.sync import TelegramClient, errors, functions from telethon.sync import TelegramClient, errors, functions
from telethon.tl import types from telethon.tl import types
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
load_dotenv() load_dotenv()
@@ -28,14 +32,20 @@ def get_human_readable_user_status(status: types.TypeUserStatus):
return "Unknown" return "Unknown"
async def get_names(client: TelegramClient, phone_number: str) -> dict: async def get_names(
client: TelegramClient, phone_number: str, download_profile_photos: bool = False
) -> dict:
"""Take in a phone number and returns the associated user information if the user exists. """Take in a phone number and returns the associated user information if the user exists.
It does so by first adding the user's phones to the contact list, retrieving the It does so by first adding the user's phones to the contact list, retrieving the
information, and then deleting the user from the contact list. information, and then deleting the user from the contact list.
---
client, TelegramClient : Telegram client used to generate API call(s)
phone_number, str : Phone number associated with a given Telegram account (including country code, example format '+11232223333')
download_profile_photos, bool : Flag for whether to download a profile's associated account photo; defaults to False.
""" """
result = {} result = {}
print(f"Checking: {phone_number=} ...", end="", flush=True) logging.info(f"Checking: {phone_number=} ...")
try: try:
# Create a contact # Create a contact
contact = types.InputPhoneContact( contact = types.InputPhoneContact(
@@ -80,6 +90,32 @@ async def get_names(client: TelegramClient, phone_number: str) -> dict:
"phone": user.phone, "phone": user.phone,
} }
) )
if download_profile_photos is True:
try:
photo_output_path = Path("{}_{}_photo.jpeg".format(user.id, phone_number))
logging.info(
"Attempting to download profile photo for %s (%s)",
str(user.id),
str(phone_number),
)
photo = await client.download_profile_photo(
user, file=photo_output_path, download_big=True
)
if photo is not None:
logging.info("Downloaded photo at '%s'", photo)
else:
logging.info(
"No photo found for %s (%s)", str(user.id), str(phone_number)
)
# We don't want the script to fail if download I/O fails locally, file format error, etc.
# TODO : Add handling for ind. exceptions
except Exception as e:
logging.exception(
"---\nUnable to download profile photo for %s. Exception provided below.\n---\n%s\n---\n",
str(phone_number),
str(e),
)
else: else:
result.update( result.update(
{ {
@@ -97,11 +133,13 @@ async def get_names(client: TelegramClient, phone_number: str) -> dict:
except Exception as e: except Exception as e:
result.update({"error": f"Unexpected error: {e}."}) result.update({"error": f"Unexpected error: {e}."})
raise raise
print("Done.") logging.info("Done.")
return result return result
async def validate_users(client: TelegramClient, phone_numbers: str) -> dict: async def validate_users(
client: TelegramClient, phone_numbers: str, download_profile_photos: bool
) -> dict:
""" """
Take in a string of comma separated phone numbers and try to get the user information associated with each phone number. Take in a string of comma separated phone numbers and try to get the user information associated with each phone number.
""" """
@@ -112,9 +150,9 @@ async def validate_users(client: TelegramClient, phone_numbers: str) -> dict:
try: try:
for phone in phones: for phone in phones:
if phone not in result: if phone not in result:
result[phone] = await get_names(client, phone) result[phone] = await get_names(client, phone, download_profile_photos)
except Exception as e: except Exception as e:
print(e) logging.error(e)
raise raise
return result return result
@@ -123,7 +161,7 @@ async def login(
api_id: str | None, api_hash: str | None, phone_number: str | None api_id: str | None, api_hash: str | None, phone_number: str | None
) -> TelegramClient: ) -> TelegramClient:
"""Create a telethon session or reuse existing one""" """Create a telethon session or reuse existing one"""
print("Logging in...", end="", flush=True) logging.info("Logging in...")
API_ID = api_id or os.getenv("API_ID") or input("Enter your API ID: ") API_ID = api_id or os.getenv("API_ID") or input("Enter your API ID: ")
API_HASH = api_hash or os.getenv("API_HASH") or input("Enter your API HASH: ") API_HASH = api_hash or os.getenv("API_HASH") or input("Enter your API HASH: ")
PHONE_NUMBER = ( PHONE_NUMBER = (
@@ -142,15 +180,15 @@ async def login(
"Two-Step Verification enabled. Please enter your account password: " "Two-Step Verification enabled. Please enter your account password: "
) )
await client.sign_in(password=pw) await client.sign_in(password=pw)
print("Done.") logging.info("Done.")
return client return client
def show_results(output: str, res: dict) -> None: def show_results(output: str, res: dict) -> None:
print(json.dumps(res, indent=4)) logging.info(json.dumps(res, indent=4))
with open(output, "w") as f: with open(output, "w") as f:
json.dump(res, f, indent=4) json.dump(res, f, indent=4)
print(f"Results saved to {output}") logging.info(f"Results saved to {output}")
@click.command( @click.command(
@@ -194,8 +232,20 @@ def show_results(output: str, res: dict) -> None:
show_default=True, show_default=True,
type=str, type=str,
) )
@click.option(
"--download-profile-photos",
help="Download the user profile photo associated with requested Telegram account",
is_flag=True,
default=False,
show_default=True,
)
def main_entrypoint( def main_entrypoint(
phone_numbers: str, api_id: str, api_hash: str, api_phone_number: str, output: str phone_numbers: str,
api_id: str,
api_hash: str,
api_phone_number: str,
output: str,
download_profile_photos: bool,
) -> None: ) -> None:
""" """
Check to see if one or more phone numbers belong to a valid Telegram account. Check to see if one or more phone numbers belong to a valid Telegram account.
@@ -235,15 +285,24 @@ def main_entrypoint(
api_hash, api_hash,
api_phone_number, api_phone_number,
output, output,
download_profile_photos,
) )
) )
async def run_program( async def run_program(
phone_numbers: str, api_id: str, api_hash: str, api_phone_number: str, output: str phone_numbers: str,
api_id: str,
api_hash: str,
api_phone_number: str,
output: str,
download_profile_photos: bool = False,
): ):
"""
Get all args passed from Click parser, pass them into the script.
"""
client = await login(api_id, api_hash, api_phone_number) client = await login(api_id, api_hash, api_phone_number)
res = await validate_users(client, phone_numbers) res = await validate_users(client, phone_numbers, download_profile_photos)
show_results(output, res) show_results(output, res)
client.disconnect() client.disconnect()