-
-
Notifications
You must be signed in to change notification settings - Fork 392
/
spf.py
147 lines (109 loc) · 3.92 KB
/
spf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#
#
#
from logging import getLogger
from typing import List, Optional
import dns.resolver
from dns.resolver import Answer
from octodns.record.base import Record
from ..deprecation import deprecated
from .base import BaseProcessor, ProcessorException
class SpfValueException(ProcessorException):
pass
class SpfDnsLookupException(ProcessorException):
pass
class SpfDnsLookupProcessor(BaseProcessor):
'''
Validate that SPF values in TXT records are valid.
Example usage:
processors:
spf:
class: octodns.processor.spf.SpfDnsLookupProcessor
zones:
example.com.:
sources:
- config
processors:
- spf
targets:
- route53
The validation can be skipped for specific records by setting the lenient
flag, e.g.
_spf:
octodns:
lenient: true
ttl: 86400
type: TXT
value: v=spf1 ptr ~all
'''
log = getLogger('SpfDnsLookupProcessor')
def __init__(self, name):
self.log.debug(f"SpfDnsLookupProcessor: {name}")
deprecated(
'SpfDnsLookupProcessor is DEPRECATED in favor of the version relocated into octodns-spf and will be removed in 2.0',
stacklevel=99,
)
super().__init__(name)
def _get_spf_from_txt_values(
self, record: Record, values: List[str]
) -> Optional[str]:
self.log.debug(
f"_get_spf_from_txt_values: record={record.fqdn} values={values}"
)
# SPF values to validate will begin with 'v=spf1 '
spf = [value for value in values if value.startswith('v=spf1 ')]
# No SPF values in the TXT record
if len(spf) == 0:
return None
# More than one SPF value resolves as "permerror", https://datatracker.ietf.org/doc/html/rfc7208#section-4.5
if len(spf) > 1:
raise SpfValueException(
f"{record.fqdn} has more than one SPF value in the TXT record"
)
return spf[0]
def _process_answer(self, answer: Answer) -> List[str]:
values = []
for value in answer:
text_value = value.to_text()
processed_value = text_value[1:-1].replace('" "', '')
values.append(processed_value)
return values
def _check_dns_lookups(
self, record: Record, values: List[str], lookups: int = 0
) -> int:
self.log.debug(
f"_check_dns_lookups: record={record.fqdn} values={values} lookups={lookups}"
)
spf = self._get_spf_from_txt_values(record, values)
if spf is None:
return lookups
terms = spf[len('v=spf1 ') :].split(' ')
for term in terms:
if lookups > 10:
raise SpfDnsLookupException(
f"{record.fqdn} exceeds the 10 DNS lookup limit in the SPF record"
)
if term.startswith('ptr'):
raise SpfValueException(
f"{record.fqdn} uses the deprecated ptr mechanism"
)
# These mechanisms cost one DNS lookup each
if term.startswith(('a', 'mx', 'exists:', 'redirect', 'include:')):
lookups += 1
# The include mechanism can result in further lookups after resolving the DNS record
if term.startswith('include:'):
domain = term[len('include:') :]
answer = dns.resolver.resolve(domain, 'TXT')
answer_values = self._process_answer(answer)
lookups = self._check_dns_lookups(
record, answer_values, lookups
)
return lookups
def process_source_zone(self, zone, *args, **kwargs):
for record in zone.records:
if record._type != 'TXT':
continue
if record.lenient:
continue
self._check_dns_lookups(record, record.values, 0)
return zone