Improve user last online & make program async

This commit is contained in:
Jordan Gillard
2024-04-06 17:10:23 -04:00
parent 22ab68a59b
commit 874b5ef902
4 changed files with 196 additions and 41 deletions

99
poetry.lock generated
View File

@@ -22,6 +22,52 @@ doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphin
test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
trio = ["trio (>=0.23)"] trio = ["trio (>=0.23)"]
[[package]]
name = "black"
version = "24.3.0"
description = "The uncompromising code formatter."
optional = false
python-versions = ">=3.8"
files = [
{file = "black-24.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7d5e026f8da0322b5662fa7a8e752b3fa2dac1c1cbc213c3d7ff9bdd0ab12395"},
{file = "black-24.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9f50ea1132e2189d8dff0115ab75b65590a3e97de1e143795adb4ce317934995"},
{file = "black-24.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e2af80566f43c85f5797365077fb64a393861a3730bd110971ab7a0c94e873e7"},
{file = "black-24.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:4be5bb28e090456adfc1255e03967fb67ca846a03be7aadf6249096100ee32d0"},
{file = "black-24.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4f1373a7808a8f135b774039f61d59e4be7eb56b2513d3d2f02a8b9365b8a8a9"},
{file = "black-24.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:aadf7a02d947936ee418777e0247ea114f78aff0d0959461057cae8a04f20597"},
{file = "black-24.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65c02e4ea2ae09d16314d30912a58ada9a5c4fdfedf9512d23326128ac08ac3d"},
{file = "black-24.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:bf21b7b230718a5f08bd32d5e4f1db7fc8788345c8aea1d155fc17852b3410f5"},
{file = "black-24.3.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:2818cf72dfd5d289e48f37ccfa08b460bf469e67fb7c4abb07edc2e9f16fb63f"},
{file = "black-24.3.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4acf672def7eb1725f41f38bf6bf425c8237248bb0804faa3965c036f7672d11"},
{file = "black-24.3.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c7ed6668cbbfcd231fa0dc1b137d3e40c04c7f786e626b405c62bcd5db5857e4"},
{file = "black-24.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:56f52cfbd3dabe2798d76dbdd299faa046a901041faf2cf33288bc4e6dae57b5"},
{file = "black-24.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:79dcf34b33e38ed1b17434693763301d7ccbd1c5860674a8f871bd15139e7837"},
{file = "black-24.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e19cb1c6365fd6dc38a6eae2dcb691d7d83935c10215aef8e6c38edee3f77abd"},
{file = "black-24.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65b76c275e4c1c5ce6e9870911384bff5ca31ab63d19c76811cb1fb162678213"},
{file = "black-24.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:b5991d523eee14756f3c8d5df5231550ae8993e2286b8014e2fdea7156ed0959"},
{file = "black-24.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c45f8dff244b3c431b36e3224b6be4a127c6aca780853574c00faf99258041eb"},
{file = "black-24.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6905238a754ceb7788a73f02b45637d820b2f5478b20fec82ea865e4f5d4d9f7"},
{file = "black-24.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7de8d330763c66663661a1ffd432274a2f92f07feeddd89ffd085b5744f85e7"},
{file = "black-24.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:7bb041dca0d784697af4646d3b62ba4a6b028276ae878e53f6b4f74ddd6db99f"},
{file = "black-24.3.0-py3-none-any.whl", hash = "sha256:41622020d7120e01d377f74249e677039d20e6344ff5851de8a10f11f513bf93"},
{file = "black-24.3.0.tar.gz", hash = "sha256:a0c9c4a0771afc6919578cec71ce82a3e31e054904e7197deacbc9382671c41f"},
]
[package.dependencies]
click = ">=8.0.0"
mypy-extensions = ">=0.4.3"
packaging = ">=22.0"
pathspec = ">=0.9.0"
platformdirs = ">=2"
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
typing-extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""}
[package.extras]
colorama = ["colorama (>=0.4.3)"]
d = ["aiohttp (>=3.7.4)", "aiohttp (>=3.7.4,!=3.9.0)"]
jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]] [[package]]
name = "click" name = "click"
version = "8.1.7" version = "8.1.7"
@@ -83,6 +129,31 @@ files = [
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
] ]
[[package]]
name = "isort"
version = "5.13.2"
description = "A Python utility / library to sort Python imports."
optional = false
python-versions = ">=3.8.0"
files = [
{file = "isort-5.13.2-py3-none-any.whl", hash = "sha256:8ca5e72a8d85860d5a3fa69b8745237f2939afe12dbf656afbcb47fe72d947a6"},
{file = "isort-5.13.2.tar.gz", hash = "sha256:48fdfcb9face5d58a4f6dde2e72a1fb8dcaf8ab26f95ab49fab84c2ddefb0109"},
]
[package.extras]
colors = ["colorama (>=0.4.6)"]
[[package]]
name = "mypy-extensions"
version = "1.0.0"
description = "Type system extensions for programs checked with the mypy type checker."
optional = false
python-versions = ">=3.5"
files = [
{file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
{file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
]
[[package]] [[package]]
name = "packaging" name = "packaging"
version = "24.0" version = "24.0"
@@ -94,6 +165,32 @@ files = [
{file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"}, {file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"},
] ]
[[package]]
name = "pathspec"
version = "0.12.1"
description = "Utility library for gitignore style pattern matching of file paths."
optional = false
python-versions = ">=3.8"
files = [
{file = "pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08"},
{file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"},
]
[[package]]
name = "platformdirs"
version = "4.2.0"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
optional = false
python-versions = ">=3.8"
files = [
{file = "platformdirs-4.2.0-py3-none-any.whl", hash = "sha256:0614df2a2f37e1a662acbd8e2b25b92ccf8632929bc6d43467e17fe89c75e068"},
{file = "platformdirs-4.2.0.tar.gz", hash = "sha256:ef0cc731df711022c174543cb70a9b5bd22e5a9337c8624ef2c2ceb8ddad8768"},
]
[package.extras]
docs = ["furo (>=2023.9.10)", "proselint (>=0.13)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"]
test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)"]
[[package]] [[package]]
name = "pluggy" name = "pluggy"
version = "1.4.0" version = "1.4.0"
@@ -233,4 +330,4 @@ files = [
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.9" python-versions = "^3.9"
content-hash = "17abe59706d0c666b46a35de6e4ea1dac59015cb0976edf4479d29d839546220" content-hash = "e2ac4216301383f37ab71c523d371ed38b7c3a282b676277c8fb155219d2de42"

View File

@@ -30,7 +30,14 @@ typing-extensions = "4.9.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pytest = "^8.1.1" pytest = "^8.1.1"
black = "24.3.0"
isort = "^5.13.2"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
pythonpath = [
"."
]

View File

@@ -1,17 +1,34 @@
import os import asyncio
import json import json
import os
import re import re
from telethon.sync import TelegramClient, errors, functions
from telethon.tl.types import InputPhoneContact
from dotenv import load_dotenv
from getpass import getpass from getpass import getpass
import click
import click
from dotenv import load_dotenv
from telethon.sync import TelegramClient, errors, functions
from telethon.tl import types
load_dotenv() load_dotenv()
def get_names(client: TelegramClient, phone_number: str) -> dict: def get_human_readable_user_status(status: types.TypeUserStatus):
match status:
case types.UserStatusOnline():
return "Currently online"
case types.UserStatusOffline():
return status.was_online.strftime("%Y-%m-%d %H:%M:%S %Z")
case types.UserStatusRecently():
return "Last seen recently"
case types.UserStatusLastWeek():
return "Last seen last week"
case types.UserStatusLastMonth():
return "Last seen last month"
case _:
return "Unknown"
async def get_names(client: TelegramClient, phone_number: str) -> dict:
"""Take in a phone number and returns the associated user information if the user exists. """Take in a phone number and returns the associated user information if the user exists.
It does so by first adding the user's phones to the contact list, retrieving the It does so by first adding the user's phones to the contact list, retrieving the
@@ -21,11 +38,11 @@ def get_names(client: TelegramClient, phone_number: str) -> dict:
print(f"Checking: {phone_number=} ...", end="", flush=True) print(f"Checking: {phone_number=} ...", end="", flush=True)
try: try:
# Create a contact # Create a contact
contact = InputPhoneContact( contact = types.InputPhoneContact(
client_id=0, phone=phone_number, first_name="", last_name="" client_id=0, phone=phone_number, first_name="", last_name=""
) )
# Attempt to add the contact from the address book # Attempt to add the contact from the address book
contacts = client(functions.contacts.ImportContactsRequest([contact])) contacts = await client(functions.contacts.ImportContactsRequest([contact]))
users = contacts.to_dict().get("users", []) users = contacts.to_dict().get("users", [])
number_of_matches = len(users) number_of_matches = len(users)
@@ -39,31 +56,28 @@ def get_names(client: TelegramClient, phone_number: str) -> dict:
elif number_of_matches == 1: elif number_of_matches == 1:
# Attempt to remove the contact from the address book. # Attempt to remove the contact from the address book.
# The response from DeleteContactsRequest contains more information than from ImportContactsRequest # The response from DeleteContactsRequest contains more information than from ImportContactsRequest
del_user = client( updates_response: types.Updates = await client(
functions.contacts.DeleteContactsRequest(id=[users[0].get("id")]) functions.contacts.DeleteContactsRequest(id=[users[0].get("id")])
) )
user = del_user.to_dict().get("users")[0] user = updates_response.users[0]
user_was_online = user.get("status", {}).get("was_online")
# getting more information about the user # getting more information about the user
result.update( result.update(
{ {
"id": user.get("id"), "id": user.id,
"username": user.get("username"), "username": user.username,
"first_name": user.get("first_name"), "usernames": user.usernames,
"last_name": user.get("last_name"), "first_name": user.first_name,
"fake": user.get("fake"), "last_name": user.last_name,
"verified": user.get("verified"), "fake": user.fake,
"premium": user.get("premium"), "verified": user.verified,
"mutual_contact": user.get("mutual_contact"), "premium": user.premium,
"bot": user.get("bot"), "mutual_contact": user.mutual_contact,
"bot_chat_history": user.get("bot_chat_history"), "bot": user.bot,
"restricted": user.get("restricted"), "bot_chat_history": user.bot_chat_history,
"restriction_reason": user.get("restriction_reason"), "restricted": user.restricted,
"user_was_online": ( "restriction_reason": user.restriction_reason,
user_was_online.strftime("%Y-%m-%d %H:%M:%S %Z") "user_was_online": get_human_readable_user_status(user.status),
if user_was_online "phone": user.phone,
else None
),
} }
) )
else: else:
@@ -87,7 +101,7 @@ def get_names(client: TelegramClient, phone_number: str) -> dict:
return result return result
def validate_users(client: TelegramClient, phone_numbers: str) -> dict: async def validate_users(client: TelegramClient, phone_numbers: str) -> dict:
""" """
Take in a string of comma separated phone numbers and try to get the user information associated with each phone number. Take in a string of comma separated phone numbers and try to get the user information associated with each phone number.
""" """
@@ -98,14 +112,14 @@ def validate_users(client: TelegramClient, phone_numbers: str) -> dict:
try: try:
for phone in phones: for phone in phones:
if phone not in result: if phone not in result:
result[phone] = get_names(client, phone) result[phone] = await get_names(client, phone)
except Exception as e: except Exception as e:
print(e) print(e)
raise raise
return result return result
def login( async def login(
api_id: str | None, api_hash: str | None, phone_number: str | None api_id: str | None, api_hash: str | None, phone_number: str | None
) -> TelegramClient: ) -> TelegramClient:
"""Create a telethon session or reuse existing one""" """Create a telethon session or reuse existing one"""
@@ -116,16 +130,18 @@ def login(
phone_number or os.getenv("PHONE_NUMBER") or input("Enter your phone number: ") phone_number or os.getenv("PHONE_NUMBER") or input("Enter your phone number: ")
) )
client = TelegramClient(PHONE_NUMBER, API_ID, API_HASH) client = TelegramClient(PHONE_NUMBER, API_ID, API_HASH)
client.connect() await client.connect()
if not client.is_user_authorized(): if not await client.is_user_authorized():
client.send_code_request(PHONE_NUMBER) await client.send_code_request(PHONE_NUMBER)
try: try:
client.sign_in(PHONE_NUMBER, input("Enter the code (sent on telegram): ")) await client.sign_in(
PHONE_NUMBER, input("Enter the code (sent on telegram): ")
)
except errors.SessionPasswordNeededError: except errors.SessionPasswordNeededError:
pw = getpass( pw = getpass(
"Two-Step Verification enabled. Please enter your account password: " "Two-Step Verification enabled. Please enter your account password: "
) )
client.sign_in(password=pw) await client.sign_in(password=pw)
print("Done.") print("Done.")
return client return client
@@ -212,8 +228,22 @@ def main_entrypoint(
i.e. +491234567891 i.e. +491234567891
""" """
client = login(api_id, api_hash, api_phone_number) asyncio.run(
res = validate_users(client, phone_numbers) run_program(
phone_numbers,
api_id,
api_hash,
api_phone_number,
output,
)
)
async def run_program(
phone_numbers: str, api_id: str, api_hash: str, api_phone_number: str, output: str
):
client = await login(api_id, api_hash, api_phone_number)
res = await validate_users(client, phone_numbers)
show_results(output, res) show_results(output, res)

View File

@@ -1,5 +1,26 @@
from datetime import datetime, timezone
import pytest import pytest
from telethon.tl import types
from telegram_phone_number_checker import main
def test_should_run_this_in_ci(): @pytest.mark.parametrize(
assert True "user_status, readable_string",
[
(types.UserStatusEmpty(), "Unknown"),
(types.UserStatusOnline(expires=None), "Currently online"),
(
types.UserStatusOffline(
was_online=datetime(2024, 4, 6, 12, 30, 1, tzinfo=timezone.utc)
),
"2024-04-06 12:30:01 UTC",
),
(types.UserStatusRecently(), "Last seen recently"),
(types.UserStatusLastWeek(), "Last seen last week"),
(types.UserStatusLastMonth(), "Last seen last month"),
],
)
def test_should_return_correct_status_string(user_status, readable_string):
assert main.get_human_readable_user_status(user_status) == readable_string