Files
2021-09-12 22:12:50 +08:00

178 lines
6.1 KiB
Python

try:
from game_backuper._zstd import (
ZSTDCompressor,
ZSTDDecompressor,
maxCLevel,
)
have_zstd = True
except ImportError:
have_zstd = False
if have_zstd:
from builtins import open as _builtin_open
from _compression import BaseStream, DecompressReader
import os
import io
_MODE_CLOSED = 0
_MODE_READ = 1
_MODE_WRITE = 3
MAX_COMPRESS_LEVEL = maxCLevel()
class ZSTDFile(BaseStream):
def __init__(self, filename, mode="r", *, compresslevel=3):
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
if not (1 <= compresslevel <= MAX_COMPRESS_LEVEL):
raise ValueError(f"compresslevel must be between 1 and {MAX_COMPRESS_LEVEL}") # noqa: E501
if mode in ("", "r", "rb"):
mode = "rb"
mode_code = _MODE_READ
elif mode in ("w", "wb"):
mode = "wb"
mode_code = _MODE_WRITE
self._compressor = ZSTDCompressor(compresslevel)
elif mode in ("x", "xb"):
mode = "xb"
mode_code = _MODE_WRITE
self._compressor = ZSTDCompressor(compresslevel)
elif mode in ("a", "ab"):
mode = "ab"
mode_code = _MODE_WRITE
self._compressor = ZSTDCompressor(compresslevel)
else:
raise ValueError("Invalid mode: %r" % (mode,))
if isinstance(filename, (str, bytes, os.PathLike)):
self._fp = _builtin_open(filename, mode)
self._closefp = True
self._mode = mode_code
else:
raise TypeError("filename must be a str, bytes, file or PathLike object") # noqa: E501
if self._mode == _MODE_READ:
raw = DecompressReader(self._fp, ZSTDDecompressor,
trailing_error=OSError)
self._buffer = io.BufferedReader(raw)
else:
self._pos = 0
def close(self):
if self._mode == _MODE_CLOSED:
return
try:
if self._mode == _MODE_READ:
self._buffer.close()
elif self._mode == _MODE_WRITE:
self._fp.write(self._compressor.flush())
self._compressor = None
finally:
try:
if self._closefp:
self._fp.close()
finally:
self._fp = None
self._closefp = False
self._mode = _MODE_CLOSED
self._buffer = None
@property
def closed(self):
return self._mode == _MODE_CLOSED
def fileno(self):
self._check_not_closed()
return self._fp.fileno()
def seekable(self):
return self.readable() and self._buffer.seekable()
def readable(self):
self._check_not_closed()
return self._mode == _MODE_READ
def writable(self):
self._check_not_closed()
return self._mode == _MODE_WRITE
def peek(self, n=0):
self._check_can_read()
return self._buffer.peek(n)
def read(self, size=-1):
self._check_can_read()
return self._buffer.read(size)
def read1(self, size=-1):
self._check_can_read()
if size < 0:
size = io.DEFAULT_BUFFER_SIZE
return self._buffer.read1(size)
def readinto(self, b):
self._check_can_read()
return self._buffer.readinto(b)
def readline(self, size=-1):
if not isinstance(size, int):
if not hasattr(size, "__index__"):
raise TypeError("Integer argument expected")
size = size.__index__()
self._check_can_read()
return self._buffer.readline(size)
def __iter__(self):
self._check_can_read()
return self._buffer.__iter__()
def readlines(self, size=-1):
if not isinstance(size, int):
if not hasattr(size, "__index__"):
raise TypeError("Integer argument expected")
size = size.__index__()
self._check_can_read()
return self._buffer.readlines(size)
def write(self, data):
self._check_can_write()
if isinstance(data, (bytes, bytearray)):
length = len(data)
else:
data = memoryview(data)
length = data.nbytes
compressed = self._compressor.compress(data)
self._fp.write(compressed)
self._pos += length
return length
def writelines(self, seq):
return BaseStream.writelines(self, seq)
def seek(self, offset, whence=io.SEEK_SET):
self._check_can_seek()
return self._buffer.seek(offset, whence)
def tell(self):
self._check_not_closed()
if self._mode == _MODE_READ:
return self._buffer.tell()
return self._pos
def open(filename, mode="rb", compresslevel=3, encoding=None, errors=None, newline=None): # noqa: E501
if "t" in mode:
if "b" in mode:
raise ValueError("Invalid mode: %r" % (mode,))
else:
if encoding is not None:
raise ValueError("Argument 'encoding' not supported in binary mode") # noqa: E501
if errors is not None:
raise ValueError("Argument 'errors' not supported in binary mode") # noqa: E501
if newline is not None:
raise ValueError("Argument 'newline' not supported in binary mode") # noqa: E501
bz_mode = mode.replace("t", "")
binary_file = ZSTDFile(filename, bz_mode, compresslevel=compresslevel)
if "t" in mode:
return io.TextIOWrapper(binary_file, encoding, errors, newline)
else:
return binary_file