Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Support multiple forwarding agents, and plugin configurations #539

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tgcf.config.json
*.txt
todo
*.ipynb

secrets/


# Byte-compiled / optimized / DLL files
Expand Down
3,289 changes: 1,886 additions & 1,403 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ verlat = "^0.1.0"
streamlit = "^1.15.2"
PyYAML = "^6.0"
pymongo = "^4.3.3"
gspread-asyncio = "^1.9.0"
aiofiles = "^23.2.1"

[tool.poetry.dev-dependencies]
black = {version = "^22.10.0", allow-prereleases = true}
Expand Down
2 changes: 1 addition & 1 deletion tgcf/bot/live_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def style_command_handler(event):
_valid = [item.value for item in Style]
if args not in _valid:
raise ValueError(f"Invalid style. Choose from {_valid}")
CONFIG.plugins.fmt.style = args
CONFIG.plugin_cfgs.fmt.style = args
await event.respond("Success")
write_config(CONFIG)
except ValueError as err:
Expand Down
10 changes: 7 additions & 3 deletions tgcf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def main(
mode: Mode = typer.Argument(
..., help="Choose the mode in which you want to run tgcf.", envvar="TGCF_MODE"
),
agent_id: Optional[int] = typer.Argument(
default=0, help="Choose the agent to use for message forwarding."
),
verbose: Optional[bool] = typer.Option( # pylint: disable=unused-argument
None,
"--loud",
Expand Down Expand Up @@ -110,14 +113,15 @@ def main(
logging.critical(f"You are running fake with {mode} mode")
sys.exit(1)

logging.info(f"Running agent id: {agent_id}")
if mode == Mode.PAST:
from tgcf.past import forward_job # pylint: disable=import-outside-toplevel

asyncio.run(forward_job())
asyncio.run(forward_job(agent_id))
else:
from tgcf.live import start_sync # pylint: disable=import-outside-toplevel

asyncio.run(start_sync())
asyncio.run(start_sync(agent_id))


# AAHNIK 2021
# AAHNIK 2023
104 changes: 81 additions & 23 deletions tgcf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import sys
from typing import Dict, List, Optional, Union, Any
from typing import Any, Dict, List, Optional, Union

from dotenv import load_dotenv
from pydantic import BaseModel, validator # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -32,6 +32,9 @@ class Forward(BaseModel):
offset: int = 0
end: Optional[int] = 0

agent: int = 0 # the agent id to use for this connection
plugin_cfg: int = 0 # the plugin configuration id to use for this connection


class LiveSettings(BaseModel):
"""Settings to configure how tgcf operates in live mode."""
Expand Down Expand Up @@ -60,15 +63,45 @@ def validate_delay(cls, val): # pylint: disable=no-self-use,no-self-argument
return val


class LoginConfig(BaseModel):
# class LoginConfig(BaseModel):

# API_ID: int = 0
# API_HASH: str = ""


# user_type: int = 0 # 0:bot, 1:user
# phone_no: int = 91
# USERNAME: str = ""
# SESSION_STRING: str = ""
# BOT_TOKEN: str = ""


class TgAPIConfig(BaseModel):
API_ID: int = 0
API_HASH: str = ""


class AgentLoginConfig(BaseModel):
alias: str = ""
user_type: int = 0 # 0:bot, 1:user
phone_no: int = 91
USERNAME: str = ""
SESSION_STRING: str = ""
BOT_TOKEN: str = ""
is_active: bool = False


class AgentForwardingConfig(BaseModel):
show_forwarded_from: bool = False
mode: int = 0 # 0: live, 1:past
live: LiveSettings = LiveSettings()
past: PastSettings = PastSettings()
pid:int = 0


class LoginConfig(BaseModel):
tg: TgAPIConfig = TgAPIConfig()
agents: List[AgentLoginConfig] = [AgentLoginConfig()]


class BotMessages(BaseModel):
Expand All @@ -80,17 +113,13 @@ class Config(BaseModel):
"""The blueprint for tgcf's whole config."""

# pylint: disable=too-few-public-
pid: int = 0
pids: List[int] = []
theme: str = "light"
login: LoginConfig = LoginConfig()
login_cfg: LoginConfig = LoginConfig()
admins: List[Union[int, str]] = []
forwards: List[Forward] = []
show_forwarded_from: bool = False
mode: int = 0 # 0: live, 1:past
live: LiveSettings = LiveSettings()
past: PastSettings = PastSettings()

plugins: PluginConfig = PluginConfig()
agent_fwd_cfg: List[AgentForwardingConfig] = [AgentForwardingConfig()]
plugin_cfgs: List[PluginConfig] = [PluginConfig()]
bot_messages = BotMessages()


Expand Down Expand Up @@ -165,38 +194,62 @@ async def get_id(client: TelegramClient, peer):
return await client.get_peer_id(peer)


async def load_active_forwards(agent_id: int, forwards: List[Forward]) -> List[Forward]:
active_forwards: List[Forward] = []
for forward in forwards:
if forward.agent != agent_id:
continue
if not forward.use_this:
continue
active_forwards.append(forward)
return active_forwards


async def load_from_to(
client: TelegramClient, forwards: List[Forward]
) -> Dict[int, List[int]]:
agent_id: int,
client: TelegramClient,
forwards: List[Forward],
) -> Dict[int, Dict[str, List[int] | int]]:
"""Convert a list of Forward objects to a mapping.
The active connections of current agent are included.

Args:
client: Instance of Telegram client (logged in)
forwards: List of Forward objects

Returns:
Dict: key = chat id of source
value = List of chat ids of destinations
value = dict
dest: List of chat ids of destinations
pcgf: id of plugin config to use

Notes:
-> The Forward objects may contain username/phn no/links
-> But this mapping strictly contains signed integer chat ids
-> Chat ids are essential for how storage is implemented
-> Storage is essential for edit, delete and reply syncs
"""
from_to_dict = {}
from_to_dict: dict = {}

async def _(peer):
return await get_id(client, peer)

for forward in forwards:
if forward.agent != agent_id:
continue
if not forward.use_this:
continue
source = forward.source
if not isinstance(source, int) and source.strip() == "":
continue
src = await _(forward.source)
from_to_dict[src] = [await _(dest) for dest in forward.dest]
# from_to_dict[src] = {
# "dest": [], # the list of destination entities
# "pcfg": 0 # id of the plugin config to use
# }
from_to_dict[src] = {}
from_to_dict[src]["dest"] = [await _(dest) for dest in forward.dest]
from_to_dict[src]["pcfg"] = forward.plugin_cfg
logging.info(f"From to dict is {from_to_dict}")
return from_to_dict

Expand Down Expand Up @@ -229,7 +282,7 @@ def read_db():


PASSWORD = os.getenv("PASSWORD", "tgcf")
ADMINS = []
ADMINS: List = []

MONGO_CON_STR = os.getenv("MONGO_CON_STR")
MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "tgcf-config")
Expand All @@ -242,19 +295,24 @@ def read_db():
logging.warn(
"You have not set a password to protect the web access to tgcf.\nThe default password `tgcf` is used."
)
from_to = {}
from_to: Dict[int, Dict[str, int | List[int]]] = {}
is_bot: Optional[bool] = None
logging.info("config.py got executed")


def get_SESSION(section: Any = CONFIG.login, default: str = 'tgcf_bot'):
if section.SESSION_STRING and section.user_type == 1:
def get_SESSION(
agent_id: int, login_cfg: LoginConfig = CONFIG.login_cfg, default: str = "tgcf_bot"
):
# TODO: validate agent_id
agent = login_cfg.agents[agent_id]
if agent.SESSION_STRING and agent.user_type == 1:
logging.info("using session string")
SESSION = StringSession(section.SESSION_STRING)
elif section.BOT_TOKEN and section.user_type == 0:
SESSION = StringSession(agent.SESSION_STRING)
elif agent.BOT_TOKEN and agent.user_type == 0:
logging.info("using bot account")
SESSION = default
SESSION = default + str(agent_id)
else:
logging.warning("Login information not set!")
sys.exit()
return SESSION
logging.info(SESSION)
return SESSION
47 changes: 30 additions & 17 deletions tgcf/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from tgcf.plugins import apply_plugins, load_async_plugins
from tgcf.utils import clean_session_files, send_message

current_agent: int = 0


async def new_message_handler(event: Union[Message, events.NewMessage]) -> None:
"""Process new incoming messages."""
Expand All @@ -36,9 +38,10 @@ async def new_message_handler(event: Union[Message, events.NewMessage]) -> None:
del st.stored[key]
break

dest = config.from_to.get(chat_id)
dest = config.from_to.get(chat_id).get("dest")
pcfg_id = config.from_to.get(chat_id).get("pcfg")

tm = await apply_plugins(message)
tm = await apply_plugins(pcfg_id, message)
if not tm:
return

Expand All @@ -50,7 +53,7 @@ async def new_message_handler(event: Union[Message, events.NewMessage]) -> None:
for d in dest:
if event.is_reply and r_event_uid in st.stored:
tm.reply_to = st.stored.get(r_event_uid).get(d)
fwded_msg = await send_message(d, tm)
fwded_msg = await send_message(current_agent, d, tm)
st.stored[event_uid].update({d: fwded_msg})
tm.clear()

Expand All @@ -67,8 +70,9 @@ async def edited_message_handler(event) -> None:
logging.info(f"Message edited in {chat_id}")

event_uid = st.EventUid(event)
pcfg_id = config.from_to.get(chat_id).get("pcfg")

tm = await apply_plugins(message)
tm = await apply_plugins(pcfg_id, message)

if not tm:
return
Expand All @@ -77,17 +81,20 @@ async def edited_message_handler(event) -> None:

if fwded_msgs:
for _, msg in fwded_msgs.items():
if config.CONFIG.live.delete_on_edit == message.text:
if (
config.CONFIG.agent_fwd_cfg[current_agent].live.delete_on_edit
== message.text
):
await msg.delete()
await message.delete()
else:
await msg.edit(tm.text)
return

dest = config.from_to.get(chat_id)
dest = config.from_to.get(chat_id).get("dest")

for d in dest:
await send_message(d, tm)
await send_message(current_agent, d, tm)
tm.clear()


Expand All @@ -114,26 +121,29 @@ async def deleted_message_handler(event):
}


async def start_sync() -> None:
async def start_sync(agent_id: int) -> None:
"""Start tgcf live sync."""
# clear past session files
clean_session_files()
global current_agent
current_agent = agent_id

# load async plugins defined in plugin_models
await load_async_plugins()

SESSION = get_SESSION()
SESSION = get_SESSION(agent_id)
client = TelegramClient(
SESSION,
CONFIG.login.API_ID,
CONFIG.login.API_HASH,
sequential_updates=CONFIG.live.sequential_updates,
CONFIG.login_cfg.tg.API_ID,
CONFIG.login_cfg.tg.API_HASH,
sequential_updates=CONFIG.agent_fwd_cfg[agent_id].live.sequential_updates,
)
if CONFIG.login.user_type == 0:
if CONFIG.login.BOT_TOKEN == "":
agent = CONFIG.login_cfg.agents[agent_id]
if agent.user_type == 0:
if agent.BOT_TOKEN == "":
logging.warning("Bot token not found, but login type is set to bot.")
sys.exit()
await client.start(bot_token=CONFIG.login.BOT_TOKEN)
await client.start(bot_token=agent.BOT_TOKEN)
else:
await client.start()
config.is_bot = await client.is_bot()
Expand All @@ -145,7 +155,10 @@ async def start_sync() -> None:
ALL_EVENTS.update(command_events)

for key, val in ALL_EVENTS.items():
if config.CONFIG.live.delete_sync is False and key == "deleted":
if (
config.CONFIG.agent_fwd_cfg[agent_id].live.delete_sync is False
and key == "deleted"
):
continue
client.add_event_handler(*val)
logging.info(f"Added event handler for {key}")
Expand All @@ -161,5 +174,5 @@ async def start_sync() -> None:
],
)
)
config.from_to = await config.load_from_to(client, config.CONFIG.forwards)
config.from_to = await config.load_from_to(agent_id, client, config.CONFIG.forwards)
await client.run_until_disconnected()
Loading
Loading