|
8 | 8 | import os |
9 | 9 | import struct |
10 | 10 | import sys |
| 11 | +import tarfile |
11 | 12 | import unittest |
12 | 13 | from subprocess import PIPE, Popen |
13 | 14 | from unittest import mock |
@@ -790,6 +791,51 @@ def test_compress_mtime_default(self): |
790 | 791 | f.read(1) # to set mtime attribute |
791 | 792 | self.assertGreater(f.mtime, 1) |
792 | 793 |
|
| 794 | + def assertReproducibleGzipMetadata(self, datac, data_size): |
| 795 | + self.assertEqual(datac[:4], b"\x1f\x8b\x08\x00") |
| 796 | + self.assertEqual(struct.unpack("<I", datac[4:8])[0], 0) |
| 797 | + self.assertEqual(datac[9], 255) |
| 798 | + self.assertEqual(struct.unpack("<I", datac[-4:])[0], data_size) |
| 799 | + |
| 800 | + def test_reproducible_output_metadata(self): |
| 801 | + data = b"Hello world" |
| 802 | + |
| 803 | + def gzip_file(): |
| 804 | + buf = io.BytesIO() |
| 805 | + with gzip.GzipFile(fileobj=buf, mode="wb", mtime=0) as f: |
| 806 | + f.write(data) |
| 807 | + return buf.getvalue() |
| 808 | + |
| 809 | + def gzip_open(): |
| 810 | + buf = io.BytesIO() |
| 811 | + with gzip.open(buf, mode="wb", mtime=0) as f: |
| 812 | + f.write(data) |
| 813 | + return buf.getvalue() |
| 814 | + |
| 815 | + def gzipped_tarfile(): |
| 816 | + buf = io.BytesIO() |
| 817 | + tarinfo = tarfile.TarInfo("data") |
| 818 | + tarinfo.size = len(data) |
| 819 | + tarinfo.mtime = 0 |
| 820 | + with tarfile.open(fileobj=buf, mode="w:gz", mtime=0) as tar: |
| 821 | + tar.addfile(tarinfo, io.BytesIO(data)) |
| 822 | + return buf.getvalue() |
| 823 | + |
| 824 | + cases = [ |
| 825 | + ("compress default mtime", lambda: gzip.compress(data), len(data)), |
| 826 | + ("compress explicit mtime", lambda: gzip.compress(data, mtime=0), len(data)), |
| 827 | + ("GzipFile", gzip_file, len(data)), |
| 828 | + ("gzip.open", gzip_open, len(data)), |
| 829 | + ("gzipped tarfile", gzipped_tarfile, None), |
| 830 | + ] |
| 831 | + for name, make_gzip, data_size in cases: |
| 832 | + with self.subTest(name): |
| 833 | + datac = make_gzip() |
| 834 | + self.assertEqual(datac, make_gzip()) |
| 835 | + if data_size is None: |
| 836 | + data_size = len(gzip.decompress(datac)) |
| 837 | + self.assertReproducibleGzipMetadata(datac, data_size) |
| 838 | + |
793 | 839 | def test_compress_correct_level(self): |
794 | 840 | for mtime in (0, 42): |
795 | 841 | with self.subTest(mtime=mtime): |
|
0 commit comments