forked from rucio/rucio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rucio-conveyor-submitter
executable file
·139 lines (115 loc) · 6.59 KB
/
rucio-conveyor-submitter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Conveyor is a daemon to manage file transfers.
"""
import argparse
import signal
from rucio.daemons.conveyor.submitter import run, stop
def get_parser():
"""
Returns the argparse parser.
"""
parser = argparse.ArgumentParser(description="The Conveyor-Submitter daemon is responsible for managing non-tape file transfers. It prepares transfer jobs and submits them to the transfertool.", epilog='''
Upload a file and create a replication rule::
$ rucio upload --scope mock --rse MOCK --name file filename.txt
$ rucio add-rule mock:file 1 MOCK2
$ rucio-admin rse add-distance MOCK2 MOCK --distance 1
The rule should replicate the file from RSE MOCK to RSE MOCK2. Therefor a distance between these RSEs is needed.
Check transfer requests for the DID::
$ python
from rucio.db.sqla import session,models
session.get_session().query(models.Request).filter_by(scope='mock', name='file').first()
# {'request_type': TRANSFER, 'state': QUEUED', ...}
A queued request was created which can be picked uped by the Conveyor-Submiter daemon.
Run the daemon::
$ rucio-conveyor-submitter --run-once
Check again the transfer requests for the DID::
$ python
from rucio.db.sqla import session,models
session.get_session().query(models.Request).filter_by(scope='mock', name='file').first()
# {'request_type': TRANSFER, 'state': SUBMITTED', ...}
A tranfer request got created by executing the transfer. Depending on the transfer submission, the request state can be different. In this example the transfer got submitted successfully.
When run in multi-VO mode, by default the daemon will run on RSEs from all VOs::
$ rucio-conveyor-submitter --run-once
2020-07-29 13:51:09,436 5784 INFO This instance will work on VOs: def, abc, xyz, 123
2020-07-29 13:51:13,315 5784 INFO RSE selection: automatic for relevant VOs
2020-07-29 13:51:13,316 5784 INFO starting submitter threads
By using the ``--vos`` argument only the VO or VOs specified will be affected::
$ rucio-conveyor-submitter --run-once --vos abc xyz
2020-07-29 13:51:09,436 5784 INFO This instance will work on VOs: abc, xyz
2020-07-29 13:51:13,315 5784 INFO RSE selection: automatic for relevant VOs
2020-07-29 13:51:13,316 5784 INFO starting submitter threads
Note that attempting the use the ``--vos`` argument when in single-VO mode will have no affect::
$ rucio-conveyor-submitter --run-once --vos abc xyz
2020-07-29 13:39:37,263 5752 INFO RSE selection: automatic
2020-07-29 13:39:37,264 5752 INFO starting submitter threads
''', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--run-once", action="store_true", default=False,
help='One iteration only')
parser.add_argument("--total-threads", action="store", default=1, type=int,
help='Concurrency control: total number of threads per process')
parser.add_argument("--bulk", action="store", default=100, type=int,
help='Bulk control: number of requests')
parser.add_argument("--group-bulk", action="store", default=1, type=int,
help='Group control: number of requests per group')
parser.add_argument("--group-policy", action="store", default='rule', type=str,
help='Group control: policy used to group. enum{rule, dest, src_dest, rule_src_dest, activity_dest, activity_src_dest}')
parser.add_argument('--source-strategy', action="store", default=None, type=str,
help='Source strategy. Overload the strategy defined in config DB.')
parser.add_argument('--exclude-rses', action="store", default=None, type=str,
help='RSE expression to exclude')
parser.add_argument('--include-rses', action="store", default=None, type=str,
help='RSE expression to include')
parser.add_argument('--rses', nargs='+', type=str,
help='Explicit list of RSEs to include')
parser.add_argument('--vos', nargs='+', type=str,
help='Optional list of VOs to consider. Only used in multi-VO mode.')
parser.add_argument('--activities', nargs='+', type=str,
help='Explicit list of activities to include')
parser.add_argument('--exclude-activities', nargs='+', type=str,
help='Explicit list of activities to exclude')
parser.add_argument('--ignore-availability', action="store_true", default=False,
help='If True, will also try to submit transfers having blocklisted RSEs as sources')
parser.add_argument('--sleep-time', action="store", default=600, type=int,
help='Seconds to sleep if few requests')
parser.add_argument('--max-sources', action="store", default=5, type=int,
help='Maximum source replicas per multi-source FTS job')
parser.add_argument('--archive-timeout-override', action="store", default=None, type=int, metavar="INTEGER_SECONDS",
help='Override the archive_timeout parameter for any transfers with it set (0 to unset)')
return parser
if __name__ == "__main__":
signal.signal(signal.SIGTERM, stop)
parser = get_parser()
args = parser.parse_args()
try:
run(once=args.run_once,
bulk=args.bulk,
group_bulk=args.group_bulk,
group_policy=args.group_policy,
include_rses=args.include_rses,
exclude_rses=args.exclude_rses,
rses=args.rses,
vos=args.vos,
source_strategy=args.source_strategy,
activities=args.activities,
exclude_activities=args.exclude_activities,
ignore_availability=args.ignore_availability,
sleep_time=args.sleep_time,
max_sources=args.max_sources,
archive_timeout_override=args.archive_timeout_override,
total_threads=args.total_threads)
except KeyboardInterrupt:
stop()