Source code for snakeoil.data_source

"""
A far more minimal form of file protocol encapsulation location and encoding into it

The primary use for data_source's is to encapsulate the following issues into a single object:

* is the data actually on disk (thus can I use more efficient ops against the file?).
* what is the preferred encoding?
* py3k compatibility concerns (bytes versus text file handles)

Note that all file like handles returned from `text_fileobj()` and `bytes_fileobj()`
have a required additional attribute- *exceptions*, either a single Exception class, or a
tuple of Exception classes that can be thrown by that file handle during usage.

This requirement exists purely to allow the consuming code to avoid having to know anything
about the backing of the file like object.

The proper way to use such a filehandle is as follows:

>>> from snakeoil.data_source import data_source
>>> source = data_source("It's a fez. I wear a fez now. Fezes are cool.", mutable=False)
>>> handle = source.text_fileobj()
>>> handle.write("You graffitied the oldest cliff face in the universe.")
Traceback (most recent call last):
TypeError:
>>> # if this where a normal file, it would be an IOError- it's impossible to guess the
>>> # correct exception to intercept, so instead we rely on the handle telling us what
>>> # we should catch;
>>> try:
...   handle.write("You wouldn't answer your phone.")
... except handle.exceptions as e:
...   print("we caught the exception.")
we caught the exception.
"""

__all__ = (
    "base",
    "bz2_source",
    "data_source",
    "local_source",
    "text_data_source",
    "bytes_data_source",
    "invokable_data_source",
)

import errno
from functools import partial
import io

from . import compression, fileutils, klass, stringio
from .currying import post_curry


def _mk_writable_cls(base, name):
    """
    inline mixin of writable overrides

    while a normal mixin is preferable, this is required due to
    differing slot layouts between py2k/py3k base classes of
    stringio.
    """

    class kls(base):
        __doc__ = f"""
        writable {name.split("_")[0]} StringIO instance suitable for usage as a data_source filehandle

        This adds a callback for updating the original data source, and appropriate
        exceptions attribute
        """

        base_cls = base
        exceptions = (MemoryError,)
        __slots__ = ("_callback",)

        def __init__(self, callback, data):
            """
            :param callback: functor invoked when this data source is modified;
                the functor takes a single value, the full content of the StringIO
            :param data: initial data for this instance
            """
            if not callable(callback):
                raise TypeError("callback must be callable")
            self.base_cls.__init__(self, data)
            self._callback = callback

        def close(self):
            self.flush()
            if self._callback is not None:
                self.seek(0)
                self._callback(self.read())
                self._callback = None
            self.base_cls.close(self)

    kls.__name__ = name
    return kls


text_wr_StringIO = _mk_writable_cls(io.StringIO, "text_wr_StringIO")
bytes_wr_StringIO = _mk_writable_cls(io.BytesIO, "bytes_wr_StringIO")


class text_ro_StringIO(stringio.text_readonly):
    """
    readonly text mode StringIO usable as a filehandle for a data_source

    Specifically this adds the necessary `exceptions` attribute; see
    :py:class:`snakeoil.stringio.text_readonly` for methods details.
    """

    __slots__ = ()
    exceptions = (MemoryError, TypeError)


class bytes_ro_StringIO(stringio.bytes_readonly):
    """
    readonly bytes mode StringIO usable as a filehandle for a data_source

    Specifically this adds the necessary `exceptions` attribute; see
    :py:class:`snakeoil.stringio.bytes_readonly` for methods details.
    """

    __slots__ = ()
    exceptions = (MemoryError, TypeError)


# derive our file classes- we derive *strictly* to append
# the exceptions class attribute for consumer usage.
def open_file(*args, **kwds):
    handle = open(*args, **kwds)
    handle.exceptions = (EnvironmentError,)
    return handle


[docs] class base: """ base data_source class; implementations of the protocol are advised to derive from this. :ivar path: If None, no local path is available- else it's the ondisk path to the data """ __slots__ = ("weakref",) path = None
[docs] def text_fileobj(self, writable=False): """get a text level filehandle for for this data :param writable: whether or not we need to write to the handle :raise: TypeError if immutable and write is requested :return: file handle like object """ raise NotImplementedError(self, "text_fileobj")
[docs] def bytes_fileobj(self, writable=False): """get a bytes level filehandle for for this data :param writable: whether or not we need to write to the handle :raise: TypeError if immutable and write is requested :return: file handle like object """ raise NotImplementedError(self, "bytes_fileobj")
[docs] def transfer_to_path(self, path): return self.transfer_to_data_source( local_source(path, mutable=True, encoding=None) )
[docs] def transfer_to_data_source(self, write_source): read_f, m, write_f = None, None, None try: write_f = write_source.bytes_fileobj(True) if self.path is not None: m, read_f = fileutils.mmap_or_open_for_read(self.path) else: read_f = self.bytes_fileobj() if read_f is not None: transfer_between_files(read_f, write_f) else: write_f.write(m) finally: for x in (read_f, write_f, m): if x is None: continue try: x.close() except EnvironmentError: pass
[docs] class local_source(base): """locally accessible data source Literally a file on disk. """ __slots__ = ("path", "mutable", "encoding") buffering_window = 32768 def __init__(self, path, mutable=False, encoding=None): """ :param path: file path of the data source :param mutable: whether this data_source is considered modifiable or not :param encoding: the text encoding to force, if any """ base.__init__(self) self.path = path self.mutable = mutable self.encoding = encoding
[docs] @klass.steal_docs(base) def text_fileobj(self, writable=False): if writable and not self.mutable: raise TypeError("data source %s is immutable" % (self,)) if self.encoding: opener = open_file opener = post_curry( opener, buffering=self.buffering_window, encoding=self.encoding ) else: opener = post_curry(open_file, self.buffering_window) if not writable: return opener(self.path, "r") try: return opener(self.path, "r+") except IOError as ie: if ie.errno != errno.ENOENT: raise return opener(self.path, "w+")
[docs] @klass.steal_docs(base) def bytes_fileobj(self, writable=False): if not writable: return open_file(self.path, "rb", self.buffering_window) if not self.mutable: raise TypeError("data source %s is immutable" % (self,)) try: return open_file(self.path, "rb+", self.buffering_window) except IOError as ie: if ie.errno != errno.ENOENT: raise return open_file(self.path, "wb+", self.buffering_window)
[docs] class bz2_source(base): """ locally accessible bz2 archive Literally a bz2 file on disk. """ __slots__ = ("path", "mutable") def __init__(self, path, mutable=False): """ :param path: file path of the data source :param mutable: whether this data source is considered modifiable or not """ base.__init__(self) self.path = path self.mutable = mutable
[docs] def text_fileobj(self, writable=False): data = compression.decompress_data( "bzip2", fileutils.readfile_bytes(self.path) ).decode() if writable: if not self.mutable: raise TypeError(f"data source {self} is not mutable") return text_wr_StringIO(self._set_data, data) return text_ro_StringIO(data)
[docs] def bytes_fileobj(self, writable=False): data = compression.decompress_data("bzip2", fileutils.readfile_bytes(self.path)) if writable: if not self.mutable: raise TypeError(f"data source {self} is not mutable") return bytes_wr_StringIO(self._set_data, data) return bytes_ro_StringIO(data)
def _set_data(self, data): if isinstance(data, str): data = data.encode() with open(self.path, "wb") as f: f.write(compression.compress_data("bzip2", data))
[docs] class data_source(base): """ base class encapsulating a purely virtual data source lacking an on disk location. Whether this be due to transformation steps necessary (pulling the data out of an archive for example), or the data being generated on the fly, this classes's derivatives :py:class:`text_data_source` and :py:class:`bytes_data_source` are likely what you should be using for direct creation. :ivar data: the raw data- should either be a string or bytes depending on your derivative :ivar path: note that path is None for this class- no on disk location available. """ __slots__ = ("data", "mutable") def __init__(self, data, mutable=False): """ :param data: data to wrap :param mutable: should this data_source be updatable? """ base.__init__(self) self.data = data self.mutable = mutable def _convert_data(self, mode): if mode == "bytes": if isinstance(self.data, bytes): return self.data return self.data.encode() if isinstance(self.data, str): return self.data return self.data.decode()
[docs] @klass.steal_docs(base) def text_fileobj(self, writable=False): if writable: if not self.mutable: raise TypeError(f"data source {self} is not mutable") return text_wr_StringIO(self._reset_data, self._convert_data("text")) return text_ro_StringIO(self._convert_data("text"))
def _reset_data(self, data): if isinstance(self.data, bytes): if not isinstance(data, bytes): data = data.encode() elif not isinstance(data, str): data = data.decode() self.data = data
[docs] @klass.steal_docs(base) def bytes_fileobj(self, writable=False): if writable: if not self.mutable: raise TypeError(f"data source {self} is not mutable") return bytes_wr_StringIO(self._reset_data, self._convert_data("bytes")) return bytes_ro_StringIO(self._convert_data("bytes"))
[docs] class text_data_source(data_source): """Text data source. This does auto-conversion between bytes/text as needed. """ __slots__ = () @klass.steal_docs(data_source) def __init__(self, data, mutable=False): if not isinstance(data, str): raise TypeError("data must be a str") data_source.__init__(self, data, mutable=mutable) def _convert_data(self, mode): if mode != "bytes": return self.data return self.data.encode()
[docs] class bytes_data_source(data_source): """Bytes data source. This does auto-conversion between bytes/text as needed. """ __slots__ = () @klass.steal_docs(data_source) def __init__(self, data, mutable=False): if not isinstance(data, bytes): raise TypeError("data must be bytes") data_source.__init__(self, data, mutable=mutable) def _convert_data(self, mode): if mode == "bytes": return self.data return self.data.decode()
[docs] class invokable_data_source(data_source): """ data source that takes a callable instead of the actual data item The callable takes a single argument- a boolean, True if a text fileobj is requested, False if None Note that this instance is explicitly readonly. """ __slots__ = () def __init__(self, data): """ :param data: callable that accepts one argument- True if a text file obj was requested, False if a bytes file obj is requested. """ data_source.__init__(self, data, mutable=False)
[docs] @klass.steal_docs(data_source) def text_fileobj(self, writable=False): if writable: raise TypeError(f"data source {self} data is immutable") return self.data(True)
[docs] @klass.steal_docs(data_source) def bytes_fileobj(self, writable=False): if writable: raise TypeError(f"data source {self} data is immutable") return self.data(False)
[docs] @classmethod def wrap_function( cls, invokable, returns_text=True, returns_handle=False, encoding_hint=None ): """ Helper function to automatically convert a function that returns text or bytes into appropriate callable :param invokable: a callable that returns either text, or bytes, taking no args :param returns_text: True if the data returned is text/basestring, False if Not :param returns_handle: True if the object returned is a handle, False if not. Note that returns_text still has meaning here- returns_text indicates what sort of data the handle returns from read invocations. :param encoding_hint: the preferred encoding to use for encoding :return: invokable_data_source instance """ return cls( partial( cls._simple_wrapper, invokable, encoding_hint, returns_text, returns_handle, ) )
@staticmethod def _simple_wrapper( invokable, encoding_hint, returns_text, returns_handle, text_wanted ): data = invokable() if returns_text != text_wanted: if text_wanted: if returns_handle: data = data.read() if encoding_hint: # we have an encoding, its bytes data, and text is wanted data = data.decode(encoding_hint) else: data = data.decode() else: # bytes were wanted... if returns_handle: # pull in the data... data = data.read() if encoding_hint is None: # fallback to utf8 encoding_hint = "utf8" data = data.encode(encoding_hint) elif returns_handle: return data if text_wanted: return text_ro_StringIO(data) return bytes_ro_StringIO(data)
def transfer_between_files(read_file, write_file, bufsize=(32 * 1024)): while data := read_file.read(bufsize): write_file.write(data)