fosfeno/backend/server.py
hacklab 30a09fdee6 FOSFENO: motor de visuales audio-reactivas para Raspberry Pi
Primera version. Cinco motores (projectM, Butterchurn, Hydra, Shaders GLSL y mezclador VJ con camara y video), panel de control web, deteccion de BPM propia, pantalla de conexion con codigo QR, instalador robusto para Raspberry Pi 4 y 5 y documentacion completa en docs/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 14:18:19 +02:00

488 lines
16 KiB
Python

#!/usr/bin/env python3
"""
FOSFENO - Motor de visuales audio-reactivas para Raspberry Pi OS.
Un unico proceso que:
- sirve el PANEL de control (http://<ip>/)
- sirve el ESCENARIO (http://<ip>/stage) -> lo abre Chromium en kiosko
- lanza/cierra Chromium y el visualizador nativo projectM
- mantiene el estado y lo sincroniza con panel y escenario via WebSocket
Motores: projectm (nativo), butterchurn, hydra, shaders (GLSL) y mixer
(mezclador VJ basado en Hydra: camara + video + efectos).
"""
import json
import shutil
import socket
import subprocess
import threading
import time
from pathlib import Path
from flask import Flask, send_from_directory
from flask_socketio import SocketIO
# --------------------------------------------------------------------------
# Rutas y configuracion
# --------------------------------------------------------------------------
BASE = Path(__file__).resolve().parent.parent # .../FOSFENO
WEB = BASE / "web"
DATA = BASE / "data"
VIDEOS = DATA / "videos"
VIDEO_EXT = (".mp4", ".webm", ".mov", ".m4v", ".ogv")
def load_json(path, default):
try:
with open(path, encoding="utf-8") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
print(f"[FOSFENO] No se pudo leer {path}: {exc}")
return default
CFG = load_json(BASE / "config.json", {})
HOST = CFG.get("server", {}).get("host", "0.0.0.0")
PORT = int(CFG.get("server", {}).get("port", 80))
app = Flask(__name__, static_folder=None)
socketio = SocketIO(app, async_mode="threading", cors_allowed_origins="*")
# --------------------------------------------------------------------------
# Datos cargados de disco
# --------------------------------------------------------------------------
HYDRA_SKETCHES = load_json(DATA / "hydra-sketches.json", {})
SHADERS = load_json(DATA / "shaders.json", {})
DEFAULTS = CFG.get("defaults", {})
ENGINES = ("projectm", "butterchurn", "hydra", "shaders", "mixer")
def first_code(table, key, field="code"):
"""Devuelve el codigo de una entrada de un diccionario de presets."""
entry = table.get(key) or next(iter(table.values()), {})
return entry.get(field, "") if isinstance(entry, dict) else ""
def list_videos():
if not VIDEOS.is_dir():
return []
return sorted(f.name for f in VIDEOS.iterdir()
if f.suffix.lower() in VIDEO_EXT)
# --------------------------------------------------------------------------
# Estado global (se difunde a todos los clientes)
# --------------------------------------------------------------------------
NOTIFY_HISTORY = [] # ultimos avisos mostrados en el panel
state = {
"engine": DEFAULTS.get("engine", "butterchurn"),
"power": True,
"sensitivity": float(DEFAULTS.get("sensitivity", 1.0)),
"audio": {"device": "", "bpm": 0},
"butterchurn": {
"preset": "", "shuffle": True,
"interval": 20, "intervalMode": "seconds", "blendTime": 2.7,
},
"hydra": {
"label": "",
"code": first_code(HYDRA_SKETCHES, DEFAULTS.get("hydraSketch", "")),
},
"shaders": {
"label": "",
"code": first_code(SHADERS, DEFAULTS.get("shader", "")),
},
"mixer": {
"source": "cam", "camOn": True, "cameraId": 0, "video": "",
"mix": 0.5, "blendMode": "blend",
"hue": 0.0, "saturate": 1.0, "contrast": 1.0, "brightness": 0.0,
"colorama": 0.0, "posterize": 0, "pixelate": 0, "kaleid": 0,
"rotate": 0.0, "feedback": 0.0, "invert": False, "beatPulse": False,
},
"projectm": {"available": False},
"meta": {
"butterchurnPresets": [],
"audioDevices": [],
"cameraDevices": [],
"videos": list_videos(),
},
"status": {"label": ""},
"notifications": NOTIFY_HISTORY,
"network": {"ip": "", "hostname": "", "port": PORT, "panelSeen": False},
}
lock = threading.Lock()
def broadcast():
socketio.emit("state", state)
def notify(level, message):
"""Registra un aviso y lo envia al panel.
level puede ser 'info', 'warn' o 'error'. Asi el usuario nunca se queda
sin saber que ha pasado: cualquier fallo aparece en la parte de arriba
del panel de control.
"""
entry = {"level": level, "message": str(message), "t": int(time.time())}
print(f"[FOSFENO] aviso({level}): {message}")
NOTIFY_HISTORY.append(entry)
while len(NOTIFY_HISTORY) > 20:
NOTIFY_HISTORY.pop(0)
try:
socketio.emit("notify", entry)
except Exception:
pass
def get_lan_ip():
"""Devuelve la IP de la Raspberry en la red local."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0]
except OSError:
return "127.0.0.1"
finally:
sock.close()
def refresh_network():
"""Actualiza la IP y el nombre de red de la Raspberry. Devuelve True si la
IP ha cambiado, para avisar al panel y al escenario."""
new_ip = get_lan_ip()
changed = state["network"]["ip"] != new_ip
state["network"]["ip"] = new_ip
state["network"]["hostname"] = socket.gethostname()
return changed
# --------------------------------------------------------------------------
# Gestion de procesos (Chromium kiosko y projectM nativo)
# --------------------------------------------------------------------------
procs = {"chromium": None, "projectm": None}
def is_running(proc):
return proc is not None and proc.poll() is None
def start_chromium():
"""Abre el escenario en Chromium a pantalla completa."""
if is_running(procs["chromium"]):
return
kiosk = CFG.get("kiosk", {})
browser = shutil.which(kiosk.get("browser", "chromium-browser")) \
or shutil.which("chromium")
if not browser:
notify("error", "No se encontro Chromium. Las visuales web no pueden "
"mostrarse. Ejecuta install.sh para instalarlo.")
return
url = kiosk.get("url", f"http://localhost:{PORT}/stage")
procs["chromium"] = subprocess.Popen([
browser,
"--kiosk", f"--app={url}",
"--use-fake-ui-for-media-stream", # acepta micro y camara sin preguntar
"--autoplay-policy=no-user-gesture-required",
"--noerrdialogs", "--disable-infobars", "--disable-translate",
"--disable-features=Translate",
"--check-for-update-interval=31536000",
"--ozone-platform-hint=auto",
])
print("[FOSFENO] Chromium abierto en", url)
def stop_projectm():
proc = procs["projectm"]
if is_running(proc):
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
procs["projectm"] = None
def start_projectm():
"""Lanza projectM nativo; su ventana se superpone a Chromium."""
stop_projectm()
script = BASE / "scripts" / "start-projectm.sh"
try:
procs["projectm"] = subprocess.Popen(["bash", str(script)])
print("[FOSFENO] projectM lanzado.")
except OSError as exc:
notify("error", f"No se pudo lanzar projectM: {exc}. "
"Prueba con otro motor de visuales.")
def projectm_key(key):
"""Envia una tecla a projectM (solo funciona en sesion X11, no Wayland)."""
if not shutil.which("xdotool"):
return
subprocess.run(
["xdotool", "search", "--name", "projectM",
"key", "--clearmodifiers", key],
timeout=3, check=False,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
def apply_engine():
"""Arranca o detiene projectM segun el motor y el estado de encendido."""
use_native = state["engine"] == "projectm" and state["power"]
if state["engine"] == "projectm" and not state["projectm"]["available"]:
notify("warn", "projectM no esta instalado en esta Raspberry. "
"Elige otro motor o vuelve a compilarlo con "
"scripts/build-projectm.sh.")
if use_native and state["projectm"]["available"]:
start_projectm()
else:
stop_projectm()
# --------------------------------------------------------------------------
# Rutas HTTP (panel, escenario, librerias, datos)
# --------------------------------------------------------------------------
@app.route("/")
def panel_index():
return send_from_directory(str(WEB / "panel"), "index.html")
@app.route("/panel/<path:fname>")
def panel_files(fname):
return send_from_directory(str(WEB / "panel"), fname)
@app.route("/stage")
def stage_index():
return send_from_directory(str(WEB / "stage"), "index.html")
@app.route("/stage/<path:fname>")
def stage_files(fname):
return send_from_directory(str(WEB / "stage"), fname)
@app.route("/lib/<path:fname>")
def lib_files(fname):
return send_from_directory(str(WEB / "lib"), fname)
@app.route("/data/<path:fname>")
def data_files(fname):
return send_from_directory(str(DATA), fname)
# --------------------------------------------------------------------------
# Eventos WebSocket
# --------------------------------------------------------------------------
@socketio.on("connect")
def on_connect():
socketio.emit("state", state)
@socketio.on("hello")
def on_hello(data):
"""El panel se presenta al conectarse. Asi el escenario sabe que alguien
ya ha encontrado el panel y puede dejar de mostrar el codigo QR."""
if (data or {}).get("role") == "panel" and not state["network"]["panelSeen"]:
with lock:
state["network"]["panelSeen"] = True
broadcast()
@socketio.on("set_engine")
def on_set_engine(data):
engine = (data or {}).get("engine")
if engine not in ENGINES:
return
with lock:
state["engine"] = engine
apply_engine()
broadcast()
@socketio.on("set_power")
def on_set_power(data):
with lock:
state["power"] = bool((data or {}).get("on"))
apply_engine()
broadcast()
@socketio.on("set_sensitivity")
def on_set_sensitivity(data):
try:
value = float((data or {}).get("value", 1.0))
except (TypeError, ValueError):
return
with lock:
state["sensitivity"] = max(0.0, min(4.0, value))
broadcast()
@socketio.on("run_code")
def on_run_code(data):
"""Ejecuta codigo (Hydra o GLSL) escrito/pegado o un fragmento de la libreria."""
engine = (data or {}).get("engine")
code = (data or {}).get("code")
label = (data or {}).get("label", "codigo personalizado")
if engine in ("hydra", "shaders") and isinstance(code, str):
with lock:
state[engine]["code"] = code
state[engine]["label"] = label
state["status"]["label"] = label
broadcast()
@socketio.on("update_settings")
def on_update_settings(data):
"""Actualiza ajustes de un motor: butterchurn, mixer o audio."""
engine = (data or {}).get("engine")
patch = (data or {}).get("patch", {})
if engine in ("butterchurn", "mixer", "audio") and isinstance(patch, dict):
with lock:
state[engine].update(patch)
broadcast()
@socketio.on("engine_command")
def on_engine_command(data):
action = (data or {}).get("action")
if state["engine"] == "projectm":
projectm_key({"next": "n", "prev": "p"}.get(action, "n"))
else:
socketio.emit("stage_command", {"action": action,
"value": (data or {}).get("value")})
@socketio.on("stage_meta")
def on_stage_meta(data):
"""El escenario nos envia las listas que solo conoce el navegador."""
data = data or {}
with lock:
if isinstance(data.get("butterchurnPresets"), list):
state["meta"]["butterchurnPresets"] = data["butterchurnPresets"]
if isinstance(data.get("audioDevices"), list):
state["meta"]["audioDevices"] = data["audioDevices"]
if isinstance(data.get("cameraDevices"), list):
state["meta"]["cameraDevices"] = data["cameraDevices"]
broadcast()
@socketio.on("rescan_devices")
def on_rescan_devices():
"""Pide al escenario que vuelva a buscar microfonos y camaras."""
socketio.emit("stage_rescan")
@socketio.on("stage_status")
def on_stage_status(data):
if isinstance(data, dict):
with lock:
if "label" in data:
state["status"]["label"] = data["label"]
if "bpm" in data:
state["audio"]["bpm"] = data["bpm"]
socketio.emit("status", {"label": state["status"]["label"],
"bpm": state["audio"]["bpm"]})
@socketio.on("stage_notify")
def on_stage_notify(data):
"""El escenario (navegador) nos comunica un error para mostrarlo en el panel."""
data = data or {}
notify(data.get("level", "error"),
data.get("message", "Error en el escenario de visuales."))
@socketio.on("rescan_videos")
def on_rescan_videos():
with lock:
state["meta"]["videos"] = list_videos()
broadcast()
@socketio.on("system")
def on_system(data):
action = (data or {}).get("action")
if action == "reboot":
subprocess.Popen(["sudo", "reboot"])
elif action == "shutdown":
subprocess.Popen(["sudo", "poweroff"])
# --------------------------------------------------------------------------
# Arranque
# --------------------------------------------------------------------------
def setup_audio():
"""Marca el microfono USB como fuente de audio por defecto (best-effort)."""
match = CFG.get("audio", {}).get("matchSource", "usb").lower()
try:
out = subprocess.check_output(["pactl", "list", "short", "sources"],
text=True, timeout=5)
except (OSError, subprocess.SubprocessError):
return
for line in out.splitlines():
cols = line.split("\t")
if len(cols) >= 2 and match in cols[1].lower() \
and "monitor" not in cols[1].lower():
subprocess.run(["pactl", "set-default-source", cols[1]], check=False)
print("[FOSFENO] Microfono por defecto:", cols[1])
return
notify("warn", "No se detecto ningun microfono USB. Conecta uno y "
"reinicia, o elige la entrada a mano en el panel.")
def watchdog():
"""Reabre Chromium si se cierra (salvo cuando projectM esta activo)."""
while True:
time.sleep(10)
if refresh_network():
broadcast()
if state["engine"] != "projectm" and not is_running(procs["chromium"]):
notify("warn", "El navegador de las visuales se cerro. "
"Reabriendolo automaticamente.")
start_chromium()
def check_install():
"""Avisa al panel si la instalacion esta incompleta."""
needed = ["socket.io.min.js", "butterchurn.min.js",
"butterchurn-presets.min.js", "hydra-synth.js"]
missing = [f for f in needed if not (WEB / "lib" / f).exists()]
if missing:
notify("error", "Faltan librerias web (" + ", ".join(missing) +
"). Vuelve a ejecutar install.sh.")
if not (WEB / "lib" / "codemirror" / "codemirror.js").exists():
notify("warn", "Falta CodeMirror: el editor de codigo no funcionara. "
"Vuelve a ejecutar install.sh.")
def main():
binary = CFG.get("projectm", {}).get("binary", "projectMSDL")
state["projectm"]["available"] = shutil.which(binary) is not None
print(f"[FOSFENO] projectM nativo: "
f"{'disponible' if state['projectm']['available'] else 'no instalado'}")
print(f"[FOSFENO] Videos encontrados: {len(state['meta']['videos'])}")
check_install()
setup_audio()
refresh_network()
threading.Timer(3.0, start_chromium).start()
threading.Thread(target=watchdog, daemon=True).start()
net = state["network"]
suffix = "" if PORT == 80 else f":{PORT}"
print("[FOSFENO] ===================================================")
print("[FOSFENO] Panel de control disponible en:")
print(f"[FOSFENO] http://{net['ip']}{suffix}/")
print(f"[FOSFENO] http://{net['hostname']}.local{suffix}/")
print("[FOSFENO] ===================================================")
socketio.run(app, host=HOST, port=PORT, allow_unsafe_werkzeug=True)
if __name__ == "__main__":
main()