Umgebaut auf REST API

This commit is contained in:
2025-06-27 11:18:35 +02:00
parent eab0a374ca
commit 5e2e03098c
2 changed files with 14 additions and 101 deletions

View File

@ -1,31 +1,24 @@
import requests import requests
import time
import json import json
from bs4 import BeautifulSoup
from datetime import datetime from datetime import datetime
import re import re
TI_STATUS_URL = "https://fachportal.gematik.de/ti-status" TI_API_URL = "https://ti-lage.prod.ccs.gematik.solutions/lageapi/v2/tilage"
WEBHOOK_URL = "https://mattermost.medisoftware.org/hooks/i67zgcgajifxxxtfwjxcxace7a" WEBHOOK_URL = "https://mattermost.medisoftware.org/hooks/i67zgcgajifxxxtfwjxcxace7a"
STATE_FILE = "ti_status_state.json" STATE_FILE = "ti_status_state.json"
def fetch_status_messages(): def fetch_status_messages():
resp = requests.get(TI_STATUS_URL) resp = requests.get(TI_API_URL)
resp.raise_for_status() resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser") data = resp.json()
# Annahme: Meldungen stehen in bestimmten HTML-Elementen (z.B. <div class="c-status-list__item">)
# Dies muss ggf. angepasst werden, wenn die Struktur anders ist!
messages = [] messages = []
for item in soup.find_all("div", class_="c-status-list__item"): for meldung in data.get("meldungen", []):
title = item.get_text(strip=True) zeit = meldung.get("zeitpunkt", "")
if title: titel = meldung.get("titel", "")
messages.append(title) beschreibung = meldung.get("beschreibung", "")
# Fallback: Wenn keine spezielle Klasse, alle sichtbaren Meldungen im Hauptbereich link = meldung.get("link", "")
if not messages: msg = f"{zeit}\n- {titel}: {beschreibung}\n{link}".strip()
for p in soup.find_all("p"): messages.append(msg)
text = p.get_text(strip=True)
if text and len(text) > 30:
messages.append(text)
return messages return messages
def load_state(): def load_state():
@ -40,7 +33,8 @@ def save_state(state):
json.dump(state, f, ensure_ascii=False, indent=2) json.dump(state, f, ensure_ascii=False, indent=2)
def markdownify_message(message): def markdownify_message(message):
# Datumsangaben fett hervorheben (z.B. 23.06.2025) # Datumsangaben fett hervorheben (z.B. 2025-06-23 oder 23.06.2025)
message = re.sub(r"(\d{4}-\d{2}-\d{2})", r"**\1**", message)
message = re.sub(r"(\d{2}\.\d{2}\.\d{4})", r"**\1**", message) message = re.sub(r"(\d{2}\.\d{2}\.\d{4})", r"**\1**", message)
# URLs als Links darstellen # URLs als Links darstellen
message = re.sub(r"(https?://\S+)", r"[\1](\1)", message) message = re.sub(r"(https?://\S+)", r"[\1](\1)", message)
@ -60,7 +54,7 @@ def markdownify_message(message):
def send_to_mattermost(message): def send_to_mattermost(message):
md_message = markdownify_message(message) md_message = markdownify_message(message)
payload = { payload = {
"text": f"#### Neue TI-Status-Meldung\n\n{md_message}\n\n[Zur Statusseite]({TI_STATUS_URL})\n_Gemeldet am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_" "text": f"#### Neue TI-Status-Meldung\n\n{md_message}\n\n[Zur Statusseite](https://fachportal.gematik.de/ti-status)\n_Gemeldet am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_"
} }
resp = requests.post(WEBHOOK_URL, json=payload) resp = requests.post(WEBHOOK_URL, json=payload)
resp.raise_for_status() resp.raise_for_status()
@ -68,7 +62,7 @@ def send_to_mattermost(message):
def main(): def main():
state = load_state() state = load_state()
known_messages = set(state.get("messages", [])) known_messages = set(state.get("messages", []))
print("Prüfe TI-Status-Seite auf neue Meldungen...") print("Prüfe TI-Status-API auf neue Meldungen...")
try: try:
messages = fetch_status_messages() messages = fetch_status_messages()
new_messages = [m for m in messages if m not in known_messages] new_messages = [m for m in messages if m not in known_messages]

View File

@ -1,81 +0,0 @@
import requests
import json
from datetime import datetime
import re
TI_API_URL = "https://ti-lage.prod.ccs.gematik.solutions/lageapi/v2/tilage"
WEBHOOK_URL = "https://mattermost.medisoftware.org/hooks/i67zgcgajifxxxtfwjxcxace7a"
STATE_FILE = "ti_status_state.json"
def fetch_status_messages():
resp = requests.get(TI_API_URL)
resp.raise_for_status()
data = resp.json()
messages = []
for meldung in data.get("meldungen", []):
zeit = meldung.get("zeitpunkt", "")
titel = meldung.get("titel", "")
beschreibung = meldung.get("beschreibung", "")
link = meldung.get("link", "")
msg = f"{zeit}\n- {titel}: {beschreibung}\n{link}".strip()
messages.append(msg)
return messages
def load_state():
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"messages": []}
def save_state(state):
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def markdownify_message(message):
# Datumsangaben fett hervorheben (z.B. 2025-06-23 oder 23.06.2025)
message = re.sub(r"(\d{4}-\d{2}-\d{2})", r"**\1**", message)
message = re.sub(r"(\d{2}\.\d{2}\.\d{4})", r"**\1**", message)
# URLs als Links darstellen
message = re.sub(r"(https?://\S+)", r"[\1](\1)", message)
# Listenpunkte erkennen (z.B. mit - oder *)
lines = message.splitlines()
md_lines = []
for line in lines:
line = line.strip()
if line.startswith("-") or line.startswith("*"):
md_lines.append(f"- {line[1:].strip()}")
else:
md_lines.append(line)
# Absätze durch doppelte Zeilenumbrüche trennen
md_message = "\n\n".join([l for l in md_lines if l])
return md_message
def send_to_mattermost(message):
md_message = markdownify_message(message)
payload = {
"text": f"#### Neue TI-Status-Meldung\n\n{md_message}\n\n[Zur Statusseite](https://fachportal.gematik.de/ti-status)\n_Gemeldet am {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}_"
}
resp = requests.post(WEBHOOK_URL, json=payload)
resp.raise_for_status()
def main():
state = load_state()
known_messages = set(state.get("messages", []))
print("Prüfe TI-Status-API auf neue Meldungen...")
try:
messages = fetch_status_messages()
new_messages = [m for m in messages if m not in known_messages]
for msg in new_messages:
print(f"Neue Meldung gefunden: {msg}")
send_to_mattermost(msg)
known_messages.add(msg)
if new_messages:
save_state({"messages": list(known_messages)})
else:
print(f"Keine neuen Meldungen ({datetime.now().strftime('%H:%M:%S')})")
except Exception as e:
print(f"Fehler: {e}")
if __name__ == "__main__":
main()