Añadir bot.py
This commit is contained in:
parent
3cd5e92be4
commit
fc2e990958
1 changed files with 254 additions and 0 deletions
254
bot.py
Normal file
254
bot.py
Normal file
|
|
@ -0,0 +1,254 @@
|
|||
import os
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, List
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from mastodon import Mastodon
|
||||
import requests
|
||||
|
||||
from telegram import Update, Message, PhotoSize
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
|
||||
import logging
|
||||
import sys
|
||||
import re
|
||||
|
||||
# ---------------------------------
|
||||
# Config & logging
|
||||
# ---------------------------------
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s"
|
||||
)
|
||||
log = logging.getLogger("enresocial_bot")
|
||||
|
||||
load_dotenv()
|
||||
|
||||
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
|
||||
CHANNEL_ID = int(os.getenv("TELEGRAM_CHANNEL_ID", "0")) # canal opcional
|
||||
GROUP_ID = int(os.getenv("TELEGRAM_GROUP_ID", "0")) # grupo opcional (si quieres limitar)
|
||||
|
||||
# Mastodon
|
||||
MASTODON_BASE_URL = os.getenv("MASTODON_BASE_URL")
|
||||
MASTODON_ACCESS_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN")
|
||||
|
||||
# Instagram (cuando lo actives)
|
||||
IG_BUSINESS_ACCOUNT_ID = os.getenv("IG_BUSINESS_ACCOUNT_ID")
|
||||
IG_ACCESS_TOKEN = os.getenv("IG_ACCESS_TOKEN")
|
||||
PUBLIC_UPLOAD_BASE_URL = os.getenv("PUBLIC_UPLOAD_BASE_URL") # p.ej. https://tudominio.com/uploads
|
||||
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "./uploads")
|
||||
|
||||
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||||
|
||||
if not TELEGRAM_TOKEN:
|
||||
log.error("Falta TELEGRAM_TOKEN en el .env")
|
||||
sys.exit(1)
|
||||
|
||||
TRIGGER = "/1212"
|
||||
TRIGGER_RE = re.compile(r"^\s*/1212\b", re.IGNORECASE)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PostPayload:
|
||||
text: str
|
||||
image_paths: List[str] # rutas locales de imágenes descargadas
|
||||
|
||||
|
||||
def extract_command_payload(msg: Message) -> Optional[str]:
|
||||
"""
|
||||
Si el texto o el caption comienza con /1212, devuelve el resto (payload).
|
||||
"""
|
||||
full_text = (msg.text or msg.caption or "").strip()
|
||||
if not full_text:
|
||||
return None
|
||||
if TRIGGER_RE.match(full_text):
|
||||
return TRIGGER_RE.sub("", full_text, count=1).lstrip()
|
||||
return None
|
||||
|
||||
|
||||
async def download_photos(context: ContextTypes.DEFAULT_TYPE, photos: List[PhotoSize]) -> List[str]:
|
||||
"""
|
||||
Descarga la foto de mayor resolución a un archivo temporal y devuelve la lista de rutas.
|
||||
Telegram envía varias resoluciones de la misma foto; tomamos la mayor.
|
||||
"""
|
||||
if not photos:
|
||||
return []
|
||||
largest = max(photos, key=lambda p: p.width * p.height)
|
||||
file = await context.bot.get_file(largest.file_id)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg", dir=UPLOAD_DIR) as tmp:
|
||||
await file.download_to_drive(custom_path=tmp.name)
|
||||
log.info(f"[TG] Foto descargada: {tmp.name}")
|
||||
return [tmp.name]
|
||||
|
||||
|
||||
def mastodon_client() -> Optional[Mastodon]:
|
||||
if not (MASTODON_BASE_URL and MASTODON_ACCESS_TOKEN):
|
||||
return None
|
||||
return Mastodon(api_base_url=MASTODON_BASE_URL, access_token=MASTODON_ACCESS_TOKEN)
|
||||
|
||||
|
||||
def post_to_mastodon(payload: PostPayload):
|
||||
m = mastodon_client()
|
||||
if not m:
|
||||
log.warning("[Mastodon] No configurado, omito.")
|
||||
return
|
||||
media_ids = []
|
||||
for path in payload.image_paths:
|
||||
media = m.media_post(path, mime_type="image/jpeg")
|
||||
media_ids.append(media["id"])
|
||||
log.info(f"[Mastodon] Media subida: {media['id']}")
|
||||
m.status_post(status=payload.text or "", media_ids=media_ids or None, visibility="public")
|
||||
log.info("[Mastodon] Publicado.")
|
||||
|
||||
|
||||
def ensure_public_url(local_path: str) -> str:
|
||||
"""
|
||||
Convierte una ruta local en una URL pública donde Instagram pueda descargar la imagen.
|
||||
Opción simple: sirves /uploads con Nginx/Apache en PUBLIC_UPLOAD_BASE_URL.
|
||||
"""
|
||||
if not PUBLIC_UPLOAD_BASE_URL:
|
||||
raise RuntimeError("Falta PUBLIC_UPLOAD_BASE_URL para Instagram.")
|
||||
filename = os.path.basename(local_path)
|
||||
return f"{PUBLIC_UPLOAD_BASE_URL.rstrip('/')}/{filename}"
|
||||
|
||||
|
||||
def post_to_instagram(payload: PostPayload):
|
||||
"""
|
||||
Publicación a Instagram Business usando Instagram Graph API:
|
||||
1) Crear 'media container' con image_url público y caption.
|
||||
2) Publicar el container.
|
||||
|
||||
IMPORTANTE: necesitas una URL pública (HTTPS) alcanzable por Meta.
|
||||
"""
|
||||
if not (IG_BUSINESS_ACCOUNT_ID and IG_ACCESS_TOKEN):
|
||||
log.warning("[Instagram] No configurado, omito.")
|
||||
return
|
||||
|
||||
if not payload.image_paths:
|
||||
log.warning("[Instagram] No hay imagen para publicar (IG requiere imagen/video).")
|
||||
return
|
||||
|
||||
# Instagram solo acepta 1 imagen por post básico con /media (para carrusel es otro flujo)
|
||||
img_url = ensure_public_url(payload.image_paths[0])
|
||||
|
||||
# 1) Crear contenedor
|
||||
media_resp = requests.post(
|
||||
f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media",
|
||||
data={"image_url": img_url, "caption": payload.text or "", "access_token": IG_ACCESS_TOKEN},
|
||||
timeout=60
|
||||
)
|
||||
media_resp.raise_for_status()
|
||||
creation_id = media_resp.json().get("id")
|
||||
|
||||
# 2) Publicar
|
||||
publish_resp = requests.post(
|
||||
f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media_publish",
|
||||
data={"creation_id": creation_id, "access_token": IG_ACCESS_TOKEN},
|
||||
timeout=60
|
||||
)
|
||||
publish_resp.raise_for_status()
|
||||
log.info("[Instagram] Publicado.")
|
||||
|
||||
|
||||
# ----------------------------
|
||||
# Handlers
|
||||
# ----------------------------
|
||||
async def handle_channel_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
"""Para posts en CANAL."""
|
||||
if update.channel_post is None:
|
||||
return
|
||||
msg = update.channel_post
|
||||
|
||||
# Debug de ID y tipo
|
||||
log.info(f"[TG] channel_post en chat.id={msg.chat.id} (type=channel)")
|
||||
|
||||
# Limitar opcionalmente a un canal concreto
|
||||
if CHANNEL_ID and msg.chat.id != CHANNEL_ID:
|
||||
return
|
||||
|
||||
payload_text = extract_command_payload(msg)
|
||||
if payload_text is None:
|
||||
return # no empieza por /1212
|
||||
|
||||
image_paths: List[str] = []
|
||||
if msg.photo:
|
||||
image_paths = await download_photos(context, msg.photo)
|
||||
|
||||
payload = PostPayload(text=payload_text, image_paths=image_paths)
|
||||
|
||||
try:
|
||||
post_to_mastodon(payload)
|
||||
except Exception as e:
|
||||
log.exception(f"[Mastodon] Error: {e}")
|
||||
|
||||
try:
|
||||
post_to_instagram(payload)
|
||||
except Exception as e:
|
||||
log.exception(f"[Instagram] Error: {e}")
|
||||
|
||||
try:
|
||||
await msg.reply_text("Publicado en redes ✅", quote=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def handle_group_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
"""Para mensajes en GRUPOS/SUPERGRUPOS que empiecen por /1212 (texto o caption)."""
|
||||
if update.message is None:
|
||||
return
|
||||
msg = update.message
|
||||
|
||||
# Debug de ID y tipo
|
||||
log.info(f"[TG] message en chat.id={msg.chat.id} (type={msg.chat.type})")
|
||||
|
||||
# Limitar opcionalmente a un grupo concreto
|
||||
if GROUP_ID and msg.chat.id != GROUP_ID:
|
||||
return
|
||||
|
||||
# Aseguramos que sólo atendemos si empieza por /1212 en texto o caption
|
||||
if not (TRIGGER_RE.match(msg.text or "") or TRIGGER_RE.match(msg.caption or "")):
|
||||
return
|
||||
|
||||
payload_text = extract_command_payload(msg)
|
||||
if payload_text is None:
|
||||
return
|
||||
|
||||
image_paths: List[str] = []
|
||||
if msg.photo:
|
||||
image_paths = await download_photos(context, msg.photo)
|
||||
|
||||
payload = PostPayload(text=payload_text, image_paths=image_paths)
|
||||
|
||||
try:
|
||||
post_to_mastodon(payload)
|
||||
except Exception as e:
|
||||
log.exception(f"[Mastodon] Error: {e}")
|
||||
|
||||
try:
|
||||
post_to_instagram(payload)
|
||||
except Exception as e:
|
||||
log.exception(f"[Instagram] Error: {e}")
|
||||
|
||||
try:
|
||||
await msg.reply_text("Publicado en redes ✅", quote=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
|
||||
|
||||
# CANALES
|
||||
app.add_handler(MessageHandler(filters.ChatType.CHANNEL, handle_channel_post))
|
||||
|
||||
# GRUPOS/SUPERGRUPOS: sólo cuando el texto o el caption empieza por /1212
|
||||
group_filter = filters.ChatType.GROUPS & (filters.Regex(TRIGGER_RE) | filters.CaptionRegex(TRIGGER_RE))
|
||||
app.add_handler(MessageHandler(group_filter, handle_group_message))
|
||||
|
||||
log.info("Bot escuchando canal/grupos…")
|
||||
# Escuchamos ambos tipos
|
||||
app.run_polling(allowed_updates=["message", "channel_post"], drop_pending_updates=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue