2026-02-19 09:22:38 +01:00

553 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/routes/api.py
import os
import time
import threading
import logging
import random
from collections import defaultdict, deque
import pygame
from flask import Blueprint, jsonify, request, current_app
import app.state as state
from app.bluetooth.manager import MouldKing, advertiser, tracer
from app.utils.helpers import load_default_sounds, load_soundboard_configs, load_soundboard_config
from config import Config
logger = logging.getLogger(__name__)
audio_lock = threading.Lock()
auto_random_lock = threading.Lock()
auto_random_timer = None
auto_random_params = {}
# Sound-Cache: path -> pygame.mixer.Sound
loaded_sounds = {}
random_history = defaultdict(deque) # sound_id -> deque[timestamps]
MAX_PER_HOUR = 2
WINDOW_SECONDS = 3600
def _ensure_mixer():
from config import Config
if not pygame.mixer.get_init():
pygame.mixer.pre_init(frequency=Config.AUDIO_FREQ,
size=Config.AUDIO_SIZE,
channels=Config.AUDIO_CHANNELS,
buffer=Config.AUDIO_BUFFER)
pygame.mixer.init()
pygame.mixer.set_num_channels(Config.AUDIO_NUM_CHANNELS)
def _load_sound(file_path):
if file_path in loaded_sounds:
return loaded_sounds[file_path]
snd = pygame.mixer.Sound(file_path)
loaded_sounds[file_path] = snd
return snd
def _play_sound_entry(sound_entry, channel_req=None, loop_req=0, sounds_dir=None):
if sounds_dir is None:
try:
sounds_dir = current_app.config['SOUNDS_DIR']
except Exception:
sounds_dir = Config.SOUNDS_DIR
base_path = sound_entry.get('base_path') or sounds_dir
file_path = os.path.join(base_path, sound_entry['file'])
if not os.path.exists(file_path):
logger.error(f"Sound-Datei nicht gefunden: {file_path}")
return jsonify({"success": False, "message": "Sound-Datei nicht gefunden"}), 404
# Abspielen (serialisiert, aber mehrere Channels erlaubt)
with audio_lock:
_ensure_mixer()
# Sound laden (Cache)
snd = _load_sound(file_path)
# Ziel-Channel bestimmen
ch = None
if channel_req not in [None, ""]:
try:
ch_id = int(channel_req)
ch = pygame.mixer.Channel(ch_id)
except Exception as e:
logger.error(f"Ungültiger channel-Wert '{channel_req}': {e}")
return jsonify({"success": False, "message": "Ungültiger Channel"}), 400
else:
ch = pygame.mixer.find_channel(True) # zwingend freien nehmen
if ch is None:
return jsonify({"success": False, "message": "Kein freier Audio-Channel verfügbar"}), 503
# Loop-Wert interpretieren
loops = -1 if loop_req in [True, "true", "True", -1, "loop", "1", 1] else 0
try:
ch.play(snd, loops=loops)
except Exception as e:
logger.exception(f"Fehler beim Abspielen von {file_path}")
return jsonify({"success": False, "message": str(e)}), 500
return None # Erfolg
def _pick_random_sound():
if state.current_soundboard is None:
return None, "Kein Soundboard geladen"
rnd_list = state.current_soundboard.get('random_pool') or state.current_soundboard.get('sounds', [])
if not rnd_list:
return None, "Keine Random-Sounds definiert"
now = time.time()
candidates = []
for sound in rnd_list:
sid = sound.get('id') or sound.get('file')
dq = _prune_history(sid, now)
if len(dq) < MAX_PER_HOUR:
candidates.append(sound)
if not candidates:
return None, "Limit erreicht (2x pro Stunde)"
sound_entry = random.choice(candidates)
sid = sound_entry.get('id') or sound_entry.get('file')
random_history[sid].append(now)
sound_entry.setdefault('id', sid)
return sound_entry, None
def _auto_random_tick():
global auto_random_timer
with auto_random_lock:
if not state.auto_random_active:
return
imin = auto_random_params.get('imin', 5)
imax = auto_random_params.get('imax', 10)
# Play one random sound
sound_entry, err = _pick_random_sound()
if sound_entry:
_play_sound_entry(sound_entry, sound_entry.get('channel'), sound_entry.get('loop'), Config.SOUNDS_DIR)
# schedule next
delay = rand_between(imin, imax) * 60
with auto_random_lock:
if state.auto_random_active:
auto_random_params['next_ts'] = time.time() + delay
auto_random_timer = threading.Timer(delay, _auto_random_tick)
auto_random_timer.daemon = True
auto_random_timer.start()
def rand_between(a, b):
return a + random.random() * (b - a)
def _prune_history(sound_id, now):
dq = random_history[sound_id]
while dq and now - dq[0] > WINDOW_SECONDS:
dq.popleft()
return dq
api_bp = Blueprint('api', __name__)
@api_bp.route('/connect', methods=['POST'])
def api_connect():
if state.current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
hub_id = int(state.current_config.get('hub_id', 0))
hub_type = state.current_config.get('hub_type', '6channel') # '4channel' oder '6channel'
# Alte Verbindung sauber trennen
if state.current_device is not None:
try:
state.current_device.Disconnect()
except:
pass
if state.current_module is not None:
# ggf. Stop vor Disconnect
try:
state.current_device.Stop()
except:
pass
# MouldKing neu initialisieren
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
# Modul holen
if hub_type.lower() in ['6channel', '6']:
state.current_module = mk.Module6_0()
else:
state.current_module = mk.Module4_0()
# Device auswählen (hub_id = 0,1,2 → Device0,1,2)
device_attr = f'Device{hub_id}'
if not hasattr(state.current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt (max 0-2)")
state.current_device = getattr(state.current_module, device_attr)
# Verbinden
state.current_device.Connect()
state.current_hub = mk # falls später noch gebraucht
logger.info(f"Verbunden: {hub_type} Hub-ID {hub_id}{device_attr}")
return jsonify({"success": True, "message": f"Hub {hub_id} ({hub_type}) verbunden"})
except Exception as e:
logger.exception("Connect-Fehler")
return jsonify({"success": False, "message": f"Verbindungsfehler: {str(e)}"}), 500
pass
@api_bp.route('/reconnect', methods=['POST'])
def api_reconnect():
if state.current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
# Alte Verbindung sauber beenden, falls nötig
if state.current_device is not None:
try:
state.current_device.Disconnect()
except:
pass
# Neu verbinden (kopierter Code aus api_connect)
hub_id = int(state.current_config.get('hub_id', 0))
hub_type = state.current_config.get('hub_type', '6channel')
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
if hub_type.lower() in ['6channel', '6']:
state.current_module = mk.Module6_0()
else:
state.current_module = mk.Module4_0()
device_attr = f'Device{hub_id}'
if not hasattr(state.current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt")
state.current_device = getattr(state.current_module, device_attr)
state.current_device.Connect()
state.current_hub = mk
logger.info(f"Re-Connect erfolgreich: Hub {hub_id}")
return jsonify({"success": True, "message": "Verbindung wiederhergestellt"})
except Exception as e:
logger.exception("Re-Connect-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@api_bp.route('/control', methods=['POST'])
def api_control():
if state.current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
data = request.get_json()
port = data['port'] # 'A', 'B', ...
value = float(data['value'])
# Port → channelId mappen (A=0, B=1, ...)
channel_map = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5}
if isinstance(port, str):
port_upper = port.upper()
if port_upper in channel_map:
channel_id = channel_map[port_upper]
else:
raise ValueError(f"Ungültiger Port: {port}")
else:
channel_id = int(port)
# Config-spezifische Anpassungen (invert, negative_only, on/off-Werte)
ch_cfg = next((c for c in state.current_config['channels'] if c['port'].upper() == port.upper()), None)
if ch_cfg:
if ch_cfg.get('invert', False):
value = -value
if ch_cfg.get('negative_only', False) and value >= 0:
value = -abs(value)
if ch_cfg['type'] != 'motor':
value = ch_cfg.get('on_value', 1.0) if value != 0 else ch_cfg.get('off_value', 0.0)
# Echter Aufruf!
state.current_device.SetChannel(channel_id, value)
logger.info(f"SetChannel({channel_id}, {value:.2f}) → Port {port}")
return jsonify({"success": True})
except Exception as e:
logger.exception("Control-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@api_bp.route('/stop_all', methods=['POST'])
def api_stop_all():
if state.current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
# Nur stoppen KEIN Disconnect!
state.current_device.Stop()
logger.info("Alle Kanäle gestoppt (Verbindung bleibt bestehen)")
return jsonify({"success": True, "message": "Alle Kanäle gestoppt"})
except Exception as e:
logger.exception("Stop-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@api_bp.route('/status', methods=['GET'])
def api_status():
if state.current_device is not None:
return jsonify({
"connected": True,
"message": "Verbunden"
})
else:
return jsonify({
"connected": False,
"message": "Keine aktive Verbindung"
}), 200
pass
@api_bp.route('/play_sound', methods=['POST'])
def api_play_sound():
try:
data = request.get_json()
sound_id = data.get('sound_id')
channel_req = data.get('channel') # optional: spezifischer Channel
loop_req = data.get('loop', 0) # optional: -1 oder true für Endlosschleife
if not sound_id:
return jsonify({"success": False, "message": "Kein sound_id angegeben"}), 400
# Sounds zusammenstellen: lokalspezifisch + globale Default-Sounds
config_sounds = state.current_config.get('sounds', []) if state.current_config else []
global_sounds = load_default_sounds(current_app.config['CONFIG_DIR'])
sb_sounds = []
if state.current_soundboard:
for key in ('backgrounds', 'sounds', 'random_pool'):
sb_sounds.extend(state.current_soundboard.get(key, []))
# IDs sicherstellen
for s in sb_sounds:
s.setdefault('id', s.get('file'))
all_sounds = list(config_sounds) + list(global_sounds) + sb_sounds
if not all_sounds:
return jsonify({"success": False, "message": "Keine Sounds konfiguriert"}), 400
sound_entry = next((s for s in all_sounds if s.get('id') == sound_id), None)
if not sound_entry:
return jsonify({"success": False, "message": f"Sound mit ID '{sound_id}' nicht gefunden"}), 404
res = _play_sound_entry(sound_entry, channel_req, loop_req)
if res is not None:
return res
sound_name = sound_entry.get('name') or sound_entry.get('id') or sound_entry.get('file', '?')
logger.info(f"Spiele Sound: {sound_name} ({sound_id})")
return jsonify({"success": True, "message": f"Spiele: {sound_name}"})
except Exception as e:
logger.exception("Play-Sound-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
# ---------- Soundboard (themenbezogen, hub-unabhängig) ----------
@api_bp.route('/soundboard/configs', methods=['GET'])
def api_soundboard_configs():
configs = load_soundboard_configs(current_app.config['SOUNDBOARD_CONFIG_DIR'])
return jsonify(configs)
@api_bp.route('/soundboard/load', methods=['POST'])
def api_soundboard_load():
data = request.get_json()
filename = data.get('filename')
if not filename:
return jsonify({"success": False, "message": "filename fehlt"}), 400
try:
sb = load_soundboard_config(filename,
current_app.config['SOUNDBOARD_CONFIG_DIR'],
current_app.config['SOUNDS_DIR'])
# Wenn das gleiche Theme erneut geladen wird, Auto-Random nicht stoppen
same_theme = state.current_soundboard and state.current_soundboard.get('filename') == filename
if not same_theme:
_stop_auto_random_internal()
state.current_soundboard = sb
logger.info(f"Soundboard geladen: {filename}")
return jsonify({"success": True, "soundboard": sb})
except Exception as e:
logger.exception("Soundboard laden fehlgeschlagen")
return jsonify({"success": False, "message": str(e)}), 500
@api_bp.route('/soundboard/play_random', methods=['POST'])
def api_soundboard_play_random():
if state.current_soundboard is None:
return jsonify({"success": False, "message": "Kein Soundboard geladen"}), 400
sound_entry, err = _pick_random_sound()
if err:
return jsonify({"success": False, "message": err}), 429 if "Limit" in err else 400
res = _play_sound_entry(sound_entry)
if res is not None:
return res # already Response
return jsonify({"success": True, "message": f"Random: {sound_entry.get('name', sound_entry.get('id'))}"})
@api_bp.route('/soundboard/auto_start', methods=['POST'])
def api_soundboard_auto_start():
if state.current_soundboard is None:
return jsonify({"success": False, "message": "Kein Soundboard geladen"}), 400
data = request.get_json() or {}
imin = float(data.get('interval_min', 5))
imax = float(data.get('interval_max', 10))
dmin = float(data.get('delay_min', 3))
dmax = float(data.get('delay_max', 12))
if imax < imin: imax = imin
if dmax < dmin: dmax = dmin
delay = rand_between(dmin, dmax) * 60
# erst laufenden Timer stoppen, bevor wir Lock nehmen
_stop_auto_random_internal()
with auto_random_lock:
state.auto_random_active = True
auto_random_params['imin'] = imin
auto_random_params['imax'] = imax
auto_random_params['dmin'] = dmin
auto_random_params['dmax'] = dmax
auto_random_params['next_ts'] = time.time() + delay
global auto_random_timer
auto_random_timer = threading.Timer(delay, _auto_random_tick)
auto_random_timer.daemon = True
auto_random_timer.start()
logger.info(f"Auto-Random gestartet (delay {delay/60:.1f} min, intervall {imin}-{imax} min)")
return jsonify({"success": True, "message": "Auto-Random gestartet",
"next_seconds": delay})
@api_bp.route('/soundboard/auto_stop', methods=['POST'])
def api_soundboard_auto_stop():
stopped = _stop_auto_random_internal()
return jsonify({"success": True, "message": "Auto-Random gestoppt", "was_running": stopped})
@api_bp.route('/soundboard/current', methods=['GET'])
def api_soundboard_current():
if state.current_soundboard:
return jsonify({"soundboard": state.current_soundboard})
return jsonify({"soundboard": None}), 200
@api_bp.route('/soundboard/status', methods=['GET'])
def api_soundboard_status():
with auto_random_lock:
active = state.auto_random_active
next_ts = auto_random_params.get('next_ts')
imin = auto_random_params.get('imin')
imax = auto_random_params.get('imax')
dmin = auto_random_params.get('dmin')
dmax = auto_random_params.get('dmax')
current_theme = state.current_soundboard.get('filename') if state.current_soundboard else None
next_seconds = max(0, next_ts - time.time()) if next_ts else None
return jsonify({
"active": active,
"next_seconds": next_seconds,
"interval_min": imin,
"interval_max": imax,
"delay_min": dmin,
"delay_max": dmax,
"current_theme": current_theme
})
def _stop_auto_random_internal():
global auto_random_timer
stopped = False
with auto_random_lock:
if auto_random_timer:
auto_random_timer.cancel()
auto_random_timer = None
stopped = True
state.auto_random_active = False
auto_random_params.clear()
return stopped
@api_bp.route('/stop_sound', methods=['POST'])
def api_stop_sound():
try:
data = request.get_json(silent=True) or {}
channel_req = data.get('channel')
with audio_lock:
_ensure_mixer()
if channel_req not in [None, ""]:
try:
ch_id = int(channel_req)
pygame.mixer.Channel(ch_id).stop()
except Exception as e:
logger.error(f"Stop-Sound-Fehler (channel {channel_req}): {e}")
return jsonify({"success": False, "message": "Ungültiger Channel"}), 400
else:
# Alle Kanäle stoppen
for ch_id in range(pygame.mixer.get_num_channels()):
pygame.mixer.Channel(ch_id).stop()
# Zur Sicherheit auch music stoppen
pygame.mixer.music.stop()
logger.info("Sound(s) gestoppt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Stop-Sound-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
pass
@api_bp.route('/set_volume', methods=['POST'])
def api_set_volume():
try:
data = request.get_json()
vol = float(data.get('volume', 0.8))
vol = max(0.0, min(1.0, vol))
channel_req = data.get('channel')
with audio_lock:
_ensure_mixer()
if channel_req not in [None, ""]:
ch_id = int(channel_req)
pygame.mixer.Channel(ch_id).set_volume(vol)
else:
# global: alle Kanäle + music
for ch_id in range(pygame.mixer.get_num_channels()):
pygame.mixer.Channel(ch_id).set_volume(vol)
pygame.mixer.music.set_volume(vol)
logger.info(f"Volume auf {vol} gesetzt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Volume-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
pass