add encrption module
fix bug in zstd compression warp code fix bug when showing compression inforamtion
This commit is contained in:
15
game_backuper/_zstd.pyi
Normal file
15
game_backuper/_zstd.pyi
Normal file
@@ -0,0 +1,15 @@
|
||||
def version() -> str: ...
|
||||
def maxCLevel() -> int: ...
|
||||
class ZSTDCompressor: # noqa: E302
|
||||
def __init__(self, compresslevel: int = 3): ...
|
||||
def compress(self, inp: bytes) -> bytes: ...
|
||||
def flush(self) -> bytes: ...
|
||||
class ZSTDDecompressor: # noqa: E302
|
||||
def __init__(self): ...
|
||||
def decompress(self, data: bytes, max_length: int = -1) -> bytes: ...
|
||||
@property
|
||||
def eof(self) -> bool: ...
|
||||
@property
|
||||
def unused_data(self) -> bytes: ...
|
||||
@property
|
||||
def needs_input(self) -> bool: ...
|
||||
@@ -174,7 +174,7 @@ cdef class ZSTDDecompressor:
|
||||
if self.finish and i.pos < i.size:
|
||||
obuf = (<char*> i.src) + i.pos
|
||||
self._unused_data = PyBytes_FromStringAndSize(obuf, i.size - i.pos)
|
||||
if max_length == 1 or len(b) <= max_length:
|
||||
if max_length == -1 or len(b) <= max_length:
|
||||
return b
|
||||
else:
|
||||
self._buff = b[max_length:]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
try:
|
||||
from bz2 import BZ2File
|
||||
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
|
||||
have_bz2 = True
|
||||
except ImportError:
|
||||
have_bz2 = False
|
||||
@@ -9,7 +9,7 @@ try:
|
||||
except ImportError:
|
||||
have_gzip = False
|
||||
try:
|
||||
from lzma import LZMAFile
|
||||
from lzma import LZMAFile, LZMACompressor, LZMADecompressor
|
||||
have_lzma = True
|
||||
except ImportError:
|
||||
have_lzma = False
|
||||
@@ -17,14 +17,21 @@ try:
|
||||
from lzip import (
|
||||
FileEncoder as LZIPFileEncoder,
|
||||
decompress_file_iter as LZIP_decompress_file_iter,
|
||||
level_to_dictionary_size_and_match_len_limit as lzip_level_dict,
|
||||
)
|
||||
from lzip_extension import (
|
||||
Decoder as LZIPDecoder,
|
||||
Encoder as LZIPEncoder,
|
||||
)
|
||||
have_lzip = True
|
||||
except ImportError:
|
||||
have_lzip = False
|
||||
try:
|
||||
from game_backuper.zstd import (
|
||||
ZSTDCompressor,
|
||||
ZSTDDecompressor,
|
||||
ZSTDFile,
|
||||
MAX_COMPRESS_LEVEL as ZSTD_MAX
|
||||
MAX_COMPRESS_LEVEL as ZSTD_MAX,
|
||||
)
|
||||
have_zstd = True
|
||||
except ImportError:
|
||||
@@ -55,6 +62,87 @@ from os import remove
|
||||
from game_backuper.file import mkdir_for_file, hydrate_file_if_needed
|
||||
|
||||
|
||||
if have_gzip:
|
||||
class GZCompressor:
|
||||
def __init__(self, compress_level: int = 9, file_obj=None):
|
||||
self.write_to_file = True
|
||||
self._fileobj = file_obj
|
||||
self._level = compress_level
|
||||
self._f = None
|
||||
|
||||
def close(self):
|
||||
self._f.close()
|
||||
|
||||
def compress(self, data: bytes) -> bytes:
|
||||
if self._f is None:
|
||||
self._f = GzipFile(mode="wb", compresslevel=self._level, fileobj=self._fileobj) # noqa: E501
|
||||
self._f.write(data)
|
||||
return b''
|
||||
|
||||
def flush(self) -> bytes:
|
||||
self._f.flush()
|
||||
return b''
|
||||
|
||||
class GZDecompressor:
|
||||
def __init__(self, file_obj=None):
|
||||
self.write_to_file = True
|
||||
self._fileobj = file_obj
|
||||
self._f = None
|
||||
|
||||
def close(self):
|
||||
self._f.close()
|
||||
|
||||
def read(self, len: int) -> bytes:
|
||||
if self._f is None:
|
||||
self._f = GzipFile(mode="rb", fileobj=self._fileobj)
|
||||
return self._f.read(len)
|
||||
|
||||
if have_lzip:
|
||||
class LZIPCompressor:
|
||||
def __init__(self, level: int = 6):
|
||||
d = lzip_level_dict[level]
|
||||
self._encoder = LZIPEncoder(d[0], d[1], 1 << 51)
|
||||
|
||||
def compress(self, data: bytes) -> bytes:
|
||||
return self._encoder.compress(data)
|
||||
|
||||
def flush(self) -> bytes:
|
||||
return self._encoder.finish()
|
||||
|
||||
class LZIPDecompressor:
|
||||
def __init__(self):
|
||||
self._decoder = LZIPDecoder(1)
|
||||
|
||||
def decompress(self, data: bytes) -> bytes:
|
||||
return self._decoder.decompress(data)
|
||||
|
||||
@property
|
||||
def eof(self):
|
||||
return False
|
||||
|
||||
if have_brotli:
|
||||
class BrotliCompressor2:
|
||||
def __init__(self, *k, **kw):
|
||||
self._c = BrotliCompressor(*k, **kw)
|
||||
|
||||
def compress(self, data: bytes) -> bytes:
|
||||
return self._c.process(data)
|
||||
|
||||
def flush(self) -> bytes:
|
||||
return self._c.finish()
|
||||
|
||||
class BrotliDecompressor2:
|
||||
def __init__(self, *k, **kw) -> None:
|
||||
self._c = BrotliDecompressor(*k, **kw)
|
||||
|
||||
def decompress(self, data: bytes) -> bytes:
|
||||
return self._c.process(data)
|
||||
|
||||
@property
|
||||
def eof(self):
|
||||
return self._c.is_finished()
|
||||
|
||||
|
||||
@unique
|
||||
class CompressMethod(IntEnum):
|
||||
BZIP2 = 0
|
||||
@@ -169,6 +257,41 @@ class CompressConfig:
|
||||
t = type(self)
|
||||
return f"<{t.__module__}.{t.__qualname__} object at {hex(id(self))}; method={repr(self._method)}, level={repr(self._level)}, ext={repr(self._ext)}>" # noqa: E501
|
||||
|
||||
def compressor(self, fileobj):
|
||||
if self._method == CompressMethod.BZIP2:
|
||||
return BZ2Compressor(self._level)
|
||||
elif self._method == CompressMethod.GZIP:
|
||||
return GZCompressor(self._level, fileobj)
|
||||
elif self._method == CompressMethod.LZMA:
|
||||
return LZMACompressor(preset=self._level)
|
||||
elif self._method == CompressMethod.LZIP:
|
||||
return LZIPCompressor(self._level)
|
||||
elif self._method == CompressMethod.ZSTD:
|
||||
return ZSTDCompressor(self._level)
|
||||
elif self._method == CompressMethod.SNAPPY:
|
||||
return Snappy_Compressor()
|
||||
elif self._method == CompressMethod.BROTLI:
|
||||
c = {}
|
||||
if self._level is not None:
|
||||
c['quality'] = self._level
|
||||
return BrotliCompressor2(**c)
|
||||
|
||||
def decompressor(self, fileobj):
|
||||
if self._method == CompressMethod.BZIP2:
|
||||
return BZ2Decompressor()
|
||||
elif self._method == CompressMethod.GZIP:
|
||||
return GZDecompressor(fileobj)
|
||||
elif self._method == CompressMethod.LZMA:
|
||||
return LZMADecompressor()
|
||||
elif self._method == CompressMethod.LZIP:
|
||||
return LZIPDecompressor()
|
||||
elif self._method == CompressMethod.ZSTD:
|
||||
return ZSTDDecompressor()
|
||||
elif self._method == CompressMethod.SNAPPY:
|
||||
return Snappy_Decompressor()
|
||||
elif self._method == CompressMethod.BROTLI:
|
||||
return BrotliDecompressor2()
|
||||
|
||||
@cached_property
|
||||
def chunk_size(self) -> int:
|
||||
return self._chunk_size
|
||||
@@ -212,6 +335,8 @@ def sizeof_fmt(num, suffix='B'):
|
||||
|
||||
|
||||
def compress_info(ori: int, re: int):
|
||||
if ori == 0:
|
||||
return f"{sizeof_fmt(ori)} -> {sizeof_fmt(re)} ({float('inf')})"
|
||||
return f"{sizeof_fmt(ori)} -> {sizeof_fmt(re)} ({re/ori*100:.2f}%)"
|
||||
|
||||
|
||||
|
||||
309
game_backuper/enc.py
Normal file
309
game_backuper/enc.py
Normal file
@@ -0,0 +1,309 @@
|
||||
from base64 import b85decode
|
||||
from io import RawIOBase
|
||||
from os import PathLike, urandom
|
||||
from typing import Union
|
||||
from zlib import crc32
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from game_backuper.compress import CompressConfig
|
||||
|
||||
|
||||
_MODE_CLOSED = 0
|
||||
_MODE_READ = 1
|
||||
_MODE_WRITE = 3
|
||||
|
||||
|
||||
class DecryptException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EncFile(RawIOBase):
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, val, tb):
|
||||
if not self.closed:
|
||||
self.close()
|
||||
|
||||
def __init__(self, fn, mode: str, salt: Union[bytes, str],
|
||||
key: Union[bytes, str] = None, iv: Union[bytes, str] = None,
|
||||
length: int = None, crc32: Union[str, int] = None,
|
||||
compress: CompressConfig = None):
|
||||
self._fp = None
|
||||
self._mode = _MODE_CLOSED
|
||||
if mode in ["", "r", "rb"]:
|
||||
mode = "rb"
|
||||
mode_code = _MODE_READ
|
||||
elif mode in ["w", "wb"]:
|
||||
mode = "wb"
|
||||
mode_code = _MODE_WRITE
|
||||
elif mode in ["x", "xb"]:
|
||||
mode = "xb"
|
||||
mode_code = _MODE_WRITE
|
||||
elif mode in ["a", "ab"]:
|
||||
mode = "ab"
|
||||
mode_code = _MODE_WRITE
|
||||
if isinstance(fn, (str, bytes, PathLike)):
|
||||
self._fp = open(fn, mode)
|
||||
self._mode = mode_code
|
||||
else:
|
||||
raise TypeError('filename must be str, bytes or PathLike.')
|
||||
if self._mode == _MODE_WRITE:
|
||||
self._key = urandom(32)
|
||||
self._iv = urandom(16)
|
||||
self._cipher = Cipher(algorithms.AES(self._key), modes.CBC(self._iv)) # noqa: E501
|
||||
self._enc = self._cipher.encryptor()
|
||||
self._crc32 = 0
|
||||
if compress is not None:
|
||||
self._compressor = compress.compressor(self)
|
||||
if self._compressor is None:
|
||||
raise NotImplementedError('Unsupported compression type.')
|
||||
else:
|
||||
self._compressor = None
|
||||
if isinstance(salt, str):
|
||||
self._salt = b85decode(salt)
|
||||
else:
|
||||
self._salt = salt
|
||||
if self._mode == _MODE_READ:
|
||||
if isinstance(key, str):
|
||||
key = b85decode(key)
|
||||
if isinstance(iv, str):
|
||||
iv = b85decode(iv)
|
||||
if key is None or len(key) != 32:
|
||||
raise ValueError('A 256-bit key is required.')
|
||||
self._key = key
|
||||
if iv is None or len(iv) != 16:
|
||||
raise ValueError('A 128-bit initialization_vector is required.') # noqa: E501
|
||||
self._iv = iv
|
||||
self._cipher = Cipher(algorithms.AES(self.key), modes.CBC(self._iv)) # noqa: E501
|
||||
self._dec = self._cipher.decryptor()
|
||||
if length is None or not isinstance(length, int):
|
||||
raise ValueError("data's length is needed.")
|
||||
self._length = length
|
||||
self._buf = b''
|
||||
self._eof = False
|
||||
if crc32:
|
||||
if isinstance(crc32, str):
|
||||
self._crc32 = int(crc32, 16)
|
||||
else:
|
||||
self._crc32 = crc32
|
||||
self._crc32_checked = False
|
||||
else:
|
||||
self._crc32 = None
|
||||
self._decompressor = None
|
||||
if compress:
|
||||
self._decompressor = compress.decompressor(self)
|
||||
if self._decompressor is None:
|
||||
raise NotImplementedError('Unsupported compression type.')
|
||||
self._debuf = b''
|
||||
self._pos = 0
|
||||
self._crc_size = algorithms.AES.block_size * 8
|
||||
self._flushing = False
|
||||
|
||||
def _check_not_closed(self):
|
||||
if self.closed:
|
||||
raise ValueError("I/O operation on closed file")
|
||||
|
||||
def close(self):
|
||||
if self._mode == _MODE_CLOSED:
|
||||
return
|
||||
try:
|
||||
if self._mode == _MODE_READ:
|
||||
if hasattr(self._decompressor, 'close'):
|
||||
self._decompressor.write_to_file = False
|
||||
self._decompressor.close()
|
||||
self._decompressor.write_to_file = True
|
||||
if self._mode == _MODE_WRITE:
|
||||
if hasattr(self._compressor, 'close'):
|
||||
self._compressor.write_to_file = False
|
||||
self._compressor.close()
|
||||
self._compressor.write_to_file = True
|
||||
elif self._compressor:
|
||||
d = self._compressor.flush()
|
||||
if d:
|
||||
length = len(d)
|
||||
if self._pos < self._crc_size:
|
||||
le = min(length, self._crc_size - self._pos)
|
||||
self._crc32 = crc32(d[:le], self._crc32)
|
||||
self._fp.write(self._enc.update(d))
|
||||
self._pos += length
|
||||
if self._pos % algorithms.AES.block_size != 0:
|
||||
self._fp.write(self._enc.update(b"\x00" * (algorithms.AES.block_size - (self._pos % algorithms.AES.block_size)))) # noqa: E501
|
||||
self._fp.write(self._enc.finalize())
|
||||
finally:
|
||||
self._fp.close()
|
||||
self._dec = None
|
||||
self._enc = None
|
||||
self._cipher = None
|
||||
self._fp = None
|
||||
self._mode = _MODE_CLOSED
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._mode == _MODE_CLOSED
|
||||
|
||||
def check_crc32(self):
|
||||
while len(self._buf) < self._crc_size:
|
||||
if not self.__read():
|
||||
break
|
||||
if crc32(self._buf[:min(self._crc_size, self._length)]) == self._crc32:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def crc32(self):
|
||||
return '{:08x}'.format(self._crc32)
|
||||
|
||||
def __decompress(self):
|
||||
if len(self._buf) == 0:
|
||||
return False
|
||||
if self._decompressor:
|
||||
le = min(len(self._buf), self._length - self._pos)
|
||||
if le == 0 or (hasattr(self._decompressor, "eof") and self._decompressor.eof): # noqa: E501
|
||||
return False
|
||||
self._debuf += self._decompressor.decompress(self._buf[:le])
|
||||
self._pos += le
|
||||
self._buf = self._buf[le:]
|
||||
return True
|
||||
return False
|
||||
|
||||
@property
|
||||
def eof(self):
|
||||
return self._pos >= self._length
|
||||
|
||||
def fileno(self):
|
||||
self._check_not_closed()
|
||||
return self._fp.fileno()
|
||||
|
||||
def flush(self) -> None:
|
||||
if self._flushing:
|
||||
return
|
||||
self._flushing = True
|
||||
self._check_not_closed()
|
||||
if hasattr(self._compressor, 'write_to_file'):
|
||||
self._compressor.write_to_file = False
|
||||
self._compressor.flush()
|
||||
self._compressor.write_to_file = True
|
||||
self._fp.flush()
|
||||
self._flushing = False
|
||||
|
||||
def readable(self):
|
||||
self._check_not_closed()
|
||||
return self._mode == _MODE_READ
|
||||
|
||||
def __read(self):
|
||||
data = self._fp.read(algorithms.AES.block_size)
|
||||
if not data:
|
||||
if not self._eof:
|
||||
self._buf += self._dec.finalize()
|
||||
self._eof = True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
self._buf += self._dec.update(data)
|
||||
return True
|
||||
|
||||
def read(self, size: int = -1):
|
||||
if not self.readable():
|
||||
raise ValueError('File is not readable.')
|
||||
if self._crc32 is not None and not self._crc32_checked:
|
||||
if not self.check_crc32():
|
||||
raise DecryptException("crc32 check failed.")
|
||||
self._crc32_checked = True
|
||||
if size < 0:
|
||||
return self.readall()
|
||||
if self._decompressor and hasattr(self._decompressor, 'write_to_file') and self._decompressor.write_to_file: # noqa: E501
|
||||
self._decompressor.write_to_file = False
|
||||
d = self._decompressor.read(size)
|
||||
self._decompressor.write_to_file = True
|
||||
return d
|
||||
if self._decompressor and not hasattr(self._decompressor, 'write_to_file'): # noqa: E501
|
||||
if not size or (self.eof and len(self._debuf) == 0):
|
||||
return b""
|
||||
while True:
|
||||
if not self.__read():
|
||||
self.__decompress()
|
||||
break
|
||||
if not self.__decompress():
|
||||
break
|
||||
if size <= len(self._debuf):
|
||||
data = self._debuf[:size]
|
||||
self._debuf = self._debuf[size:]
|
||||
return data
|
||||
d = self._debuf[:size]
|
||||
self._debuf = self._debuf[size:]
|
||||
return d
|
||||
if not size or self.eof:
|
||||
return b""
|
||||
size = min(size, self._length - self._pos)
|
||||
if size <= len(self._buf):
|
||||
data = self._buf[:size]
|
||||
self._buf = self._buf[size:]
|
||||
self._pos += size
|
||||
return data
|
||||
while True:
|
||||
if not self.__read():
|
||||
break
|
||||
if size <= len(self._buf):
|
||||
data = self._buf[:size]
|
||||
self._buf = self._buf[size:]
|
||||
self._pos += size
|
||||
return data
|
||||
self._pos += min(len(self._buf), size)
|
||||
d = self._buf[:size]
|
||||
self._buf = self._buf[size:]
|
||||
return d
|
||||
|
||||
def readinto(self, b):
|
||||
with memoryview(b) as view, view.cast("B") as byte_view:
|
||||
data = self.read(len(byte_view))
|
||||
byte_view[:len(data)] = data
|
||||
return len(data)
|
||||
|
||||
def tell(self):
|
||||
return self._pos
|
||||
|
||||
def writable(self):
|
||||
self._check_not_closed()
|
||||
return self._mode == _MODE_WRITE
|
||||
|
||||
def write(self, data):
|
||||
if not self.writable():
|
||||
raise ValueError("File was not opened for writing")
|
||||
if isinstance(data, bytes):
|
||||
length = len(data)
|
||||
elif isinstance(data, str):
|
||||
data = data.encode()
|
||||
length = len(data)
|
||||
elif isinstance(data, bytearray):
|
||||
data = bytes(data)
|
||||
length = len(data)
|
||||
else:
|
||||
data = memoryview(data)
|
||||
length = data.nbytes
|
||||
data = data.tobytes()
|
||||
if self._compressor:
|
||||
if hasattr(self._compressor, 'write_to_file'):
|
||||
if self._compressor.write_to_file:
|
||||
self._compressor.write_to_file = False
|
||||
self._compressor.compress(data)
|
||||
self._compressor.write_to_file = True
|
||||
return
|
||||
else:
|
||||
data = self._compressor.compress(data)
|
||||
length = len(data)
|
||||
if self._pos < self._crc_size:
|
||||
le = min(length, self._crc_size - self._pos)
|
||||
self._crc32 = crc32(data[:le], self._crc32)
|
||||
self._fp.write(self._enc.update(data))
|
||||
self._pos += length
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
if len(self._salt) < 32:
|
||||
self._salt = self._salt + b'\x00' * (32 - len(self._salt))
|
||||
return bytes(a ^ b for a, b in zip(self._salt, self._key))
|
||||
|
||||
@property
|
||||
def iv(self):
|
||||
return self._iv
|
||||
@@ -15,7 +15,11 @@ else:
|
||||
have_cfapi = False
|
||||
|
||||
|
||||
File = namedtuple('File', ['id', 'file', 'size', 'program', 'hash', 'type'])
|
||||
_File = namedtuple('File', ['id', 'file', 'size', 'program', 'hash', 'type'])
|
||||
|
||||
|
||||
class File(_File):
|
||||
pass
|
||||
|
||||
|
||||
def mkdir_for_file(p: str):
|
||||
|
||||
100
testenc.py
Normal file
100
testenc.py
Normal file
@@ -0,0 +1,100 @@
|
||||
from game_backuper.compress import CompressConfig
|
||||
from game_backuper.enc import DecryptException, EncFile
|
||||
from os import urandom, remove
|
||||
from hashlib import sha512
|
||||
from zlib import crc32
|
||||
|
||||
|
||||
datalen = 4096
|
||||
data = urandom(datalen)
|
||||
a = sha512()
|
||||
a.update(data)
|
||||
with EncFile('a.txt', 'wb', a.digest()) as f:
|
||||
f.write(data)
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
crc = f.crc32
|
||||
print(crc)
|
||||
crc2 = '{:08x}'.format(crc32(data[:1024]))
|
||||
print(crc2)
|
||||
assert crc == crc2
|
||||
with EncFile('a.txt', 'rb', a.digest(), key, iv, datalen, crc) as f:
|
||||
d = f.read()
|
||||
assert d == data
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, datalen, crc) as f:
|
||||
try:
|
||||
d = f.read()
|
||||
assert False
|
||||
except DecryptException:
|
||||
pass
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('gzip', 9)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, compress=CompressConfig('gzip')) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('bzip2', 1)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, CompressConfig('bzip2', 1)) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('lzma', 1)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, CompressConfig('lzma', 1)) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('lzip', 1)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, CompressConfig('lzip', 1)) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('zstd', 1)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, CompressConfig('zstd', 1)) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('snappy', 1)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, CompressConfig('snappy', 1)) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
with EncFile('a.txt', 'wb', b'', compress=CompressConfig('brotli', 1)) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
key = f.key
|
||||
iv = f.iv
|
||||
le = f.tell()
|
||||
crc = f.crc32
|
||||
with EncFile('a.txt', 'rb', b'', key, iv, le, crc, CompressConfig('brotli', 1)) as f: # noqa: E501
|
||||
assert data == f.read()
|
||||
remove('a.txt')
|
||||
Reference in New Issue
Block a user