import json import os from datetime import datetime, timedelta from collections import defaultdict, Counter import statistics STATS_FILE = "ti_outage_statistics.json" class OutageStatistics: def __init__(self): self.stats_file = STATS_FILE self.stats = self.load_statistics() def load_statistics(self): """Lädt gespeicherte Statistiken aus der JSON-Datei""" try: if os.path.exists(self.stats_file): with open(self.stats_file, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: print(f"Warnung: Konnte Statistiken nicht laden: {e}") # Standard-Struktur für neue Statistiken return { "outages": [], "services": {}, "summary": { "total_outages": 0, "total_duration_minutes": 0, "average_duration_minutes": 0, "longest_outage_minutes": 0, "most_affected_service": "", "last_updated": "" } } def save_statistics(self): """Speichert Statistiken in die JSON-Datei""" try: with open(self.stats_file, "w", encoding="utf-8") as f: json.dump(self.stats, f, ensure_ascii=False, indent=2) except IOError as e: print(f"Fehler beim Speichern der Statistiken: {e}") def record_outage_start(self, service_name, outage_type, timestamp=None): """Zeichnet den Beginn einer Störung auf""" if timestamp is None: timestamp = datetime.now().isoformat() # Prüfe ob bereits eine aktive Störung für diesen Service existiert for outage in self.stats["outages"]: if (outage["service"] == service_name and outage["status"] == "active" and outage["type"] == outage_type): # Störung bereits aktiv, nicht erneut aufzeichnen return outage_record = { "service": service_name, "type": outage_type, "start_time": timestamp, "end_time": None, "status": "active", "duration_minutes": None } self.stats["outages"].append(outage_record) self.update_service_stats(service_name, "start") self.save_statistics() def record_outage_end(self, service_name, timestamp=None): """Zeichnet das Ende einer Störung auf""" if timestamp is None: timestamp = datetime.now().isoformat() # Finde die aktive Störung für diesen Service for outage in self.stats["outages"]: if outage["service"] == service_name and outage["status"] == "active": outage["end_time"] = timestamp outage["status"] = "resolved" # Berechne Dauer start_time = datetime.fromisoformat(outage["start_time"]) end_time = datetime.fromisoformat(timestamp) duration = end_time - start_time outage["duration_minutes"] = int(duration.total_seconds() / 60) self.update_service_stats(service_name, "end", outage["duration_minutes"]) break self.save_statistics() def update_service_stats(self, service_name, event_type, duration_minutes=None): """Aktualisiert die Service-spezifischen Statistiken""" if service_name not in self.stats["services"]: self.stats["services"][service_name] = { "total_outages": 0, "total_duration_minutes": 0, "average_duration_minutes": 0, "longest_outage_minutes": 0, "last_outage": None } service_stats = self.stats["services"][service_name] if event_type == "start": service_stats["total_outages"] += 1 service_stats["last_outage"] = datetime.now().isoformat() elif event_type == "end" and duration_minutes: service_stats["total_duration_minutes"] += duration_minutes service_stats["average_duration_minutes"] = ( service_stats["total_duration_minutes"] / service_stats["total_outages"] ) if duration_minutes > service_stats["longest_outage_minutes"]: service_stats["longest_outage_minutes"] = duration_minutes def update_summary_stats(self): """Aktualisiert die Zusammenfassungs-Statistiken""" active_outages = [o for o in self.stats["outages"] if o["status"] == "active"] resolved_outages = [o for o in self.stats["outages"] if o["status"] == "resolved"] total_duration = sum(o.get("duration_minutes", 0) for o in resolved_outages) durations = [o.get("duration_minutes", 0) for o in resolved_outages if o.get("duration_minutes")] self.stats["summary"] = { "total_outages": len(resolved_outages), "active_outages": len(active_outages), "total_duration_minutes": total_duration, "average_duration_minutes": statistics.mean(durations) if durations else 0, "longest_outage_minutes": max(durations) if durations else 0, "most_affected_service": self.get_most_affected_service(), "last_updated": datetime.now().isoformat() } def get_most_affected_service(self): """Ermittelt den am stärksten betroffenen Service""" if not self.stats["services"]: return "" most_affected = max( self.stats["services"].items(), key=lambda x: x[1]["total_outages"] ) return most_affected[0] def get_recent_outages(self, days=30): """Gibt Störungen der letzten X Tage zurück""" cutoff_date = datetime.now() - timedelta(days=days) recent_outages = [] for outage in self.stats["outages"]: if outage["status"] == "resolved" and outage["end_time"]: end_time = datetime.fromisoformat(outage["end_time"]) if end_time >= cutoff_date: recent_outages.append(outage) return sorted(recent_outages, key=lambda x: x["end_time"], reverse=True) def get_outage_trends(self, days=30): """Analysiert Trends in den Störungen""" recent_outages = self.get_recent_outages(days) # Gruppiere nach Datum daily_outages = defaultdict(list) for outage in recent_outages: date = outage["end_time"][:10] # YYYY-MM-DD daily_outages[date].append(outage) # Berechne Durchschnitt pro Tag daily_counts = [len(outages) for outages in daily_outages.values()] daily_durations = [] for outages in daily_outages.values(): daily_duration = sum(o.get("duration_minutes", 0) for o in outages) daily_durations.append(daily_duration) return { "daily_average_outages": statistics.mean(daily_counts) if daily_counts else 0, "daily_average_duration": statistics.mean(daily_durations) if daily_durations else 0, "total_days_with_outages": len(daily_outages), "most_outages_in_day": max(daily_counts) if daily_counts else 0 } def generate_statistics_report(self): """Generiert einen detaillierten Statistik-Bericht""" self.update_summary_stats() report = [] report.append("📊 **TI-Status Ausfall-Statistiken**") report.append("=" * 50) # Zusammenfassung summary = self.stats["summary"] report.append(f"**Zusammenfassung:**") report.append(f"• Gesamte Störungen: {summary['total_outages']}") report.append(f"• Aktive Störungen: {summary['active_outages']}") report.append(f"• Gesamtdauer: {summary['total_duration_minutes']} Minuten") report.append(f"• Durchschnittsdauer: {summary['average_duration_minutes']:.1f} Minuten") report.append(f"• Längste Störung: {summary['longest_outage_minutes']} Minuten") report.append(f"• Am stärksten betroffen: {summary['most_affected_service']}") report.append("") # Service-spezifische Statistiken if self.stats["services"]: report.append("**Service-Statistiken:**") for service, stats in sorted( self.stats["services"].items(), key=lambda x: x[1]["total_outages"], reverse=True )[:10]: # Top 10 Services report.append(f"• **{service.upper()}**:") report.append(f" - Störungen: {stats['total_outages']}") report.append(f" - Gesamtdauer: {stats['total_duration_minutes']} Min") report.append(f" - Durchschnitt: {stats['average_duration_minutes']:.1f} Min") report.append(f" - Längste: {stats['longest_outage_minutes']} Min") report.append("") # Trends der letzten 30 Tage trends = self.get_outage_trends(30) report.append("**Trends (letzte 30 Tage):**") report.append(f"• Durchschnitt Störungen/Tag: {trends['daily_average_outages']:.1f}") report.append(f"• Durchschnitt Dauer/Tag: {trends['daily_average_duration']:.1f} Min") report.append(f"• Tage mit Störungen: {trends['total_days_with_outages']}") report.append(f"• Max. Störungen an einem Tag: {trends['most_outages_in_day']}") report.append("") # Letzte Störungen recent_outages = self.get_recent_outages(7) # Letzte 7 Tage if recent_outages: report.append("**Letzte Störungen (7 Tage):**") for outage in recent_outages[:5]: # Top 5 duration = outage.get("duration_minutes", 0) start_time = datetime.fromisoformat(outage["start_time"]).strftime("%d.%m. %H:%M") end_time = datetime.fromisoformat(outage["end_time"]).strftime("%d.%m. %H:%M") report.append(f"• **{outage['service'].upper()}** ({outage['type']})") report.append(f" - {start_time} - {end_time} ({duration} Min)") report.append("") report.append(f"_Bericht generiert am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_") return "\n".join(report)