modularisierung von app.py und app.js - erster versuch

This commit is contained in:
oberon 2026-02-16 13:57:57 +01:00
parent f6fba252e4
commit 174e64e480
17 changed files with 1341 additions and 629 deletions

630
app.py
View File

@ -1,634 +1,8 @@
# app.py # app.py
import os from app import create_app
import json
import logging
import shutil
import threading
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, send_from_directory, request, jsonify
import pygame
# pygame einmalig initialisieren (am besten global)
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=512)
pygame.mixer.music.set_volume(0.8) # Standard 80 %
# Logging-Verzeichnis (muss VOR cleanup_old_log_dirs definiert werden!)
LOG_DIR = 'logs'
os.makedirs(LOG_DIR, exist_ok=True)
# ── Cleanup Log-Files ────────────────────────────────────────────────────────────────
def cleanup_old_log_dirs(max_age_days=90):
"""Löscht Monatsordner in logs/, die älter als max_age_days sind"""
if not os.path.exists(LOG_DIR):
return
cutoff_date = datetime.now() - timedelta(days=max_age_days)
cutoff_str = cutoff_date.strftime('%Y-%m')
for subdir in os.listdir(LOG_DIR):
subdir_path = os.path.join(LOG_DIR, subdir)
if os.path.isdir(subdir_path):
try:
# subdir ist z. B. "2025-11"
subdir_date = datetime.strptime(subdir, '%Y-%m')
if subdir_date < cutoff_date:
logger.info(f"Lösche alten Log-Ordner: {subdir_path}")
shutil.rmtree(subdir_path)
except ValueError:
# Ungültiges Datumsformat → überspringen
pass
except Exception as e:
logger.error(f"Fehler beim Löschen von {subdir_path}: {e}")
# ── Hilfsfunktion - globale Sounds laden ────────────────────────────────────────────────────────────────
def load_default_sounds():
path = os.path.join(app.config['CONFIG_DIR'], 'default_sounds.json')
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# sounds = data.get('global_sounds', data.get('sounds', [])) # flexibel
sounds = data.get('global_sounds', [])
logger.info(f"Globale Default-Sounds geladen: {len(sounds)} Einträge")
return sounds
except Exception as e:
logger.error(f"Fehler beim Laden default_sounds.json: {e}")
return []
logger.info("Keine default_sounds.json gefunden")
return []
# ── Bluetooth ────────────────────────────────────────────────────────────────
from mkconnect.mouldking.MouldKing import MouldKing
from mkconnect.tracer.TracerConsole import TracerConsole
from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket
app = Flask(__name__)
app.config['CONFIG_DIR'] = 'configs'
os.makedirs(app.config['CONFIG_DIR'], exist_ok=True)
# ── Logging ────────────────────────────────────────────────────────────────
# Täglicher Unterordner
today = datetime.now().strftime('%Y-%m')
current_log_subdir = os.path.join(LOG_DIR, today)
os.makedirs(current_log_subdir, exist_ok=True)
# Basis-Dateiname (ohne Endung)
base_filename = os.path.join(current_log_subdir, datetime.now().strftime('%d'))
# Handler für INFO+
info_handler = logging.FileHandler(f"{base_filename}-info.log")
info_handler.setLevel(logging.INFO)
info_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Handler für WARNING+ (inkl. ERROR, CRITICAL)
error_handler = logging.FileHandler(f"{base_filename}-error.log")
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s\n%(pathname)s:%(lineno)d\n',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Logger konfigurieren
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # DEBUG, damit alles durchkommt
logger.handlers.clear() # Alte Handler entfernen
logger.addHandler(info_handler)
logger.addHandler(error_handler)
# Optional: Auch in Konsole
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(info_handler.formatter)
logger.addHandler(console)
logger.info("Logging getrennt initialisiert: info.log + error.log")
# Bluetooth-Komponenten (einmalig initialisieren)
tracer = TracerConsole()
advertiser = AdvertiserBTSocket() # ← ohne tracer, wie korrigiert
# Globale Zustände (für Einzelbenutzer / Entwicklung später Session/DB)
current_config = None
current_hub: MouldKing | None = None
current_module = None # Module6_0 oder Module4_0
current_device = None # Device0/1/2 je nach hub_id
import threading
import time
# Globale Variable für den letzten erfolgreichen Check (optional)
last_successful_check = time.time()
def connection_monitor():
"""Background-Thread: Prüft alle 5 Sekunden, ob der Hub noch antwortet"""
global current_device, current_module, current_hub, last_successful_check
while True:
if current_device is not None:
try:
# Harter Test: Kanal 0 kurz auf 0.1 und zurück auf 0.0 setzen
current_device.SetChannel(0, 0.1)
time.sleep(0.05) # winzige Pause
current_device.SetChannel(0, 0.0)
last_successful_check = time.time()
logger.debug("Monitor: Hub antwortet → SetChannel-Test OK")
except Exception as e:
logger.warning(f"Monitor: Hub scheint weg zu sein: {str(e)}")
# Sauber trennen
try:
current_device.Disconnect()
except:
pass
current_device = None
current_module = None
current_hub = None
# Optional: Frontend benachrichtigen (z. B. über WebSocket später)
time.sleep(5) # Prüfintervall: 5 Sekunden
# Thread starten (daemon=True → beendet sich mit der App)
monitor_thread = threading.Thread(target=connection_monitor, daemon=True)
monitor_thread.start()
logger.info("Background-Monitor für Hub-Verbindung gestartet")
def load_configs():
configs = []
for filename in os.listdir(app.config['CONFIG_DIR']):
if filename.lower().endswith('.json') and filename != 'default_sounds.json':
path = os.path.join(app.config['CONFIG_DIR'], filename)
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
data['filename'] = filename
data.setdefault('name', filename.replace('.json', '').replace('_', ' ').title())
configs.append(data)
except Exception as e:
logger.error(f"Fehler beim Laden {filename}: {e}")
return sorted(configs, key=lambda x: x.get('name', ''))
@app.route('/')
def index():
return render_template('index.html', configs=load_configs())
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
return "Datei nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config geladen: {filename}")
return redirect(url_for('control_page'))
except Exception as e:
logger.error(f"Config-Ladefehler {filename}: {e}")
return "Fehler beim Laden", 500
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
logger.error(f"Config nicht gefunden: {path}")
return "Konfiguration nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config erfolgreich geladen: {filename} mit {len(current_config.get('channels', []))} Kanälen")
return redirect(url_for('control_page'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
return "Ungültiges JSON-Format in der Konfigurationsdatei", 500
except Exception as e:
logger.error(f"Ladefehler {filename}: {e}")
return "Fehler beim Laden der Konfiguration", 500
"""
@app.route('/control')
def control_page():
if current_config is None:
return redirect(url_for('index'))
return render_template('control.html', config=current_config)
"""
@app.route('/control')
def control_page():
if current_config is None:
logger.warning("current_config ist None → Redirect zu index")
return redirect(url_for('index'))
# Globale Sounds immer laden
global_sounds = load_default_sounds()
logger.info(f"Übergebe config an Template: {current_config}")
print("DEBUG: config hat channels?", 'channels' in current_config, len(current_config.get('channels', [])))
return render_template(
'control.html',
config=current_config,
global_sounds=global_sounds
)
@app.route('/soundboard')
def soundboard():
if current_config is None or 'sounds' not in current_config:
return redirect(url_for('index'))
sounds = current_config.get('sounds', [])
return render_template('soundboard.html', sounds=sounds, config=current_config)
# ── Admin ────────────────────────────────────────────────────────────────────
@app.route('/admin')
def admin():
configs = load_configs()
return render_template('admin.html', configs=configs)
@app.route('/admin/edit/<filename>', methods=['GET', 'POST'])
def admin_edit_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if request.method == 'POST':
try:
new_content = request.form.get('config_content')
if not new_content:
raise ValueError("Kein Inhalt übermittelt")
# Validierung: versuche zu parsen
json.loads(new_content)
with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info(f"Config {filename} erfolgreich gespeichert")
return redirect(url_for('admin'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
error_msg = f"Ungültiges JSON: {str(e)}"
except Exception as e:
logger.error(f"Speicherfehler {filename}: {e}")
error_msg = f"Fehler beim Speichern: {str(e)}"
# Bei Fehler: zurück zum Formular mit Fehlermeldung
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content, error=error_msg)
# GET: Formular laden
if not os.path.exists(path):
return "Konfiguration nicht gefunden", 404
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content)
@app.route('/admin/delete/<filename>', methods=['POST'])
def admin_delete_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"Config gelöscht: {filename}")
return jsonify({"success": True, "message": f"{filename} gelöscht"})
except Exception as e:
logger.error(f"Löschfehler {filename}: {e}")
return jsonify({"success": False, "message": str(e)}), 500
return jsonify({"success": False, "message": "Datei nicht gefunden"}), 404
@app.route('/admin/logs')
@app.route('/admin/logs/<date>')
def admin_logs(date=None):
if date is None:
date = datetime.now().strftime('%d')
today = datetime.now().strftime('%Y-%m') # ← Hier definieren!
current_month = datetime.now().strftime('%Y-%m')
log_dir = os.path.join(LOG_DIR, today)
if not os.path.exists(log_dir):
return render_template('admin_logs.html', logs="Keine Logs für diesen Tag.", date=date, dates=[], today=today)
# Verfügbare Tage ...
dates = sorted([
f.split('-')[0] for f in os.listdir(log_dir)
if f.endswith('-info.log') or f.endswith('-error.log')
], reverse=True)
# Logs laden ...
info_path = os.path.join(log_dir, f"{date}-info.log")
error_path = os.path.join(log_dir, f"{date}-error.log")
logs = []
if os.path.exists(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
logs.append("=== INFO-LOG ===\n" + f.read())
if os.path.exists(error_path):
with open(error_path, 'r', encoding='utf-8') as f:
logs.append("=== ERROR-LOG ===\n" + f.read())
log_content = "\n\n".join(logs) if logs else "Keine Logs für diesen Tag."
return render_template('admin_logs.html',
logs=log_content,
date=date,
dates=dates,
today=current_month)
@app.route('/configs/<path:filename>')
def serve_config_file(filename):
return send_from_directory(app.config['CONFIG_DIR'], filename)
# ── API ──────────────────────────────────────────────────────────────────────
@app.route('/api/connect', methods=['POST'])
def api_connect():
global current_hub, current_module, current_device
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel') # '4channel' oder '6channel'
# Alte Verbindung sauber trennen
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
if current_module is not None:
# ggf. Stop vor Disconnect
try:
current_device.Stop()
except:
pass
# MouldKing neu initialisieren
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
# Modul holen
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
# Device auswählen (hub_id = 0,1,2 → Device0,1,2)
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt (max 0-2)")
current_device = getattr(current_module, device_attr)
# Verbinden
current_device.Connect()
current_hub = mk # falls später noch gebraucht
logger.info(f"Verbunden: {hub_type} Hub-ID {hub_id}{device_attr}")
return jsonify({"success": True, "message": f"Hub {hub_id} ({hub_type}) verbunden"})
except Exception as e:
logger.exception("Connect-Fehler")
return jsonify({"success": False, "message": f"Verbindungsfehler: {str(e)}"}), 500
@app.route('/api/status', methods=['GET'])
def api_status():
global current_device
if current_device is not None:
return jsonify({
"connected": True,
"message": "Verbunden"
})
else:
return jsonify({
"connected": False,
"message": "Keine aktive Verbindung"
}), 200
@app.route('/api/control', methods=['POST'])
def api_control():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
data = request.get_json()
port = data['port'] # 'A', 'B', ...
value = float(data['value'])
# Port → channelId mappen (A=0, B=1, ...)
channel_map = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5}
if isinstance(port, str):
port_upper = port.upper()
if port_upper in channel_map:
channel_id = channel_map[port_upper]
else:
raise ValueError(f"Ungültiger Port: {port}")
else:
channel_id = int(port)
# Config-spezifische Anpassungen (invert, negative_only, on/off-Werte)
ch_cfg = next((c for c in current_config['channels'] if c['port'].upper() == port.upper()), None)
if ch_cfg:
if ch_cfg.get('invert', False):
value = -value
if ch_cfg.get('negative_only', False) and value >= 0:
value = -abs(value)
if ch_cfg['type'] != 'motor':
value = ch_cfg.get('on_value', 1.0) if value != 0 else ch_cfg.get('off_value', 0.0)
# Echter Aufruf!
current_device.SetChannel(channel_id, value)
logger.info(f"SetChannel({channel_id}, {value:.2f}) → Port {port}")
return jsonify({"success": True})
except Exception as e:
logger.exception("Control-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/stop_all', methods=['POST'])
def api_stop_all():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
# Nur stoppen KEIN Disconnect!
current_device.Stop()
logger.info("Alle Kanäle gestoppt (Verbindung bleibt bestehen)")
return jsonify({"success": True, "message": "Alle Kanäle gestoppt"})
except Exception as e:
logger.exception("Stop-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/reconnect', methods=['POST'])
def api_reconnect():
global current_device, current_module, current_hub
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
# Alte Verbindung sauber beenden, falls nötig
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
# Neu verbinden (kopierter Code aus api_connect)
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel')
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt")
current_device = getattr(current_module, device_attr)
current_device.Connect()
current_hub = mk
logger.info(f"Re-Connect erfolgreich: Hub {hub_id}")
return jsonify({"success": True, "message": "Verbindung wiederhergestellt"})
except Exception as e:
logger.exception("Re-Connect-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/play_sound', methods=['POST'])
def api_play_sound():
try:
data = request.get_json()
sound_id = data.get('sound_id')
if not sound_id:
return jsonify({"success": False, "message": "Kein sound_id angegeben"}), 400
# Sound aus aktueller Config suchen
if current_config is None or 'sounds' not in current_config:
return jsonify({"success": False, "message": "Keine Sounds in der aktuellen Konfiguration"}), 400
sound_entry = next((s for s in current_config['sounds'] if s['id'] == sound_id), None)
if not sound_entry:
return jsonify({"success": False, "message": f"Sound mit ID '{sound_id}' nicht gefunden"}), 404
file_path = os.path.join('sounds', sound_entry['file'])
if not os.path.exists(file_path):
logger.error(f"Sound-Datei nicht gefunden: {file_path}")
return jsonify({"success": False, "message": "Sound-Datei nicht gefunden"}), 404
# Sound in separatem Thread abspielen (blockiert nicht)
def play():
try:
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
time.sleep(0.1)
except Exception as e:
logger.error(f"Fehler beim Abspielen von {file_path}: {e}")
threading.Thread(target=play, daemon=True).start()
logger.info(f"Spiele Sound: {sound_entry['name']} ({sound_id})")
return jsonify({"success": True, "message": f"Spiele: {sound_entry['name']}"})
except Exception as e:
logger.exception("Play-Sound-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/set_volume', methods=['POST'])
def api_set_volume():
try:
data = request.get_json()
vol = float(data.get('volume', 0.8))
vol = max(0.0, min(1.0, vol))
pygame.mixer.music.set_volume(vol)
logger.info(f"Volume auf {vol} gesetzt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Volume-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/stop_sound', methods=['POST'])
def api_stop_sound():
try:
pygame.mixer.music.stop()
logger.info("Aktueller Sound gestoppt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Stop-Sound-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
app = create_app()
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True) app.run(host='0.0.0.0', port=5000, debug=True)
# Beim Start aufrufen
cleanup_old_log_dirs(90)
logger.info("Alte Log-Ordner bereinigt")
def daily_cleanup():
while True:
cleanup_old_log_dirs(90)
time.sleep(86400) # 24 Stunden
cleanup_thread = threading.Thread(target=daily_cleanup, daemon=True)
cleanup_thread.start()

45
app/__init__.py Normal file
View File

@ -0,0 +1,45 @@
# app/__init__.py
from flask import Flask
from .utils.logging import setup_logging
from .routes.main import main_bp
from .routes.admin import admin_bp
from .routes.api import api_bp
import threading
def create_app():
app = Flask(__name__)
# Config laden
app.config.from_object('config.Config')
# Logging zuerst (vor allem anderen)
from .utils.logging import setup_logging, cleanup_old_log_dirs
setup_logging(app)
# Täglicher Cleanup-Thread starten (einmalig)
def daily_cleanup():
while True:
cleanup_old_log_dirs(90)
time.sleep(86400) # 24 Stunden
cleanup_thread = threading.Thread(target=daily_cleanup, daemon=True)
cleanup_thread.start()
logger.info("Täglicher Log-Cleanup-Thread gestartet (24h Intervall)")
# Bluetooth initialisieren (Monitor-Thread startet automatisch)
from .bluetooth.manager import init_bluetooth
init_bluetooth()
# Blueprints registrieren
from .routes.main import main_bp
from .routes.admin import admin_bp
from .routes.api import api_bp
# Blueprints registrieren
app.register_blueprint(main_bp)
app.register_blueprint(admin_bp, url_prefix='/admin')
app.register_blueprint(api_bp, url_prefix='/api')
return app

View File

72
app/bluetooth/manager.py Normal file
View File

@ -0,0 +1,72 @@
# app/bluetooth/manager.py
"""
Bluetooth-Verbindungsmanagement: MouldKing-Instanz, Monitor-Thread, Cleanup
"""
import threading
import time
import logging
from mkconnect.mouldking.MouldKing import MouldKing
from mkconnect.tracer.TracerConsole import TracerConsole
from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket
logger = logging.getLogger(__name__)
# Globale Bluetooth-Komponenten (einmalig initialisiert)
tracer = TracerConsole()
advertiser = AdvertiserBTSocket()
# Globale Zustände (werden von den Routen gesetzt und hier überwacht)
current_hub = None
current_module = None
current_device = None
def init_bluetooth():
"""
Initialisiert die Bluetooth-Komponenten (einmalig beim App-Start).
"""
global tracer, advertiser
logger.info("Bluetooth-Komponenten initialisiert (Advertiser + Tracer)")
# Hier ggf. weitere Initialisierung (z. B. hci0 prüfen)
def connection_monitor():
"""
Background-Thread: Prüft alle 5 Sekunden, ob der Hub noch antwortet.
Bei Fehlern wird die Verbindung sauber getrennt.
"""
global current_device, current_module, current_hub
while True:
if current_device is not None:
try:
# Harter Test: Kanal 0 kurz auf 0.1 und zurück auf 0.0 setzen
current_device.SetChannel(0, 0.1)
time.sleep(0.05) # winzige Pause
current_device.SetChannel(0, 0.0)
logger.debug("Monitor: Hub antwortet → SetChannel-Test OK")
except Exception as e:
logger.warning(f"Monitor: Hub scheint weg zu sein: {str(e)}")
# Sauber trennen
try:
current_device.Disconnect()
except Exception as disconnect_err:
logger.debug(f"Disconnect fehlgeschlagen (harmlos): {disconnect_err}")
# Globale Zustände zurücksetzen
current_device = None
current_module = None
current_hub = None
# Optional: Frontend benachrichtigen (z. B. später über WebSocket)
time.sleep(5) # Prüfintervall: 5 Sekunden
# Thread beim Import starten (einmalig)
monitor_thread = threading.Thread(target=connection_monitor, daemon=True)
monitor_thread.start()
logger.info("Bluetooth-Monitor-Thread gestartet (5s Intervall)")

0
app/routes/__init__.py Normal file
View File

108
app/routes/admin.py Normal file
View File

@ -0,0 +1,108 @@
# app/routes/admin.py
from flask import Blueprint, render_template, request, jsonify, redirect, url_for
from .. import current_config
from app.utils.helpers import load_configs, load_default_sounds
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/')
def admin():
configs = load_configs()
return render_template('admin.html', configs=configs)
@app.route('/edit/<filename>', methods=['GET', 'POST'])
def admin_edit_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if request.method == 'POST':
try:
new_content = request.form.get('config_content')
if not new_content:
raise ValueError("Kein Inhalt übermittelt")
# Validierung: versuche zu parsen
json.loads(new_content)
with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info(f"Config {filename} erfolgreich gespeichert")
return redirect(url_for('admin'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
error_msg = f"Ungültiges JSON: {str(e)}"
except Exception as e:
logger.error(f"Speicherfehler {filename}: {e}")
error_msg = f"Fehler beim Speichern: {str(e)}"
# Bei Fehler: zurück zum Formular mit Fehlermeldung
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content, error=error_msg)
# GET: Formular laden
if not os.path.exists(path):
return "Konfiguration nicht gefunden", 404
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content)
pass
@app.route('/delete/<filename>', methods=['POST'])
def admin_delete_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"Config gelöscht: {filename}")
return jsonify({"success": True, "message": f"{filename} gelöscht"})
except Exception as e:
logger.error(f"Löschfehler {filename}: {e}")
return jsonify({"success": False, "message": str(e)}), 500
return jsonify({"success": False, "message": "Datei nicht gefunden"}), 404
pass
@app.route('/logs')
@app.route('/logs/<date>')
def admin_logs(date=None):
if date is None:
date = datetime.now().strftime('%d')
today = datetime.now().strftime('%Y-%m') # ← Hier definieren!
current_month = datetime.now().strftime('%Y-%m')
log_dir = os.path.join(LOG_DIR, today)
if not os.path.exists(log_dir):
return render_template('admin_logs.html', logs="Keine Logs für diesen Tag.", date=date, dates=[], today=today)
# Verfügbare Tage ...
dates = sorted([
f.split('-')[0] for f in os.listdir(log_dir)
if f.endswith('-info.log') or f.endswith('-error.log')
], reverse=True)
# Logs laden ...
info_path = os.path.join(log_dir, f"{date}-info.log")
error_path = os.path.join(log_dir, f"{date}-error.log")
logs = []
if os.path.exists(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
logs.append("=== INFO-LOG ===\n" + f.read())
if os.path.exists(error_path):
with open(error_path, 'r', encoding='utf-8') as f:
logs.append("=== ERROR-LOG ===\n" + f.read())
log_content = "\n\n".join(logs) if logs else "Keine Logs für diesen Tag."
return render_template('admin_logs.html',
logs=log_content,
date=date,
dates=dates,
today=current_month)
pass

258
app/routes/api.py Normal file
View File

@ -0,0 +1,258 @@
# app/routes/api.py
from flask import Blueprint, jsonify, request
from .. import current_device, current_config, pygame
api_bp = Blueprint('api', __name__)
@app.route('/connect', methods=['POST'])
def api_connect():
global current_hub, current_module, current_device
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel') # '4channel' oder '6channel'
# Alte Verbindung sauber trennen
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
if current_module is not None:
# ggf. Stop vor Disconnect
try:
current_device.Stop()
except:
pass
# MouldKing neu initialisieren
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
# Modul holen
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
# Device auswählen (hub_id = 0,1,2 → Device0,1,2)
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt (max 0-2)")
current_device = getattr(current_module, device_attr)
# Verbinden
current_device.Connect()
current_hub = mk # falls später noch gebraucht
logger.info(f"Verbunden: {hub_type} Hub-ID {hub_id}{device_attr}")
return jsonify({"success": True, "message": f"Hub {hub_id} ({hub_type}) verbunden"})
except Exception as e:
logger.exception("Connect-Fehler")
return jsonify({"success": False, "message": f"Verbindungsfehler: {str(e)}"}), 500
pass
@app.route('/reconnect', methods=['POST'])
def api_reconnect():
global current_device, current_module, current_hub
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
# Alte Verbindung sauber beenden, falls nötig
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
# Neu verbinden (kopierter Code aus api_connect)
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel')
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt")
current_device = getattr(current_module, device_attr)
current_device.Connect()
current_hub = mk
logger.info(f"Re-Connect erfolgreich: Hub {hub_id}")
return jsonify({"success": True, "message": "Verbindung wiederhergestellt"})
except Exception as e:
logger.exception("Re-Connect-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@app.route('/control', methods=['POST'])
def api_control():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
data = request.get_json()
port = data['port'] # 'A', 'B', ...
value = float(data['value'])
# Port → channelId mappen (A=0, B=1, ...)
channel_map = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5}
if isinstance(port, str):
port_upper = port.upper()
if port_upper in channel_map:
channel_id = channel_map[port_upper]
else:
raise ValueError(f"Ungültiger Port: {port}")
else:
channel_id = int(port)
# Config-spezifische Anpassungen (invert, negative_only, on/off-Werte)
ch_cfg = next((c for c in current_config['channels'] if c['port'].upper() == port.upper()), None)
if ch_cfg:
if ch_cfg.get('invert', False):
value = -value
if ch_cfg.get('negative_only', False) and value >= 0:
value = -abs(value)
if ch_cfg['type'] != 'motor':
value = ch_cfg.get('on_value', 1.0) if value != 0 else ch_cfg.get('off_value', 0.0)
# Echter Aufruf!
current_device.SetChannel(channel_id, value)
logger.info(f"SetChannel({channel_id}, {value:.2f}) → Port {port}")
return jsonify({"success": True})
except Exception as e:
logger.exception("Control-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@app.route('/stop_all', methods=['POST'])
def api_stop_all():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
# Nur stoppen KEIN Disconnect!
current_device.Stop()
logger.info("Alle Kanäle gestoppt (Verbindung bleibt bestehen)")
return jsonify({"success": True, "message": "Alle Kanäle gestoppt"})
except Exception as e:
logger.exception("Stop-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@app.route('/status', methods=['GET'])
def api_status():
global current_device
if current_device is not None:
return jsonify({
"connected": True,
"message": "Verbunden"
})
else:
return jsonify({
"connected": False,
"message": "Keine aktive Verbindung"
}), 200
pass
@app.route('/play_sound', methods=['POST'])
def api_play_sound():
try:
data = request.get_json()
sound_id = data.get('sound_id')
if not sound_id:
return jsonify({"success": False, "message": "Kein sound_id angegeben"}), 400
# Sound aus aktueller Config suchen
if current_config is None or 'sounds' not in current_config:
return jsonify({"success": False, "message": "Keine Sounds in der aktuellen Konfiguration"}), 400
sound_entry = next((s for s in current_config['sounds'] if s['id'] == sound_id), None)
if not sound_entry:
return jsonify({"success": False, "message": f"Sound mit ID '{sound_id}' nicht gefunden"}), 404
file_path = os.path.join('sounds', sound_entry['file'])
if not os.path.exists(file_path):
logger.error(f"Sound-Datei nicht gefunden: {file_path}")
return jsonify({"success": False, "message": "Sound-Datei nicht gefunden"}), 404
# Sound in separatem Thread abspielen (blockiert nicht)
def play():
try:
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
time.sleep(0.1)
except Exception as e:
logger.error(f"Fehler beim Abspielen von {file_path}: {e}")
threading.Thread(target=play, daemon=True).start()
logger.info(f"Spiele Sound: {sound_entry['name']} ({sound_id})")
return jsonify({"success": True, "message": f"Spiele: {sound_entry['name']}"})
except Exception as e:
logger.exception("Play-Sound-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
pass
@app.route('/stop_sound', methods=['POST'])
def api_stop_sound():
try:
pygame.mixer.music.stop()
logger.info("Aktueller Sound gestoppt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Stop-Sound-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
pass
@app.route('/set_volume', methods=['POST'])
def api_set_volume():
try:
data = request.get_json()
vol = float(data.get('volume', 0.8))
vol = max(0.0, min(1.0, vol))
pygame.mixer.music.set_volume(vol)
logger.info(f"Volume auf {vol} gesetzt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Volume-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
pass

68
app/routes/main.py Normal file
View File

@ -0,0 +1,68 @@
# app/routes/main.py
from flask import Blueprint, render_template, redirect, url_for, request, jsonify
from .. import current_config
from app.utils.helpers import load_configs, load_default_sounds
main_bp = Blueprint('main', __name__)
@main_bp.route('/')
def index():
configs = load_configs()
return render_template('index.html', configs=configs)
"""
@app.route('/')
def index():
return render_template('index.html', configs=load_configs())
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
logger.error(f"Config nicht gefunden: {path}")
return "Konfiguration nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config erfolgreich geladen: {filename} mit {len(current_config.get('channels', []))} Kanälen")
return redirect(url_for('main.control_page'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
return "Ungültiges JSON-Format in der Konfigurationsdatei", 500
except Exception as e:
logger.error(f"Ladefehler {filename}: {e}")
return "Fehler beim Laden der Konfiguration", 500
@app.route('/control')
def control_page():
if current_config is None:
logger.warning("current_config ist None → Redirect zu index")
return redirect(url_for('main.index'))
# Globale Sounds immer laden
global_sounds = load_default_sounds()
logger.info(f"Übergebe config an Template: {current_config}")
print("DEBUG: config hat channels?", 'channels' in current_config, len(current_config.get('channels', [])))
return render_template(
'control.html',
config=current_config,
global_sounds=global_sounds
)
@app.route('/soundboard')
def soundboard():
if current_config is None or 'sounds' not in current_config:
return redirect(url_for('index'))
sounds = current_config.get('sounds', [])
return render_template('soundboard.html', sounds=sounds, config=current_config)
pass

0
app/utils/__init__.py Normal file
View File

58
app/utils/helpers.py Normal file
View File

@ -0,0 +1,58 @@
# app/utils/helpers.py
import os
import json
import logging
logger = logging.getLogger(__name__)
def load_configs(config_dir='configs'):
"""
Lädt alle .json-Konfigurationsdateien aus dem configs-Ordner,
außer default_sounds.json (die wird separat geladen).
"""
configs = []
if not os.path.exists(config_dir):
logger.warning(f"Config-Verzeichnis nicht gefunden: {config_dir}")
return configs
for filename in os.listdir(config_dir):
if filename.lower().endswith('.json') and filename != 'default_sounds.json':
path = os.path.join(config_dir, filename)
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
data['filename'] = filename
data.setdefault('name', filename.replace('.json', '').replace('_', ' ').title())
configs.append(data)
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
except Exception as e:
logger.error(f"Fehler beim Laden von {filename}: {e}")
return sorted(configs, key=lambda x: x.get('name', ''))
def load_default_sounds(config_dir='configs'):
"""
Lädt die globalen Standard-Sounds aus configs/default_sounds.json
Wird immer geladen, unabhängig von der aktuellen Lok-Konfiguration
"""
path = os.path.join(config_dir, 'default_sounds.json')
if not os.path.exists(path):
logger.info("Keine default_sounds.json gefunden → keine globalen Sounds")
return []
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Flexibel: entweder 'global_sounds' oder direkt 'sounds'
sounds = data.get('global_sounds', data.get('sounds', []))
logger.info(f"Globale Default-Sounds geladen: {len(sounds)} Einträge")
return sounds
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in default_sounds.json: {e}")
return []
except Exception as e:
logger.error(f"Fehler beim Laden default_sounds.json: {e}")
return []

87
app/utils/logging.py Normal file
View File

@ -0,0 +1,87 @@
# app/utils/logging.py
import logging
import os
import shutil
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta
LOG_DIR = app.config['LOG_DIR']
os.makedirs(LOG_DIR, exist_ok=True)
def cleanup_old_log_dirs(max_age_days=90):
"""
Löscht Monatsordner in logs/, die älter als max_age_days sind.
Beispiel: logs/2025-11/ wird gelöscht, wenn älter als 90 Tage.
"""
if not os.path.exists(LOG_DIR):
return
cutoff_date = datetime.now() - timedelta(days=max_age_days)
cutoff_str = cutoff_date.strftime('%Y-%m')
deleted = 0
for subdir in os.listdir(LOG_DIR):
subdir_path = os.path.join(LOG_DIR, subdir)
if os.path.isdir(subdir_path):
try:
subdir_date = datetime.strptime(subdir, '%Y-%m')
if subdir_date < cutoff_date:
shutil.rmtree(subdir_path)
deleted += 1
logger.info(f"Alten Log-Ordner gelöscht: {subdir_path}")
except ValueError:
# Kein gültiges Datumsformat → überspringen
continue
except Exception as e:
logger.error(f"Fehler beim Löschen von {subdir_path}: {e}")
if deleted > 0:
logger.info(f"Insgesamt {deleted} alte Log-Ordner bereinigt")
else:
logger.debug("Keine alten Log-Ordner zum Bereinigen gefunden")
def setup_logging(app):
"""
Richtet das Logging ein: tägliche Dateien in Unterordnern + Trennung info/error.
Ruft auch einmalig cleanup auf.
"""
# Cleanup beim Start
cleanup_old_log_dirs(90)
today = datetime.now().strftime('%Y-%m')
subdir = os.path.join(LOG_DIR, today)
os.makedirs(subdir, exist_ok=True)
base = os.path.join(subdir, datetime.now().strftime('%d'))
# Info-Handler
info_handler = logging.FileHandler(f"{base}-info.log")
info_handler.setLevel(logging.INFO)
info_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Error-Handler
error_handler = logging.FileHandler(f"{base}-error.log")
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s\n%(pathname)s:%(lineno)d\n',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Root-Logger konfigurieren
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
logger.addHandler(info_handler)
logger.addHandler(error_handler)
# Konsole
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(info_handler.formatter)
logger.addHandler(console)
logger.info("Logging eingerichtet tägliche Trennung info/error")
logger.info(f"Logs heute: {base}-info.log / {base}-error.log")

634
app_old.py Normal file
View File

@ -0,0 +1,634 @@
# app.py
import os
import json
import logging
import shutil
import threading
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, send_from_directory, request, jsonify
import pygame
# pygame einmalig initialisieren (am besten global)
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=512)
pygame.mixer.music.set_volume(0.8) # Standard 80 %
# Logging-Verzeichnis (muss VOR cleanup_old_log_dirs definiert werden!)
LOG_DIR = 'logs'
os.makedirs(LOG_DIR, exist_ok=True)
# ── Cleanup Log-Files ────────────────────────────────────────────────────────────────
def cleanup_old_log_dirs(max_age_days=90):
"""Löscht Monatsordner in logs/, die älter als max_age_days sind"""
if not os.path.exists(LOG_DIR):
return
cutoff_date = datetime.now() - timedelta(days=max_age_days)
cutoff_str = cutoff_date.strftime('%Y-%m')
for subdir in os.listdir(LOG_DIR):
subdir_path = os.path.join(LOG_DIR, subdir)
if os.path.isdir(subdir_path):
try:
# subdir ist z. B. "2025-11"
subdir_date = datetime.strptime(subdir, '%Y-%m')
if subdir_date < cutoff_date:
logger.info(f"Lösche alten Log-Ordner: {subdir_path}")
shutil.rmtree(subdir_path)
except ValueError:
# Ungültiges Datumsformat → überspringen
pass
except Exception as e:
logger.error(f"Fehler beim Löschen von {subdir_path}: {e}")
# ── Hilfsfunktion - globale Sounds laden ────────────────────────────────────────────────────────────────
def load_default_sounds():
path = os.path.join(app.config['CONFIG_DIR'], 'default_sounds.json')
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# sounds = data.get('global_sounds', data.get('sounds', [])) # flexibel
sounds = data.get('global_sounds', [])
logger.info(f"Globale Default-Sounds geladen: {len(sounds)} Einträge")
return sounds
except Exception as e:
logger.error(f"Fehler beim Laden default_sounds.json: {e}")
return []
logger.info("Keine default_sounds.json gefunden")
return []
# ── Bluetooth ────────────────────────────────────────────────────────────────
from mkconnect.mouldking.MouldKing import MouldKing
from mkconnect.tracer.TracerConsole import TracerConsole
from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket
app = Flask(__name__)
app.config['CONFIG_DIR'] = 'configs'
os.makedirs(app.config['CONFIG_DIR'], exist_ok=True)
# ── Logging ────────────────────────────────────────────────────────────────
# Täglicher Unterordner
today = datetime.now().strftime('%Y-%m')
current_log_subdir = os.path.join(LOG_DIR, today)
os.makedirs(current_log_subdir, exist_ok=True)
# Basis-Dateiname (ohne Endung)
base_filename = os.path.join(current_log_subdir, datetime.now().strftime('%d'))
# Handler für INFO+
info_handler = logging.FileHandler(f"{base_filename}-info.log")
info_handler.setLevel(logging.INFO)
info_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Handler für WARNING+ (inkl. ERROR, CRITICAL)
error_handler = logging.FileHandler(f"{base_filename}-error.log")
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s\n%(pathname)s:%(lineno)d\n',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Logger konfigurieren
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # DEBUG, damit alles durchkommt
logger.handlers.clear() # Alte Handler entfernen
logger.addHandler(info_handler)
logger.addHandler(error_handler)
# Optional: Auch in Konsole
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(info_handler.formatter)
logger.addHandler(console)
logger.info("Logging getrennt initialisiert: info.log + error.log")
# Bluetooth-Komponenten (einmalig initialisieren)
tracer = TracerConsole()
advertiser = AdvertiserBTSocket() # ← ohne tracer, wie korrigiert
# Globale Zustände (für Einzelbenutzer / Entwicklung später Session/DB)
current_config = None
current_hub: MouldKing | None = None
current_module = None # Module6_0 oder Module4_0
current_device = None # Device0/1/2 je nach hub_id
import threading
import time
# Globale Variable für den letzten erfolgreichen Check (optional)
last_successful_check = time.time()
def connection_monitor():
"""Background-Thread: Prüft alle 5 Sekunden, ob der Hub noch antwortet"""
global current_device, current_module, current_hub, last_successful_check
while True:
if current_device is not None:
try:
# Harter Test: Kanal 0 kurz auf 0.1 und zurück auf 0.0 setzen
current_device.SetChannel(0, 0.1)
time.sleep(0.05) # winzige Pause
current_device.SetChannel(0, 0.0)
last_successful_check = time.time()
logger.debug("Monitor: Hub antwortet → SetChannel-Test OK")
except Exception as e:
logger.warning(f"Monitor: Hub scheint weg zu sein: {str(e)}")
# Sauber trennen
try:
current_device.Disconnect()
except:
pass
current_device = None
current_module = None
current_hub = None
# Optional: Frontend benachrichtigen (z. B. über WebSocket später)
time.sleep(5) # Prüfintervall: 5 Sekunden
# Thread starten (daemon=True → beendet sich mit der App)
monitor_thread = threading.Thread(target=connection_monitor, daemon=True)
monitor_thread.start()
logger.info("Background-Monitor für Hub-Verbindung gestartet")
def load_configs():
configs = []
for filename in os.listdir(app.config['CONFIG_DIR']):
if filename.lower().endswith('.json') and filename != 'default_sounds.json':
path = os.path.join(app.config['CONFIG_DIR'], filename)
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
data['filename'] = filename
data.setdefault('name', filename.replace('.json', '').replace('_', ' ').title())
configs.append(data)
except Exception as e:
logger.error(f"Fehler beim Laden {filename}: {e}")
return sorted(configs, key=lambda x: x.get('name', ''))
@app.route('/')
def index():
return render_template('index.html', configs=load_configs())
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
return "Datei nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config geladen: {filename}")
return redirect(url_for('control_page'))
except Exception as e:
logger.error(f"Config-Ladefehler {filename}: {e}")
return "Fehler beim Laden", 500
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
logger.error(f"Config nicht gefunden: {path}")
return "Konfiguration nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config erfolgreich geladen: {filename} mit {len(current_config.get('channels', []))} Kanälen")
return redirect(url_for('control_page'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
return "Ungültiges JSON-Format in der Konfigurationsdatei", 500
except Exception as e:
logger.error(f"Ladefehler {filename}: {e}")
return "Fehler beim Laden der Konfiguration", 500
"""
@app.route('/control')
def control_page():
if current_config is None:
return redirect(url_for('index'))
return render_template('control.html', config=current_config)
"""
@app.route('/control')
def control_page():
if current_config is None:
logger.warning("current_config ist None → Redirect zu index")
return redirect(url_for('index'))
# Globale Sounds immer laden
global_sounds = load_default_sounds()
logger.info(f"Übergebe config an Template: {current_config}")
print("DEBUG: config hat channels?", 'channels' in current_config, len(current_config.get('channels', [])))
return render_template(
'control.html',
config=current_config,
global_sounds=global_sounds
)
@app.route('/soundboard')
def soundboard():
if current_config is None or 'sounds' not in current_config:
return redirect(url_for('index'))
sounds = current_config.get('sounds', [])
return render_template('soundboard.html', sounds=sounds, config=current_config)
# ── Admin ────────────────────────────────────────────────────────────────────
@app.route('/admin')
def admin():
configs = load_configs()
return render_template('admin.html', configs=configs)
@app.route('/admin/edit/<filename>', methods=['GET', 'POST'])
def admin_edit_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if request.method == 'POST':
try:
new_content = request.form.get('config_content')
if not new_content:
raise ValueError("Kein Inhalt übermittelt")
# Validierung: versuche zu parsen
json.loads(new_content)
with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info(f"Config {filename} erfolgreich gespeichert")
return redirect(url_for('admin'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
error_msg = f"Ungültiges JSON: {str(e)}"
except Exception as e:
logger.error(f"Speicherfehler {filename}: {e}")
error_msg = f"Fehler beim Speichern: {str(e)}"
# Bei Fehler: zurück zum Formular mit Fehlermeldung
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content, error=error_msg)
# GET: Formular laden
if not os.path.exists(path):
return "Konfiguration nicht gefunden", 404
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content)
@app.route('/admin/delete/<filename>', methods=['POST'])
def admin_delete_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"Config gelöscht: {filename}")
return jsonify({"success": True, "message": f"{filename} gelöscht"})
except Exception as e:
logger.error(f"Löschfehler {filename}: {e}")
return jsonify({"success": False, "message": str(e)}), 500
return jsonify({"success": False, "message": "Datei nicht gefunden"}), 404
@app.route('/admin/logs')
@app.route('/admin/logs/<date>')
def admin_logs(date=None):
if date is None:
date = datetime.now().strftime('%d')
today = datetime.now().strftime('%Y-%m') # ← Hier definieren!
current_month = datetime.now().strftime('%Y-%m')
log_dir = os.path.join(LOG_DIR, today)
if not os.path.exists(log_dir):
return render_template('admin_logs.html', logs="Keine Logs für diesen Tag.", date=date, dates=[], today=today)
# Verfügbare Tage ...
dates = sorted([
f.split('-')[0] for f in os.listdir(log_dir)
if f.endswith('-info.log') or f.endswith('-error.log')
], reverse=True)
# Logs laden ...
info_path = os.path.join(log_dir, f"{date}-info.log")
error_path = os.path.join(log_dir, f"{date}-error.log")
logs = []
if os.path.exists(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
logs.append("=== INFO-LOG ===\n" + f.read())
if os.path.exists(error_path):
with open(error_path, 'r', encoding='utf-8') as f:
logs.append("=== ERROR-LOG ===\n" + f.read())
log_content = "\n\n".join(logs) if logs else "Keine Logs für diesen Tag."
return render_template('admin_logs.html',
logs=log_content,
date=date,
dates=dates,
today=current_month)
@app.route('/configs/<path:filename>')
def serve_config_file(filename):
return send_from_directory(app.config['CONFIG_DIR'], filename)
# ── API ──────────────────────────────────────────────────────────────────────
@app.route('/api/connect', methods=['POST'])
def api_connect():
global current_hub, current_module, current_device
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel') # '4channel' oder '6channel'
# Alte Verbindung sauber trennen
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
if current_module is not None:
# ggf. Stop vor Disconnect
try:
current_device.Stop()
except:
pass
# MouldKing neu initialisieren
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
# Modul holen
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
# Device auswählen (hub_id = 0,1,2 → Device0,1,2)
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt (max 0-2)")
current_device = getattr(current_module, device_attr)
# Verbinden
current_device.Connect()
current_hub = mk # falls später noch gebraucht
logger.info(f"Verbunden: {hub_type} Hub-ID {hub_id}{device_attr}")
return jsonify({"success": True, "message": f"Hub {hub_id} ({hub_type}) verbunden"})
except Exception as e:
logger.exception("Connect-Fehler")
return jsonify({"success": False, "message": f"Verbindungsfehler: {str(e)}"}), 500
@app.route('/api/status', methods=['GET'])
def api_status():
global current_device
if current_device is not None:
return jsonify({
"connected": True,
"message": "Verbunden"
})
else:
return jsonify({
"connected": False,
"message": "Keine aktive Verbindung"
}), 200
@app.route('/api/control', methods=['POST'])
def api_control():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
data = request.get_json()
port = data['port'] # 'A', 'B', ...
value = float(data['value'])
# Port → channelId mappen (A=0, B=1, ...)
channel_map = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5}
if isinstance(port, str):
port_upper = port.upper()
if port_upper in channel_map:
channel_id = channel_map[port_upper]
else:
raise ValueError(f"Ungültiger Port: {port}")
else:
channel_id = int(port)
# Config-spezifische Anpassungen (invert, negative_only, on/off-Werte)
ch_cfg = next((c for c in current_config['channels'] if c['port'].upper() == port.upper()), None)
if ch_cfg:
if ch_cfg.get('invert', False):
value = -value
if ch_cfg.get('negative_only', False) and value >= 0:
value = -abs(value)
if ch_cfg['type'] != 'motor':
value = ch_cfg.get('on_value', 1.0) if value != 0 else ch_cfg.get('off_value', 0.0)
# Echter Aufruf!
current_device.SetChannel(channel_id, value)
logger.info(f"SetChannel({channel_id}, {value:.2f}) → Port {port}")
return jsonify({"success": True})
except Exception as e:
logger.exception("Control-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/stop_all', methods=['POST'])
def api_stop_all():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
# Nur stoppen KEIN Disconnect!
current_device.Stop()
logger.info("Alle Kanäle gestoppt (Verbindung bleibt bestehen)")
return jsonify({"success": True, "message": "Alle Kanäle gestoppt"})
except Exception as e:
logger.exception("Stop-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/reconnect', methods=['POST'])
def api_reconnect():
global current_device, current_module, current_hub
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
# Alte Verbindung sauber beenden, falls nötig
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
# Neu verbinden (kopierter Code aus api_connect)
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel')
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt")
current_device = getattr(current_module, device_attr)
current_device.Connect()
current_hub = mk
logger.info(f"Re-Connect erfolgreich: Hub {hub_id}")
return jsonify({"success": True, "message": "Verbindung wiederhergestellt"})
except Exception as e:
logger.exception("Re-Connect-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/play_sound', methods=['POST'])
def api_play_sound():
try:
data = request.get_json()
sound_id = data.get('sound_id')
if not sound_id:
return jsonify({"success": False, "message": "Kein sound_id angegeben"}), 400
# Sound aus aktueller Config suchen
if current_config is None or 'sounds' not in current_config:
return jsonify({"success": False, "message": "Keine Sounds in der aktuellen Konfiguration"}), 400
sound_entry = next((s for s in current_config['sounds'] if s['id'] == sound_id), None)
if not sound_entry:
return jsonify({"success": False, "message": f"Sound mit ID '{sound_id}' nicht gefunden"}), 404
file_path = os.path.join('sounds', sound_entry['file'])
if not os.path.exists(file_path):
logger.error(f"Sound-Datei nicht gefunden: {file_path}")
return jsonify({"success": False, "message": "Sound-Datei nicht gefunden"}), 404
# Sound in separatem Thread abspielen (blockiert nicht)
def play():
try:
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
time.sleep(0.1)
except Exception as e:
logger.error(f"Fehler beim Abspielen von {file_path}: {e}")
threading.Thread(target=play, daemon=True).start()
logger.info(f"Spiele Sound: {sound_entry['name']} ({sound_id})")
return jsonify({"success": True, "message": f"Spiele: {sound_entry['name']}"})
except Exception as e:
logger.exception("Play-Sound-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/set_volume', methods=['POST'])
def api_set_volume():
try:
data = request.get_json()
vol = float(data.get('volume', 0.8))
vol = max(0.0, min(1.0, vol))
pygame.mixer.music.set_volume(vol)
logger.info(f"Volume auf {vol} gesetzt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Volume-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/stop_sound', methods=['POST'])
def api_stop_sound():
try:
pygame.mixer.music.stop()
logger.info("Aktueller Sound gestoppt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Stop-Sound-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
# Beim Start aufrufen
cleanup_old_log_dirs(90)
logger.info("Alte Log-Ordner bereinigt")
def daily_cleanup():
while True:
cleanup_old_log_dirs(90)
time.sleep(86400) # 24 Stunden
cleanup_thread = threading.Thread(target=daily_cleanup, daemon=True)
cleanup_thread.start()

8
config.py Normal file
View File

@ -0,0 +1,8 @@
# config.py
import os
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-key-change-me'
CONFIG_DIR = 'configs'
LOG_DIR = 'logs'
SOUNDS_DIR = 'sounds'

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB