mkcontrol-app/app.py

634 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app.py
import os
import json
import logging
import shutil
import threading
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, send_from_directory, request, jsonify
import pygame
# pygame einmalig initialisieren (am besten global)
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=512)
pygame.mixer.music.set_volume(0.8) # Standard 80 %
# Logging-Verzeichnis (muss VOR cleanup_old_log_dirs definiert werden!)
LOG_DIR = 'logs'
os.makedirs(LOG_DIR, exist_ok=True)
# ── Cleanup Log-Files ────────────────────────────────────────────────────────────────
def cleanup_old_log_dirs(max_age_days=90):
"""Löscht Monatsordner in logs/, die älter als max_age_days sind"""
if not os.path.exists(LOG_DIR):
return
cutoff_date = datetime.now() - timedelta(days=max_age_days)
cutoff_str = cutoff_date.strftime('%Y-%m')
for subdir in os.listdir(LOG_DIR):
subdir_path = os.path.join(LOG_DIR, subdir)
if os.path.isdir(subdir_path):
try:
# subdir ist z. B. "2025-11"
subdir_date = datetime.strptime(subdir, '%Y-%m')
if subdir_date < cutoff_date:
logger.info(f"Lösche alten Log-Ordner: {subdir_path}")
shutil.rmtree(subdir_path)
except ValueError:
# Ungültiges Datumsformat → überspringen
pass
except Exception as e:
logger.error(f"Fehler beim Löschen von {subdir_path}: {e}")
# ── Hilfsfunktion - globale Sounds laden ────────────────────────────────────────────────────────────────
def load_default_sounds():
path = os.path.join(app.config['CONFIG_DIR'], 'default_sounds.json')
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# sounds = data.get('global_sounds', data.get('sounds', [])) # flexibel
sounds = data.get('global_sounds', [])
logger.info(f"Globale Default-Sounds geladen: {len(sounds)} Einträge")
return sounds
except Exception as e:
logger.error(f"Fehler beim Laden default_sounds.json: {e}")
return []
logger.info("Keine default_sounds.json gefunden")
return []
# ── Bluetooth ────────────────────────────────────────────────────────────────
from mkconnect.mouldking.MouldKing import MouldKing
from mkconnect.tracer.TracerConsole import TracerConsole
from mkconnect.advertiser.AdvertiserBTSocket import AdvertiserBTSocket
app = Flask(__name__)
app.config['CONFIG_DIR'] = 'configs'
os.makedirs(app.config['CONFIG_DIR'], exist_ok=True)
# ── Logging ────────────────────────────────────────────────────────────────
# Täglicher Unterordner
today = datetime.now().strftime('%Y-%m')
current_log_subdir = os.path.join(LOG_DIR, today)
os.makedirs(current_log_subdir, exist_ok=True)
# Basis-Dateiname (ohne Endung)
base_filename = os.path.join(current_log_subdir, datetime.now().strftime('%d'))
# Handler für INFO+
info_handler = logging.FileHandler(f"{base_filename}-info.log")
info_handler.setLevel(logging.INFO)
info_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Handler für WARNING+ (inkl. ERROR, CRITICAL)
error_handler = logging.FileHandler(f"{base_filename}-error.log")
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s\n%(pathname)s:%(lineno)d\n',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Logger konfigurieren
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # DEBUG, damit alles durchkommt
logger.handlers.clear() # Alte Handler entfernen
logger.addHandler(info_handler)
logger.addHandler(error_handler)
# Optional: Auch in Konsole
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(info_handler.formatter)
logger.addHandler(console)
logger.info("Logging getrennt initialisiert: info.log + error.log")
# Bluetooth-Komponenten (einmalig initialisieren)
tracer = TracerConsole()
advertiser = AdvertiserBTSocket() # ← ohne tracer, wie korrigiert
# Globale Zustände (für Einzelbenutzer / Entwicklung später Session/DB)
current_config = None
current_hub: MouldKing | None = None
current_module = None # Module6_0 oder Module4_0
current_device = None # Device0/1/2 je nach hub_id
import threading
import time
# Globale Variable für den letzten erfolgreichen Check (optional)
last_successful_check = time.time()
def connection_monitor():
"""Background-Thread: Prüft alle 5 Sekunden, ob der Hub noch antwortet"""
global current_device, current_module, current_hub, last_successful_check
while True:
if current_device is not None:
try:
# Harter Test: Kanal 0 kurz auf 0.1 und zurück auf 0.0 setzen
current_device.SetChannel(0, 0.1)
time.sleep(0.05) # winzige Pause
current_device.SetChannel(0, 0.0)
last_successful_check = time.time()
logger.debug("Monitor: Hub antwortet → SetChannel-Test OK")
except Exception as e:
logger.warning(f"Monitor: Hub scheint weg zu sein: {str(e)}")
# Sauber trennen
try:
current_device.Disconnect()
except:
pass
current_device = None
current_module = None
current_hub = None
# Optional: Frontend benachrichtigen (z. B. über WebSocket später)
time.sleep(5) # Prüfintervall: 5 Sekunden
# Thread starten (daemon=True → beendet sich mit der App)
monitor_thread = threading.Thread(target=connection_monitor, daemon=True)
monitor_thread.start()
logger.info("Background-Monitor für Hub-Verbindung gestartet")
def load_configs():
configs = []
for filename in os.listdir(app.config['CONFIG_DIR']):
if filename.lower().endswith('.json') and filename != 'default_sounds.json':
path = os.path.join(app.config['CONFIG_DIR'], filename)
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
data['filename'] = filename
data.setdefault('name', filename.replace('.json', '').replace('_', ' ').title())
configs.append(data)
except Exception as e:
logger.error(f"Fehler beim Laden {filename}: {e}")
return sorted(configs, key=lambda x: x.get('name', ''))
@app.route('/')
def index():
return render_template('index.html', configs=load_configs())
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
return "Datei nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config geladen: {filename}")
return redirect(url_for('control_page'))
except Exception as e:
logger.error(f"Config-Ladefehler {filename}: {e}")
return "Fehler beim Laden", 500
"""
@app.route('/load_config/<filename>')
def load_config(filename):
global current_config
path = os.path.join(app.config['CONFIG_DIR'], filename)
if not os.path.exists(path):
logger.error(f"Config nicht gefunden: {path}")
return "Konfiguration nicht gefunden", 404
try:
with open(path, 'r', encoding='utf-8') as f:
current_config = json.load(f)
current_config['filename'] = filename
logger.info(f"Config erfolgreich geladen: {filename} mit {len(current_config.get('channels', []))} Kanälen")
return redirect(url_for('control_page'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
return "Ungültiges JSON-Format in der Konfigurationsdatei", 500
except Exception as e:
logger.error(f"Ladefehler {filename}: {e}")
return "Fehler beim Laden der Konfiguration", 500
"""
@app.route('/control')
def control_page():
if current_config is None:
return redirect(url_for('index'))
return render_template('control.html', config=current_config)
"""
@app.route('/control')
def control_page():
if current_config is None:
logger.warning("current_config ist None → Redirect zu index")
return redirect(url_for('index'))
# Globale Sounds immer laden
global_sounds = load_default_sounds()
logger.info(f"Übergebe config an Template: {current_config}")
print("DEBUG: config hat channels?", 'channels' in current_config, len(current_config.get('channels', [])))
return render_template(
'control.html',
config=current_config,
global_sounds=global_sounds
)
@app.route('/soundboard')
def soundboard():
if current_config is None or 'sounds' not in current_config:
return redirect(url_for('index'))
sounds = current_config.get('sounds', [])
return render_template('soundboard.html', sounds=sounds, config=current_config)
# ── Admin ────────────────────────────────────────────────────────────────────
@app.route('/admin')
def admin():
configs = load_configs()
return render_template('admin.html', configs=configs)
@app.route('/admin/edit/<filename>', methods=['GET', 'POST'])
def admin_edit_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if request.method == 'POST':
try:
new_content = request.form.get('config_content')
if not new_content:
raise ValueError("Kein Inhalt übermittelt")
# Validierung: versuche zu parsen
json.loads(new_content)
with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info(f"Config {filename} erfolgreich gespeichert")
return redirect(url_for('admin'))
except json.JSONDecodeError as e:
logger.error(f"Ungültiges JSON in {filename}: {e}")
error_msg = f"Ungültiges JSON: {str(e)}"
except Exception as e:
logger.error(f"Speicherfehler {filename}: {e}")
error_msg = f"Fehler beim Speichern: {str(e)}"
# Bei Fehler: zurück zum Formular mit Fehlermeldung
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content, error=error_msg)
# GET: Formular laden
if not os.path.exists(path):
return "Konfiguration nicht gefunden", 404
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('admin_edit.html', filename=filename, content=content)
@app.route('/admin/delete/<filename>', methods=['POST'])
def admin_delete_config(filename):
path = os.path.join(app.config['CONFIG_DIR'], filename)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"Config gelöscht: {filename}")
return jsonify({"success": True, "message": f"{filename} gelöscht"})
except Exception as e:
logger.error(f"Löschfehler {filename}: {e}")
return jsonify({"success": False, "message": str(e)}), 500
return jsonify({"success": False, "message": "Datei nicht gefunden"}), 404
@app.route('/admin/logs')
@app.route('/admin/logs/<date>')
def admin_logs(date=None):
if date is None:
date = datetime.now().strftime('%d')
today = datetime.now().strftime('%Y-%m') # ← Hier definieren!
current_month = datetime.now().strftime('%Y-%m')
log_dir = os.path.join(LOG_DIR, today)
if not os.path.exists(log_dir):
return render_template('admin_logs.html', logs="Keine Logs für diesen Tag.", date=date, dates=[], today=today)
# Verfügbare Tage ...
dates = sorted([
f.split('-')[0] for f in os.listdir(log_dir)
if f.endswith('-info.log') or f.endswith('-error.log')
], reverse=True)
# Logs laden ...
info_path = os.path.join(log_dir, f"{date}-info.log")
error_path = os.path.join(log_dir, f"{date}-error.log")
logs = []
if os.path.exists(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
logs.append("=== INFO-LOG ===\n" + f.read())
if os.path.exists(error_path):
with open(error_path, 'r', encoding='utf-8') as f:
logs.append("=== ERROR-LOG ===\n" + f.read())
log_content = "\n\n".join(logs) if logs else "Keine Logs für diesen Tag."
return render_template('admin_logs.html',
logs=log_content,
date=date,
dates=dates,
today=current_month)
@app.route('/configs/<path:filename>')
def serve_config_file(filename):
return send_from_directory(app.config['CONFIG_DIR'], filename)
# ── API ──────────────────────────────────────────────────────────────────────
@app.route('/api/connect', methods=['POST'])
def api_connect():
global current_hub, current_module, current_device
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel') # '4channel' oder '6channel'
# Alte Verbindung sauber trennen
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
if current_module is not None:
# ggf. Stop vor Disconnect
try:
current_device.Stop()
except:
pass
# MouldKing neu initialisieren
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
# Modul holen
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
# Device auswählen (hub_id = 0,1,2 → Device0,1,2)
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt (max 0-2)")
current_device = getattr(current_module, device_attr)
# Verbinden
current_device.Connect()
current_hub = mk # falls später noch gebraucht
logger.info(f"Verbunden: {hub_type} Hub-ID {hub_id}{device_attr}")
return jsonify({"success": True, "message": f"Hub {hub_id} ({hub_type}) verbunden"})
except Exception as e:
logger.exception("Connect-Fehler")
return jsonify({"success": False, "message": f"Verbindungsfehler: {str(e)}"}), 500
@app.route('/api/status', methods=['GET'])
def api_status():
global current_device
if current_device is not None:
return jsonify({
"connected": True,
"message": "Verbunden"
})
else:
return jsonify({
"connected": False,
"message": "Keine aktive Verbindung"
}), 200
@app.route('/api/control', methods=['POST'])
def api_control():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
data = request.get_json()
port = data['port'] # 'A', 'B', ...
value = float(data['value'])
# Port → channelId mappen (A=0, B=1, ...)
channel_map = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5}
if isinstance(port, str):
port_upper = port.upper()
if port_upper in channel_map:
channel_id = channel_map[port_upper]
else:
raise ValueError(f"Ungültiger Port: {port}")
else:
channel_id = int(port)
# Config-spezifische Anpassungen (invert, negative_only, on/off-Werte)
ch_cfg = next((c for c in current_config['channels'] if c['port'].upper() == port.upper()), None)
if ch_cfg:
if ch_cfg.get('invert', False):
value = -value
if ch_cfg.get('negative_only', False) and value >= 0:
value = -abs(value)
if ch_cfg['type'] != 'motor':
value = ch_cfg.get('on_value', 1.0) if value != 0 else ch_cfg.get('off_value', 0.0)
# Echter Aufruf!
current_device.SetChannel(channel_id, value)
logger.info(f"SetChannel({channel_id}, {value:.2f}) → Port {port}")
return jsonify({"success": True})
except Exception as e:
logger.exception("Control-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/stop_all', methods=['POST'])
def api_stop_all():
global current_device
if current_device is None:
return jsonify({"success": False, "message": "Nicht verbunden"}), 400
try:
# Nur stoppen KEIN Disconnect!
current_device.Stop()
logger.info("Alle Kanäle gestoppt (Verbindung bleibt bestehen)")
return jsonify({"success": True, "message": "Alle Kanäle gestoppt"})
except Exception as e:
logger.exception("Stop-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/reconnect', methods=['POST'])
def api_reconnect():
global current_device, current_module, current_hub
if current_config is None:
return jsonify({"success": False, "message": "Keine Konfiguration geladen"}), 400
try:
# Alte Verbindung sauber beenden, falls nötig
if current_device is not None:
try:
current_device.Disconnect()
except:
pass
# Neu verbinden (kopierter Code aus api_connect)
hub_id = int(current_config.get('hub_id', 0))
hub_type = current_config.get('hub_type', '6channel')
mk = MouldKing()
mk.SetAdvertiser(advertiser)
try:
mk.SetTracer(tracer)
except:
pass
if hub_type.lower() in ['6channel', '6']:
current_module = mk.Module6_0()
else:
current_module = mk.Module4_0()
device_attr = f'Device{hub_id}'
if not hasattr(current_module, device_attr):
raise ValueError(f"Hub-ID {hub_id} nicht unterstützt")
current_device = getattr(current_module, device_attr)
current_device.Connect()
current_hub = mk
logger.info(f"Re-Connect erfolgreich: Hub {hub_id}")
return jsonify({"success": True, "message": "Verbindung wiederhergestellt"})
except Exception as e:
logger.exception("Re-Connect-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/play_sound', methods=['POST'])
def api_play_sound():
try:
data = request.get_json()
sound_id = data.get('sound_id')
if not sound_id:
return jsonify({"success": False, "message": "Kein sound_id angegeben"}), 400
# Sound aus aktueller Config suchen
if current_config is None or 'sounds' not in current_config:
return jsonify({"success": False, "message": "Keine Sounds in der aktuellen Konfiguration"}), 400
sound_entry = next((s for s in current_config['sounds'] if s['id'] == sound_id), None)
if not sound_entry:
return jsonify({"success": False, "message": f"Sound mit ID '{sound_id}' nicht gefunden"}), 404
file_path = os.path.join('sounds', sound_entry['file'])
if not os.path.exists(file_path):
logger.error(f"Sound-Datei nicht gefunden: {file_path}")
return jsonify({"success": False, "message": "Sound-Datei nicht gefunden"}), 404
# Sound in separatem Thread abspielen (blockiert nicht)
def play():
try:
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
time.sleep(0.1)
except Exception as e:
logger.error(f"Fehler beim Abspielen von {file_path}: {e}")
threading.Thread(target=play, daemon=True).start()
logger.info(f"Spiele Sound: {sound_entry['name']} ({sound_id})")
return jsonify({"success": True, "message": f"Spiele: {sound_entry['name']}"})
except Exception as e:
logger.exception("Play-Sound-Fehler")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/set_volume', methods=['POST'])
def api_set_volume():
try:
data = request.get_json()
vol = float(data.get('volume', 0.8))
vol = max(0.0, min(1.0, vol))
pygame.mixer.music.set_volume(vol)
logger.info(f"Volume auf {vol} gesetzt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Volume-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/api/stop_sound', methods=['POST'])
def api_stop_sound():
try:
pygame.mixer.music.stop()
logger.info("Aktueller Sound gestoppt")
return jsonify({"success": True})
except Exception as e:
logger.error(f"Stop-Sound-Fehler: {e}")
return jsonify({"success": False, "message": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
# Beim Start aufrufen
cleanup_old_log_dirs(90)
logger.info("Alte Log-Ordner bereinigt")
def daily_cleanup():
while True:
cleanup_old_log_dirs(90)
time.sleep(86400) # 24 Stunden
cleanup_thread = threading.Thread(target=daily_cleanup, daemon=True)
cleanup_thread.start()