forked from rucio/rucio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rucio-judge-cleaner
executable file
·89 lines (69 loc) · 4.54 KB
/
rucio-judge-cleaner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Judge-Cleaner is a daemon to clean expired replication rules.
"""
import argparse
import signal
from rucio.daemons.judge.cleaner import run, stop
def get_parser():
"""
Returns the argparse parser.
"""
parser = argparse.ArgumentParser(description="The Judge-Cleaner daemon is responsible for cleaning expired replication rules. It deletes rules by checking if the 'expired_at' date property is older than the current timestamp. If the rule is expired, it will first remove one lock for the replica and parent datasets if the DID belongs to any. Then it will set a tombstone to the replica to mark it as deletable if there are no rules protecting the replica. After these steps, the rule gets deleted.", epilog='''
Upload a file to your RSE::
$ rucio upload --rse MOCK --scope mock --name file filename.txt
Set a replication rule for the file with a lifetime of one second::
$ rucio add-rule mock:file 1 MOCK2 --lifetime 1
Check the replication rules and the replicas::
$ rucio list-rules mock:file
ID ACCOUNT SCOPE:NAME STATE[OK/REPL/STUCK] RSE_EXPRESSION COPIES EXPIRES (UTC) CREATED (UTC)
-------------------------------- --------- ------------ ---------------------- ---------------- -------- ------------------- -------------------
c273c7ed75724143ad21c667e659456b root mock:file REPLICATING[0/1/0] MOCK2 1 2018-12-03 09:53:09 2018-12-03 09:53:08
06f012771b0546dca0c908441c048964 root mock:file OK[1/0/0] MOCK 1 2018-12-03 09:52:19
$ python
from rucio.db.sqla import session, models
from rucio.core.rse import get_rse_id
rse_id = get_rse_id('MOCK2')
session.get_session().query(models.RSEFileAssociation).filter_by(name='file', scope='mock', rse_id=rse_id).first().tombstone // None
session.get_session().query(models.RSEFileAssociation).filter_by(name='file', scope='mock', rse_id=rse_id).first().lock_cnt // 1
The first rule was created with an expiration date of one second after the creation date.
Run the daemon::
$ rucio-judge-cleaner --run-once
Check the replication rules and the replicas::
$ rucio list-rules mock:file
ID ACCOUNT SCOPE:NAME STATE[OK/REPL/STUCK] RSE_EXPRESSION COPIES EXPIRES (UTC) CREATED (UTC)
-------------------------------- --------- ------------ ---------------------- ---------------- -------- --------------- -------------------
06f012771b0546dca0c908441c048964 root mock:file OK[1/0/0] MOCK 1 2018-12-03 09:52:19
$ python
from rucio.db.sqla import session, models
from rucio.core.rse import get_rse_id
rse_id = get_rse_id('MOCK2')
session.get_session().query(models.RSEFileAssociation).filter_by(name='file', scope='mock', rse_id=rse_id).first().tombstone // datetime.datetime(1970, 1, 1, 0, 0)
session.get_session().query(models.RSEFileAssociation).filter_by(name='file', scope='mock', rse_id=rse_id).first().lock_cnt // 0
The rule we created before was deleted and the replica of the file on RSE MOCK2 got a tombstone because there is no protecting rule anymore.
''', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--run-once", action="store_true", default=False, help='One iteration only')
parser.add_argument("--threads", action="store", default=1, type=int, help='Concurrency control: total number of threads on this process')
parser.add_argument('--sleep-time', action="store", default=60, type=int, help='Concurrency control: thread sleep time after each chunk of work')
return parser
if __name__ == "__main__":
signal.signal(signal.SIGTERM, stop)
parser = get_parser()
args = parser.parse_args()
try:
run(once=args.run_once, threads=args.threads, sleep_time=args.sleep_time)
except KeyboardInterrupt:
stop()