enresocial_bot/bot.py
2025-11-13 22:24:46 +01:00

636 lines
19 KiB
Python

import os
import tempfile
from dataclasses import dataclass
from typing import Optional, List
from dotenv import load_dotenv
from mastodon import Mastodon
import requests
from telegram import (
Update,
Message,
PhotoSize,
InlineKeyboardButton,
InlineKeyboardMarkup,
)
from telegram.ext import (
ApplicationBuilder,
ContextTypes,
MessageHandler,
CommandHandler,
CallbackQueryHandler,
filters,
)
import logging
import sys
import re
# ---------------------------------
# Config & logging
# ---------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
log = logging.getLogger("enresocial_bot")
load_dotenv()
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
CHANNEL_ID = int(os.getenv("TELEGRAM_CHANNEL_ID", "0")) # canal opcional
GROUP_ID = int(os.getenv("TELEGRAM_GROUP_ID", "0")) # grupo opcional (si quieres limitar)
# Mastodon
MASTODON_BASE_URL = os.getenv("MASTODON_BASE_URL")
MASTODON_ACCESS_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN")
# Instagram (cuando lo actives)
IG_BUSINESS_ACCOUNT_ID = os.getenv("IG_BUSINESS_ACCOUNT_ID")
IG_ACCESS_TOKEN = os.getenv("IG_ACCESS_TOKEN")
PUBLIC_UPLOAD_BASE_URL = os.getenv("PUBLIC_UPLOAD_BASE_URL") # p.ej. https://tudominio.com/uploads
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "./uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
if not TELEGRAM_TOKEN:
log.error("Falta TELEGRAM_TOKEN en el .env")
sys.exit(1)
TRIGGER = "/1212"
TRIGGER_RE = re.compile(r"^\s*/1212\b", re.IGNORECASE)
@dataclass
class PostPayload:
text: str
image_paths: List[str] # rutas locales de imágenes descargadas
# ----------------------------
# Ayuda /1212 (comportamiento existente)
# ----------------------------
def extract_command_payload(msg: Message) -> Optional[str]:
"""
Si el texto o el caption comienza con /1212, devuelve el resto (payload).
"""
full_text = (msg.text or msg.caption or "").strip()
if not full_text:
return None
if TRIGGER_RE.match(full_text):
return TRIGGER_RE.sub("", full_text, count=1).lstrip()
return None
async def download_photos(context: ContextTypes.DEFAULT_TYPE, photos: List[PhotoSize]) -> List[str]:
"""
Descarga la foto de mayor resolución a un archivo temporal y devuelve la lista de rutas.
Telegram envía varias resoluciones de la misma foto; tomamos la mayor.
"""
if not photos:
return []
largest = max(photos, key=lambda p: p.width * p.height)
file = await context.bot.get_file(largest.file_id)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg", dir=UPLOAD_DIR) as tmp:
await file.download_to_drive(custom_path=tmp.name)
log.info(f"[TG] Foto descargada: {tmp.name}")
return [tmp.name]
def mastodon_client() -> Optional[Mastodon]:
if not (MASTODON_BASE_URL and MASTODON_ACCESS_TOKEN):
return None
return Mastodon(api_base_url=MASTODON_BASE_URL, access_token=MASTODON_ACCESS_TOKEN)
def post_to_mastodon(payload: PostPayload):
m = mastodon_client()
if not m:
log.warning("[Mastodon] No configurado, omito.")
return
media_ids = []
for path in payload.image_paths:
media = m.media_post(path, mime_type="image/jpeg")
media_ids.append(media["id"])
log.info(f"[Mastodon] Media subida: {media['id']}")
m.status_post(status=payload.text or "", media_ids=media_ids or None, visibility="public")
log.info("[Mastodon] Publicado.")
def ensure_public_url(local_path: str) -> str:
"""
Convierte una ruta local en una URL pública donde Instagram pueda descargar la imagen.
Opción simple: sirves /uploads con Nginx/Apache en PUBLIC_UPLOAD_BASE_URL.
"""
if not PUBLIC_UPLOAD_BASE_URL:
raise RuntimeError("Falta PUBLIC_UPLOAD_BASE_URL para Instagram.")
filename = os.path.basename(local_path)
return f"{PUBLIC_UPLOAD_BASE_URL.rstrip('/')}/{filename}"
def post_to_instagram(payload: PostPayload):
"""
Publicación a Instagram Business usando Instagram Graph API:
1) Crear 'media container' con image_url público y caption.
2) Publicar el container.
IMPORTANTE: necesitas una URL pública (HTTPS) alcanzable por Meta.
"""
if not (IG_BUSINESS_ACCOUNT_ID and IG_ACCESS_TOKEN):
log.warning("[Instagram] No configurado, omito.")
return
if not payload.image_paths:
log.warning("[Instagram] No hay imagen para publicar (IG requiere imagen/video).")
return
# Instagram solo acepta 1 imagen por post básico con /media (para carrusel es otro flujo)
img_url = ensure_public_url(payload.image_paths[0])
# 1) Crear contenedor
media_resp = requests.post(
f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media",
data={"image_url": img_url, "caption": payload.text or "", "access_token": IG_ACCESS_TOKEN},
timeout=60
)
media_resp.raise_for_status()
creation_id = media_resp.json().get("id")
# 2) Publicar
publish_resp = requests.post(
f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media_publish",
data={"creation_id": creation_id, "access_token": IG_ACCESS_TOKEN},
timeout=60
)
publish_resp.raise_for_status()
log.info("[Instagram] Publicado.")
# ----------------------------
# Borradores para /publicar
# ----------------------------
DRAFT_KEY = "current_draft"
DRAFT_STATE_KEY = "draft_state" # 'idle', 'waiting_title', 'waiting_description', 'waiting_hashtags', 'waiting_time', 'waiting_date'
def init_draft(
context: ContextTypes.DEFAULT_TYPE,
chat_id: int,
title: str = "",
description: str = "",
image_paths=None
):
if image_paths is None:
image_paths = []
context.user_data[DRAFT_KEY] = {
"chat_id": chat_id,
"title": title,
"description": description,
"hashtags": "",
"time_start": "",
"time_end": "",
"date": "",
"image_paths": image_paths,
"message_id": None, # id del mensaje de preview
}
context.user_data[DRAFT_STATE_KEY] = "idle"
def get_draft(context: ContextTypes.DEFAULT_TYPE) -> Optional[dict]:
return context.user_data.get(DRAFT_KEY)
def set_state(context: ContextTypes.DEFAULT_TYPE, state: str):
context.user_data[DRAFT_STATE_KEY] = state
def get_state(context: ContextTypes.DEFAULT_TYPE) -> str:
return context.user_data.get(DRAFT_STATE_KEY, "idle")
def build_draft_preview_text(draft: dict) -> str:
title = draft.get("title") or ""
description = draft.get("description") or ""
hashtags = draft.get("hashtags") or ""
date = draft.get("date") or ""
ts = draft.get("time_start") or ""
te = draft.get("time_end") or ""
if ts and te:
time_str = f"{ts} / {te}"
elif ts:
time_str = ts
else:
time_str = ""
text = (
"✏️ *Borrador de publicación*\n\n"
f"📌 *Título*: {title}\n\n"
f"📝 *Descripción:*\n{description}\n\n"
f"🏷 Hashtags: {hashtags}\n"
f"⏰ Horario: {time_str}\n"
f"📅 Fecha: {date}"
)
return text
def build_draft_keyboard() -> InlineKeyboardMarkup:
keyboard = [
[
InlineKeyboardButton("Título", callback_data="draft:title"),
InlineKeyboardButton("Descripción", callback_data="draft:description"),
],
[
InlineKeyboardButton("Hashtag", callback_data="draft:hashtags"),
InlineKeyboardButton("Horario", callback_data="draft:time"),
InlineKeyboardButton("Fecha", callback_data="draft:date"),
],
[
InlineKeyboardButton("✅ Publicar", callback_data="draft:publish"),
InlineKeyboardButton("❌ Cancelar", callback_data="draft:cancel"),
]
]
return InlineKeyboardMarkup(keyboard)
async def refresh_draft_message(context: ContextTypes.DEFAULT_TYPE):
"""
Borra el mensaje de preview anterior (si existe) y envía uno nuevo
con la preview + botones, para que siempre quede al final del chat.
"""
draft = get_draft(context)
if not draft:
return
chat_id = draft["chat_id"]
old_message_id = draft.get("message_id")
# Borramos el mensaje anterior del borrador si existe
if old_message_id:
try:
await context.bot.delete_message(chat_id=chat_id, message_id=old_message_id)
except Exception as e:
log.warning(f"No se pudo borrar el mensaje anterior del borrador: {e}")
# Enviamos un nuevo mensaje de preview con la botonera
sent = await context.bot.send_message(
chat_id=chat_id,
text=build_draft_preview_text(draft),
reply_markup=build_draft_keyboard(),
parse_mode="Markdown"
)
draft["message_id"] = sent.message_id
# ----------------------------
# Comando /publicar
# ----------------------------
async def start_publish(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.message
if msg is None:
return
full_text = msg.text or ""
parts = full_text.split(" ", 1)
# Si el usuario pone texto tras /publicar, lo usamos como descripción inicial
base_description = ""
if len(parts) >= 2 and parts[1].strip():
base_description = parts[1].strip()
# Descargamos foto si la hay
image_paths: List[str] = []
if msg.photo:
image_paths = await download_photos(context, msg.photo)
# Iniciamos borrador con título vacío y descripción opcional
init_draft(context, msg.chat.id, title="", description=base_description, image_paths=image_paths)
# Mostramos el borrador con la botonera al final del chat
await refresh_draft_message(context)
# ----------------------------
# Callback de botones (Título / Descripción / Hashtag / Horario / Fecha / Publicar / Cancelar)
# ----------------------------
async def draft_keyboard_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
data = query.data or ""
if not data.startswith("draft:"):
return
draft = get_draft(context)
if not draft:
try:
await query.message.edit_text("No hay borrador activo.")
except Exception:
pass
return
action = data.split(":", 1)[1]
if action == "title":
set_state(context, "waiting_title")
await query.message.reply_text(
"Escribe ahora el *título* de la publicación.",
parse_mode="Markdown"
)
elif action == "description":
set_state(context, "waiting_description")
await query.message.reply_text(
"Escribe ahora la *descripción* de la publicación.",
parse_mode="Markdown"
)
elif action == "hashtags":
set_state(context, "waiting_hashtags")
await query.message.reply_text(
"Escribe ahora los *hashtags* en un mensaje (por ejemplo: `#feminismo #sostenibilidad`).",
parse_mode="Markdown"
)
elif action == "time":
set_state(context, "waiting_time")
await query.message.reply_text(
"Escribe ahora el *horario* en formato `HH:MM / HH:MM` (por ejemplo: `19:30 / 21:30`).",
parse_mode="Markdown"
)
elif action == "date":
set_state(context, "waiting_date")
await query.message.reply_text(
"Escribe ahora la *fecha* en formato `DD/MM/AAAA` (por ejemplo: `13/04/2025`).",
parse_mode="Markdown"
)
elif action == "publish":
await finalize_and_post_draft(update, context)
elif action == "cancel":
context.user_data.pop(DRAFT_KEY, None)
set_state(context, "idle")
try:
await query.message.edit_text("Publicación cancelada.")
except Exception:
pass
# ----------------------------
# Manejo de los textos de título / descripción / hashtag / hora / fecha
# ----------------------------
async def handle_draft_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.message
if msg is None or not msg.text:
return
state = get_state(context)
draft = get_draft(context)
if not draft:
return # no hay borrador activo, ignoramos
text = msg.text.strip()
# Título
if state == "waiting_title":
draft["title"] = text
set_state(context, "idle")
await refresh_draft_message(context)
return
# Descripción
if state == "waiting_description":
draft["description"] = text
set_state(context, "idle")
await refresh_draft_message(context)
return
# Hashtags
if state == "waiting_hashtags":
draft["hashtags"] = text
set_state(context, "idle")
await refresh_draft_message(context)
return
# Horario rango HH:MM / HH:MM
if state == "waiting_time":
# Validar formato tipo "19:30 / 21:30"
m = re.match(r"^\s*(\d{1,2}:\d{2})\s*/\s*(\d{1,2}:\d{2})\s*$", text)
if not m:
await msg.reply_text(
"Formato de horario no válido. Usa `HH:MM / HH:MM`, por ejemplo `19:30 / 21:30`.",
parse_mode="Markdown"
)
return
start, end = m.group(1), m.group(2)
draft["time_start"] = start
draft["time_end"] = end
set_state(context, "idle")
await refresh_draft_message(context)
return
# Fecha DD/MM/AAAA
if state == "waiting_date":
if not re.match(r"^\d{1,2}/\d{1,2}/\d{4}$", text):
await msg.reply_text(
"Formato de fecha no válido. Usa `DD/MM/AAAA`, por ejemplo `13/04/2025`.",
parse_mode="Markdown"
)
return
draft["date"] = text
set_state(context, "idle")
await refresh_draft_message(context)
return
# Si no estamos esperando nada especial, no hacemos nada
return
# ----------------------------
# Publicar borrador (/publicar) realmente
# ----------------------------
async def finalize_and_post_draft(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
draft = get_draft(context)
if not draft:
try:
await query.message.edit_text("No hay borrador para publicar.")
except Exception:
pass
return
title = draft.get("title") or ""
description = draft.get("description") or ""
base_text_parts = []
if title:
base_text_parts.append(title)
if description:
if title:
base_text_parts.append("") # línea en blanco entre título y descripción
base_text_parts.append(description)
base_text = "\n".join(base_text_parts) if base_text_parts else ""
extra_lines = []
if draft.get("date"):
extra_lines.append(f"📅 {draft['date']}")
ts = draft.get("time_start") or ""
te = draft.get("time_end") or ""
if ts and te:
extra_lines.append(f"{ts} / {te}")
elif ts:
extra_lines.append(f"{ts}")
if draft.get("hashtags"):
extra_lines.append(draft["hashtags"])
final_text = base_text
if extra_lines:
if final_text:
final_text += "\n\n" + "\n".join(extra_lines)
else:
final_text = "\n".join(extra_lines)
payload = PostPayload(
text=final_text,
image_paths=draft.get("image_paths", []),
)
try:
post_to_mastodon(payload)
except Exception as e:
log.exception(f"[Mastodon] Error: {e}")
try:
post_to_instagram(payload)
except Exception as e:
log.exception(f"[Instagram] Error: {e}")
context.user_data.pop(DRAFT_KEY, None)
set_state(context, "idle")
try:
await query.message.edit_text("Publicado en redes ✅")
except Exception:
pass
# ----------------------------
# Handlers existentes /1212
# ----------------------------
async def handle_channel_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Para posts en CANAL."""
if update.channel_post is None:
return
msg = update.channel_post
# Debug de ID y tipo
log.info(f"[TG] channel_post en chat.id={msg.chat.id} (type=channel)")
# Limitar opcionalmente a un canal concreto
if CHANNEL_ID and msg.chat.id != CHANNEL_ID:
return
payload_text = extract_command_payload(msg)
if payload_text is None:
return # no empieza por /1212
image_paths: List[str] = []
if msg.photo:
image_paths = await download_photos(context, msg.photo)
payload = PostPayload(text=payload_text, image_paths=image_paths)
try:
post_to_mastodon(payload)
except Exception as e:
log.exception(f"[Mastodon] Error: {e}")
try:
post_to_instagram(payload)
except Exception as e:
log.exception(f"[Instagram] Error: {e}")
try:
await msg.reply_text("Publicado en redes ✅", quote=True)
except Exception:
pass
async def handle_group_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Para mensajes en GRUPOS/SUPERGRUPOS que empiecen por /1212 (texto o caption)."""
if update.message is None:
return
msg = update.message
# Debug de ID y tipo
log.info(f"[TG] message en chat.id={msg.chat.id} (type={msg.chat.type})")
# Limitar opcionalmente a un grupo concreto
if GROUP_ID and msg.chat.id != GROUP_ID:
return
# Aseguramos que sólo atendemos si empieza por /1212 en texto o caption
if not (TRIGGER_RE.match(msg.text or "") or TRIGGER_RE.match(msg.caption or "")):
return
payload_text = extract_command_payload(msg)
if payload_text is None:
return
image_paths: List[str] = []
if msg.photo:
image_paths = await download_photos(context, msg.photo)
payload = PostPayload(text=payload_text, image_paths=image_paths)
try:
post_to_mastodon(payload)
except Exception as e:
log.exception(f"[Mastodon] Error: {e}")
try:
post_to_instagram(payload)
except Exception as e:
log.exception(f"[Instagram] Error: {e}")
try:
await msg.reply_text("Publicado en redes ✅", quote=True)
except Exception:
pass
# ----------------------------
# main()
# ----------------------------
def main():
app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
# NUEVO: comando /publicar + flujo de borrador
app.add_handler(CommandHandler("publicar", start_publish))
app.add_handler(CallbackQueryHandler(draft_keyboard_callback, pattern=r"^draft:"))
# Textos normales para título / descripción / hashtags / hora / fecha
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_draft_text))
# EXISTENTE: CANALES con /1212
app.add_handler(MessageHandler(filters.ChatType.CHANNEL, handle_channel_post))
# EXISTENTE: GRUPOS/SUPERGRUPOS con /1212
group_filter = filters.ChatType.GROUPS & (filters.Regex(TRIGGER_RE) | filters.CaptionRegex(TRIGGER_RE))
app.add_handler(MessageHandler(group_filter, handle_group_message))
log.info("Bot escuchando canal/grupos…")
app.run_polling(
allowed_updates=["message", "channel_post", "callback_query"],
drop_pending_updates=False
)
if __name__ == "__main__":
main()