import requests import json from datetime import datetime import re import os from dotenv import load_dotenv import apprise # Lade Umgebungsvariablen aus .env Datei load_dotenv() TI_API_URL = "https://ti-lage.prod.ccs.gematik.solutions/lageapi/v2/tilage" STATE_FILE = "ti_status_state.json" # Apprise Konfiguration aus Umgebungsvariablen APPRISE_URL = os.getenv('APPRISE_URL') if not APPRISE_URL: raise ValueError("APPRISE_URL muss in der .env Datei definiert sein") def fetch_status_messages(): print(f"Rufe TI-Status-API ab: {TI_API_URL}") resp = requests.get(TI_API_URL) resp.raise_for_status() data = resp.json() print(f"API-Antwort erhalten. Anzahl Meldungen: {len(data.get('meldungen', []))}") messages = [] for meldung in data.get("meldungen", []): zeit = meldung.get("zeitpunkt", "") titel = meldung.get("titel", "") beschreibung = meldung.get("beschreibung", "") link = meldung.get("link", "") msg = f"{zeit}\n- {titel}: {beschreibung}\n{link}".strip() messages.append(msg) print(f"Verarbeite Meldung: {titel[:50]}...") print(f"Insgesamt {len(messages)} Meldungen verarbeitet") return messages def load_state(): try: with open(STATE_FILE, "r", encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {"messages": []} def save_state(state): with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2) def markdownify_message(message): # Datumsangaben fett hervorheben (z.B. 2025-06-23 oder 23.06.2025) message = re.sub(r"(\d{4}-\d{2}-\d{2})", r"**\1**", message) message = re.sub(r"(\d{2}\.\d{2}\.\d{4})", r"**\1**", message) # URLs als Links darstellen message = re.sub(r"(https?://\S+)", r"[\1](\1)", message) # Listenpunkte erkennen (z.B. mit - oder *) lines = message.splitlines() md_lines = [] for line in lines: line = line.strip() if line.startswith("-") or line.startswith("*"): md_lines.append(f"- {line[1:].strip()}") else: md_lines.append(line) # Absätze durch doppelte Zeilenumbrüche trennen md_message = "\n\n".join([l for l in md_lines if l]) return md_message def send_notification(message): md_message = markdownify_message(message) # Erstelle Apprise Objekt apobj = apprise.Apprise() # Füge die Apprise URL hinzu apobj.add(APPRISE_URL) # Erstelle die Nachricht title = "Neue TI-Status-Meldung" body = f"{md_message}\n\n[Zur Statusseite](https://fachportal.gematik.de/ti-status)\n_Gemeldet am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_" # Sende die Nachricht apobj.notify( title=title, body=body, body_format=apprise.NotifyFormat.MARKDOWN ) def main(): state = load_state() known_messages = set(state.get("messages", [])) print("Prüfe TI-Status-API auf neue Meldungen...") try: messages = fetch_status_messages() new_messages = [m for m in messages if m not in known_messages] for msg in new_messages: print(f"Neue Meldung gefunden: {msg}") send_notification(msg) known_messages.add(msg) if new_messages: save_state({"messages": list(known_messages)}) else: print(f"Keine neuen Meldungen ({datetime.now().strftime('%H:%M:%S')})") except Exception as e: print(f"Fehler: {e}") if __name__ == "__main__": main()