Files
TI-Status-Bot/ti_statistics.py
Markus Busche f79fc24b3b feat: Statistik-Funktionalität für TI-Status Checker hinzugefügt
- Neue Klasse OutageStatistics für automatische Störungsaufzeichnung
- Sammelt Häufigkeit und Dauer von Ausfällen
- Parameter --stats/-s zum Senden von Statistik-Berichten über Apprise
- Erweiterte Kommandozeilen-Argumente (--debug/-d)
- Automatische Aufzeichnung von Störungsbeginn und -ende
- Detaillierte Statistiken mit Trends und Service-spezifischen Daten
- Umfassende Dokumentation in README_STATISTICS.md
2025-08-11 09:44:58 +02:00

240 lines
10 KiB
Python

import json
import os
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import statistics
STATS_FILE = "ti_outage_statistics.json"
class OutageStatistics:
def __init__(self):
self.stats_file = STATS_FILE
self.stats = self.load_statistics()
def load_statistics(self):
"""Lädt gespeicherte Statistiken aus der JSON-Datei"""
try:
if os.path.exists(self.stats_file):
with open(self.stats_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Warnung: Konnte Statistiken nicht laden: {e}")
# Standard-Struktur für neue Statistiken
return {
"outages": [],
"services": {},
"summary": {
"total_outages": 0,
"total_duration_minutes": 0,
"average_duration_minutes": 0,
"longest_outage_minutes": 0,
"most_affected_service": "",
"last_updated": ""
}
}
def save_statistics(self):
"""Speichert Statistiken in die JSON-Datei"""
try:
with open(self.stats_file, "w", encoding="utf-8") as f:
json.dump(self.stats, f, ensure_ascii=False, indent=2)
except IOError as e:
print(f"Fehler beim Speichern der Statistiken: {e}")
def record_outage_start(self, service_name, outage_type, timestamp=None):
"""Zeichnet den Beginn einer Störung auf"""
if timestamp is None:
timestamp = datetime.now().isoformat()
# Prüfe ob bereits eine aktive Störung für diesen Service existiert
for outage in self.stats["outages"]:
if (outage["service"] == service_name and
outage["status"] == "active" and
outage["type"] == outage_type):
# Störung bereits aktiv, nicht erneut aufzeichnen
return
outage_record = {
"service": service_name,
"type": outage_type,
"start_time": timestamp,
"end_time": None,
"status": "active",
"duration_minutes": None
}
self.stats["outages"].append(outage_record)
self.update_service_stats(service_name, "start")
self.save_statistics()
def record_outage_end(self, service_name, timestamp=None):
"""Zeichnet das Ende einer Störung auf"""
if timestamp is None:
timestamp = datetime.now().isoformat()
# Finde die aktive Störung für diesen Service
for outage in self.stats["outages"]:
if outage["service"] == service_name and outage["status"] == "active":
outage["end_time"] = timestamp
outage["status"] = "resolved"
# Berechne Dauer
start_time = datetime.fromisoformat(outage["start_time"])
end_time = datetime.fromisoformat(timestamp)
duration = end_time - start_time
outage["duration_minutes"] = int(duration.total_seconds() / 60)
self.update_service_stats(service_name, "end", outage["duration_minutes"])
break
self.save_statistics()
def update_service_stats(self, service_name, event_type, duration_minutes=None):
"""Aktualisiert die Service-spezifischen Statistiken"""
if service_name not in self.stats["services"]:
self.stats["services"][service_name] = {
"total_outages": 0,
"total_duration_minutes": 0,
"average_duration_minutes": 0,
"longest_outage_minutes": 0,
"last_outage": None
}
service_stats = self.stats["services"][service_name]
if event_type == "start":
service_stats["total_outages"] += 1
service_stats["last_outage"] = datetime.now().isoformat()
elif event_type == "end" and duration_minutes:
service_stats["total_duration_minutes"] += duration_minutes
service_stats["average_duration_minutes"] = (
service_stats["total_duration_minutes"] / service_stats["total_outages"]
)
if duration_minutes > service_stats["longest_outage_minutes"]:
service_stats["longest_outage_minutes"] = duration_minutes
def update_summary_stats(self):
"""Aktualisiert die Zusammenfassungs-Statistiken"""
active_outages = [o for o in self.stats["outages"] if o["status"] == "active"]
resolved_outages = [o for o in self.stats["outages"] if o["status"] == "resolved"]
total_duration = sum(o.get("duration_minutes", 0) for o in resolved_outages)
durations = [o.get("duration_minutes", 0) for o in resolved_outages if o.get("duration_minutes")]
self.stats["summary"] = {
"total_outages": len(resolved_outages),
"active_outages": len(active_outages),
"total_duration_minutes": total_duration,
"average_duration_minutes": statistics.mean(durations) if durations else 0,
"longest_outage_minutes": max(durations) if durations else 0,
"most_affected_service": self.get_most_affected_service(),
"last_updated": datetime.now().isoformat()
}
def get_most_affected_service(self):
"""Ermittelt den am stärksten betroffenen Service"""
if not self.stats["services"]:
return ""
most_affected = max(
self.stats["services"].items(),
key=lambda x: x[1]["total_outages"]
)
return most_affected[0]
def get_recent_outages(self, days=30):
"""Gibt Störungen der letzten X Tage zurück"""
cutoff_date = datetime.now() - timedelta(days=days)
recent_outages = []
for outage in self.stats["outages"]:
if outage["status"] == "resolved" and outage["end_time"]:
end_time = datetime.fromisoformat(outage["end_time"])
if end_time >= cutoff_date:
recent_outages.append(outage)
return sorted(recent_outages, key=lambda x: x["end_time"], reverse=True)
def get_outage_trends(self, days=30):
"""Analysiert Trends in den Störungen"""
recent_outages = self.get_recent_outages(days)
# Gruppiere nach Datum
daily_outages = defaultdict(list)
for outage in recent_outages:
date = outage["end_time"][:10] # YYYY-MM-DD
daily_outages[date].append(outage)
# Berechne Durchschnitt pro Tag
daily_counts = [len(outages) for outages in daily_outages.values()]
daily_durations = []
for outages in daily_outages.values():
daily_duration = sum(o.get("duration_minutes", 0) for o in outages)
daily_durations.append(daily_duration)
return {
"daily_average_outages": statistics.mean(daily_counts) if daily_counts else 0,
"daily_average_duration": statistics.mean(daily_durations) if daily_durations else 0,
"total_days_with_outages": len(daily_outages),
"most_outages_in_day": max(daily_counts) if daily_counts else 0
}
def generate_statistics_report(self):
"""Generiert einen detaillierten Statistik-Bericht"""
self.update_summary_stats()
report = []
report.append("📊 **TI-Status Ausfall-Statistiken**")
report.append("=" * 50)
# Zusammenfassung
summary = self.stats["summary"]
report.append(f"**Zusammenfassung:**")
report.append(f"• Gesamte Störungen: {summary['total_outages']}")
report.append(f"• Aktive Störungen: {summary['active_outages']}")
report.append(f"• Gesamtdauer: {summary['total_duration_minutes']} Minuten")
report.append(f"• Durchschnittsdauer: {summary['average_duration_minutes']:.1f} Minuten")
report.append(f"• Längste Störung: {summary['longest_outage_minutes']} Minuten")
report.append(f"• Am stärksten betroffen: {summary['most_affected_service']}")
report.append("")
# Service-spezifische Statistiken
if self.stats["services"]:
report.append("**Service-Statistiken:**")
for service, stats in sorted(
self.stats["services"].items(),
key=lambda x: x[1]["total_outages"],
reverse=True
)[:10]: # Top 10 Services
report.append(f"• **{service.upper()}**:")
report.append(f" - Störungen: {stats['total_outages']}")
report.append(f" - Gesamtdauer: {stats['total_duration_minutes']} Min")
report.append(f" - Durchschnitt: {stats['average_duration_minutes']:.1f} Min")
report.append(f" - Längste: {stats['longest_outage_minutes']} Min")
report.append("")
# Trends der letzten 30 Tage
trends = self.get_outage_trends(30)
report.append("**Trends (letzte 30 Tage):**")
report.append(f"• Durchschnitt Störungen/Tag: {trends['daily_average_outages']:.1f}")
report.append(f"• Durchschnitt Dauer/Tag: {trends['daily_average_duration']:.1f} Min")
report.append(f"• Tage mit Störungen: {trends['total_days_with_outages']}")
report.append(f"• Max. Störungen an einem Tag: {trends['most_outages_in_day']}")
report.append("")
# Letzte Störungen
recent_outages = self.get_recent_outages(7) # Letzte 7 Tage
if recent_outages:
report.append("**Letzte Störungen (7 Tage):**")
for outage in recent_outages[:5]: # Top 5
duration = outage.get("duration_minutes", 0)
start_time = datetime.fromisoformat(outage["start_time"]).strftime("%d.%m. %H:%M")
end_time = datetime.fromisoformat(outage["end_time"]).strftime("%d.%m. %H:%M")
report.append(f"• **{outage['service'].upper()}** ({outage['type']})")
report.append(f" - {start_time} - {end_time} ({duration} Min)")
report.append("")
report.append(f"_Bericht generiert am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_")
return "\n".join(report)