-
Notifications
You must be signed in to change notification settings - Fork 0
/
slack_action.py
67 lines (55 loc) · 2.73 KB
/
slack_action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import argparse
import logging
import pycountry
import requests
def main():
parser = argparse.ArgumentParser(description='Send fail2ban action messages to Slack.')
parser.add_argument('webhook_url', type=str, help='Your webhook URL generated via Slack.')
parser.add_argument('action_type', type=str, choices=['ban', 'unban', 'start', 'stop'],
help='The type of action from fail2ban.')
parser.add_argument('jail', type=str, help='The jail that the action belongs to.')
parser.add_argument('--ip', type=str, default='', help='The IP address that caused the action to occur.')
parser.add_argument('--num-failures', type=int, default=0, help='The number of failures by an IP address.')
args = parser.parse_args()
if args.action_type == 'ban':
msg = create_ban_msg(args)
elif args.action_type == 'unban':
msg = f'Removed {args.ip} from jail {args.jail}'
elif args.action_type == 'start':
msg = f'Jail \'{args.jail}\' has been started'
elif args.action_type == 'stop':
msg = f'Jail \'{args.jail}\' has been stopped'
else:
# Should not happen, args parser restricts choices so this case won't matter
msg = 'Unknown msg type'
try:
slack_response = requests.post(f'https://hooks.slack.com/services/{args.webhook_url}', json={
"channel": "#alerts",
"username": "fail2ban",
"text": msg
}, timeout=2)
if slack_response.status_code != 200:
slack_response.raise_for_status()
except requests.exceptions.RequestException as e:
logging.exception(e)
def create_ban_msg(args):
failure_txt = "failures" if args.num_failures != 1 else "failure"
# Fetch what country the IP hails from (for curiosity's sake)
try:
ipinfo_response = requests.get(f'https://ipinfo.io/{args.ip}/json', timeout=1)
if ipinfo_response.status_code == 200:
ip_json = ipinfo_response.json()
if 'country' in ip_json:
country_code = ip_json['country'].lower()
try:
country = pycountry.countries.lookup(country_code)
return f'Banned :flag-{country_code}: {args.ip} ({country.name}) in jail {args.jail} for ' \
f'{args.num_failures} {failure_txt}'
except LookupError:
return f'Banned :flag-{country_code}: {args.ip} (Unknown) in jail {args.jail} for ' \
f'{args.num_failures} {failure_txt}'
except requests.exceptions.RequestException as e:
logging.exception(e)
return f'Banned {args.ip} (Unknown) in jail {args.jail} for {args.num_failures} {failure_txt}'
if __name__ == "__main__":
main()