| # -*- coding: utf-8 -*- |
| # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| # See https://llvm.org/LICENSE.txt for license information. |
| # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| """ This module is a collection of methods commonly used in this project. """ |
| import collections |
| import functools |
| import json |
| import logging |
| import os |
| import os.path |
| import re |
| import shlex |
| import subprocess |
| import sys |
| |
| ENVIRONMENT_KEY = "INTERCEPT_BUILD" |
| |
| Execution = collections.namedtuple("Execution", ["pid", "cwd", "cmd"]) |
| |
| CtuConfig = collections.namedtuple( |
| "CtuConfig", ["collect", "analyze", "dir", "extdef_map_cmd"] |
| ) |
| |
| |
| def duplicate_check(method): |
| """Predicate to detect duplicated entries. |
| |
| Unique hash method can be use to detect duplicates. Entries are |
| represented as dictionaries, which has no default hash method. |
| This implementation uses a set datatype to store the unique hash values. |
| |
| This method returns a method which can detect the duplicate values.""" |
| |
| def predicate(entry): |
| entry_hash = predicate.unique(entry) |
| if entry_hash not in predicate.state: |
| predicate.state.add(entry_hash) |
| return False |
| return True |
| |
| predicate.unique = method |
| predicate.state = set() |
| return predicate |
| |
| |
| def run_build(command, *args, **kwargs): |
| """Run and report build command execution |
| |
| :param command: array of tokens |
| :return: exit code of the process |
| """ |
| environment = kwargs.get("env", os.environ) |
| logging.debug("run build %s, in environment: %s", command, environment) |
| exit_code = subprocess.call(command, *args, **kwargs) |
| logging.debug("build finished with exit code: %d", exit_code) |
| return exit_code |
| |
| |
| def run_command(command, cwd=None): |
| """Run a given command and report the execution. |
| |
| :param command: array of tokens |
| :param cwd: the working directory where the command will be executed |
| :return: output of the command |
| """ |
| |
| def decode_when_needed(result): |
| """check_output returns bytes or string depend on python version""" |
| return result.decode("utf-8") if isinstance(result, bytes) else result |
| |
| try: |
| directory = os.path.abspath(cwd) if cwd else os.getcwd() |
| logging.debug("exec command %s in %s", command, directory) |
| output = subprocess.check_output( |
| command, cwd=directory, stderr=subprocess.STDOUT |
| ) |
| return decode_when_needed(output).splitlines() |
| except subprocess.CalledProcessError as ex: |
| ex.output = decode_when_needed(ex.output).splitlines() |
| raise ex |
| |
| |
| def reconfigure_logging(verbose_level): |
| """Reconfigure logging level and format based on the verbose flag. |
| |
| :param verbose_level: number of `-v` flags received by the command |
| :return: no return value |
| """ |
| # Exit when nothing to do. |
| if verbose_level == 0: |
| return |
| |
| root = logging.getLogger() |
| # Tune logging level. |
| level = logging.WARNING - min(logging.WARNING, (10 * verbose_level)) |
| root.setLevel(level) |
| # Be verbose with messages. |
| if verbose_level <= 3: |
| fmt_string = "%(name)s: %(levelname)s: %(message)s" |
| else: |
| fmt_string = "%(name)s: %(levelname)s: %(funcName)s: %(message)s" |
| handler = logging.StreamHandler(sys.stdout) |
| handler.setFormatter(logging.Formatter(fmt=fmt_string)) |
| root.handlers = [handler] |
| |
| |
| def command_entry_point(function): |
| """Decorator for command entry methods. |
| |
| The decorator initialize/shutdown logging and guard on programming |
| errors (catch exceptions). |
| |
| The decorated method can have arbitrary parameters, the return value will |
| be the exit code of the process.""" |
| |
| @functools.wraps(function) |
| def wrapper(*args, **kwargs): |
| """Do housekeeping tasks and execute the wrapped method.""" |
| |
| try: |
| logging.basicConfig( |
| format="%(name)s: %(message)s", level=logging.WARNING, stream=sys.stdout |
| ) |
| # This hack to get the executable name as %(name). |
| logging.getLogger().name = os.path.basename(sys.argv[0]) |
| return function(*args, **kwargs) |
| except KeyboardInterrupt: |
| logging.warning("Keyboard interrupt") |
| return 130 # Signal received exit code for bash. |
| except Exception: |
| logging.exception("Internal error.") |
| if logging.getLogger().isEnabledFor(logging.DEBUG): |
| logging.error( |
| "Please report this bug and attach the output " "to the bug report" |
| ) |
| else: |
| logging.error( |
| "Please run this command again and turn on " |
| "verbose mode (add '-vvvv' as argument)." |
| ) |
| return 64 # Some non used exit code for internal errors. |
| finally: |
| logging.shutdown() |
| |
| return wrapper |
| |
| |
| def compiler_wrapper(function): |
| """Implements compiler wrapper base functionality. |
| |
| A compiler wrapper executes the real compiler, then implement some |
| functionality, then returns with the real compiler exit code. |
| |
| :param function: the extra functionality what the wrapper want to |
| do on top of the compiler call. If it throws exception, it will be |
| caught and logged. |
| :return: the exit code of the real compiler. |
| |
| The :param function: will receive the following arguments: |
| |
| :param result: the exit code of the compilation. |
| :param execution: the command executed by the wrapper.""" |
| |
| def is_cxx_compiler(): |
| """Find out was it a C++ compiler call. Compiler wrapper names |
| contain the compiler type. C++ compiler wrappers ends with `c++`, |
| but might have `.exe` extension on windows.""" |
| |
| wrapper_command = os.path.basename(sys.argv[0]) |
| return re.match(r"(.+)c\+\+(.*)", wrapper_command) |
| |
| def run_compiler(executable): |
| """Execute compilation with the real compiler.""" |
| |
| command = executable + sys.argv[1:] |
| logging.debug("compilation: %s", command) |
| result = subprocess.call(command) |
| logging.debug("compilation exit code: %d", result) |
| return result |
| |
| # Get relevant parameters from environment. |
| parameters = json.loads(os.environ[ENVIRONMENT_KEY]) |
| reconfigure_logging(parameters["verbose"]) |
| # Execute the requested compilation. Do crash if anything goes wrong. |
| cxx = is_cxx_compiler() |
| compiler = parameters["cxx"] if cxx else parameters["cc"] |
| result = run_compiler(compiler) |
| # Call the wrapped method and ignore it's return value. |
| try: |
| call = Execution( |
| pid=os.getpid(), |
| cwd=os.getcwd(), |
| cmd=["c++" if cxx else "cc"] + sys.argv[1:], |
| ) |
| function(result, call) |
| except: |
| logging.exception("Compiler wrapper failed complete.") |
| finally: |
| # Always return the real compiler exit code. |
| return result |
| |
| |
| def wrapper_environment(args): |
| """Set up environment for interpose compiler wrapper.""" |
| |
| return { |
| ENVIRONMENT_KEY: json.dumps( |
| { |
| "verbose": args.verbose, |
| "cc": shlex.split(args.cc), |
| "cxx": shlex.split(args.cxx), |
| } |
| ) |
| } |