Skip to content

Commit

Permalink
Added some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eliasgranderubio committed Jan 20, 2017
1 parent 9f93707 commit a99cc01
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 1 deletion.
29 changes: 28 additions & 1 deletion tests/analysis/static/dependencies/test_dep_info_extractor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import unittest
import json

import os
from dagda.analysis.static.dependencies.dep_info_extractor import get_filtered_dependencies_info
from dagda.analysis.static.dependencies.dep_info_extractor import raw_info_to_json_array
from dagda.analysis.static.dependencies.dep_info_extractor import read_depcheck_output_file


# -- Test suite
Expand Down Expand Up @@ -32,6 +33,32 @@ def test_raw_info_to_json_array(self):
self.assertTrue('java#cxf#2.6.0' in filtered_dep)
self.assertTrue('java#navigator#4.08' in filtered_dep)

def test_read_depcheck_output_file_exception(self):
msg = ''
try:
read_depcheck_output_file('no_image_name')
except Exception as ex:
msg = ex.get_message()
self.assertEqual(msg, 'Depcheck output file [/tmp/depcheck/no_image_name] not found.')

def test_read_depcheck_empty_output_file(self):
# Prepare test
try:
os.makedirs('/tmp/depcheck')
created = True
except OSError:
created = False
with open('/tmp/depcheck/empty_output_file', 'w') as f:
None
# Run
raw_info = read_depcheck_output_file('empty_output_file')
# Clean up
os.remove('/tmp/depcheck/empty_output_file')
if created:
os.removedirs('/tmp/depcheck')
# Check
self.assertEqual(raw_info, '')


# -- Mock Constants

Expand Down
117 changes: 117 additions & 0 deletions tests/api/service/test_vuln.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import unittest
from unittest.mock import patch
from dagda.api.service.vuln import get_vulns_by_product_and_version
from dagda.api.service.vuln import get_products_by_cve
from dagda.api.service.vuln import get_products_by_bid
from dagda.api.service.vuln import get_products_by_exploit_id
from dagda.api.service.vuln import init_or_update_db
from dagda.api.service.vuln import get_init_or_update_db_status


# -- Test suite

class VulnApiTestCase(unittest.TestCase):

# -- Mock internal classes

class MockMongoDriverEmptyLists():
def get_vulnerabilities(self, product, version):
return []

def get_products_by_cve(self, cve_id):
return []

def get_products_by_bid(self, bid_id):
return []

def get_products_by_exploit_db_id(self, exploit_id):
return []

def get_init_db_process_status(self):
return {'timestamp': None}


class MockMongoDriverWithContent():
def get_vulnerabilities(self, product, version):
return ['CVE-2002-2002']

def get_products_by_cve(self, cve_id):
return [{'product':'product_name', 'version': '1.0.0'}]

def get_products_by_bid(self, bid_id):
return [{'product':'product_name', 'version': '1.0.0'}]

def get_products_by_exploit_db_id(self, exploit_id):
return [{'product':'product_name', 'version': '1.0.0'}]

def get_init_db_process_status(self):
return {'timestamp': 123456789}

class MockDagdaEdn():
def put(self, msg):
return

# -- Tests

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverEmptyLists())
def test_get_vulns_by_product_and_version_404(self, m):
response, code = get_vulns_by_product_and_version('product')
self.assertEqual(code, 404)

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverEmptyLists())
def test_get_products_by_cve_400(self, m):
response, code = get_products_by_cve('product')
self.assertEqual(code, 400)

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverEmptyLists())
def test_get_products_by_cve_404(self, m):
response, code = get_products_by_cve('CVE-2002-2002')
self.assertEqual(code, 404)

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverEmptyLists())
def test_get_products_by_bid_404(self, m):
response, code = get_products_by_bid(1)
self.assertEqual(code, 404)

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverEmptyLists())
def test_get_products_by_exploit_id_404(self, m):
response, code = get_products_by_exploit_id(1)
self.assertEqual(code, 404)

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverWithContent())
def test_get_vulns_by_product_and_version_200(self, m):
response = get_vulns_by_product_and_version('product')
self.assertEqual(response, '["CVE-2002-2002"]')

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverWithContent())
def test_get_products_by_cve_200(self, m):
response = get_products_by_cve('CVE-2002-2002')
self.assertEqual(response, '[{"product": "product_name", "version": "1.0.0"}]')

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverWithContent())
def test_get_products_by_bid_200(self, m):
response = get_products_by_bid(1)
self.assertEqual(response, '[{"product": "product_name", "version": "1.0.0"}]')

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverWithContent())
def test_get_products_by_exploit_id_200(self, m):
response = get_products_by_exploit_id(1)
self.assertEqual(response, '[{"product": "product_name", "version": "1.0.0"}]')

@patch('api.internal.internal_server.InternalServer.get_dagda_edn', return_value=MockDagdaEdn())
def test_init_or_update_db_202(self, m):
response, code = init_or_update_db()
self.assertEqual(code, 202)

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverEmptyLists())
def test_get_init_or_update_db_status_empty_timestamp(self, m):
response = get_init_or_update_db_status()
self.assertEqual(response, '{"timestamp": "-"}')

@patch('api.internal.internal_server.InternalServer.get_mongodb_driver', return_value=MockMongoDriverWithContent())
def test_get_init_or_update_db_status_with_timestamp(self, m):
response = get_init_or_update_db_status()
self.assertEqual(response, '{"timestamp": "1973-11-29 21:33:09"}')

if __name__ == '__main__':
unittest.main()

0 comments on commit a99cc01

Please sign in to comment.