Source code for benzina.utils.file

from collections import namedtuple

import numpy as np

from benzina.utils.mp4 import get_chunk_offset_at, get_name_at, \
    get_sample_size_at, get_shape_at, find_headers_at, \
    find_sample_table_at, find_video_configuration_at

FileDesc = namedtuple("FileDesc", ["name", "mode"])
Trak = namedtuple("Trak", ["label", "shape", "stbl_pos"])


class FileReadMixin:
    def __init__(self, disk_file=None, *_args, **_kwargs):
        self._file = disk_file

    @property
    def closed(self):
        return self._file is None or self._file.closed

    def seek(self, offset):
        self._file.seek(offset)

    def read(self, size):
        return self._file.read(size)


[docs]class FileProxy(FileReadMixin): """ When closed, this proxy can be serialized without problems. """ def __init__(self, disk_file, mode="rb"): super().__init__(None, mode=mode) self._name = None self._mode = mode if isinstance(disk_file, str): self._name = disk_file else: try: self._name = disk_file.name self._file = disk_file except AttributeError: self._name = None self._file = disk_file def __enter__(self): self.open() return self def __exit__(self, type, value, traceback): self.close() @property def name(self): return self._name @property def mode(self): return self._mode def open(self): if self.closed: self._file = open(self._name, mode=self._mode) def close(self): if not self.closed and self._name is not None: self._file.close() self._file = None
class File(FileReadMixin): def __init__(self, disk_file, offset=0): super().__init__(disk_file, offset=offset) self._owner = False self._offset = offset self._traks = None # If disk_file is a path, ownership of the disk file will be held by # this instance if isinstance(disk_file, str): self._owner = True if not isinstance(self._file, FileProxy): self._file = FileProxy(disk_file, mode="rb") if not self.closed: self._parse() def __enter__(self): # If self._path is set, ownership of the disk file is held by this # instance and it will be opened here if self._owner: self.open() return self def __exit__(self, type, value, traceback): # If self._path is set, ownership of the disk file is held by this # instance and the file should be closed here if self._owner: self.close() @property def name(self): return self._file.name @property def offset(self): return self._offset def open(self): self._file.open() if self._traks is None: self._parse() def close(self): self._file.close() def trak(self, label): if self._traks is None: raise RuntimeError("File [{}] is missing a moov box" .format(self._file.name)) if isinstance(label, str): label = label.encode("utf-8") if label[-1] == 0: label = label[:-1] return self._traks.get(label, None) def subfile(self, offset): return File(self._file, offset=offset) def _parse(self): self._traks = {} moov_pos, box_size, _, header_size = \ next(find_headers_at(self._file, {b"moov"}, self._offset)) self._file.seek(moov_pos) for trak_pos, _, _, _ in \ find_headers_at(self._file, {b"trak"}, moov_pos + header_size, box_size - header_size): trak_label = get_name_at(self._file, trak_pos) if trak_label[-1] == 0: trak_label = trak_label[:-1] trak_shape = get_shape_at(self._file, trak_pos) trak_stbl_pos, _, _, _ = find_sample_table_at(self._file, trak_pos) self._traks[trak_label] = Trak(trak_label, trak_shape, trak_stbl_pos) class Track: def __init__(self, f, label): self._file_path = None self._file = None self._label = label self._trak = None self._len = None self._co_buffer = None self._co = np.empty(0, np.uint64) self._sz_buffer = None self._sz = np.empty(0, np.uint32) # If file is a path, ownership of the file will be held by this instance if isinstance(f, str): self._file_path = f self._file = File(self._file_path) else: self._file = f if not self._file.closed: self._parse() def __len__(self): return self._len def __getitem__(self, index): if isinstance(index, slice): return [Sample(self, i) for i in range(*index.indices(len(self)))] elif index >= len(self): raise IndexError return Sample(self, index) def __iter__(self): return (self[i] for i in range(len(self))) def __enter__(self): # If self._file_path is set, ownership of the file is held by this # instance and it will be opened here if self._file_path: self.open() return self def __exit__(self, type, value, traceback): # If self._file_path is set, ownership of the file is held by this # instance and it should be closed here if self._file_path: self.close() def open(self): self._file.open() if self._trak is None: self._parse() def close(self): self._file.close() @property def file(self): return self._file @property def label(self): return self._label @property def shape(self): return self._trak.shape def sample_location(self, index): return int(self._file.offset + self._co[index]), int(self._sz[index]) def sample_bytes(self, index): self.open() offset, size = self.sample_location(index) self._file.seek(offset) return self._file.read(size) def sample_as_file(self, index): offset, _ = self.sample_location(index) return self._file.subfile(offset) def video_configuration_location(self): _vcC = find_video_configuration_at(self._file, self._trak.stbl_pos) if _vcC is None: return None pos, box_size, _, header_size = _vcC return pos + header_size, box_size - header_size def _parse(self): trak = self._file.trak(self._label) self._co, self._co_buffer = \ get_chunk_offset_at(self._file, trak.stbl_pos) self._sz, self._sz_buffer = \ get_sample_size_at(self._file, trak.stbl_pos) self._co, self._sz = np.broadcast_arrays(self._co, self._sz) self._len = len(self._co) self._trak = trak class Sample: def __init__(self, track, index): self._track = track self._index = index def __bytes__(self): return self._track.sample_bytes(self._index) @property def value(self): return bytes(self) @property def location(self): return self._track.sample_location(self._index) def as_file(self): return self._track.sample_as_file(self._index)