-
-
Notifications
You must be signed in to change notification settings - Fork 395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Malware bazaar injestor #2259
base: develop
Are you sure you want to change the base?
Malware bazaar injestor #2259
Conversation
fixed json serialization for types: bytes and File
in this way you can set an health checker if you want
…estor # Conflicts: # api_app/serializers/job.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some food for thought. View full project report here.
api_app/classes.py
Outdated
if all(isinstance(n, bytes) for n in report_content): | ||
report_content = [ | ||
base64.b64encode(b).decode("utf-8") for b in report_content | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif
we do not want to check the content 2 times
api_app/classes.py
Outdated
base64.b64encode(b).decode("utf-8") for b in report_content | ||
] | ||
|
||
self.content = content |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessary
api_app/ingestors_manager/classes.py
Outdated
def after_run_success(self, content): | ||
super().after_run_success(content) | ||
self._config: IngestorConfig | ||
# exhaust generator | ||
deque( | ||
self._config.create_jobs( | ||
# every job created from an ingestor | ||
content, | ||
self.content, | ||
TLP.CLEAR.value, | ||
self._user, | ||
self._config.delay, | ||
), | ||
maxlen=0, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def after_run_success(self, content):
pre_parsing_content = content # <------------
super().after_run_success(content)
self._config: IngestorConfig
deque(
self._config.create_jobs(
# every job created from an ingestor
pre_parsing_content, # <-----------
TLP.CLEAR.value,
self._user,
self._config.delay,
),
maxlen=0,
)
```
|
||
class MalwareBazaar(Ingestor): | ||
url: str | ||
hours: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add as comment the same description that you added in the Parameter
model?
Just so that if someone only reads this code, understand what is this parameter for
|
||
plugin = {'id': 2, 'python_module': {'health_check_schedule': None, 'update_schedule': {'minute': '0', 'hour': '*', 'day_of_week': '*', 'day_of_month': '*', 'month_of_year': '*'}, 'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'schedule': {'minute': '0', 'hour': '*', 'day_of_week': '*', 'day_of_month': '*', 'month_of_year': '*'}, 'periodic_task': {'crontab': {'minute': '0', 'hour': '*', 'day_of_week': '*', 'day_of_month': '*', 'month_of_year': '*'}, 'name': 'MalwarebazaarIngestor', 'task': 'intel_owl.tasks.execute_ingestor', 'kwargs': '{"config_name": "MalwareBazaar"}', 'queue': 'default', 'enabled': True}, 'user': {'username': 'MalwarebazaarIngestor', 'first_name': '', 'last_name': '', 'email': ''}, 'name': 'MalwareBazaar', 'description': 'MalwareBazaar ingestor', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'maximum_jobs': 30, 'delay': '00:00:30', 'health_check_task': None, 'playbook_to_execute': 2, 'model': 'ingestors_manager.IngestorConfig'} | ||
|
||
params = [{'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'url', 'type': 'str', 'description': 'API endpoint', 'is_secret': False, 'required': True}, {'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'hours', 'type': 'int', 'description': 'Download samples that are up to X hours old', 'is_secret': False, 'required': True}, {'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'signatures', 'type': 'list', 'description': 'Download samples from chosen signatures (aka malware families)', 'is_secret': True, 'required': True}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense that signatures
is a secret?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you dont't want to revealt that information, dunno. Anyway I changed that as NOT secret parameter
class MalwareBazaar(Ingestor): | ||
url: str | ||
hours: int | ||
signatures: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the Parameter
called signatures
it is actually a secret, the config()
function would have assigned it to _signatures
in this class.
Idk how this is working on your side
class ThreatFox(Ingestor): | ||
url: str | ||
days: int | ||
|
||
BASE_URL = "https://threatfox-api.abuse.ch/api/v1/" | ||
|
||
def run(self) -> Iterable[Any]: | ||
result = requests.post( | ||
self.BASE_URL, json={"query": "get_iocs", "days": self.days} | ||
) | ||
result = requests.post(self.url, json={"query": "get_iocs", "days": self.days}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you change the URL? Why someone should be able to change the URL to something differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the BASE_URL
to url
in order to enable the healthcheck feature (it's present, why disable it completely?).
I made it to do things the same way, as if it were a standard. If it doesn't go well I'll put it back as before.
…estor # Conflicts: # requirements/project-requirements.txt
fixed threatfox migration
if all(isinstance(n, File) for n in report_content): | ||
report_content = [ | ||
base64.b64encode(f.read()).decode("utf-8") for f in report_content | ||
] | ||
elif all(isinstance(n, bytes) for n in report_content): | ||
report_content = [ | ||
base64.b64encode(b).decode("utf-8") for b in report_content | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are iterating them anyway, why don't we convert each object to the correct base64?
report_content = []
for n in content:
if isinstance(n, File):
report_content.append(...)
elif isinstance(n, bytes):
report_content.append(...)
else: report_content.append(n)
result = requests.post( | ||
self.url, | ||
data={"query": "get_siginfo", "signature": signature, "limit": 100}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a timeout to the post request? Just in case
sample_archive = requests.post( | ||
self.url, | ||
data={ | ||
"query": "get_file", | ||
"sha256_hash": h, | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a timeout
# Download samples from chosen signatures (aka malware families) | ||
signatures: str | ||
|
||
def run(self) -> Iterable[Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function imho should be split in 3 parts:
- Where we are retrieving the results of each signatures
- Where we are parsing the results and adding stuff in the
hashes
variable - Where we are downloading the file from the hashes
I would split this function in these 3 parts, to decrease its complexity
Description
Addition of MalwareBazaar Ingestor and minor bug fixing
Type of change
Please delete options that are not relevant.
Checklist
develop
_monkeypatch()
was used in its class to apply the necessary decorators.dumpplugin
command and added it in the project as a data migration. ("How to share a plugin with the community")test_files.zip
and you added the default tests for that mimetype in test_classes.py.FREE_TO_USE_ANALYZERS
playbook by following this guide.MockUpResponse
of the_monkeypatch()
method. This serves us to provide a valid sample for testing.Black
,Flake
,Isort
) gave 0 errors. If you have correctly installed pre-commit, it does these checks and adjustments on your behalf.tests
folder). All the tests (new and old ones) gave 0 errors.DeepSource
,Django Doctors
or other third-party linters have triggered any alerts during the CI checks, I have solved those alerts.