This commit is contained in:
root
2023-07-11 15:41:57 +02:00
parent 7fb280ead8
commit c957d105e8
5 changed files with 175 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
# Rocket.Chat Notification Method
This method can be used to send notification via Rocket.Chat.
Configuration in WATO. You need to enter the URL including the token which is provided
by Rocket.Chat upon creating the respective webhook.

Binary file not shown.

11
src/info Normal file
View File

@@ -0,0 +1,11 @@
{'author': 'Mark-Tim Junghanns',
'description': 'Send notifications via Rocket.Chat',
'download_url': 'https://git.php-xpert.de/mtj/Checkmk-rocketchat-notification/',
'files': {'notifications': ['rocketchat'],
'web': ['plugins/wato/rocketchat.py']},
'name': 'rocketchat-notification',
'title': 'Rocket.Chat Notification Method',
'version': '1.0.0',
'version.min_required': '2.2.0p3',
'version.packaged': '2.2.0p3',
'version.usable_until': None}

124
src/notifications/rocketchat Executable file
View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Rocketchat
# SPDX-FileCopyrightText: 2013 Mathias Kettner <mk@mathias-kettner.de>
# SPDX-FileCopyrightText: 2021 Stefan Gehn <stefan+cmk@srcbox.net>
# SPDX-FileCopyrightText: 2023 Mark Junghanns <mark@markjunghanns.de>
#
# SPDX-License-Identifier: GPL-2.0-only
# Rocket.Chat notification based on asciimail notification from check_mk 1.2.6p16.
from __future__ import unicode_literals
import sys
import requests
from cmk.notification_plugins import utils
tmpl_host_text = """$HOSTSTATE_EMOJI$ `$HOSTNAME$`
```
Host: $HOSTNAME$
Event: $EVENT_TXT$
Output: $HOSTOUTPUT$
```"""
tmpl_service_text = """$SERVICESTATE_EMOJI$ `$HOSTNAME$/$SERVICEDESC$`
```
Host: $HOSTNAME$
Service: $SERVICEDESC$
Event: $EVENT_TXT$
Output: $SERVICEOUTPUT$
```"""
def hoststate_as_emoji(hoststate):
if hoststate == "UP":
return "\ud83d\udd35" # large blue circle
elif hoststate == "DOWN":
return "\ud83d\udd34" # large red circle
elif hoststate == "UNREACH":
return "\u26aa\ufe0f" # medium white circle
return hoststate
def servicestate_as_emoji(servicestate):
if servicestate == "OK":
return "\ud83c\udd97" # squared ok
elif servicestate == "WARN":
return "\u26a0\ufe0f" # warning sign
elif servicestate == "CRIT":
return "\u2757\ufe0f" # heavy exclamation mark symbol
elif servicestate == "UNKN":
return "\u2754" # white question mark ornament
return servicestate
def construct_message_text(context):
context["HOSTSTATE_EMOJI"] = hoststate_as_emoji(context.get("HOSTSHORTSTATE", ""))
context["SERVICESTATE_EMOJI"] = servicestate_as_emoji(
context.get("SERVICESHORTSTATE", "")
)
notification_type = context["NOTIFICATIONTYPE"]
if notification_type in ["PROBLEM", "RECOVERY"]:
txt_info = "$PREVIOUS@HARDSHORTSTATE$ -> $@SHORTSTATE$"
elif notification_type.startswith("FLAP"):
if "START" in notification_type:
txt_info = "Started Flapping"
else:
txt_info = "Stopped Flapping ($@SHORTSTATE$)"
elif notification_type.startswith("DOWNTIME"):
what = notification_type[8:].title()
txt_info = "Downtime " + what + " ($@SHORTSTATE$)"
elif notification_type == "ACKNOWLEDGEMENT":
txt_info = "Acknowledged ($@SHORTSTATE$)"
elif notification_type == "CUSTOM":
txt_info = "Custom Notification ($@SHORTSTATE$)"
else:
txt_info = notification_type # Should never happen
context["EVENT_TXT"] = utils.substitute_context(
txt_info.replace("@", context["WHAT"]), context
)
if context["WHAT"] == "HOST":
tmpl_text = tmpl_host_text
else:
tmpl_text = tmpl_service_text
return utils.substitute_context(tmpl_text, context)
def send_rocketchat_message(webhook_url, text):
url = format(webhook_url)
json = {
"text": text,
}
r = requests.post(url=url, json=json)
if r.status_code != 200:
sys.stderr.write(
"Failed to send Rocket.Chat message. Status: {}, Response: {}\n".format(
r.status_code, r.text
)
)
return 1 # Temporary error to make Checkmk retry
sys.stdout.write(
"Sent message to Rocket.Chat {}\n"
)
return 0
def main():
context = utils.collect_context()
webhook_url = context["PARAMETER_WEBHOOK_URL"]
text = construct_message_text(context)
return send_rocketchat_message(webhook_url, text)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2021 Stefan Gehn <stefan+cmk@srcbox.net>
# SPDX-FileCopyrightText: 2023 Mark-Tim Junghanns <mark@markjunghanns.de>
#
# SPDX-License-Identifier: GPL-2.0-only
from cmk.gui.valuespec import Dictionary, TextAscii
from cmk.gui.plugins.wato import notification_parameter_registry, NotificationParameter
@notification_parameter_registry.register
class NotificationParameterRocketchat(NotificationParameter):
@property
def ident(self):
return "rocketchat"
@property
def spec(self):
return Dictionary(
title=_("Create notification with the following parameters"),
required_keys=["webhook_url"],
elements=[
(
"webhook_url",
TextAscii(
title=_("Webhook URL"),
help=_("Webhook URL des Monitoring Users"),
size=146,
allow_empty=False,
),
),
],
)