Source code for sheet_loader.detecting_buffer

# -*- coding: utf-8 -*-
from __future__ import annotations

import ctypes
import logging
import select
from collections import deque
from csv import Sniffer
from multiprocessing import Pipe
from threading import Thread
from typing import Optional

from .buffer_processor import MAGIC_BYTE_SEQUENCE, BufferProcessor
from .connection import NonBlockConnection
from .helpers import LazyOpener, catch_exceptions
from .types import FilePathType, Openable, Readable

CHUNK_SIZE = 32 * 1024
ENCODING_PREFIX_LENGTH = 16
CSV_SAMPLE_SIZE = 512 * 1024  # 512 kb sample size


[docs]class DetectingBuffer(Readable): # pylint: disable=too-many-instance-attributes
[docs] def seek(self, __offset: int, __whence: int = ...) -> int: raise NotImplementedError
def __init__( self, file: Readable | Openable | FilePathType, newline=None, **kwargs, ) -> None: super().__init__() self._newline = newline kwargs["mode"] = "rb" self._opener = LazyOpener(file, **kwargs) self._open_file = self._opener.open() self._in_read, self._in_write = Pipe(duplex=False) self._out_read, self._out_write = Pipe(duplex=False) self._reader = ReaderThread( self._open_file, NonBlockConnection.convert_connnection(self._in_write), kwargs.get("chunk_size", CHUNK_SIZE), ) self._processor = BufferProcessor( in_reader=self._in_read, out_writer=NonBlockConnection.convert_connnection(self._out_write), ) self._reader.start() self._processor.start() self._buffer: deque = deque() self._done = False self._leftovers = "" self._closed = False self._log = logging.getLogger(__name__).getChild(self.__class__.__name__) # noinspection PyMethodMayBeStatic
[docs] def seekable(self) -> bool: return False
def _fetch_chunk(self): if self._done: return "" try: chunk = self._out_read.recv() # self._log.debug("Received chunk: %s (%s bytes)", type(chunk), len(chunk)) except EOFError: self._done_reading() return "" if chunk == MAGIC_BYTE_SEQUENCE: self._done_reading() return "" if self._newline is None: chunk = chunk.replace("\r\n", "\n") chunk = chunk.replace("\r", "\n") return chunk def _done_reading(self): if self._closed: return self._reader.stop() self._processor.stop() catch_exceptions(self._out_read.close, [OSError])() catch_exceptions(self._out_write.close, [OSError])() catch_exceptions(self._in_read.close, [OSError])() catch_exceptions(self._in_write.close, [OSError])() self._reader.join() self._processor.join() self._log.debug("Joined all thread/processes") self._done = True self._opener.close() self._closed = True def __del__(self): self._done_reading()
[docs] def peek(self, size: int | None = None) -> str: out = self.read(size) self._leftovers = out + self._leftovers return out
[docs] def read(self, size: int | None = None) -> str: self._log.debug( "Trying to read %s bytes (already have %s bytes)", size, len(self._leftovers) ) output = "" size = size or -1 if size < 0: if self._leftovers: output += self._leftovers self._leftovers = "" while chunk := self._fetch_chunk(): output += chunk return output while size > 0: if self._leftovers: # self._log.debug("%s: Using leftovers: %s", threading.currentThread(), len(self._leftovers)) nc = self._leftovers[:size] self._leftovers = self._leftovers[size:] # self._log.debug("%s: Leftovers: %s", threading.currentThread(), len(self._leftovers)) output += nc size -= len(nc) continue chunk = self._fetch_chunk() if chunk == "": break if len(chunk) > size: self._leftovers = chunk[size:] output += chunk[:size] break output += chunk size -= len(chunk) # self._log.debug("Read %s bytes", len(output)) return output
[docs] def readline(self): str_buf = self.read(1024) if self._newline is None: nl = "\n" else: nl = self._newline while nl not in str_buf: r = self.read(1024) str_buf += r if r == "": break if nl in str_buf: ret = str_buf[: str_buf.index(nl) + 1] self._leftovers = str_buf[len(ret) :] + self._leftovers else: ret = str_buf return ret
def __next__(self): r = self.readline() if r == "": raise StopIteration return r def __iter__(self): return self
[docs] def get_csv_dialect( self, sniffer: Optional[Sniffer] = None, delimiters: Optional[str] = None, sample_size=CSV_SAMPLE_SIZE, ): sniffer = sniffer or Sniffer() buffer = self.read(sample_size) dialect = sniffer.sniff(buffer, delimiters=delimiters) self._leftovers = buffer + self._leftovers return dialect
[docs]class StopThread(BaseException): pass
[docs]def pipe_full(conn, timeout=0.0): _, w, _ = select.select([], [conn], [], timeout) return 0 == len(w)
[docs]class ReaderThread(Thread): def __init__(self, file: Readable, in_writer: NonBlockConnection, chunk_size=CHUNK_SIZE): super().__init__() self._file = file self._in_writer = in_writer self._chunk_size = chunk_size self._log = ( logging.getLogger(__name__).getChild(self.__class__.__name__).getChild(self.name) ) # self._log.debug(os.get_blocking(self._in_writer.fileno())) # os.set_blocking(self._in_writer.fileno(), False) # self._log.debug(os.get_blocking(self._in_writer.fileno())) def _reader(self): while chunk := self._file.read(self._chunk_size): # self._log.info(pipe_full(self._in_writer)) self._in_writer.send_bytes(chunk) # self._log.debug("Sent %s bytes", len(chunk)) # self._in_write_wrap.send_bytes_polled(MAGIC_BYTE_SEQUENCE) self._in_writer.send_bytes(chunk) self._log.debug("Sent magic byte sequence") self._in_writer.close() self._log.debug("Closed input")
[docs] def run(self): try: self._reader() except StopThread: self._log.debug("stopped") except Exception as e: # pylint: disable=broad-except self._log.exception(e) finally: self._log.debug("exiting")
[docs] def stop(self): thread_id = self.ident self._log.debug("Raising exception for thread %s", thread_id) res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread_id), ctypes.py_object(SystemExit) ) self._log.debug("Raised exception for thread %s: %s", thread_id, res) if res > 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), 0) raise SystemError("PyThreadState_SetAsyncExc failed")