Source code for sheet_loader.sheet_loader

# -*- coding: utf-8 -*-
from __future__ import annotations

import csv
import inspect
import logging
from collections import abc
from enum import Enum, auto
from pathlib import PurePath
from typing import Any, Union

from .detecting_buffer import DetectingBuffer
from .helpers import LazyOpener, frame_chunker, pd
from .types import FastApiFile, FilePathType, GeneralPath

_LOG = logging.getLogger(__name__)

_csv_sniff = csv.Sniffer()
_csv_sniff.preferred = ["|", ",", "\t"]


[docs]class SheetFileType(Enum): CSV = auto() XLS = auto()
[docs]def clean_kwargs(function, kwargs: dict): try: sig = inspect.signature(function) if any(v.kind == inspect.Parameter.VAR_KEYWORD for v in sig.parameters.values()): # variable keywords allowed, return all return kwargs return {k: v for k, v in kwargs.items() if k in sig.parameters} except ValueError: # failsafe, return all return kwargs
[docs]class SheetLoader(abc.Iterable): # pylint: disable=too-many-instance-attributes def __iter__(self): return self _data: Union[None, Any] _file_data: Union[None, LazyOpener] _file: GeneralPath _type: SheetFileType def __init__(self, file: FilePathType | FastApiFile, **kwargs): self._chunker = None self._all_chunks_returned = False self._data = None self._file_data = None self.kwargs = kwargs self._cur_row = 0 if isinstance(file, FastApiFile): self._file = file.file if ( file.content_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" or file.filename.lower().startswith(".xls") ): self._type = SheetFileType.XLS else: self._type = SheetFileType.CSV else: self._file = file if hasattr(file, "suffix"): suffix = file.suffix # type: ignore else: suffix = PurePath(str(file)).suffix if suffix.lower().startswith(".xls"): self._type = SheetFileType.XLS else: self._type = SheetFileType.CSV
[docs] def load_xlsx(self, **pd_kwargs): if hasattr(self._file, "as_uri"): file = self._file.as_uri() else: file = self._file kwargs = {"dtype": "string"} kwargs.update(pd_kwargs) kwargs = clean_kwargs(pd.read_excel, kwargs) self._data = pd.read_excel(file.as_uri(), **kwargs)
[docs] def get_file_data(self, **kwargs): if self._file_data is None: self._file_data = LazyOpener(self._file, **kwargs) return self._file_data.open()
[docs] def close(self): if self._file_data is not None: self._file_data.close()
def __del__(self): self.close() # @contextmanager
[docs] def load_csv(self, csv_read_func=None, **pd_kwargs): if csv_read_func is None: csv_read_func = pd.read_csv db = DetectingBuffer(self._file) dialect = db.get_csv_dialect(sniffer=_csv_sniff) _LOG.debug( "Dialect: %s", {k: v for k, v in dialect.__dict__.items() if not k.startswith("_")}, ) preview = db.peek(1024 * 256) try: # header_line = next( # csv.reader(io.StringIO(buffer, newline=None), dialect=dialect) # ) # dtypes = {k: v for k, v in DTYPE_MAP.items() if k in header_line} # date_dtypes = [v for v in DTYPE_DATES if v in header_line] read_kwargs = { "dialect": dialect, "on_bad_lines": "warn", "dtype": "string", "header": 0, } read_kwargs.update(pd_kwargs) read_kwargs = clean_kwargs(csv_read_func, read_kwargs) self._data = csv_read_func(db, **read_kwargs) except Exception: _LOG.exception(preview, stack_info=True) raise
def _open(self): if self._type == SheetFileType.CSV: self.load_csv(**self.kwargs) elif self._type == SheetFileType.XLS: self.load_xlsx(**self.kwargs) @property def data(self): if self._data is None: self._open() return self._data def __next__(self): if self._all_chunks_returned: raise StopIteration if "chunksize" not in self.kwargs: self._all_chunks_returned = True return self.data if self._chunker is None: if self._type == SheetFileType.XLS: self._chunker = frame_chunker(self.data, self.kwargs["chunksize"]) else: self._chunker = self.data try: return next(self._chunker) except StopIteration: self._all_chunks_returned = True raise
[docs] def read(self): if "chunksize" in self.kwargs and self._type == SheetFileType.XLS: return frame_chunker(self.data, self.kwargs["chunksize"]) return self.data
[docs]def read_sheet(file: FilePathType | FastApiFile, **kwargs): sl = SheetLoader(file, **kwargs) if "chunksize" not in kwargs: return sl.data return sl