Skip to content

Commit

Permalink
Tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Prabhu Subramanian <prabhu@appthreat.com>
  • Loading branch information
prabhu committed Jul 31, 2024
1 parent 4409243 commit ec6fef3
Showing 1 changed file with 76 additions and 66 deletions.
142 changes: 76 additions & 66 deletions blint/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
from rich.terminal_theme import MONOKAI

from blint.binary import parse

# pylint: disable-next=unused-import
from blint.checks import (check_nx, check_pie,
check_relro, check_canary, check_rpath,
check_virtual_size, check_authenticode,
check_dll_characteristics, check_codesign,
check_trust_info)
from blint.checks import (
check_nx,
check_pie,
check_relro,
check_canary,
check_rpath,
check_virtual_size,
check_authenticode,
check_dll_characteristics,
check_codesign,
check_trust_info,
)
from blint.config import FIRST_STAGE_WORDS, PII_WORDS, get_int_from_env
from blint.logger import LOG, console
from blint.utils import (create_findings_table, is_fuzzable_name, print_findings_table)
from blint.utils import create_findings_table, is_fuzzable_name, print_findings_table

try:
import importlib.resources # pylint: disable=ungrouped-imports
Expand All @@ -37,16 +45,12 @@
with contextlib.suppress(NameError, FileNotFoundError):
review_files = (
resource.name
for resource in importlib.resources.files(
"blint.data.annotations"
).iterdir()
for resource in importlib.resources.files("blint.data.annotations").iterdir()
if resource.is_file() and resource.name.endswith(".yml")
)
if not review_files:
review_methods_dir = Path(__file__).parent / "data" / "annotations"
review_files = [
p.as_posix() for p in Path(review_methods_dir).rglob("*.yml")
]
review_files = [p.as_posix() for p in Path(review_methods_dir).rglob("*.yml")]

rules_dict = {}
review_exe_dict = defaultdict(list)
Expand All @@ -56,16 +60,18 @@
review_entries_dict = defaultdict(list)
review_rules_cache = {
"PII_READ": {
"id": "PII_READ",
"title": "Detect PII Read Operations",
"summary": "Can Retrieve Sensitive PII data",
"description": "Contains logic to retrieve sensitive data such as names, email, passwords etc.",
"patterns": PII_WORDS
"patterns": PII_WORDS,
},
"LOADER_SYMBOLS": {
"id": "LOADER_SYMBOLS",
"title": "Detect Initial Loader",
"summary": "Behaves like a loader",
"description": "The binary behaves like a loader by downloading and executing additional payloads.",
"patterns": FIRST_STAGE_WORDS
"patterns": FIRST_STAGE_WORDS,
},
}

Expand All @@ -84,11 +90,7 @@ def get_resource(package, resource):
# If we're in the context of a module, we could also use
# ``__loader__.get_resource_reader(__name__).open_resource(resource)``.
# We use open_binary() because it is simple.
return (
importlib.resources.files(package)
.joinpath(resource)
.open("r", encoding="utf-8")
)
return importlib.resources.files(package).joinpath(resource).open("r", encoding="utf-8")

# Fall back to __file__.

Expand Down Expand Up @@ -318,7 +320,7 @@ def json_serializer(obj):
"""JSON serializer to help serialize problematic types such as bytes"""
if isinstance(obj, bytes):
try:
return obj.decode('utf-8')
return obj.decode("utf-8")
except UnicodeDecodeError:
return ""

Expand Down Expand Up @@ -346,23 +348,26 @@ def report(src_dir, reports_dir, findings, reviews, files, fuzzables):
print_findings_table(findings, files)
findings_file = Path(reports_dir) / "findings.json"
LOG.info(f"Findings written to {findings_file}")
output = orjson.dumps({**common_metadata, "findings": findings}, default=json_serializer).decode("utf-8",
"ignore")
output = orjson.dumps(
{**common_metadata, "findings": findings}, default=json_serializer
).decode("utf-8", "ignore")
with open(findings_file, mode="w", encoding="utf-8") as ffp:
ffp.write(output)
if reviews:
print_reviews_table(reviews, files)
reviews_file = Path(reports_dir) / "reviews.json"
LOG.info(f"Review written to {reviews_file}")
output = orjson.dumps({**common_metadata, "reviews": reviews}, default=json_serializer).decode("utf-8",
"ignore")
output = orjson.dumps(
{**common_metadata, "reviews": reviews}, default=json_serializer
).decode("utf-8", "ignore")
with open(reviews_file, mode="w", encoding="utf-8") as rfp:
rfp.write(output)
if fuzzables:
fuzzables_file = Path(reports_dir) / "fuzzables.json"
LOG.info(f"Fuzzables data written to {fuzzables_file}")
output = orjson.dumps({**common_metadata, "fuzzables": fuzzables}, default=json_serializer).decode("utf-8",
"ignore")
output = orjson.dumps(
{**common_metadata, "fuzzables": fuzzables}, default=json_serializer
).decode("utf-8", "ignore")
with open(fuzzables_file, mode="w", encoding="utf-8") as rfp:
rfp.write(output)
else:
Expand All @@ -385,8 +390,11 @@ def __init__(self):
self.reviews = []
self.fuzzables = []
self.progress = Progress(
transient=True, redirect_stderr=True,
redirect_stdout=True, refresh_per_second=1, )
transient=True,
redirect_stderr=True,
redirect_stdout=True,
refresh_per_second=1,
)
self.task = None
self.reviewer = None

Expand All @@ -410,7 +418,9 @@ def start(self, files, reports_dir, no_reviews=False, suggest_fuzzables=True):
with self.progress:
self.task = self.progress.add_task(
f"[green] BLinting {len(files)} binaries",
total=len(files), start=True, )
total=len(files),
start=True,
)
for f in files:
self._process_files(f, reports_dir, no_reviews, suggest_fuzzables)
return self.findings, self.reviews, self.fuzzables
Expand All @@ -425,20 +435,16 @@ def _process_files(self, f, reports_dir, no_reviews, suggest_fuzzables):
suggest_fuzzables (bool): Whether to suggest fuzzable targets or not.
"""
self.progress.update(
self.task, description=f"Processing [bold]{f}[/bold]")
self.progress.update(self.task, description=f"Processing [bold]{f}[/bold]")
metadata = parse(f)
exe_name = metadata.get("name", "")
# Store raw metadata
metadata_file = (Path(reports_dir) / f"{os.path.basename(exe_name)}"
f"-metadata.json")
metadata_file = Path(reports_dir) / f"{os.path.basename(exe_name)}" f"-metadata.json"
LOG.debug(f"Metadata written to {metadata_file}")
output = orjson.dumps(metadata, default=json_serializer).decode("utf-8", "ignore")
with open(metadata_file, mode="w", encoding="utf-8") as ffp:
ffp.write(output)
self.progress.update(
self.task,
description=f"Checking [bold]{f}[/bold] against rules")
self.progress.update(self.task, description=f"Checking [bold]{f}[/bold] against rules")
if finding := run_checks(f, metadata):
self.findings += finding
# Perform symbol reviews
Expand All @@ -457,8 +463,7 @@ def _process_files(self, f, reports_dir, no_reviews, suggest_fuzzables):

def do_review(self, exe_name, f, metadata):
"""Performs a review of the given file."""
self.progress.update(
self.task, description="Checking methods against review rules")
self.progress.update(self.task, description="Checking methods against review rules")
self.reviewer = ReviewRunner()
self.reviewer.run_review(metadata)
if self.reviewer.results:
Expand Down Expand Up @@ -498,11 +503,11 @@ def run_review(self, metadata):
self._gen_review_lists(exe_type)
# Check if reviews are available for this exe type
if (
self.review_methods_list
or self.review_exe_list
or self.review_symbols_list
or self.review_imports_list
or self.review_entries_list
self.review_methods_list
or self.review_exe_list
or self.review_symbols_list
or self.review_imports_list
or self.review_entries_list
):
return self._review_lists(metadata)
return self._review_loader_symbols(metadata)
Expand Down Expand Up @@ -552,9 +557,11 @@ def _review_entries(self, metadata):
Returns:
dict: The results of the review.
"""
entries_list = [f.get("name", "") for f in
metadata.get("dynamic_entries", []) if
f.get("tag") == "NEEDED"]
entries_list = [
f.get("name", "")
for f in metadata.get("dynamic_entries", [])
if f.get("tag") == "NEEDED"
]
LOG.debug(f"Reviewing {len(entries_list)} dynamic entries")
self.run_review_methods_symbols(self.review_entries_list, entries_list)

Expand Down Expand Up @@ -597,14 +604,11 @@ def _review_symbols_exe(self, metadata):
Args:
metadata (dict): The metadata to review.
"""
symbols_list = [f.get("name", "") for f in
metadata.get("dynamic_symbols", [])]
symbols_list += [f.get("name", "") for f in
metadata.get("symtab_symbols", [])]
symbols_list = [f.get("name", "") for f in metadata.get("dynamic_symbols", [])]
symbols_list += [f.get("name", "") for f in metadata.get("symtab_symbols", [])]
LOG.debug(f"Reviewing {len(symbols_list)} symbols")
if self.review_symbols_list:
self.run_review_methods_symbols(
self.review_symbols_list, symbols_list)
self.run_review_methods_symbols(self.review_symbols_list, symbols_list)
if self.review_exe_list:
self.run_review_methods_symbols(self.review_exe_list, symbols_list)

Expand All @@ -616,22 +620,19 @@ def _methods_or_exe(self, metadata):
Args:
metadata (dict): The metadata to review.
"""
functions_list = [re.sub(r"[*&()]", "", f.get("name", "")) for f in
metadata.get("functions", [])]
functions_list = [
re.sub(r"[*&()]", "", f.get("name", "")) for f in metadata.get("functions", [])
]
if metadata.get("magic", "").startswith("PE"):
functions_list += [f.get("name", "") for f in
metadata.get("symtab_symbols", [])]
functions_list += [f.get("name", "") for f in metadata.get("symtab_symbols", [])]
# If there are no function but static symbols use that instead
if not functions_list and metadata.get("symtab_symbols"):
functions_list = [f.get("name", "") for f in
metadata.get("symtab_symbols", [])]
functions_list = [f.get("name", "") for f in metadata.get("symtab_symbols", [])]
LOG.debug(f"Reviewing {len(functions_list)} functions")
if self.review_methods_list:
self.run_review_methods_symbols(
self.review_methods_list, functions_list)
self.run_review_methods_symbols(self.review_methods_list, functions_list)
if self.review_exe_list:
self.run_review_methods_symbols(
self.review_exe_list, functions_list)
self.run_review_methods_symbols(self.review_exe_list, functions_list)

def _gen_review_lists(self, exe_type):
"""
Expand All @@ -658,8 +659,12 @@ def process_review(self, f, exe_name):
if not self.results:
return {}
for cid, evidence in self.results.items():
aresult = {**review_rules_cache.get(cid), "evidence": evidence,
"filename": f, "exe_name": exe_name, }
aresult = {
**review_rules_cache.get(cid),
"evidence": evidence,
"filename": f,
"exe_name": exe_name,
}
del aresult["patterns"]
reviews.append(aresult)
return reviews
Expand Down Expand Up @@ -695,8 +700,13 @@ def run_review_methods_symbols(self, review_list, functions_list):
if found_pattern[apattern] > EVIDENCE_LIMIT or found_cid[cid] > EVIDENCE_LIMIT:
continue
for afun in functions_list:
if apattern.lower() in afun.lower() and not found_function.get(afun.lower()):
result = {"pattern": apattern, "function": afun, }
if apattern.lower() in afun.lower() and not found_function.get(
afun.lower()
):
result = {
"pattern": apattern,
"function": afun,
}
results[cid].append(result)
found_cid[cid] += 1
found_pattern[apattern] += 1
Expand Down

0 comments on commit ec6fef3

Please sign in to comment.