# Copyright (C) 2010-2012 Cuckoo Sandbox Developers. # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org # See the file 'docs/LICENSE' for copying permission. import os import sys import string try: import magic HAVE_MAGIC = True except: HAVE_MAGIC = False from lib.cuckoo.common.constants import CUCKOO_ROOT from lib.cuckoo.common.abstracts import Processing from lib.cuckoo.common.utils import convert_to_printable, File import lib.pefile.pefile as pefile import lib.pefile.peutils as peutils # Partially taken from http://malwarecookbook.googlecode.com/svn/trunk/3/8/pescanner.py class PortableExecutable: """PE analysis. @note: Partially taken from http://malwarecookbook.googlecode.com/svn/trunk/3/8/pescanner.py. """ def __init__(self, file_path): """@param file_path: file path.""" self.file_path = file_path self.pe = None def _get_filetype(self, data): """Gets filetype, uses libmagic if available. @param data: data to be analyzed. @return: file type or None. """ if not IS_MAGIC: return None try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.buffer(data) except: try: file_type = magic.from_buffer(data) except Exception: return None return file_type def _get_peid_signatures(self): """Gets PEID signatures. @return: matched signatures or None. """ if not self.pe: return None try: signatures = peutils.SignatureDatabase(os.path.join(CUCKOO_ROOT, "data", "peutils" , "UserDB.TXT")) return signatures.match(self.pe, ep_only = True) except: return None def _get_imported_symbols(self): """Gets imported symbols. @return: imported symbols dict or None. """ if not self.pe: return None imports = [] if hasattr(self.pe, "DIRECTORY_ENTRY_IMPORT"): for entry in self.pe.DIRECTORY_ENTRY_IMPORT: try: symbols = [] for imported_symbol in entry.imports: symbol = {} symbol["address"] = hex(imported_symbol.address) symbol["name"] = imported_symbol.name symbols.append(symbol) imports_section = {} imports_section["dll"] = entry.dll imports_section["imports"] = symbols imports.append(imports_section) except: continue return imports def _get_exported_symbols(self): """Gets exported symbols. @return: exported symbols dict or None. """ if not self.pe: return None exports = [] if hasattr(self.pe, "DIRECTORY_ENTRY_EXPORT"): for exported_symbol in self.pe.DIRECTORY_ENTRY_EXPORT.symbols: symbol = {} symbol["address"] = hex(self.pe.OPTIONAL_HEADER.ImageBase + exported_symbol.address) symbol["name"] = exported_symbol.name symbol["ordinal"] = exported_symbol.ordinal exports.append(symbol) return exports def _get_sections(self): """Gets sections. @return: sections dict or None. """ if not self.pe: return None sections = [] for entry in self.pe.sections: try: section = {} section["name"] = convert_to_printable(entry.Name.strip("\x00")) section["virtual_address"] = hex(entry.VirtualAddress) section["virtual_size"] = hex(entry.Misc_VirtualSize) section["size_of_data"] = hex(entry.SizeOfRawData) section["entropy"] = entry.get_entropy() sections.append(section) except: continue return sections def _get_resources(self): """Get resources. @return: resources dict or None. """ if not self.pe: return None resources = [] if hasattr(self.pe, "DIRECTORY_ENTRY_RESOURCE"): for resource_type in self.pe.DIRECTORY_ENTRY_RESOURCE.entries: try: resource = {} if resource_type.name is not None: name = "%s" % resource_type.name else: name = "%s" % pefile.RESOURCE_TYPE.get(resource_type.struct.Id) if name == None: name = "%d" % resource_type.struct.Id if hasattr(resource_type, "directory"): for resource_id in resource_type.directory.entries: if hasattr(resource_id, "directory"): for resource_lang in resource_id.directory.entries: data = self.pe.get_data(resource_lang.data.struct.OffsetToData, resource_lang.data.struct.Size) filetype = self._get_filetype(data) language = pefile.LANG.get(resource_lang.data.lang, None) sublanguage = pefile.get_sublang_name_for_lang(resource_lang.data.lang, resource_lang.data.sublang) resource["name"] = name resource["offset"] = ("%-8s" % hex(resource_lang.data.struct.OffsetToData)).strip() resource["size"] = ("%-8s" % hex(resource_lang.data.struct.Size)).strip() resource["filetype"] = filetype resource["language"] = language resource["sublanguage"] = sublanguage resources.append(resource) except: continue return resources def _get_versioninfo(self): """Get version info. @return: info dict or None. """ if not self.pe: return None infos = [] if hasattr(self.pe, "VS_VERSIONINFO"): if hasattr(self.pe, "FileInfo"): for entry in self.pe.FileInfo: try: if hasattr(entry, "StringTable"): for st_entry in entry.StringTable: for str_entry in st_entry.entries.items(): entry = {} entry["name"] = convert_to_printable(str_entry[0]) entry["value"] = convert_to_printable(str_entry[1]) infos.append(entry) elif hasattr(entry, "Var"): for var_entry in entry.Var: if hasattr(var_entry, "entry"): entry = {} entry["name"] = convert_to_printable(var_entry.entry.keys()[0]) entry["value"] = convert_to_printable(var_entry.entry.values()[0]) infos.append(entry) except: continue return infos def run(self): """Run analysis. @return: analysis results dict or None. """ if not os.path.exists(self.file_path): return None try: self.pe = pefile.PE(self.file_path) except pefile.PEFormatError: return None results = {} results["peid_signatures"] = self._get_peid_signatures() results["pe_imports"] = self._get_imported_symbols() results["pe_exports"] = self._get_exported_symbols() results["pe_sections"] = self._get_sections() results["pe_resources"] = self._get_resources() results["pe_versioninfo"] = self._get_versioninfo() results["imported_dll_count"] = len([x for x in results["pe_imports"] if "dll" in x and x['dll'] is not None ]) return results class Static(Processing): """Static analysis.""" def run(self): """Run analysis. @return: results dict. """ self.key = "static" static = {} file_type = File(self.file_path).get_type() if file_type: if "PE32" in file_type: static = PortableExecutable(self.file_path).run() return static .