595 lines
17 KiB
Python
595 lines
17 KiB
Python
import os
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from typing import Optional, List
|
|
|
|
from dotenv import load_dotenv
|
|
from mastodon import Mastodon
|
|
import requests
|
|
|
|
from telegram import (
|
|
Update,
|
|
Message,
|
|
PhotoSize,
|
|
InlineKeyboardButton,
|
|
InlineKeyboardMarkup,
|
|
)
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
CommandHandler,
|
|
CallbackQueryHandler,
|
|
filters,
|
|
)
|
|
import logging
|
|
import sys
|
|
import re
|
|
|
|
# ---------------------------------
|
|
# Config & logging
|
|
# ---------------------------------
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s"
|
|
)
|
|
log = logging.getLogger("enresocial_bot")
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def get_int_env(name: str, default: int = 0) -> int:
|
|
"""
|
|
Lee una variable de entorno como int.
|
|
Si está vacía o mal formateada, devuelve default y escribe un warning.
|
|
"""
|
|
raw = os.getenv(name, "").strip()
|
|
if not raw:
|
|
return default
|
|
try:
|
|
return int(raw)
|
|
except ValueError:
|
|
log.warning(f"Variable de entorno {name}='{raw}' no es un entero válido. Usando {default}.")
|
|
return default
|
|
|
|
|
|
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
|
|
|
|
# Si algún día quieres limitar /publicar a un chat concreto,
|
|
# puedes usar estos IDs (de momento no se filtra por ellos).
|
|
CHANNEL_ID = get_int_env("TELEGRAM_CHANNEL_ID", 0)
|
|
GROUP_ID = get_int_env("TELEGRAM_GROUP_ID", 0)
|
|
|
|
# Mastodon
|
|
MASTODON_BASE_URL = os.getenv("MASTODON_BASE_URL")
|
|
MASTODON_ACCESS_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN")
|
|
|
|
# Instagram
|
|
IG_BUSINESS_ACCOUNT_ID = os.getenv("IG_BUSINESS_ACCOUNT_ID")
|
|
IG_ACCESS_TOKEN = os.getenv("IG_ACCESS_TOKEN")
|
|
PUBLIC_UPLOAD_BASE_URL = os.getenv("PUBLIC_UPLOAD_BASE_URL")
|
|
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "./uploads")
|
|
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
|
if not TELEGRAM_TOKEN:
|
|
log.error("Falta TELEGRAM_TOKEN en el .env")
|
|
sys.exit(1)
|
|
|
|
|
|
@dataclass
|
|
class PostPayload:
|
|
text: str
|
|
image_paths: List[str]
|
|
|
|
|
|
async def download_photos(context: ContextTypes.DEFAULT_TYPE, photos: List[PhotoSize]) -> List[str]:
|
|
"""
|
|
Descarga la foto de mayor resolución a un archivo temporal y devuelve la lista de rutas.
|
|
"""
|
|
if not photos:
|
|
return []
|
|
largest = max(photos, key=lambda p: p.width * p.height)
|
|
file = await context.bot.get_file(largest.file_id)
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg", dir=UPLOAD_DIR) as tmp:
|
|
await file.download_to_drive(custom_path=tmp.name)
|
|
log.info(f"[TG] Foto descargada: {tmp.name}")
|
|
return [tmp.name]
|
|
|
|
|
|
def mastodon_client() -> Optional[Mastodon]:
|
|
if not (MASTODON_BASE_URL and MASTODON_ACCESS_TOKEN):
|
|
return None
|
|
return Mastodon(api_base_url=MASTODON_BASE_URL, access_token=MASTODON_ACCESS_TOKEN)
|
|
|
|
|
|
def post_to_mastodon(payload: PostPayload):
|
|
m = mastodon_client()
|
|
if not m:
|
|
log.warning("[Mastodon] No configurado, omito.")
|
|
return
|
|
media_ids = []
|
|
for path in payload.image_paths:
|
|
media = m.media_post(path, mime_type="image/jpeg")
|
|
media_ids.append(media["id"])
|
|
log.info(f"[Mastodon] Media subida: {media['id']}")
|
|
m.status_post(status=payload.text or "", media_ids=media_ids or None, visibility="public")
|
|
log.info("[Mastodon] Publicado.")
|
|
|
|
|
|
def ensure_public_url(local_path: str) -> str:
|
|
"""
|
|
Convierte una ruta local en una URL pública donde Instagram pueda descargar la imagen.
|
|
"""
|
|
if not PUBLIC_UPLOAD_BASE_URL:
|
|
raise RuntimeError("Falta PUBLIC_UPLOAD_BASE_URL para Instagram.")
|
|
filename = os.path.basename(local_path)
|
|
return f"{PUBLIC_UPLOAD_BASE_URL.rstrip('/')}/{filename}"
|
|
|
|
|
|
def post_to_instagram(payload: PostPayload):
|
|
"""
|
|
Publicación a Instagram Business usando Instagram Graph API.
|
|
"""
|
|
if not (IG_BUSINESS_ACCOUNT_ID and IG_ACCESS_TOKEN):
|
|
log.warning("[Instagram] No configurado, omito.")
|
|
return
|
|
|
|
if not payload.image_paths:
|
|
log.warning("[Instagram] No hay imagen para publicar (IG requiere imagen/video).")
|
|
return
|
|
|
|
img_url = ensure_public_url(payload.image_paths[0])
|
|
|
|
media_resp = requests.post(
|
|
f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media",
|
|
data={
|
|
"image_url": img_url,
|
|
"caption": payload.text or "",
|
|
"access_token": IG_ACCESS_TOKEN,
|
|
},
|
|
timeout=60,
|
|
)
|
|
media_resp.raise_for_status()
|
|
creation_id = media_resp.json().get("id")
|
|
|
|
publish_resp = requests.post(
|
|
f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media_publish",
|
|
data={"creation_id": creation_id, "access_token": IG_ACCESS_TOKEN},
|
|
timeout=60,
|
|
)
|
|
publish_resp.raise_for_status()
|
|
log.info("[Instagram] Publicado.")
|
|
|
|
|
|
# ----------------------------
|
|
# Borradores para /publicar
|
|
# ----------------------------
|
|
DRAFT_KEY = "current_draft"
|
|
# 'idle', 'waiting_title', 'waiting_description', 'waiting_hashtags',
|
|
# 'waiting_time', 'waiting_date', 'waiting_photo'
|
|
DRAFT_STATE_KEY = "draft_state"
|
|
|
|
|
|
def init_draft(
|
|
context: ContextTypes.DEFAULT_TYPE,
|
|
chat_id: int,
|
|
title: str = "",
|
|
description: str = "",
|
|
image_paths: Optional[List[str]] = None,
|
|
):
|
|
if image_paths is None:
|
|
image_paths = []
|
|
context.user_data[DRAFT_KEY] = {
|
|
"chat_id": chat_id,
|
|
"title": title,
|
|
"description": description,
|
|
"hashtags": "",
|
|
"time_start": "",
|
|
"time_end": "",
|
|
"date": "",
|
|
"image_paths": image_paths,
|
|
"message_id": None, # id del mensaje de preview
|
|
}
|
|
context.user_data[DRAFT_STATE_KEY] = "idle"
|
|
|
|
|
|
def get_draft(context: ContextTypes.DEFAULT_TYPE) -> Optional[dict]:
|
|
return context.user_data.get(DRAFT_KEY)
|
|
|
|
|
|
def set_state(context: ContextTypes.DEFAULT_TYPE, state: str):
|
|
context.user_data[DRAFT_STATE_KEY] = state
|
|
|
|
|
|
def get_state(context: ContextTypes.DEFAULT_TYPE) -> str:
|
|
return context.user_data.get(DRAFT_STATE_KEY, "idle")
|
|
|
|
|
|
def build_draft_preview_text(draft: dict) -> str:
|
|
title = draft.get("title") or "—"
|
|
description = draft.get("description") or "—"
|
|
hashtags = draft.get("hashtags") or "—"
|
|
date = draft.get("date") or "—"
|
|
|
|
ts = draft.get("time_start") or ""
|
|
te = draft.get("time_end") or ""
|
|
if ts and te:
|
|
time_str = f"{ts} / {te}"
|
|
elif ts:
|
|
time_str = ts
|
|
else:
|
|
time_str = "—"
|
|
|
|
has_image = "Sí ✅" if draft.get("image_paths") else "No —"
|
|
|
|
text = (
|
|
"✏️ *Borrador de publicación*\n\n"
|
|
f"📌 *Título*: {title}\n\n"
|
|
f"📝 *Descripción:*\n{description}\n\n"
|
|
f"🏷 Hashtags: {hashtags}\n"
|
|
f"⏰ Horario: {time_str}\n"
|
|
f"📅 Fecha: {date}\n"
|
|
f"🖼 Imagen: {has_image}"
|
|
)
|
|
return text
|
|
|
|
|
|
def build_draft_keyboard() -> InlineKeyboardMarkup:
|
|
keyboard = [
|
|
[
|
|
InlineKeyboardButton("Título", callback_data="draft:title"),
|
|
InlineKeyboardButton("Descripción", callback_data="draft:description"),
|
|
],
|
|
[
|
|
InlineKeyboardButton("Hashtag", callback_data="draft:hashtags"),
|
|
InlineKeyboardButton("Horario", callback_data="draft:time"),
|
|
InlineKeyboardButton("Fecha", callback_data="draft:date"),
|
|
],
|
|
[
|
|
InlineKeyboardButton("📷 Adjuntar foto", callback_data="draft:photo"),
|
|
],
|
|
[
|
|
InlineKeyboardButton("✅ Publicar", callback_data="draft:publish"),
|
|
InlineKeyboardButton("❌ Cancelar", callback_data="draft:cancel"),
|
|
],
|
|
]
|
|
return InlineKeyboardMarkup(keyboard)
|
|
|
|
|
|
async def refresh_draft_message(context: ContextTypes.DEFAULT_TYPE):
|
|
"""
|
|
Elimina el mensaje de preview anterior (si existe) y envía uno nuevo
|
|
con el borrador actualizado + botones.
|
|
"""
|
|
draft = get_draft(context)
|
|
if not draft:
|
|
return
|
|
|
|
chat_id = draft["chat_id"]
|
|
old_message_id = draft.get("message_id")
|
|
|
|
if old_message_id:
|
|
try:
|
|
await context.bot.delete_message(chat_id=chat_id, message_id=old_message_id)
|
|
except Exception as e:
|
|
log.warning(f"No se pudo borrar el mensaje anterior del borrador: {e}")
|
|
|
|
sent = await context.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=build_draft_preview_text(draft),
|
|
reply_markup=build_draft_keyboard(),
|
|
parse_mode="Markdown",
|
|
)
|
|
draft["message_id"] = sent.message_id
|
|
|
|
|
|
# ----------------------------
|
|
# Comando /publicar
|
|
# ----------------------------
|
|
async def start_publish(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
msg = update.message
|
|
if msg is None:
|
|
return
|
|
|
|
full_text = msg.text or ""
|
|
parts = full_text.split(" ", 1)
|
|
|
|
# Texto que venga detrás de /publicar lo usamos como descripción inicial
|
|
base_description = ""
|
|
if len(parts) >= 2 and parts[1].strip():
|
|
base_description = parts[1].strip()
|
|
|
|
image_paths: List[str] = []
|
|
# Si el usuario lanzó /publicar con una foto ya adjunta, también la usamos
|
|
if msg.photo:
|
|
image_paths = await download_photos(context, msg.photo)
|
|
|
|
init_draft(context, msg.chat.id, title="", description=base_description, image_paths=image_paths)
|
|
|
|
# Primer mensaje interactivo con los botones
|
|
await refresh_draft_message(context)
|
|
|
|
|
|
# ----------------------------
|
|
# Callback de botones
|
|
# ----------------------------
|
|
async def draft_keyboard_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
query = update.callback_query
|
|
await query.answer()
|
|
|
|
data = query.data or ""
|
|
if not data.startswith("draft:"):
|
|
return
|
|
|
|
draft = get_draft(context)
|
|
if not draft:
|
|
try:
|
|
await query.message.edit_text("No hay borrador activo.")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
action = data.split(":", 1)[1]
|
|
|
|
if action == "title":
|
|
set_state(context, "waiting_title")
|
|
await query.message.reply_text(
|
|
"Escribe ahora el *título* de la publicación.",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
elif action == "description":
|
|
set_state(context, "waiting_description")
|
|
await query.message.reply_text(
|
|
"Escribe ahora la *descripción* de la publicación.",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
elif action == "hashtags":
|
|
set_state(context, "waiting_hashtags")
|
|
await query.message.reply_text(
|
|
"Escribe ahora los *hashtags* (por ejemplo: `#feminismo #sostenibilidad`).",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
elif action == "time":
|
|
set_state(context, "waiting_time")
|
|
await query.message.reply_text(
|
|
"Escribe ahora el *horario* en formato `HH:MM / HH:MM` (por ejemplo: `19:30 / 21:30`).",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
elif action == "date":
|
|
set_state(context, "waiting_date")
|
|
await query.message.reply_text(
|
|
"Escribe ahora la *fecha* en formato `DD/MM/AAAA` (por ejemplo: `13/04/2025`).",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
elif action == "photo":
|
|
set_state(context, "waiting_photo")
|
|
await query.message.reply_text(
|
|
"Ahora envía la *foto* como un mensaje normal (sin comandos). "
|
|
"Se usará como imagen de la publicación.",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
elif action == "publish":
|
|
await finalize_and_post_draft(update, context)
|
|
|
|
elif action == "cancel":
|
|
context.user_data.pop(DRAFT_KEY, None)
|
|
set_state(context, "idle")
|
|
try:
|
|
await query.message.edit_text("Publicación cancelada.")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ----------------------------
|
|
# Manejo de texto para el borrador
|
|
# ----------------------------
|
|
async def handle_draft_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""
|
|
Aquí llega el texto que el usuario escribe DESPUÉS de pulsar un botón
|
|
(Título, Descripción, Hashtags, Hora, Fecha).
|
|
Guardamos el dato y volvemos a mostrar el borrador con los botones.
|
|
"""
|
|
msg = update.message
|
|
if msg is None or not msg.text:
|
|
return
|
|
|
|
state = get_state(context)
|
|
draft = get_draft(context)
|
|
if not draft:
|
|
return
|
|
|
|
text = msg.text.strip()
|
|
|
|
# Opcional: borrar el mensaje de texto del usuario para que el chat quede “limpio”
|
|
try:
|
|
await msg.delete()
|
|
except Exception:
|
|
pass
|
|
|
|
if state == "waiting_title":
|
|
draft["title"] = text
|
|
set_state(context, "idle")
|
|
await refresh_draft_message(context)
|
|
return
|
|
|
|
if state == "waiting_description":
|
|
draft["description"] = text
|
|
set_state(context, "idle")
|
|
await refresh_draft_message(context)
|
|
return
|
|
|
|
if state == "waiting_hashtags":
|
|
draft["hashtags"] = text
|
|
set_state(context, "idle")
|
|
await refresh_draft_message(context)
|
|
return
|
|
|
|
if state == "waiting_time":
|
|
m = re.match(r"^\s*(\d{1,2}:\d{2})\s*/\s*(\d{1,2}:\d{2})\s*$", text)
|
|
if not m:
|
|
await msg.reply_text(
|
|
"Formato de horario no válido. Usa `HH:MM / HH:MM`, por ejemplo `19:30 / 21:30`.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
start, end = m.group(1), m.group(2)
|
|
draft["time_start"] = start
|
|
draft["time_end"] = end
|
|
set_state(context, "idle")
|
|
await refresh_draft_message(context)
|
|
return
|
|
|
|
if state == "waiting_date":
|
|
if not re.match(r"^\d{1,2}/\d{1,2}/\d{4}$", text):
|
|
await msg.reply_text(
|
|
"Formato de fecha no válido. Usa `DD/MM/AAAA`, por ejemplo `13/04/2025`.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
draft["date"] = text
|
|
set_state(context, "idle")
|
|
await refresh_draft_message(context)
|
|
return
|
|
|
|
# Si no estamos esperando nada, ignoramos
|
|
return
|
|
|
|
|
|
# ----------------------------
|
|
# Manejo de fotos para el borrador
|
|
# ----------------------------
|
|
async def handle_draft_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""
|
|
Aquí llega la foto cuando el usuario ha pulsado 'Adjuntar foto'.
|
|
Descargamos la imagen, la guardamos en el borrador y mostramos de nuevo el borrador.
|
|
"""
|
|
msg = update.message
|
|
if msg is None or not msg.photo:
|
|
return
|
|
|
|
state = get_state(context)
|
|
draft = get_draft(context)
|
|
if not draft:
|
|
return
|
|
|
|
if state != "waiting_photo":
|
|
# Si no estamos esperando una foto, ignoramos (o podríamos decidir otra cosa)
|
|
return
|
|
|
|
# Opcional: borrar el mensaje de foto del usuario para limpiar el chat
|
|
try:
|
|
await msg.delete()
|
|
except Exception:
|
|
pass
|
|
|
|
image_paths = await download_photos(context, msg.photo)
|
|
draft["image_paths"] = image_paths
|
|
set_state(context, "idle")
|
|
await refresh_draft_message(context)
|
|
|
|
|
|
# ----------------------------
|
|
# Publicar borrador
|
|
# ----------------------------
|
|
async def finalize_and_post_draft(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
query = update.callback_query
|
|
draft = get_draft(context)
|
|
if not draft:
|
|
try:
|
|
await query.message.edit_text("No hay borrador para publicar.")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
title = draft.get("title") or ""
|
|
description = draft.get("description") or ""
|
|
base_text_parts: List[str] = []
|
|
|
|
if title:
|
|
base_text_parts.append(title)
|
|
if description:
|
|
if title:
|
|
base_text_parts.append("")
|
|
base_text_parts.append(description)
|
|
|
|
base_text = "\n".join(base_text_parts) if base_text_parts else ""
|
|
|
|
extra_lines: List[str] = []
|
|
|
|
if draft.get("date"):
|
|
extra_lines.append(f"📅 {draft['date']}")
|
|
|
|
ts = draft.get("time_start") or ""
|
|
te = draft.get("time_end") or ""
|
|
if ts and te:
|
|
extra_lines.append(f"⏰ {ts} / {te}")
|
|
elif ts:
|
|
extra_lines.append(f"⏰ {ts}")
|
|
|
|
if draft.get("hashtags"):
|
|
extra_lines.append(draft["hashtags"])
|
|
|
|
final_text = base_text
|
|
if extra_lines:
|
|
if final_text:
|
|
final_text += "\n\n" + "\n".join(extra_lines)
|
|
else:
|
|
final_text = "\n".join(extra_lines)
|
|
|
|
payload = PostPayload(
|
|
text=final_text,
|
|
image_paths=draft.get("image_paths", []),
|
|
)
|
|
|
|
try:
|
|
post_to_mastodon(payload)
|
|
except Exception as e:
|
|
log.exception(f"[Mastodon] Error: {e}")
|
|
|
|
try:
|
|
post_to_instagram(payload)
|
|
except Exception as e:
|
|
log.exception(f"[Instagram] Error: {e}")
|
|
|
|
context.user_data.pop(DRAFT_KEY, None)
|
|
set_state(context, "idle")
|
|
|
|
try:
|
|
await query.message.edit_text("Publicado en redes ✅")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ----------------------------
|
|
# main()
|
|
# ----------------------------
|
|
def main():
|
|
app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
|
|
|
|
# Único flujo: /publicar + botones + textos + fotos del borrador
|
|
app.add_handler(CommandHandler("publicar", start_publish))
|
|
app.add_handler(CallbackQueryHandler(draft_keyboard_callback, pattern=r"^draft:"))
|
|
|
|
# Texto para título/descripcion/hashtags/hora/fecha
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_draft_text))
|
|
|
|
# Fotos para "Adjuntar foto"
|
|
app.add_handler(MessageHandler(filters.PHOTO, handle_draft_photo))
|
|
|
|
log.info("Bot escuchando /publicar…")
|
|
app.run_polling(
|
|
allowed_updates=["message", "callback_query"],
|
|
drop_pending_updates=False,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|