Anade 'bash install.sh --laptop' y el lanzador './fosfeno' para correr FOSFENO en portatiles Debian/Ubuntu/Mint sin Raspberry Pi: puerto 8080, sin arranque automatico ni cambios en el sistema. El servidor admite las variables FOSFENO_PORT y FOSFENO_NO_KIOSK. Nueva documentacion en docs/portatil.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
495 lines
16 KiB
Python
495 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
FOSFENO - Motor de visuales audio-reactivas para Raspberry Pi OS.
|
|
|
|
Un unico proceso que:
|
|
- sirve el PANEL de control (http://<ip>/)
|
|
- sirve el ESCENARIO (http://<ip>/stage) -> lo abre Chromium en kiosko
|
|
- lanza/cierra Chromium y el visualizador nativo projectM
|
|
- mantiene el estado y lo sincroniza con panel y escenario via WebSocket
|
|
|
|
Motores: projectm (nativo), butterchurn, hydra, shaders (GLSL) y mixer
|
|
(mezclador VJ basado en Hydra: camara + video + efectos).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from flask import Flask, send_from_directory
|
|
from flask_socketio import SocketIO
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Rutas y configuracion
|
|
# --------------------------------------------------------------------------
|
|
BASE = Path(__file__).resolve().parent.parent # .../FOSFENO
|
|
WEB = BASE / "web"
|
|
DATA = BASE / "data"
|
|
VIDEOS = DATA / "videos"
|
|
VIDEO_EXT = (".mp4", ".webm", ".mov", ".m4v", ".ogv")
|
|
|
|
|
|
def load_json(path, default):
|
|
try:
|
|
with open(path, encoding="utf-8") as fh:
|
|
return json.load(fh)
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
print(f"[FOSFENO] No se pudo leer {path}: {exc}")
|
|
return default
|
|
|
|
|
|
CFG = load_json(BASE / "config.json", {})
|
|
HOST = CFG.get("server", {}).get("host", "0.0.0.0")
|
|
# El puerto y el modo kiosko se pueden forzar por variable de entorno; asi el
|
|
# modo portatil (script 'fosfeno') usa el puerto 8080 sin tocar config.json.
|
|
PORT = int(os.environ.get("FOSFENO_PORT") or CFG.get("server", {}).get("port", 80))
|
|
NO_KIOSK = bool(os.environ.get("FOSFENO_NO_KIOSK"))
|
|
|
|
app = Flask(__name__, static_folder=None)
|
|
socketio = SocketIO(app, async_mode="threading", cors_allowed_origins="*")
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Datos cargados de disco
|
|
# --------------------------------------------------------------------------
|
|
HYDRA_SKETCHES = load_json(DATA / "hydra-sketches.json", {})
|
|
SHADERS = load_json(DATA / "shaders.json", {})
|
|
|
|
DEFAULTS = CFG.get("defaults", {})
|
|
ENGINES = ("projectm", "butterchurn", "hydra", "shaders", "mixer")
|
|
|
|
|
|
def first_code(table, key, field="code"):
|
|
"""Devuelve el codigo de una entrada de un diccionario de presets."""
|
|
entry = table.get(key) or next(iter(table.values()), {})
|
|
return entry.get(field, "") if isinstance(entry, dict) else ""
|
|
|
|
|
|
def list_videos():
|
|
if not VIDEOS.is_dir():
|
|
return []
|
|
return sorted(f.name for f in VIDEOS.iterdir()
|
|
if f.suffix.lower() in VIDEO_EXT)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Estado global (se difunde a todos los clientes)
|
|
# --------------------------------------------------------------------------
|
|
NOTIFY_HISTORY = [] # ultimos avisos mostrados en el panel
|
|
|
|
state = {
|
|
"engine": DEFAULTS.get("engine", "butterchurn"),
|
|
"power": True,
|
|
"sensitivity": float(DEFAULTS.get("sensitivity", 1.0)),
|
|
"audio": {"device": "", "bpm": 0},
|
|
"butterchurn": {
|
|
"preset": "", "shuffle": True,
|
|
"interval": 20, "intervalMode": "seconds", "blendTime": 2.7,
|
|
},
|
|
"hydra": {
|
|
"label": "",
|
|
"code": first_code(HYDRA_SKETCHES, DEFAULTS.get("hydraSketch", "")),
|
|
},
|
|
"shaders": {
|
|
"label": "",
|
|
"code": first_code(SHADERS, DEFAULTS.get("shader", "")),
|
|
},
|
|
"mixer": {
|
|
"source": "cam", "camOn": True, "cameraId": 0, "video": "",
|
|
"mix": 0.5, "blendMode": "blend",
|
|
"hue": 0.0, "saturate": 1.0, "contrast": 1.0, "brightness": 0.0,
|
|
"colorama": 0.0, "posterize": 0, "pixelate": 0, "kaleid": 0,
|
|
"rotate": 0.0, "feedback": 0.0, "invert": False, "beatPulse": False,
|
|
},
|
|
"projectm": {"available": False},
|
|
"meta": {
|
|
"butterchurnPresets": [],
|
|
"audioDevices": [],
|
|
"cameraDevices": [],
|
|
"videos": list_videos(),
|
|
},
|
|
"status": {"label": ""},
|
|
"notifications": NOTIFY_HISTORY,
|
|
"network": {"ip": "", "hostname": "", "port": PORT, "panelSeen": False},
|
|
}
|
|
lock = threading.Lock()
|
|
|
|
|
|
def broadcast():
|
|
socketio.emit("state", state)
|
|
|
|
|
|
def notify(level, message):
|
|
"""Registra un aviso y lo envia al panel.
|
|
|
|
level puede ser 'info', 'warn' o 'error'. Asi el usuario nunca se queda
|
|
sin saber que ha pasado: cualquier fallo aparece en la parte de arriba
|
|
del panel de control.
|
|
"""
|
|
entry = {"level": level, "message": str(message), "t": int(time.time())}
|
|
print(f"[FOSFENO] aviso({level}): {message}")
|
|
NOTIFY_HISTORY.append(entry)
|
|
while len(NOTIFY_HISTORY) > 20:
|
|
NOTIFY_HISTORY.pop(0)
|
|
try:
|
|
socketio.emit("notify", entry)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def get_lan_ip():
|
|
"""Devuelve la IP de la Raspberry en la red local."""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
sock.connect(("8.8.8.8", 80))
|
|
return sock.getsockname()[0]
|
|
except OSError:
|
|
return "127.0.0.1"
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
def refresh_network():
|
|
"""Actualiza la IP y el nombre de red de la Raspberry. Devuelve True si la
|
|
IP ha cambiado, para avisar al panel y al escenario."""
|
|
new_ip = get_lan_ip()
|
|
changed = state["network"]["ip"] != new_ip
|
|
state["network"]["ip"] = new_ip
|
|
state["network"]["hostname"] = socket.gethostname()
|
|
return changed
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Gestion de procesos (Chromium kiosko y projectM nativo)
|
|
# --------------------------------------------------------------------------
|
|
procs = {"chromium": None, "projectm": None}
|
|
|
|
|
|
def is_running(proc):
|
|
return proc is not None and proc.poll() is None
|
|
|
|
|
|
def start_chromium():
|
|
"""Abre el escenario en Chromium a pantalla completa."""
|
|
if is_running(procs["chromium"]):
|
|
return
|
|
kiosk = CFG.get("kiosk", {})
|
|
browser = shutil.which(kiosk.get("browser", "chromium-browser")) \
|
|
or shutil.which("chromium")
|
|
if not browser:
|
|
notify("error", "No se encontro Chromium. Las visuales web no pueden "
|
|
"mostrarse. Ejecuta install.sh para instalarlo.")
|
|
return
|
|
url = f"http://localhost:{PORT}/stage"
|
|
procs["chromium"] = subprocess.Popen([
|
|
browser,
|
|
"--kiosk", f"--app={url}",
|
|
"--use-fake-ui-for-media-stream", # acepta micro y camara sin preguntar
|
|
"--autoplay-policy=no-user-gesture-required",
|
|
"--noerrdialogs", "--disable-infobars", "--disable-translate",
|
|
"--disable-features=Translate",
|
|
"--check-for-update-interval=31536000",
|
|
"--ozone-platform-hint=auto",
|
|
])
|
|
print("[FOSFENO] Chromium abierto en", url)
|
|
|
|
|
|
def stop_projectm():
|
|
proc = procs["projectm"]
|
|
if is_running(proc):
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
procs["projectm"] = None
|
|
|
|
|
|
def start_projectm():
|
|
"""Lanza projectM nativo; su ventana se superpone a Chromium."""
|
|
stop_projectm()
|
|
script = BASE / "scripts" / "start-projectm.sh"
|
|
try:
|
|
procs["projectm"] = subprocess.Popen(["bash", str(script)])
|
|
print("[FOSFENO] projectM lanzado.")
|
|
except OSError as exc:
|
|
notify("error", f"No se pudo lanzar projectM: {exc}. "
|
|
"Prueba con otro motor de visuales.")
|
|
|
|
|
|
def projectm_key(key):
|
|
"""Envia una tecla a projectM (solo funciona en sesion X11, no Wayland)."""
|
|
if not shutil.which("xdotool"):
|
|
return
|
|
subprocess.run(
|
|
["xdotool", "search", "--name", "projectM",
|
|
"key", "--clearmodifiers", key],
|
|
timeout=3, check=False,
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
|
|
def apply_engine():
|
|
"""Arranca o detiene projectM segun el motor y el estado de encendido."""
|
|
use_native = state["engine"] == "projectm" and state["power"]
|
|
if state["engine"] == "projectm" and not state["projectm"]["available"]:
|
|
notify("warn", "projectM no esta instalado en esta Raspberry. "
|
|
"Elige otro motor o vuelve a compilarlo con "
|
|
"scripts/build-projectm.sh.")
|
|
if use_native and state["projectm"]["available"]:
|
|
start_projectm()
|
|
else:
|
|
stop_projectm()
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Rutas HTTP (panel, escenario, librerias, datos)
|
|
# --------------------------------------------------------------------------
|
|
@app.route("/")
|
|
def panel_index():
|
|
return send_from_directory(str(WEB / "panel"), "index.html")
|
|
|
|
|
|
@app.route("/panel/<path:fname>")
|
|
def panel_files(fname):
|
|
return send_from_directory(str(WEB / "panel"), fname)
|
|
|
|
|
|
@app.route("/stage")
|
|
def stage_index():
|
|
return send_from_directory(str(WEB / "stage"), "index.html")
|
|
|
|
|
|
@app.route("/stage/<path:fname>")
|
|
def stage_files(fname):
|
|
return send_from_directory(str(WEB / "stage"), fname)
|
|
|
|
|
|
@app.route("/lib/<path:fname>")
|
|
def lib_files(fname):
|
|
return send_from_directory(str(WEB / "lib"), fname)
|
|
|
|
|
|
@app.route("/data/<path:fname>")
|
|
def data_files(fname):
|
|
return send_from_directory(str(DATA), fname)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Eventos WebSocket
|
|
# --------------------------------------------------------------------------
|
|
@socketio.on("connect")
|
|
def on_connect():
|
|
socketio.emit("state", state)
|
|
|
|
|
|
@socketio.on("hello")
|
|
def on_hello(data):
|
|
"""El panel se presenta al conectarse. Asi el escenario sabe que alguien
|
|
ya ha encontrado el panel y puede dejar de mostrar el codigo QR."""
|
|
if (data or {}).get("role") == "panel" and not state["network"]["panelSeen"]:
|
|
with lock:
|
|
state["network"]["panelSeen"] = True
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("set_engine")
|
|
def on_set_engine(data):
|
|
engine = (data or {}).get("engine")
|
|
if engine not in ENGINES:
|
|
return
|
|
with lock:
|
|
state["engine"] = engine
|
|
apply_engine()
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("set_power")
|
|
def on_set_power(data):
|
|
with lock:
|
|
state["power"] = bool((data or {}).get("on"))
|
|
apply_engine()
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("set_sensitivity")
|
|
def on_set_sensitivity(data):
|
|
try:
|
|
value = float((data or {}).get("value", 1.0))
|
|
except (TypeError, ValueError):
|
|
return
|
|
with lock:
|
|
state["sensitivity"] = max(0.0, min(4.0, value))
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("run_code")
|
|
def on_run_code(data):
|
|
"""Ejecuta codigo (Hydra o GLSL) escrito/pegado o un fragmento de la libreria."""
|
|
engine = (data or {}).get("engine")
|
|
code = (data or {}).get("code")
|
|
label = (data or {}).get("label", "codigo personalizado")
|
|
if engine in ("hydra", "shaders") and isinstance(code, str):
|
|
with lock:
|
|
state[engine]["code"] = code
|
|
state[engine]["label"] = label
|
|
state["status"]["label"] = label
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("update_settings")
|
|
def on_update_settings(data):
|
|
"""Actualiza ajustes de un motor: butterchurn, mixer o audio."""
|
|
engine = (data or {}).get("engine")
|
|
patch = (data or {}).get("patch", {})
|
|
if engine in ("butterchurn", "mixer", "audio") and isinstance(patch, dict):
|
|
with lock:
|
|
state[engine].update(patch)
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("engine_command")
|
|
def on_engine_command(data):
|
|
action = (data or {}).get("action")
|
|
if state["engine"] == "projectm":
|
|
projectm_key({"next": "n", "prev": "p"}.get(action, "n"))
|
|
else:
|
|
socketio.emit("stage_command", {"action": action,
|
|
"value": (data or {}).get("value")})
|
|
|
|
|
|
@socketio.on("stage_meta")
|
|
def on_stage_meta(data):
|
|
"""El escenario nos envia las listas que solo conoce el navegador."""
|
|
data = data or {}
|
|
with lock:
|
|
if isinstance(data.get("butterchurnPresets"), list):
|
|
state["meta"]["butterchurnPresets"] = data["butterchurnPresets"]
|
|
if isinstance(data.get("audioDevices"), list):
|
|
state["meta"]["audioDevices"] = data["audioDevices"]
|
|
if isinstance(data.get("cameraDevices"), list):
|
|
state["meta"]["cameraDevices"] = data["cameraDevices"]
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("rescan_devices")
|
|
def on_rescan_devices():
|
|
"""Pide al escenario que vuelva a buscar microfonos y camaras."""
|
|
socketio.emit("stage_rescan")
|
|
|
|
|
|
@socketio.on("stage_status")
|
|
def on_stage_status(data):
|
|
if isinstance(data, dict):
|
|
with lock:
|
|
if "label" in data:
|
|
state["status"]["label"] = data["label"]
|
|
if "bpm" in data:
|
|
state["audio"]["bpm"] = data["bpm"]
|
|
socketio.emit("status", {"label": state["status"]["label"],
|
|
"bpm": state["audio"]["bpm"]})
|
|
|
|
|
|
@socketio.on("stage_notify")
|
|
def on_stage_notify(data):
|
|
"""El escenario (navegador) nos comunica un error para mostrarlo en el panel."""
|
|
data = data or {}
|
|
notify(data.get("level", "error"),
|
|
data.get("message", "Error en el escenario de visuales."))
|
|
|
|
|
|
@socketio.on("rescan_videos")
|
|
def on_rescan_videos():
|
|
with lock:
|
|
state["meta"]["videos"] = list_videos()
|
|
broadcast()
|
|
|
|
|
|
@socketio.on("system")
|
|
def on_system(data):
|
|
action = (data or {}).get("action")
|
|
if action == "reboot":
|
|
subprocess.Popen(["sudo", "reboot"])
|
|
elif action == "shutdown":
|
|
subprocess.Popen(["sudo", "poweroff"])
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Arranque
|
|
# --------------------------------------------------------------------------
|
|
def setup_audio():
|
|
"""Marca el microfono USB como fuente de audio por defecto (best-effort)."""
|
|
match = CFG.get("audio", {}).get("matchSource", "usb").lower()
|
|
try:
|
|
out = subprocess.check_output(["pactl", "list", "short", "sources"],
|
|
text=True, timeout=5)
|
|
except (OSError, subprocess.SubprocessError):
|
|
return
|
|
for line in out.splitlines():
|
|
cols = line.split("\t")
|
|
if len(cols) >= 2 and match in cols[1].lower() \
|
|
and "monitor" not in cols[1].lower():
|
|
subprocess.run(["pactl", "set-default-source", cols[1]], check=False)
|
|
print("[FOSFENO] Microfono por defecto:", cols[1])
|
|
return
|
|
notify("warn", "No se detecto ningun microfono USB. Conecta uno y "
|
|
"reinicia, o elige la entrada a mano en el panel.")
|
|
|
|
|
|
def watchdog():
|
|
"""Reabre Chromium si se cierra (salvo cuando projectM esta activo)."""
|
|
while True:
|
|
time.sleep(10)
|
|
if refresh_network():
|
|
broadcast()
|
|
if not NO_KIOSK and state["engine"] != "projectm" \
|
|
and not is_running(procs["chromium"]):
|
|
notify("warn", "El navegador de las visuales se cerro. "
|
|
"Reabriendolo automaticamente.")
|
|
start_chromium()
|
|
|
|
|
|
def check_install():
|
|
"""Avisa al panel si la instalacion esta incompleta."""
|
|
needed = ["socket.io.min.js", "butterchurn.min.js",
|
|
"butterchurn-presets.min.js", "hydra-synth.js"]
|
|
missing = [f for f in needed if not (WEB / "lib" / f).exists()]
|
|
if missing:
|
|
notify("error", "Faltan librerias web (" + ", ".join(missing) +
|
|
"). Vuelve a ejecutar install.sh.")
|
|
if not (WEB / "lib" / "codemirror" / "codemirror.js").exists():
|
|
notify("warn", "Falta CodeMirror: el editor de codigo no funcionara. "
|
|
"Vuelve a ejecutar install.sh.")
|
|
|
|
|
|
def main():
|
|
binary = CFG.get("projectm", {}).get("binary", "projectMSDL")
|
|
state["projectm"]["available"] = shutil.which(binary) is not None
|
|
print(f"[FOSFENO] projectM nativo: "
|
|
f"{'disponible' if state['projectm']['available'] else 'no instalado'}")
|
|
print(f"[FOSFENO] Videos encontrados: {len(state['meta']['videos'])}")
|
|
|
|
check_install()
|
|
setup_audio()
|
|
refresh_network()
|
|
# En modo portatil el script 'fosfeno' abre el navegador; el servidor no.
|
|
if not NO_KIOSK:
|
|
threading.Timer(3.0, start_chromium).start()
|
|
threading.Thread(target=watchdog, daemon=True).start()
|
|
|
|
net = state["network"]
|
|
suffix = "" if PORT == 80 else f":{PORT}"
|
|
print("[FOSFENO] ===================================================")
|
|
print("[FOSFENO] Panel de control disponible en:")
|
|
print(f"[FOSFENO] http://{net['ip']}{suffix}/")
|
|
print(f"[FOSFENO] http://{net['hostname']}.local{suffix}/")
|
|
print("[FOSFENO] ===================================================")
|
|
socketio.run(app, host=HOST, port=PORT, allow_unsafe_werkzeug=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|