diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index 3b7897b8a88a45..d8e3b671ec229f 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -66,18 +66,28 @@ class BaseTest(unittest.TestCase): EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' BAD_DATA = b'this is not a valid bzip2 file' - # Some tests need more than one block of uncompressed data. Since one block - # is at least 100,000 bytes, we gather some data dynamically and compress it. - # Note that this assumes that compression works correctly, so we cannot - # simply use the bigger test data for all tests. + # Some tests need more than one block of data. The bz2 module does not + # support flushing a block during compression, so we must read in data until + # there are at least 2 blocks. Since different orderings of Python files may + # be compressed differently, we need to check the compression output for + # more than one bzip2 block header magic, a hex encoding of Pi + # (0x314159265359) + bz2_block_magic = bytes.fromhex('314159265359') test_size = 0 - BIG_TEXT = bytearray(128*1024) + BIG_TEXT = b'' + BIG_DATA = b'' + compressor = BZ2Compressor(1) for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')): with open(fname, 'rb') as fh: - test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) - if test_size > 128*1024: + data = fh.read() + BIG_DATA += compressor.compress(data) + BIG_TEXT += data + # TODO(emmatyping): if it is impossible for a block header to cross + # multiple outputs, we can just search the output of each compress call + # which should be more efficient + if BIG_DATA.count(bz2_block_magic) > 1: + BIG_DATA += compressor.flush() break - BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) def setUp(self): fd, self.filename = tempfile.mkstemp()