Source code for aws_lambda_python_packager.dep_analyzer

from __future__ import annotations

import logging
import re
import shlex
import shutil
import subprocess  # nosec
import tempfile
from abc import ABC, abstractmethod
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Any, Iterable

import requests

from .util import PathType, chdir_cm

PackageInfo = namedtuple("PackageInfo", ["name", "version", "version_spec"])
PACKAGE_URL = "https://raw.githubusercontent.com/mumblepins/aws-get-lambda-python-pkg-versions/main/{region}-python{python_version}-{architecture}.json"


[docs]class CommandNotFoundError(Exception): pass
[docs]class ExtraLine(list): pass
[docs]class DepAnalyzer(ABC): # pylint: disable=too-many-instance-attributes project_root: Path analyzer_name: str # region init and teardown def __init__( self, project_root: PathType | None, python_version: str = "3.9", architecture: str = "x86_64", region: str = "us-east-1", ignore_packages=False, update_dependencies=False, additional_packages_to_ignore: dict | None = None, ): if additional_packages_to_ignore is None: self._additional_packages_to_ignore = {} else: self._additional_packages_to_ignore = additional_packages_to_ignore self._extra_lines: list[ExtraLine] | None = None self._exported_reqs = None self._reqs: dict[Any, PackageInfo] | None = None self._pkgs_to_ignore_dict = None if project_root is None: self.project_root = Path.cwd() else: self.project_root = Path(project_root) # self._pip = shutil.which("pip") # print(subprocess.check_output("which pip3", shell=True)) # self._pip = "/home/sullid2/.pyenv/versions/3.9.12/bin/pip3.9" # print(self._pip) # if self._pip is None: # raise CommandNotFoundError("pip not found, please install and add to PATH") self.python_version = python_version self.architecture = architecture self.region = region self.ignore_packages = ignore_packages self.update_dependencies = update_dependencies self._temp_proj_dir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with self._chdir = partial(chdir_cm, self._temp_proj_dir.name) self._target = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with self.log = logging.getLogger(self.__class__.__name__) def __del__(self): try: self._target.cleanup() self._temp_proj_dir.cleanup() except OSError: pass # endregion # region abstract methods @abstractmethod def _get_requirements(self) -> Iterable[PackageInfo | ExtraLine]: pass @abstractmethod def _update_dependency_file(self, pkgs_to_add: dict[str, PackageInfo]): pass
[docs] @abstractmethod def direct_dependencies(self) -> dict[str, str]: pass
# endregion # region properties @property def pkgs_to_ignore_dict(self): if not self.ignore_packages and not self._additional_packages_to_ignore: return {} if self._pkgs_to_ignore_dict is None: self._pkgs_to_ignore_dict = {} if self.ignore_packages: lambda_pkgs_to_ignore = self._get_packages_to_ignore() else: lambda_pkgs_to_ignore = {} for pk, pv in { **lambda_pkgs_to_ignore, **self._additional_packages_to_ignore, }.items(): pk = re.sub(r"\[[^\]]+\]$", "", pk) self._pkgs_to_ignore_dict[pk] = pv return self._pkgs_to_ignore_dict @property def pkgs_to_ignore(self): return [f"{k}=={v}" for k, v in self.pkgs_to_ignore_dict.items()] @property def pkgs_to_ignore_info(self): return {k: PackageInfo(k, v, f"{k}=={v}") for k, v in self.pkgs_to_ignore_dict.items()} @property def requirements(self) -> dict[str, PackageInfo]: if self._reqs is None: self.log.warning("Exporting requirements") reqs = self.update_dependency_file() if reqs is None: reqs = list(self.get_requirements()) if self._extra_lines is None: self._extra_lines = [r for r in reqs if isinstance(r, ExtraLine)] self._reqs = {r.name: r for r in reqs if not isinstance(r, ExtraLine)} return self._reqs @property def extra_lines(self): if self._extra_lines is None: _ = self.requirements return self._extra_lines # endregion # region private methods @contextmanager def _change_context(self): with self._chdir(): yield def _log_popen_output(self, output, level=logging.DEBUG, prefix=""): data = "" for line in output: o = line.decode("utf-8") self.log.log(level, prefix + o.rstrip()) data += o return data def _install_pip(self, *args, return_state=False, quiet=False, requirements_file=False): pip_command = [ "install", "--disable-pip-version-check", "--ignore-installed", "--no-compile", "--python-version", self.python_version, "--implementation", "cp", ] if not requirements_file: for el in self.extra_lines: pip_command.extend(el) if self.architecture == "arm64": pip_command.extend( [ "--platform", "manylinux2014_aarch64", ] ) elif self.architecture == "x86_64": pip_command.extend( [ "--platform", "manylinux2014_x86_64", ] ) pip_command.extend(args) return self.run_pip(*pip_command, return_state=return_state, quiet=quiet) def _get_packages_to_ignore(self): try: r = requests.get( PACKAGE_URL.format( region=self.region, architecture=self.architecture, python_version=self.python_version, ), timeout=30, ) r.raise_for_status() data = r.json() pkgs_to_ignore_dict = data except Exception as e: # pylint: disable=broad-except self.log.warning("Failed to get packages to ignore: %s", e, exc_info=True) pkgs_to_ignore_dict = {} return pkgs_to_ignore_dict # endregion # region public methods
[docs] def get_requirements(self) -> Iterable[PackageInfo | ExtraLine]: self.log.info("Getting requirements info using %s", self.analyzer_name) return self._get_requirements()
[docs] @classmethod def process_requirements(cls, requirements: Iterable[str]) -> Iterable[PackageInfo | ExtraLine]: for line in requirements: if line.startswith("#") or line.strip() == "": continue if line.startswith("-"): yield ExtraLine(shlex.split(line)) continue pkg_match = re.match(r"^([^= \n]*)(==)?([^\s;]*).*$", line) if pkg_match: pkg_name, _, pkg_version = pkg_match.groups() yield PackageInfo(pkg_name, pkg_version, line.rstrip())
[docs] def run_command( self, *args, return_state=False, quiet=False, prefix=None, context=None ) -> bool | tuple[str, str]: if prefix is None: prefix = Path(args[0]).name self.log.debug("Running command: %s", args) if context is None: context = self._change_context if quiet: loglevel = logging.DEBUG else: loglevel = logging.INFO with context(): with subprocess.Popen( # nosec [str(a) for a in args], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) as proc: with ThreadPoolExecutor(2) as pool: stdout_fut = pool.submit( self._log_popen_output, proc.stdout, loglevel, prefix + "(OUT)> ", ) stderr_fut = pool.submit( self._log_popen_output, proc.stderr, loglevel, prefix + "(ERR)> ", ) stdout, stderr = stdout_fut.result(), stderr_fut.result() proc.wait() if return_state: return not bool(proc.returncode) if proc.returncode: self.log.error("ERROR IN CALL: %s", args) self.log.error("STDOUT: %s", stdout) self.log.error("STDERR: %s", stderr) raise subprocess.CalledProcessError(proc.returncode, args, stdout, stderr) return stdout, stderr
[docs] def update_dependency_file(self): if not self.update_dependencies or not self.pkgs_to_ignore_dict: return None self.log.info( "Checking to see if any dependencies need to be changed in the dependency file to match the AWS Lambda environment" ) cur_requires = list(self.get_requirements()) pkgs_to_add = {} for pkg in cur_requires: if isinstance(pkg, ExtraLine): continue pkg_name, pkg_version, _ = pkg if ( pkg_name in self.pkgs_to_ignore_dict and pkg_version != self.pkgs_to_ignore_dict[pkg_name] ): self.log.warning( "%s is currently %s but should be %s", pkg_name, pkg_version, self.pkgs_to_ignore_dict[pkg_name], ) pkgs_to_add[pkg_name] = self.pkgs_to_ignore_info[pkg_name] if len(pkgs_to_add) > 0: self.log.info("Updating dependency file to add %s requirements", len(pkgs_to_add)) self._update_dependency_file(pkgs_to_add) return None self.log.info("No changes needed in the dependency file") return cur_requires
[docs] def export_requirements(self): strip_extras = re.compile(r"\[[^\]]+\]$") if self._exported_reqs is None: output = [] for pkg_name, pkg_version, pkg_spec in self.requirements.values(): if ( self.pkgs_to_ignore_dict.get(strip_extras.sub("", pkg_name), None) == pkg_version ): self.log.warning( "Ignoring %s as it should be in the AWS Lambda Environment already", pkg_spec.strip().split(";")[0], ) continue output.append(pkg_spec) self._exported_reqs = output return self._exported_reqs
[docs] def exported_requirements(self): ret = {} for r in self.process_requirements(self.export_requirements()): ret[r.name] = r return ret
[docs] def run_pip(self, *args, return_state=False, quiet=False, context=None): self.run_command("pip", *args, return_state=return_state, quiet=quiet, context=context)
[docs] def install_dependencies(self, quiet=True): pip_command = [ "--target", self._target.name, "--no-deps", ] reqs = self.export_requirements() if not reqs: self.log.warning("No dependencies to install with pip, skipping") return pip_command.extend(reqs) self.log.warning("Installing dependencies using pip") self._install_pip(*pip_command, quiet=quiet) self.log.warning("Installing dependencies done")
[docs] def install_root(self): src_path = self.project_root / "src" if src_path.exists(): if (src_path / "__init__.py").exists(): self.log.warning("src/__init__.py exists, installing as package in target") shutil.copytree(src_path, Path(self._target.name) / "src") else: self.log.warning( "src/__init__.py does not exist, installing files from src directly into target" ) shutil.copytree(src_path, Path(self._target.name), dirs_exist_ok=True) elif next(self.project_root.glob("*.py"), None): for f in self.project_root.glob("*.py"): self.log.warning("Copying %s to target", f) shutil.copy(f, self._target.name) else: self.log.warning( "No src/__init__.py or *.py files found, no root program is being installed" )
[docs] def get_layer_files(self): target_path = Path(self._target.name) return [a.relative_to(target_path) for a in target_path.iterdir()]
[docs] def copy_from_target(self, dst: PathType): self.log.warning("Copying %s from target to %s", self._target.name, dst) shutil.copytree(self._target.name, dst)
[docs] def copy_from_temp_dir(self, files: Iterable[str]): for f in files: fp = Path(self._temp_proj_dir.name) / f if fp.exists(): shutil.copy(fp, self.project_root)
[docs] def copy_to_temp_dir(self, files: Iterable[str]): for f in files: fp = Path(self.project_root) / f if fp.exists(): shutil.copy(fp, self._temp_proj_dir.name)
[docs] def backup_files(self, files: Iterable[str]): date_str = datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%SZ") for f in files: fp = Path(self.project_root) / f if fp.exists(): shutil.copy(fp, fp.with_suffix(f".{date_str}{fp.suffix}"))
# endregion