Copy # telegrambot.py 文件
import asyncio
import telegram
import json
import traceback
TOKEN = "xxxxx"
chat_id = "-xxxxx"
bot = telegram.Bot(token=TOKEN)
class FilterMsg(object):
def __init__(self, text):
self.event_value = json.loads(text['EventValue'])
self.data = dict()
self.data['kind'] = self.event_value['involvedObject']['kind']
self.data['namespace'] = self.event_value['involvedObject']['namespace']
self.data['reason'] = self.event_value['reason']
self.data['message'] = self.event_value['message']
self.data['first_timestamp'] = self.event_value['firstTimestamp']
self.data['last_timestamp'] = self.event_value['lastTimestamp']
self.data['count'] = self.event_value['count']
self.data['type'] = self.event_value['type']
self.data['event_time'] = self.event_value['eventTime']
self.data['pod_hostname'] = text['EventTags']['hostname']
self.data['pod_name'] = text['EventTags']['pod_name']
def convert(self):
msg_markdown = f"""
*K8S Cluster Event*
`Kind: {self.data['kind']}`
`Namescodeace: {self.data['namespace']}`
`Reason: {self.data['reason']}`
`Timestamp: {self.data['first_timestamp']} to {self.data['last_timestamp']}`
`Count: {self.data['count']}`
`EventType: {self.data['type']}`
`EventTime: {self.data['event_time']}`
`PodHostname: {self.data['pod_hostname']}`
`PodName: {self.data['pod_name']}`
`Message: {self.data['message']}`
"""
return msg_markdown
async def send_message(text):
try:
# Core: get message from Kafka,and filter message
convert_text = json.loads(text.decode('utf8').replace('\\n', ''))
msg_instance = FilterMsg(convert_text)
msg = msg_instance.convert()
send_result = bot.send_message(chat_id=chat_id, text=msg, parse_mode='MarkdownV2')
return send_result
except KeyError as e:
msg = "Unknow message.."
send_result = bot.send_message(chat_id=chat_id, text=msg)
return send_result
except Exception as e:
print(e.__str__())
#traceback.print_exc()
print('send message to telegram failed,please check.')
if __name__ == '__main__':
text = b''
text = json.loads(text.decode('utf8').replace('\\n', ''))
send_result = asyncio.run(send_message(text))
print(send_result)
# get_events.py 文件
from kafka import KafkaConsumer, TopicPartition
from telegrambot import send_message
import asyncio
class KConsumer(object):
"""kafka consumer instance"""
def __init__(self, topic, group_id, bootstrap_servers, auto_offset_reset, enable_auto_commit=False):
"""
:param topic:
:param group_id:
:param bootstrap_servers:
"""
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
consumer_timeout_ms=10000
)
self.tp = TopicPartition(topic, 0)
def start_consumer(self):
while True:
try:
# 手动拉取消息,间隔时间30s,然后手动 commit 提交当前 offset
msg_list_dict = self.consumer.poll(timeout_ms=30000)
for tp, msg_list in msg_list_dict.items():
for msg in msg_list:
### core operate,send message to telegram
send_result = asyncio.run(send_message(msg.value))
print(send_result)
#print(f"current offset is {self.consumer.position(tp)}")
self.consumer.commit()
except Exception as e:
print('ERROR: get cluster events failed,please check.')
def close_consumer(self):
try:
self.consumer.unsubscribe()
self.consumer.close()
except:
print("consumer stop failed,please check.")
if __name__ == '__main__':
# env,中间件配置信息
topic = 'test_topic'
bootstrap_servers = 'kafka-headless:9092'
group_id = 'test.group'
auto_offset_reset = 'earliest'
enable_auto_commit = False
# start
consumer = KConsumer(topic, group_id=group_id, bootstrap_servers=bootstrap_servers, auto_offset_reset=auto_offset_reset, enable_auto_commit=enable_auto_commit)
consumer.start_consumer()
# stop
#consumer.close_consumer()