Source code for aws_lambda_python_packager.arrow_fetcher
import logging
import shutil
import tarfile
import tempfile
from contextlib import contextmanager, suppress
from pathlib import Path
from typing import Generator, Optional, Union
import fsspec
import requests
from appdirs import user_cache_dir
PYARROW_BUILDER_RELEASES = "https://api.github.com/repos/mumblepins/pyarrow-builder/releases/tags/{arrow_version}-py{python_version}"
LOG = logging.getLogger(__name__)
[docs]def get_arrow_version(arrow_version: str, python_version: str, arch: str) -> Optional[str]:
r = requests.get(
PYARROW_BUILDER_RELEASES.format(python_version=python_version, arrow_version=arrow_version),
timeout=10,
)
if r.status_code == 404:
return None
r.raise_for_status()
rj = r.json()
if arch.lower().startswith("arm"):
arch = "aarch64"
elif arch.lower().startswith("amd"):
arch = "x86_64"
for a in rj["assets"]:
if a["name"].endswith(f"{arrow_version}-py{python_version}-{arch}.tar.gz"):
return a["browser_download_url"]
return None
[docs]@contextmanager
def open_zip_file(url: object) -> Generator[tarfile.TarFile, None, None]:
for filesystem_type in (
{
"args": ("simplecache",),
"kwargs": {
"target_protocol": "http",
"cache_storage": str(
(Path(user_cache_dir("lambda-packager")) / "simplecache").resolve()
),
},
},
{
"args": ("http",),
"kwargs": {},
},
):
f = z = None
try:
fs = fsspec.filesystem(*filesystem_type["args"], **filesystem_type["kwargs"])
f = fs.open(url, "rb")
z = tarfile.open(fileobj=f)
yield z
except (KeyError, AttributeError):
continue
else:
break
finally:
with suppress(NameError, KeyError, AttributeError):
if z is not None:
z.close()
if f is not None:
f.close()
[docs]def fetch_arrow_package(
output_dir: Union[str, Path],
package_version: str,
python_version="3.9",
arch="x86_64",
):
if (pkg_url := get_arrow_version(package_version, python_version, arch)) is None:
raise ValueError(f"Could not find package arrow with version {package_version}")
with open_zip_file(pkg_url) as zfh, tempfile.TemporaryDirectory() as tmpdir:
zfh.extractall(tmpdir)
Path(output_dir).mkdir(parents=True, exist_ok=True)
for p in (Path(tmpdir) / "python").glob("*"):
shutil.copytree(p, Path(output_dir) / p.name, dirs_exist_ok=True)
return package_version