forked from rucio/rucio
/
extract.py
117 lines (98 loc) · 3.48 KB
/
extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ssl_key_file = ''
ssl_cert_file = ''
queue = '/topic/rucio.events'
chunksize = 1
subscription_id = 1
consumer = ''
consumer_port = 9200
es_username = ""
es_password = ""
from json import loads as jloads
from time import sleep
import elasticsearch as es
import stomp
class ElasticConn:
def __init__(self, host_port, auth):
self.__es = es.Elasticsearch([host_port[0]],http_auth=auth,consumer_port=host_port[1])
def index_data(self, indexName, body):
res = self.__es.index(index=indexName, body=body)
print(res)
return res['result'] == 'created'
class AMQConsumer(stomp.ConnectionListener):
def __init__(self, conn, chunksize, subscription_id):
self.__conn = conn
self.__chunksize = chunksize
self.__subscription_id = subscription_id
self.__ids = []
self.__reports = []
self.__esConn = ElasticConn(host_port = (consumer, consumer_port), auth = (es_username, es_password))
def on_error(self, frame):
pass
# Send message to StatsD
def on_message(self, frame):
# Send message to StatsD
# Sanity check
print(frame)
msg_id = frame.headers['message-id']
if 'resubmitted' in frame.headers:
# Send message to StatsD
# Ignore resubmitted messages
return
try:
report = jloads(frame.body)
except Exception:
# Corrupt message, ignore
# Send message to StatsD
self.__conn.ack(msg_id, self.__subscription_id)
return
try:
report['payload']['created_at'] = report['created_at']
report['payload']['event_type'] = report['event_type']
for k,v in report['payload'].items():
if k.endswith("_at"):
if v:
report['payload'][k] = v.split('.')[0]
except:
pass
self.__ids.append(msg_id)
self.__reports.append({'id': msg_id, 'body': report})
if len(self.__reports) >= self.__chunksize:
self.__send_to_es()
def __send_to_es(self):
for msg in self.__reports:
event_type = str(msg['body']['event_type']).lower()
res = False
if event_type.startswith('transfer'):
res = self.__esConn.index_data('rucio_transfer', msg['body']['payload'])
elif event_type.startswith('deletion'):
res = self.__esConn.index_data('rucio_deletion', msg['body']['payload'])
else:
self.__conn.ack(msg['id'],self.__subscription_id)
if res:
self.__conn.ack(msg['id'],self.__subscription_id)
self.__reports = []
self.__ids = []
if __name__ == "__main__":
logging.basicConfig(level=0)
conn = stomp.Connection(host_and_ports=[(broker,broker_port)],reconnect_attempts_max=5)
if borker_use_ssl:
conn.set_ssl(key_file=ssl_key_file, cert_file=ssl_cert_file)
conn.set_listener('', AMQConsumer(conn, chunksize, subscription_id))
conn.connect(wait=True)
conn.subscribe(destination=queue, ack='client-individual', id=subscription_id)
while True:
sleep(3600)
conn.disconnect()