- Neue Klasse OutageStatistics für automatische Störungsaufzeichnung - Sammelt Häufigkeit und Dauer von Ausfällen - Parameter --stats/-s zum Senden von Statistik-Berichten über Apprise - Erweiterte Kommandozeilen-Argumente (--debug/-d) - Automatische Aufzeichnung von Störungsbeginn und -ende - Detaillierte Statistiken mit Trends und Service-spezifischen Daten - Umfassende Dokumentation in README_STATISTICS.md
240 lines
10 KiB
Python
240 lines
10 KiB
Python
import json
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict, Counter
|
|
import statistics
|
|
|
|
STATS_FILE = "ti_outage_statistics.json"
|
|
|
|
class OutageStatistics:
|
|
def __init__(self):
|
|
self.stats_file = STATS_FILE
|
|
self.stats = self.load_statistics()
|
|
|
|
def load_statistics(self):
|
|
"""Lädt gespeicherte Statistiken aus der JSON-Datei"""
|
|
try:
|
|
if os.path.exists(self.stats_file):
|
|
with open(self.stats_file, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
print(f"Warnung: Konnte Statistiken nicht laden: {e}")
|
|
|
|
# Standard-Struktur für neue Statistiken
|
|
return {
|
|
"outages": [],
|
|
"services": {},
|
|
"summary": {
|
|
"total_outages": 0,
|
|
"total_duration_minutes": 0,
|
|
"average_duration_minutes": 0,
|
|
"longest_outage_minutes": 0,
|
|
"most_affected_service": "",
|
|
"last_updated": ""
|
|
}
|
|
}
|
|
|
|
def save_statistics(self):
|
|
"""Speichert Statistiken in die JSON-Datei"""
|
|
try:
|
|
with open(self.stats_file, "w", encoding="utf-8") as f:
|
|
json.dump(self.stats, f, ensure_ascii=False, indent=2)
|
|
except IOError as e:
|
|
print(f"Fehler beim Speichern der Statistiken: {e}")
|
|
|
|
def record_outage_start(self, service_name, outage_type, timestamp=None):
|
|
"""Zeichnet den Beginn einer Störung auf"""
|
|
if timestamp is None:
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Prüfe ob bereits eine aktive Störung für diesen Service existiert
|
|
for outage in self.stats["outages"]:
|
|
if (outage["service"] == service_name and
|
|
outage["status"] == "active" and
|
|
outage["type"] == outage_type):
|
|
# Störung bereits aktiv, nicht erneut aufzeichnen
|
|
return
|
|
|
|
outage_record = {
|
|
"service": service_name,
|
|
"type": outage_type,
|
|
"start_time": timestamp,
|
|
"end_time": None,
|
|
"status": "active",
|
|
"duration_minutes": None
|
|
}
|
|
|
|
self.stats["outages"].append(outage_record)
|
|
self.update_service_stats(service_name, "start")
|
|
self.save_statistics()
|
|
|
|
def record_outage_end(self, service_name, timestamp=None):
|
|
"""Zeichnet das Ende einer Störung auf"""
|
|
if timestamp is None:
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Finde die aktive Störung für diesen Service
|
|
for outage in self.stats["outages"]:
|
|
if outage["service"] == service_name and outage["status"] == "active":
|
|
outage["end_time"] = timestamp
|
|
outage["status"] = "resolved"
|
|
|
|
# Berechne Dauer
|
|
start_time = datetime.fromisoformat(outage["start_time"])
|
|
end_time = datetime.fromisoformat(timestamp)
|
|
duration = end_time - start_time
|
|
outage["duration_minutes"] = int(duration.total_seconds() / 60)
|
|
|
|
self.update_service_stats(service_name, "end", outage["duration_minutes"])
|
|
break
|
|
|
|
self.save_statistics()
|
|
|
|
def update_service_stats(self, service_name, event_type, duration_minutes=None):
|
|
"""Aktualisiert die Service-spezifischen Statistiken"""
|
|
if service_name not in self.stats["services"]:
|
|
self.stats["services"][service_name] = {
|
|
"total_outages": 0,
|
|
"total_duration_minutes": 0,
|
|
"average_duration_minutes": 0,
|
|
"longest_outage_minutes": 0,
|
|
"last_outage": None
|
|
}
|
|
|
|
service_stats = self.stats["services"][service_name]
|
|
|
|
if event_type == "start":
|
|
service_stats["total_outages"] += 1
|
|
service_stats["last_outage"] = datetime.now().isoformat()
|
|
elif event_type == "end" and duration_minutes:
|
|
service_stats["total_duration_minutes"] += duration_minutes
|
|
service_stats["average_duration_minutes"] = (
|
|
service_stats["total_duration_minutes"] / service_stats["total_outages"]
|
|
)
|
|
if duration_minutes > service_stats["longest_outage_minutes"]:
|
|
service_stats["longest_outage_minutes"] = duration_minutes
|
|
|
|
def update_summary_stats(self):
|
|
"""Aktualisiert die Zusammenfassungs-Statistiken"""
|
|
active_outages = [o for o in self.stats["outages"] if o["status"] == "active"]
|
|
resolved_outages = [o for o in self.stats["outages"] if o["status"] == "resolved"]
|
|
|
|
total_duration = sum(o.get("duration_minutes", 0) for o in resolved_outages)
|
|
durations = [o.get("duration_minutes", 0) for o in resolved_outages if o.get("duration_minutes")]
|
|
|
|
self.stats["summary"] = {
|
|
"total_outages": len(resolved_outages),
|
|
"active_outages": len(active_outages),
|
|
"total_duration_minutes": total_duration,
|
|
"average_duration_minutes": statistics.mean(durations) if durations else 0,
|
|
"longest_outage_minutes": max(durations) if durations else 0,
|
|
"most_affected_service": self.get_most_affected_service(),
|
|
"last_updated": datetime.now().isoformat()
|
|
}
|
|
|
|
def get_most_affected_service(self):
|
|
"""Ermittelt den am stärksten betroffenen Service"""
|
|
if not self.stats["services"]:
|
|
return ""
|
|
|
|
most_affected = max(
|
|
self.stats["services"].items(),
|
|
key=lambda x: x[1]["total_outages"]
|
|
)
|
|
return most_affected[0]
|
|
|
|
def get_recent_outages(self, days=30):
|
|
"""Gibt Störungen der letzten X Tage zurück"""
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
recent_outages = []
|
|
|
|
for outage in self.stats["outages"]:
|
|
if outage["status"] == "resolved" and outage["end_time"]:
|
|
end_time = datetime.fromisoformat(outage["end_time"])
|
|
if end_time >= cutoff_date:
|
|
recent_outages.append(outage)
|
|
|
|
return sorted(recent_outages, key=lambda x: x["end_time"], reverse=True)
|
|
|
|
def get_outage_trends(self, days=30):
|
|
"""Analysiert Trends in den Störungen"""
|
|
recent_outages = self.get_recent_outages(days)
|
|
|
|
# Gruppiere nach Datum
|
|
daily_outages = defaultdict(list)
|
|
for outage in recent_outages:
|
|
date = outage["end_time"][:10] # YYYY-MM-DD
|
|
daily_outages[date].append(outage)
|
|
|
|
# Berechne Durchschnitt pro Tag
|
|
daily_counts = [len(outages) for outages in daily_outages.values()]
|
|
daily_durations = []
|
|
for outages in daily_outages.values():
|
|
daily_duration = sum(o.get("duration_minutes", 0) for o in outages)
|
|
daily_durations.append(daily_duration)
|
|
|
|
return {
|
|
"daily_average_outages": statistics.mean(daily_counts) if daily_counts else 0,
|
|
"daily_average_duration": statistics.mean(daily_durations) if daily_durations else 0,
|
|
"total_days_with_outages": len(daily_outages),
|
|
"most_outages_in_day": max(daily_counts) if daily_counts else 0
|
|
}
|
|
|
|
def generate_statistics_report(self):
|
|
"""Generiert einen detaillierten Statistik-Bericht"""
|
|
self.update_summary_stats()
|
|
|
|
report = []
|
|
report.append("📊 **TI-Status Ausfall-Statistiken**")
|
|
report.append("=" * 50)
|
|
|
|
# Zusammenfassung
|
|
summary = self.stats["summary"]
|
|
report.append(f"**Zusammenfassung:**")
|
|
report.append(f"• Gesamte Störungen: {summary['total_outages']}")
|
|
report.append(f"• Aktive Störungen: {summary['active_outages']}")
|
|
report.append(f"• Gesamtdauer: {summary['total_duration_minutes']} Minuten")
|
|
report.append(f"• Durchschnittsdauer: {summary['average_duration_minutes']:.1f} Minuten")
|
|
report.append(f"• Längste Störung: {summary['longest_outage_minutes']} Minuten")
|
|
report.append(f"• Am stärksten betroffen: {summary['most_affected_service']}")
|
|
report.append("")
|
|
|
|
# Service-spezifische Statistiken
|
|
if self.stats["services"]:
|
|
report.append("**Service-Statistiken:**")
|
|
for service, stats in sorted(
|
|
self.stats["services"].items(),
|
|
key=lambda x: x[1]["total_outages"],
|
|
reverse=True
|
|
)[:10]: # Top 10 Services
|
|
report.append(f"• **{service.upper()}**:")
|
|
report.append(f" - Störungen: {stats['total_outages']}")
|
|
report.append(f" - Gesamtdauer: {stats['total_duration_minutes']} Min")
|
|
report.append(f" - Durchschnitt: {stats['average_duration_minutes']:.1f} Min")
|
|
report.append(f" - Längste: {stats['longest_outage_minutes']} Min")
|
|
report.append("")
|
|
|
|
# Trends der letzten 30 Tage
|
|
trends = self.get_outage_trends(30)
|
|
report.append("**Trends (letzte 30 Tage):**")
|
|
report.append(f"• Durchschnitt Störungen/Tag: {trends['daily_average_outages']:.1f}")
|
|
report.append(f"• Durchschnitt Dauer/Tag: {trends['daily_average_duration']:.1f} Min")
|
|
report.append(f"• Tage mit Störungen: {trends['total_days_with_outages']}")
|
|
report.append(f"• Max. Störungen an einem Tag: {trends['most_outages_in_day']}")
|
|
report.append("")
|
|
|
|
# Letzte Störungen
|
|
recent_outages = self.get_recent_outages(7) # Letzte 7 Tage
|
|
if recent_outages:
|
|
report.append("**Letzte Störungen (7 Tage):**")
|
|
for outage in recent_outages[:5]: # Top 5
|
|
duration = outage.get("duration_minutes", 0)
|
|
start_time = datetime.fromisoformat(outage["start_time"]).strftime("%d.%m. %H:%M")
|
|
end_time = datetime.fromisoformat(outage["end_time"]).strftime("%d.%m. %H:%M")
|
|
report.append(f"• **{outage['service'].upper()}** ({outage['type']})")
|
|
report.append(f" - {start_time} - {end_time} ({duration} Min)")
|
|
|
|
report.append("")
|
|
report.append(f"_Bericht generiert am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_")
|
|
|
|
return "\n".join(report) |