forked from rucio/rucio
/
test_hermes.py
277 lines (253 loc) · 9.87 KB
/
test_hermes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hermes Test
"""
import time
from datetime import datetime
from json import loads
import pytest
import requests
import stomp
from rucio.common.config import config_get, config_get_int
from rucio.core.message import add_message, retrieve_messages, truncate_messages
from rucio.daemons.hermes import hermes
from rucio.tests.common import rse_name_generator, skip_missing_elasticsearch_influxdb_in_env
class MyListener:
def __init__(self, conn):
self.conn = conn
self.count = 0
self.messages = []
def reset(self):
self.count = 0
self.messages = []
def on_error(self, headers, message):
print("received an error %s" % message)
def on_message(self, frame):
print("received message %s" % frame)
message = frame.body
self.count += 1
self.messages.append(loads(message))
@pytest.mark.noparallel(reason="fails when run in parallel")
@skip_missing_elasticsearch_influxdb_in_env
@pytest.mark.parametrize(
"core_config_mock",
[
{
"table_content": [
("hermes", "services_list", "influx,activemq,elastic,email"),
(
"hermes",
"elastic_endpoint",
"http://localhost:9200/ddm_events/doc/_bulk",
),
(
"hermes",
"influxdb_endpoint",
"http://localhost:8086/api/v2/write?org=rucio&bucket=rucio",
),
("hermes", "influxdb_token", "mytoken"),
("messaging-hermes", "destination", "/queue/events"),
("messaging-hermes", "brokers", "localhost"),
("messaging-hermes", "use_ssl", False),
("messaging-hermes", "username", "hermes"),
("messaging-hermes", "password", "supersecret"),
("messaging-hermes", "nonssl_port", 61613),
("messaging-hermes", "send_email", False),
]
}
],
indirect=True,
)
@pytest.mark.parametrize(
"caches_mock",
[
{
"caches_to_mock": [
"rucio.core.config.REGION",
]
}
],
indirect=True,
)
def test_hermes(core_config_mock, caches_mock):
"""HERMES (DAEMON): Test the messaging daemon."""
truncate_messages()
mock_rse = rse_name_generator()
file_size = 2
nb_messages = 3
list_messages = []
event_types = ["blahblah", "deletion-done"]
# Start consumer
host = config_get("messaging-hermes", "brokers")
port = config_get_int("messaging-hermes", "port")
user = config_get("messaging-hermes", "username")
password = config_get("messaging-hermes", "password")
destination = config_get("messaging-hermes", "destination")
conn = stomp.Connection(host_and_ports=[(host, port)])
listener = MyListener(conn)
conn.set_listener("", listener)
conn.connect(login=user, passcode=password)
conn.subscribe(
destination=destination,
id=1,
ack="auto",
headers={
"subscription-type": "MULTICAST",
"durable-subscription-name": "someValue",
},
)
for _ in range(10):
if conn.is_connected():
break
time.sleep(2)
listener.reset()
print("Waiting for messages...")
# Create 3 messages of type blahblah registered to services influx, activemq and elastic
# Create 3 messages of type email registered to service email
for i in range(1, 4):
event_type = event_types[0]
message = {
"bytes": 2,
"rse": mock_rse,
"created_at": datetime.utcnow().replace(microsecond=0),
}
add_message(event_type, message)
add_message(
"email",
{
"to": config_get("messaging-hermes", "email_test").split(","),
"subject": "Half-Life %i" % i,
"body": """
Good morning, and welcome to the Black Mesa Transit System.
This automated train is provided for the security and convenience of
the Black Mesa Research Facility personnel. The time is eight-forty
seven A.M... Current outside temperature is ninety three degrees with
an estimated high of one hundred and five. Before exiting the train,
be sure to check your area for personal belongings.
Thank you, and have a very safe, and productive day.""",
},
)
message["event_type"] = event_type
list_messages.append(message)
messages = retrieve_messages(50, old_mode=False)
service_dict = {"influx": 0, "elastic": 0, "email": 0, "activemq": 0}
for message in messages:
service_dict[message["services"]] += 1
assert service_dict["influx"] == 3
assert service_dict["elastic"] == 3
assert service_dict["activemq"] == 3
assert service_dict["email"] == 3
# Run Hermes
# The messages of event_type email should be submitted and removed from the list
# The messages of event-type blahblah should be removed from the list for service influx since this event-type is not supported by influx
# The messages of event-type blahblah should be submitted to elastic
# The messages of event-type blahblah should be submitted to ActiveMQ
hermes.hermes(once=True)
service_dict = {"influx": 0, "elastic": 0, "email": 0, "activemq": 0}
messages = retrieve_messages(50, old_mode=False)
for message in messages:
service_dict[message["services"]] += 1
assert service_dict["influx"] == 0
assert service_dict["elastic"] == 0
assert service_dict["activemq"] == 0
assert service_dict["email"] == 0
# Now add nb_messages more messages of event-type deletion-done associated to services influx, elastic and activemq
for _ in range(nb_messages):
event_type = event_types[1]
message = {
"bytes": file_size,
"rse": mock_rse,
"created_at": datetime.utcnow().replace(microsecond=0),
}
add_message(event_type, message)
message["event_type"] = event_type
list_messages.append(message)
messages = retrieve_messages(50, old_mode=False)
service_dict = {"influx": 0, "elastic": 0, "email": 0, "activemq": 0}
for message in messages:
service_dict[message["services"]] += 1
assert service_dict["influx"] == 3
assert service_dict["elastic"] == 3
assert service_dict["activemq"] == 3
assert service_dict["email"] == 0
# Run Hermes
hermes.hermes(once=True)
service_dict = {"influx": 0, "elastic": 0, "email": 0, "activemq": 0}
messages = retrieve_messages(50, old_mode=False)
for message in messages:
service_dict[message["services"]] += 1
time.sleep(20) # Waiting that all the messages are consumed to check ActiveMQ
# Checking influxDB
assert service_dict["influx"] == 0
res = requests.get(
"http://localhost:8086/query?db=rucio",
headers={"Authorization": "Token mytoken"},
params={"q": "SELECT * FROM deletion"},
)
assert res.status_code == 200
assert "results" in res.json()
influx_res = res.json()["results"]
assert "series" in influx_res[0]
columns = influx_res[0]["series"][0]["columns"]
rse_index = columns.index("rse")
rse_included = False
for res in influx_res[0]["series"][0]["values"]:
if res[rse_index] == mock_rse:
rse_included = True
nb_deletion_done = columns.index("nb_deletion_done")
bytes_deletion_done = columns.index("bytes_deletion_done")
assert res[nb_deletion_done] == nb_messages
assert res[bytes_deletion_done] == nb_messages * file_size
assert rse_included
# Checking ElasticSearch
pattern = "%a, %d %b %Y %H:%M:%S %Z"
assert service_dict["elastic"] == 0
data = ' { "query": { "match_all": {} } }'
headers = {"Content-Type": "application/json"}
response = requests.post(
"http://localhost:9200/_search?size=1000", data=data, headers=headers
)
assert response.status_code == 200
res = response.json()
print(res)
elastic_messages = []
for entry in res["hits"]["hits"]:
message = entry["_source"]
elastic_messages.append(
{
"created_at": datetime.strptime(
message["payload"]["created_at"], pattern
),
"event_type": message["event_type"],
"rse": message["payload"]["rse"],
"bytes": message["payload"]["bytes"],
}
)
for message in list_messages:
assert message in elastic_messages
# Checking ActiveMQ
assert service_dict["activemq"] == 0
assert len(listener.messages) == len(list_messages)
activemq_messages = []
for message in listener.messages:
message["payload"]["created_at"] = datetime.strptime(
message["payload"]["created_at"], pattern
)
message["payload"]["event_type"] = message["event_type"]
activemq_messages.append(message["payload"])
for message in list_messages:
assert message in activemq_messages
# Checking email
assert service_dict["email"] == 0