import os import tempfile from dataclasses import dataclass from typing import Optional, List from dotenv import load_dotenv from mastodon import Mastodon import requests from telegram import ( Update, Message, PhotoSize, InlineKeyboardButton, InlineKeyboardMarkup, ) from telegram.ext import ( ApplicationBuilder, ContextTypes, MessageHandler, CommandHandler, CallbackQueryHandler, filters, ) import logging import sys import re # --------------------------------- # Config & logging # --------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s" ) log = logging.getLogger("enresocial_bot") load_dotenv() def get_int_env(name: str, default: int = 0) -> int: """ Lee una variable de entorno como int. Si está vacía o mal formateada, devuelve default y escribe un warning. """ raw = os.getenv(name, "").strip() if not raw: return default try: return int(raw) except ValueError: log.warning(f"Variable de entorno {name}='{raw}' no es un entero válido. Usando {default}.") return default TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") # Si algún día quieres limitar /publicar a un chat concreto, # puedes usar estos IDs (de momento no se filtra por ellos). CHANNEL_ID = get_int_env("TELEGRAM_CHANNEL_ID", 0) GROUP_ID = get_int_env("TELEGRAM_GROUP_ID", 0) # Mastodon MASTODON_BASE_URL = os.getenv("MASTODON_BASE_URL") MASTODON_ACCESS_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN") # Instagram IG_BUSINESS_ACCOUNT_ID = os.getenv("IG_BUSINESS_ACCOUNT_ID") IG_ACCESS_TOKEN = os.getenv("IG_ACCESS_TOKEN") PUBLIC_UPLOAD_BASE_URL = os.getenv("PUBLIC_UPLOAD_BASE_URL") UPLOAD_DIR = os.getenv("UPLOAD_DIR", "./uploads") os.makedirs(UPLOAD_DIR, exist_ok=True) if not TELEGRAM_TOKEN: log.error("Falta TELEGRAM_TOKEN en el .env") sys.exit(1) @dataclass class PostPayload: text: str image_paths: List[str] async def download_photos(context: ContextTypes.DEFAULT_TYPE, photos: List[PhotoSize]) -> List[str]: """ Descarga la foto de mayor resolución a un archivo temporal y devuelve la lista de rutas. """ if not photos: return [] largest = max(photos, key=lambda p: p.width * p.height) file = await context.bot.get_file(largest.file_id) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg", dir=UPLOAD_DIR) as tmp: await file.download_to_drive(custom_path=tmp.name) log.info(f"[TG] Foto descargada: {tmp.name}") return [tmp.name] def mastodon_client() -> Optional[Mastodon]: if not (MASTODON_BASE_URL and MASTODON_ACCESS_TOKEN): return None return Mastodon(api_base_url=MASTODON_BASE_URL, access_token=MASTODON_ACCESS_TOKEN) def post_to_mastodon(payload: PostPayload): m = mastodon_client() if not m: log.warning("[Mastodon] No configurado, omito.") return media_ids = [] for path in payload.image_paths: media = m.media_post(path, mime_type="image/jpeg") media_ids.append(media["id"]) log.info(f"[Mastodon] Media subida: {media['id']}") m.status_post(status=payload.text or "", media_ids=media_ids or None, visibility="public") log.info("[Mastodon] Publicado.") def ensure_public_url(local_path: str) -> str: """ Convierte una ruta local en una URL pública donde Instagram pueda descargar la imagen. """ if not PUBLIC_UPLOAD_BASE_URL: raise RuntimeError("Falta PUBLIC_UPLOAD_BASE_URL para Instagram.") filename = os.path.basename(local_path) return f"{PUBLIC_UPLOAD_BASE_URL.rstrip('/')}/{filename}" def post_to_instagram(payload: PostPayload): """ Publicación a Instagram Business usando Instagram Graph API. """ if not (IG_BUSINESS_ACCOUNT_ID and IG_ACCESS_TOKEN): log.warning("[Instagram] No configurado, omito.") return if not payload.image_paths: log.warning("[Instagram] No hay imagen para publicar (IG requiere imagen/video).") return img_url = ensure_public_url(payload.image_paths[0]) media_resp = requests.post( f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media", data={ "image_url": img_url, "caption": payload.text or "", "access_token": IG_ACCESS_TOKEN, }, timeout=60, ) media_resp.raise_for_status() creation_id = media_resp.json().get("id") publish_resp = requests.post( f"https://graph.facebook.com/v21.0/{IG_BUSINESS_ACCOUNT_ID}/media_publish", data={"creation_id": creation_id, "access_token": IG_ACCESS_TOKEN}, timeout=60, ) publish_resp.raise_for_status() log.info("[Instagram] Publicado.") # ---------------------------- # Borradores para /publicar # ---------------------------- DRAFT_KEY = "current_draft" # 'idle', 'waiting_title', 'waiting_description', 'waiting_hashtags', # 'waiting_time', 'waiting_date', 'waiting_photo' DRAFT_STATE_KEY = "draft_state" def init_draft( context: ContextTypes.DEFAULT_TYPE, chat_id: int, title: str = "", description: str = "", image_paths: Optional[List[str]] = None, ): if image_paths is None: image_paths = [] context.user_data[DRAFT_KEY] = { "chat_id": chat_id, "title": title, "description": description, "hashtags": "", "time_start": "", "time_end": "", "date": "", "image_paths": image_paths, "message_id": None, # id del mensaje de preview } context.user_data[DRAFT_STATE_KEY] = "idle" def get_draft(context: ContextTypes.DEFAULT_TYPE) -> Optional[dict]: return context.user_data.get(DRAFT_KEY) def set_state(context: ContextTypes.DEFAULT_TYPE, state: str): context.user_data[DRAFT_STATE_KEY] = state def get_state(context: ContextTypes.DEFAULT_TYPE) -> str: return context.user_data.get(DRAFT_STATE_KEY, "idle") def build_draft_preview_text(draft: dict) -> str: title = draft.get("title") or "—" description = draft.get("description") or "—" hashtags = draft.get("hashtags") or "—" date = draft.get("date") or "—" ts = draft.get("time_start") or "" te = draft.get("time_end") or "" if ts and te: time_str = f"{ts} / {te}" elif ts: time_str = ts else: time_str = "—" has_image = "Sí ✅" if draft.get("image_paths") else "No —" text = ( "✏️ *Borrador de publicación*\n\n" f"📌 *Título*: {title}\n\n" f"📝 *Descripción:*\n{description}\n\n" f"🏷 Hashtags: {hashtags}\n" f"⏰ Horario: {time_str}\n" f"📅 Fecha: {date}\n" f"🖼 Imagen: {has_image}" ) return text def build_draft_keyboard() -> InlineKeyboardMarkup: keyboard = [ [ InlineKeyboardButton("Título", callback_data="draft:title"), InlineKeyboardButton("Descripción", callback_data="draft:description"), ], [ InlineKeyboardButton("Hashtag", callback_data="draft:hashtags"), InlineKeyboardButton("Horario", callback_data="draft:time"), InlineKeyboardButton("Fecha", callback_data="draft:date"), ], [ InlineKeyboardButton("📷 Adjuntar foto", callback_data="draft:photo"), ], [ InlineKeyboardButton("✅ Publicar", callback_data="draft:publish"), InlineKeyboardButton("❌ Cancelar", callback_data="draft:cancel"), ], ] return InlineKeyboardMarkup(keyboard) async def refresh_draft_message(context: ContextTypes.DEFAULT_TYPE): """ Elimina el mensaje de preview anterior (si existe) y envía uno nuevo con el borrador actualizado + botones. """ draft = get_draft(context) if not draft: return chat_id = draft["chat_id"] old_message_id = draft.get("message_id") if old_message_id: try: await context.bot.delete_message(chat_id=chat_id, message_id=old_message_id) except Exception as e: log.warning(f"No se pudo borrar el mensaje anterior del borrador: {e}") sent = await context.bot.send_message( chat_id=chat_id, text=build_draft_preview_text(draft), reply_markup=build_draft_keyboard(), parse_mode="Markdown", ) draft["message_id"] = sent.message_id # ---------------------------- # Comando /publicar # ---------------------------- async def start_publish(update: Update, context: ContextTypes.DEFAULT_TYPE): msg = update.message if msg is None: return full_text = msg.text or "" parts = full_text.split(" ", 1) # Texto que venga detrás de /publicar lo usamos como descripción inicial base_description = "" if len(parts) >= 2 and parts[1].strip(): base_description = parts[1].strip() image_paths: List[str] = [] # Si el usuario lanzó /publicar con una foto ya adjunta, también la usamos if msg.photo: image_paths = await download_photos(context, msg.photo) init_draft(context, msg.chat.id, title="", description=base_description, image_paths=image_paths) # Primer mensaje interactivo con los botones await refresh_draft_message(context) # ---------------------------- # Callback de botones # ---------------------------- async def draft_keyboard_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query await query.answer() data = query.data or "" if not data.startswith("draft:"): return draft = get_draft(context) if not draft: try: await query.message.edit_text("No hay borrador activo.") except Exception: pass return action = data.split(":", 1)[1] if action == "title": set_state(context, "waiting_title") await query.message.reply_text( "Escribe ahora el *título* de la publicación.", parse_mode="Markdown", ) elif action == "description": set_state(context, "waiting_description") await query.message.reply_text( "Escribe ahora la *descripción* de la publicación.", parse_mode="Markdown", ) elif action == "hashtags": set_state(context, "waiting_hashtags") await query.message.reply_text( "Escribe ahora los *hashtags* (por ejemplo: `#feminismo #sostenibilidad`).", parse_mode="Markdown", ) elif action == "time": set_state(context, "waiting_time") await query.message.reply_text( "Escribe ahora el *horario* en formato `HH:MM / HH:MM` (por ejemplo: `19:30 / 21:30`).", parse_mode="Markdown", ) elif action == "date": set_state(context, "waiting_date") await query.message.reply_text( "Escribe ahora la *fecha* en formato `DD/MM/AAAA` (por ejemplo: `13/04/2025`).", parse_mode="Markdown", ) elif action == "photo": set_state(context, "waiting_photo") await query.message.reply_text( "Ahora envía la *foto* como un mensaje normal (sin comandos). " "Se usará como imagen de la publicación.", parse_mode="Markdown", ) elif action == "publish": await finalize_and_post_draft(update, context) elif action == "cancel": context.user_data.pop(DRAFT_KEY, None) set_state(context, "idle") try: await query.message.edit_text("Publicación cancelada.") except Exception: pass # ---------------------------- # Manejo de texto para el borrador # ---------------------------- async def handle_draft_text(update: Update, context: ContextTypes.DEFAULT_TYPE): """ Aquí llega el texto que el usuario escribe DESPUÉS de pulsar un botón (Título, Descripción, Hashtags, Hora, Fecha). Guardamos el dato y volvemos a mostrar el borrador con los botones. """ msg = update.message if msg is None or not msg.text: return state = get_state(context) draft = get_draft(context) if not draft: return text = msg.text.strip() # Opcional: borrar el mensaje de texto del usuario para que el chat quede “limpio” try: await msg.delete() except Exception: pass if state == "waiting_title": draft["title"] = text set_state(context, "idle") await refresh_draft_message(context) return if state == "waiting_description": draft["description"] = text set_state(context, "idle") await refresh_draft_message(context) return if state == "waiting_hashtags": draft["hashtags"] = text set_state(context, "idle") await refresh_draft_message(context) return if state == "waiting_time": m = re.match(r"^\s*(\d{1,2}:\d{2})\s*/\s*(\d{1,2}:\d{2})\s*$", text) if not m: await msg.reply_text( "Formato de horario no válido. Usa `HH:MM / HH:MM`, por ejemplo `19:30 / 21:30`.", parse_mode="Markdown", ) return start, end = m.group(1), m.group(2) draft["time_start"] = start draft["time_end"] = end set_state(context, "idle") await refresh_draft_message(context) return if state == "waiting_date": if not re.match(r"^\d{1,2}/\d{1,2}/\d{4}$", text): await msg.reply_text( "Formato de fecha no válido. Usa `DD/MM/AAAA`, por ejemplo `13/04/2025`.", parse_mode="Markdown", ) return draft["date"] = text set_state(context, "idle") await refresh_draft_message(context) return # Si no estamos esperando nada, ignoramos return # ---------------------------- # Manejo de fotos para el borrador # ---------------------------- async def handle_draft_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """ Aquí llega la foto cuando el usuario ha pulsado 'Adjuntar foto'. Descargamos la imagen, la guardamos en el borrador y mostramos de nuevo el borrador. """ msg = update.message if msg is None or not msg.photo: return state = get_state(context) draft = get_draft(context) if not draft: return if state != "waiting_photo": # Si no estamos esperando una foto, ignoramos (o podríamos decidir otra cosa) return # Opcional: borrar el mensaje de foto del usuario para limpiar el chat try: await msg.delete() except Exception: pass image_paths = await download_photos(context, msg.photo) draft["image_paths"] = image_paths set_state(context, "idle") await refresh_draft_message(context) # ---------------------------- # Publicar borrador # ---------------------------- async def finalize_and_post_draft(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query draft = get_draft(context) if not draft: try: await query.message.edit_text("No hay borrador para publicar.") except Exception: pass return title = draft.get("title") or "" description = draft.get("description") or "" base_text_parts: List[str] = [] if title: base_text_parts.append(title) if description: if title: base_text_parts.append("") base_text_parts.append(description) base_text = "\n".join(base_text_parts) if base_text_parts else "" extra_lines: List[str] = [] if draft.get("date"): extra_lines.append(f"📅 {draft['date']}") ts = draft.get("time_start") or "" te = draft.get("time_end") or "" if ts and te: extra_lines.append(f"⏰ {ts} / {te}") elif ts: extra_lines.append(f"⏰ {ts}") if draft.get("hashtags"): extra_lines.append(draft["hashtags"]) final_text = base_text if extra_lines: if final_text: final_text += "\n\n" + "\n".join(extra_lines) else: final_text = "\n".join(extra_lines) payload = PostPayload( text=final_text, image_paths=draft.get("image_paths", []), ) try: post_to_mastodon(payload) except Exception as e: log.exception(f"[Mastodon] Error: {e}") try: post_to_instagram(payload) except Exception as e: log.exception(f"[Instagram] Error: {e}") context.user_data.pop(DRAFT_KEY, None) set_state(context, "idle") try: await query.message.edit_text("Publicado en redes ✅") except Exception: pass # ---------------------------- # main() # ---------------------------- def main(): app = ApplicationBuilder().token(TELEGRAM_TOKEN).build() # Único flujo: /publicar + botones + textos + fotos del borrador app.add_handler(CommandHandler("publicar", start_publish)) app.add_handler(CallbackQueryHandler(draft_keyboard_callback, pattern=r"^draft:")) # Texto para título/descripcion/hashtags/hora/fecha app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_draft_text)) # Fotos para "Adjuntar foto" app.add_handler(MessageHandler(filters.PHOTO, handle_draft_photo)) log.info("Bot escuchando /publicar…") app.run_polling( allowed_updates=["message", "callback_query"], drop_pending_updates=False, ) if __name__ == "__main__": main()