forked from rucio/rucio
/
srmdumps.py
282 lines (227 loc) · 8.41 KB
/
srmdumps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser as ConfigParser
import datetime
import glob
import hashlib
import html.parser as HTMLParser
import logging
import operator
import os
import re
import gfal2
import requests
from rucio.common.config import get_config_dirs
from rucio.common.dumper import DUMPS_CACHE_DIR, ddmendpoint_url, gfal_download_to_file, http_download_to_file, temp_file
CHUNK_SIZE = 10485760
__DUMPERCONFIGDIRS = (os.path.join(confdir, 'auditor') for confdir in get_config_dirs())
__DUMPERCONFIGDIRS = list(filter(os.path.exists, __DUMPERCONFIGDIRS))
class Parser(ConfigParser.RawConfigParser):
'''
RawConfigParser subclass that doesn't modify the the name of the options
and removes any quotes arround the string values.
'''
remove_quotes_re = re.compile(r"^'(.+)'$")
remove_double_quotes_re = re.compile(r'^"(.+)"$')
def optionxform(self, optionstr):
return optionstr
def get(self, section, option):
value = super(Parser, self).get(section, option)
if isinstance(value, str):
value = self.remove_quotes_re.sub(r'\1', value)
value = self.remove_double_quotes_re.sub(r'\1', value)
return value
def items(self, section):
return [(name, self.get(section, name)) for name in self.options(section)]
def mkdir(dir_):
'''
This functions creates the `dir` directory if it doesn't exist. If `dir`
already exists this function does nothing.
'''
try:
os.mkdir(dir_)
except OSError as e:
assert e.errno == 17
def get_newest(base_url, url_pattern, links):
'''
Returns a tuple with the newest url in the `links` list matching the
pattern `url_pattern` and a datetime object representing the creation
date of the url.
The creation date is extracted from the url using datetime.strptime().
'''
logger = logging.getLogger('auditor.srmdumps')
times = []
pattern_components = url_pattern.split('/')
date_pattern = '{0}/{1}'.format(base_url, pattern_components[0])
if len(pattern_components) > 1:
postfix = '/' + '/'.join(pattern_components[1:])
else:
postfix = ''
for link in links:
try:
time = datetime.datetime.strptime(link, date_pattern)
except ValueError:
pass
else:
times.append((str(link) + postfix, time))
if not times:
msg = 'No links found matching the pattern {0} in {1}'.format(date_pattern, links)
logger.error(msg)
raise RuntimeError(msg)
return max(times, key=operator.itemgetter(1))
def gfal_links(base_url):
'''
Returns a list of the urls contained in `base_url`.
'''
ctxt = gfal2.creat_context() # pylint: disable=no-member
return ['/'.join((base_url, f)) for f in ctxt.listdir(str(base_url))]
class _LinkCollector(HTMLParser.HTMLParser):
def __init__(self):
super(_LinkCollector, self).__init__()
self.links = []
def handle_starttag(self, tag, attrs):
if tag == 'a':
self.links.append(
next(value for key, value in attrs if key == 'href')
)
def http_links(base_url):
'''
Returns a list of the urls contained in `base_url`.
'''
html = requests.get(base_url).text
link_collector = _LinkCollector()
link_collector.feed(html)
links = []
for link in link_collector.links:
if not link.startswith('http://') and not link.startswith('https://'):
links.append('{0}/{1}'.format(base_url, link))
else:
links.append(link)
return links
protocol_funcs = {
'davs': {
'links': gfal_links,
'download': gfal_download_to_file,
},
'gsiftp': {
'links': gfal_links,
'download': gfal_download_to_file,
},
'root': {
'links': gfal_links,
'download': gfal_download_to_file,
},
'srm': {
'links': gfal_links,
'download': gfal_download_to_file,
},
'http': {
'links': http_links,
'download': http_download_to_file,
},
'https': {
'links': http_links,
'download': http_download_to_file,
},
}
def protocol(url):
'''
Given the URL `url` returns a string with the protocol part.
'''
proto = url.split('://')[0]
if proto not in protocol_funcs:
raise RuntimeError('Protocol {0} not supported'.format(proto))
return proto
def get_links(base_url):
'''
Given the URL `base_url` returns the URLs linked or contained in it.
'''
return protocol_funcs[protocol(base_url)]['links'](base_url)
def download(url, filename):
'''
Given the URL `url` downloads its contents on `filename`.
'''
return protocol_funcs[protocol(url)]['download'](url, filename)
def parse_configuration(conf_dirs=__DUMPERCONFIGDIRS):
'''
Parses the configuration for the endpoints contained in `conf_dir`.
Returns a ConfParser.RawConfParser subclass instance.
'''
logger = logging.getLogger('auditor.srmdumps')
if len(conf_dirs) == 0:
logger.error('No configuration directory given to load SRM dumps paths')
raise Exception('No configuration directory given to load SRM dumps paths')
configuration = Parser({
'disabled': False,
})
for conf_dir in conf_dirs:
configuration.read(glob.glob(conf_dir + '/*.cfg'))
return configuration
def download_rse_dump(rse, configuration, date='latest', destdir=DUMPS_CACHE_DIR):
'''
Downloads the dump for the given ddmendpoint. If this endpoint does not
follow the standarized method to publish the dumps it should have an
entry in the `configuration` object describing how to download the dump.
`rse` is the DDMEndpoint name.
`configuration` is a RawConfigParser subclass.
`date` is a datetime instance with the date of the desired dump or 'latest'
to download the lastest available dump.
`destdir` is the directory where the dump will be saved (the final component
in the path is created if it doesn't exist).
Return value: a tuple with the filename and a datetime instance with
the date of the dump.
'''
logger = logging.getLogger('auditor.srmdumps')
base_url, url_pattern = generate_url(rse, configuration)
if date == 'latest':
logger.debug('Looking for site dumps in: "%s"', base_url)
links = get_links(base_url)
url, date = get_newest(base_url, url_pattern, links)
else:
url = '{0}/{1}'.format(base_url, date.strftime(url_pattern))
if not os.path.isdir(destdir):
os.mkdir(destdir)
filename = '{0}_{1}_{2}_{3}'.format(
'ddmendpoint',
rse,
date.strftime('%d-%m-%Y'),
hashlib.sha1(url.encode()).hexdigest()
)
filename = re.sub(r'\W', '-', filename)
path = os.path.join(destdir, filename)
if not os.path.exists(path):
logger.debug('Trying to download: "%s"', url)
with temp_file(destdir, final_name=filename) as (f, _):
download(url, f)
return (path, date)
def generate_url(rse, config):
'''
:param rse: Name of the endpoint.
:param config: RawConfigParser instance which may have configuration
related to the endpoint.
:returns: Tuple with the URL where the links can be queried to find new
dumps and the pattern used to parse the date of the dump of the files/directories
listed..
'''
site = rse.split('_')[0]
if site not in config.sections():
base_url = ddmendpoint_url(rse) + 'dumps'
url_pattern = 'dump_%Y%m%d'
else:
url_components = config.get(site, rse).split('/')
# The pattern may not be the last component
pattern_index = next(idx for idx, comp in enumerate(url_components) if '%m' in comp)
base_url = '/'.join(url_components[:pattern_index])
url_pattern = '/'.join(url_components[pattern_index:])
return base_url, url_pattern