-
Notifications
You must be signed in to change notification settings - Fork 0
/
nvd_patch_getter.py
256 lines (206 loc) · 8.67 KB
/
nvd_patch_getter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
'''
Further description of the code
This code only works for patches available from github, other websites are
ignored.
Takes in string of the example form, "CVE-2024-21665"
Use NVD API to go and search for available 'patch' tag
If available, default is to download locally to <curr-dir>/<cve_id>.patch
---- commented code is to save it to a folder named "patches" ----
Else, return False
'''
import requests
import os
import argparse
import json
import re
from ftplib import FTP
from urllib.parse import urlparse
from settings import logger
import settings
class Nvd_Patch_Getter:
def __init__(self, args):
config = settings.get_config()
self.apiKey = config['apiKey']
logger.info("Initializing...")
self.args = args
self.is_in_nvd = True
self.patch_text = ""
self.is_cve_public_bool = True
self.nvd_api_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
self.foldername = "patches"
def run(self):
cve_id = self.args.cve_id
self.local_dir_check_create()
cve_json = self.nvd_cve_id_check(cve_id)
if not cve_json:
logger.warning(f"CVE ID {cve_id} does not exist on NVD or is not publicly available")
return""
cve_patch_link_list = self.parse_patch(cve_json)
if cve_patch_link_list:
logger.info(f"List of CVE links for download: {cve_patch_link_list}")
self.patch_text = self.download_cve_patch(cve_patch_link_list, cve_id)
else:
logger.info(f"CVE ID: {cve_id} does not have patch")
logger.info("Exiting")
def download_cve_patch(self, cve_patch_link_list, cve_id):
'''
Returns empty string
Args:
cve_patch_link_list (list): list of patch links
cve_id (string): Current working cve id
Returns:
string: ""
'''
# include headers for NVD API key
for index, cve_patch_url in enumerate(cve_patch_link_list):
try:
response = requests.get(cve_patch_url)
if response.status_code == 200:
filename = "./" + self.foldername + '/' + cve_id + '_' + str(index) + ".patch"
with open(filename, 'w') as file:
file.write(response.text)
logger.info(f"Patch written to {filename}")
else:
logger.warning(f"Patch not found. Status code: {response.status_code}")
except:
logger.warning(f"Patch not found. URL: {cve_patch_url}")
return ""
def parse_patch(self, cve_json):
"""returns a list of patch urls
Args:
cve_json (_type_): _description_
Returns:
a list of patch urls or an empty list
_type_: _description_
"""
# get list of references
list_references = cve_json['vulnerabilities'][0]['cve']['references']
patch_urls = []
for reference_dict in list_references:
if "Vendor Advisory" in reference_dict.keys():
logger.info("Skip vendor advisory tag")
#skip all vendor advisory links
pass
else:
# go through every URL and search for commit word in link
any_url = reference_dict['url']
commit_url = self.is_url_contain_commit(any_url)
openssl_url = self.is_url_contain_openssl(any_url)
#curl.haxx.se/libcurl-contentencoding.patch
curl_url = self.is_url_contain_curl(any_url)
#https://sourceware.org/git/?p=glibc.git;a=commit;h=199eb0de8d673fb23aa127721054b4f1803d61f3
sourceware_url = self.is_url_contain_sourceware(any_url)
if commit_url:
# commit urls can come from github, savannah, openssl
downloadable_patch_url = self.conver_commit_patch(commit_url)
patch_urls.append(downloadable_patch_url)
elif openssl_url:
downloadable_patch_url = self.conver_openssl_patch(openssl_url)
patch_urls.append(downloadable_patch_url)
elif curl_url:
#no need for conversion
patch_urls.append(curl_url)
elif sourceware_url:
downloadable_patch_url = self.conver_sourceware_patch(sourceware_url)
patch_urls.append(downloadable_patch_url)
else:
pass
return patch_urls
def conver_sourceware_patch(self, sourceware_url):
commit_url = re.sub(r'%3B', ';', sourceware_url)
if 'commit' in sourceware_url:
patch_url = re.sub('commit', 'patch', commit_url)
elif ';' in sourceware_url:
#https://sourceware.org/git/gitweb.cgi?p=glibc.git;h=5460617d1567657621107d895ee2dd83bc1f88f2
#https://sourceware.org/git/?p=glibc.git;a=patch;h=5460617d1567657621107d895ee2dd83bc1f88f2
patch_url = re.sub(';', ';a=patch;', commit_url)
else:
patch_url = ""
return patch_url
def conver_openssl_patch(self, openssl_url):
openssl_url = re.sub('secadv_', 'secadv/', openssl_url)
return openssl_url
def conver_commit_patch(self, commit_url):
# if github url, add .patch to end url
if 'github.com' in commit_url:
patch_url = commit_url + '.patch'
return patch_url
# if starts with git. , highly likely swapping the word commit with
# patch will work
elif 'git.' in commit_url:
#convert ascii back to character, if any
commit_url = re.sub(r'%3B', ';', commit_url)
patch_url = re.sub('commit', 'patch', commit_url)
return patch_url
def is_url_contain_sourceware(self, any_url):
if 'sourceware.org' in any_url:
return any_url
return ""
def is_url_contain_commit(self, any_url):
if 'commit' in any_url:
return any_url
return ""
def is_url_contain_openssl(self, any_url):
if 'openssl.org' in any_url:
return any_url
return ""
def is_url_contain_curl(self, any_url):
if 'curl.' in any_url and '.html' not in any_url:
return any_url
return ""
def is_cve_public(self, cve_json):
try:
vulnerabilities = cve_json.get("vulnerabilities", [])
if vulnerabilities:
vuln_status = vulnerabilities[0].get("cve", {}).get("vulnStatus", "")
return vuln_status in ["Analyzed", "Modified"]
return False
except json.JSONDecodeError:
print("Error: Invalid JSON")
return False
def nvd_cve_id_check(self, cve_id):
# Check if api can return cve_id
# e.g https://services.nvd.nist.gov/rest/json/cves/2.0?cveId=CVE-2019-1010218
headers = {
"apiKey": f"{self.apiKey}"
}
cve_url = self.nvd_api_url + "?cveId=" + cve_id
response = requests.get(cve_url, headers=headers)
if response.status_code == 200:
data = response.json()
if not self.is_cve_public(data):
self.is_cve_public_bool = False
logger.info(f"CVE ID is not public, Error: {data}")
return""
return data
else:
logger.info(f"API key printed from config.json: {self.apiKey}")
logger.info(f"CVE ID does not exist on NVD, Error: {response.status_code}")
self.is_in_nvd = False
return ""
def local_dir_check_create(self):
# Check if current directory has folder named "patches"
# Create if it does not exist
curr_dir = os.getcwd()
target_dir = os.path.join(curr_dir, self.foldername)
print(target_dir)
if not os.path.exists(target_dir):
logger.info("Folder named patches does not exist! Creating...")
os.makedirs(target_dir)
logger.info("Folder patches created")
else:
logger.info("Folder patches exists")
def patch_text_getter(self):
return self.patch_text
def is_in_nvd_getter(self):
return self.is_in_nvd
def is_cve_public_getter(self):
return self.is_cve_public_bool
def parse_arguments():
parser = argparse.ArgumentParser(description='Takes in cve-id and downloads patch file from NVD website')
parser.add_argument('-id', '--cve_id', help='CVE ID input')
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
inst = Nvd_Patch_Getter(args)
inst.run()