binman: Move compression into binman

The compression functions are not actually used by patman, so we don't
need then in the tools module. Also we want to change them to use
bintools, which patman will not support.

Move these into a new comp_util module, within binman.

Signed-off-by: Simon Glass <sjg@chromium.org>
This commit is contained in:
Simon Glass 2022-01-09 20:14:03 -07:00
parent f75db1e996
commit ad35ce5466
6 changed files with 102 additions and 89 deletions

View File

@ -20,6 +20,7 @@ import io
import struct
import sys
from binman import comp_util
from binman import elf
from patman import command
from patman import tools
@ -240,9 +241,9 @@ class CbfsFile(object):
"""Handle decompressing data if necessary"""
indata = self.data
if self.compress == COMPRESS_LZ4:
data = tools.Decompress(indata, 'lz4', with_header=False)
data = comp_util.Decompress(indata, 'lz4', with_header=False)
elif self.compress == COMPRESS_LZMA:
data = tools.Decompress(indata, 'lzma', with_header=False)
data = comp_util.Decompress(indata, 'lzma', with_header=False)
else:
data = indata
self.memlen = len(data)
@ -361,9 +362,9 @@ class CbfsFile(object):
elif self.ftype == TYPE_RAW:
orig_data = data
if self.compress == COMPRESS_LZ4:
data = tools.Compress(orig_data, 'lz4', with_header=False)
data = comp_util.Compress(orig_data, 'lz4', with_header=False)
elif self.compress == COMPRESS_LZMA:
data = tools.Compress(orig_data, 'lzma', with_header=False)
data = comp_util.Compress(orig_data, 'lzma', with_header=False)
self.memlen = len(orig_data)
self.data_len = len(data)
attr = struct.pack(ATTR_COMPRESSION_FORMAT,

88
tools/binman/comp_util.py Normal file
View File

@ -0,0 +1,88 @@
# SPDX-License-Identifier: GPL-2.0+
# Copyright 2022 Google LLC
#
"""Utilities to compress and decompress data"""
import struct
import tempfile
from patman import tools
def Compress(indata, algo, with_header=True):
"""Compress some data using a given algorithm
Note that for lzma this uses an old version of the algorithm, not that
provided by xz.
This requires 'lz4' and 'lzma_alone' tools. It also requires an output
directory to be previously set up, by calling PrepareOutputDir().
Care is taken to use unique temporary files so that this function can be
called from multiple threads.
Args:
indata: Input data to compress
algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma')
Returns:
Compressed data
"""
if algo == 'none':
return indata
fname = tempfile.NamedTemporaryFile(prefix='%s.comp.tmp' % algo,
dir=tools.GetOutputDir()).name
tools.WriteFile(fname, indata)
if algo == 'lz4':
data = tools.Run('lz4', '--no-frame-crc', '-B4', '-5', '-c', fname,
binary=True)
# cbfstool uses a very old version of lzma
elif algo == 'lzma':
outfname = tempfile.NamedTemporaryFile(prefix='%s.comp.otmp' % algo,
dir=tools.GetOutputDir()).name
tools.Run('lzma_alone', 'e', fname, outfname, '-lc1', '-lp0', '-pb0',
'-d8')
data = tools.ReadFile(outfname)
elif algo == 'gzip':
data = tools.Run('gzip', '-c', fname, binary=True)
else:
raise ValueError("Unknown algorithm '%s'" % algo)
if with_header:
hdr = struct.pack('<I', len(data))
data = hdr + data
return data
def Decompress(indata, algo, with_header=True):
"""Decompress some data using a given algorithm
Note that for lzma this uses an old version of the algorithm, not that
provided by xz.
This requires 'lz4' and 'lzma_alone' tools. It also requires an output
directory to be previously set up, by calling PrepareOutputDir().
Args:
indata: Input data to decompress
algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma')
Returns:
Compressed data
"""
if algo == 'none':
return indata
if with_header:
data_len = struct.unpack('<I', indata[:4])[0]
indata = indata[4:4 + data_len]
fname = tools.GetOutputFilename('%s.decomp.tmp' % algo)
with open(fname, 'wb') as fd:
fd.write(indata)
if algo == 'lz4':
data = tools.Run('lz4', '-dc', fname, binary=True)
elif algo == 'lzma':
outfname = tools.GetOutputFilename('%s.decomp.otmp' % algo)
tools.Run('lzma_alone', 'd', fname, outfname)
data = tools.ReadFile(outfname, binary=True)
elif algo == 'gzip':
data = tools.Run('gzip', '-cd', fname, binary=True)
else:
raise ValueError("Unknown algorithm '%s'" % algo)
return data

View File

@ -11,6 +11,7 @@ import pathlib
import sys
from binman import bintool
from binman import comp_util
from dtoc import fdt_util
from patman import tools
from patman.tools import ToHex, ToHexSize
@ -1034,7 +1035,7 @@ features to produce new behaviours.
self.uncomp_data = indata
if self.compress != 'none':
self.uncomp_size = len(indata)
data = tools.Compress(indata, self.compress)
data = comp_util.Compress(indata, self.compress)
return data
@classmethod

View File

@ -13,6 +13,7 @@ import concurrent.futures
import re
import sys
from binman import comp_util
from binman.entry import Entry
from binman import state
from dtoc import fdt_util
@ -775,7 +776,7 @@ class Entry_section(Entry):
data = parent_data[offset:offset + child.size]
if decomp:
indata = data
data = tools.Decompress(indata, child.compress)
data = comp_util.Decompress(indata, child.compress)
if child.uncomp_size:
tout.Info("%s: Decompressing data size %#x with algo '%s' to data size %#x" %
(child.GetPath(), len(indata), child.compress,

View File

@ -23,6 +23,7 @@ import urllib.error
from binman import bintool
from binman import cbfs_util
from binman import cmdline
from binman import comp_util
from binman import control
from binman import elf
from binman import elf_test
@ -1926,7 +1927,7 @@ class TestFunctional(unittest.TestCase):
self._ResetDtbs()
def _decompress(self, data):
return tools.Decompress(data, 'lz4')
return comp_util.Decompress(data, 'lz4')
def testCompress(self):
"""Test compression of blobs"""
@ -2805,7 +2806,7 @@ class TestFunctional(unittest.TestCase):
def testExtractCbfsRaw(self):
"""Test extracting CBFS compressed data without decompressing it"""
data = self._RunExtractCmd('section/cbfs/u-boot-dtb', decomp=False)
dtb = tools.Decompress(data, 'lzma', with_header=False)
dtb = comp_util.Decompress(data, 'lzma', with_header=False)
self.assertEqual(EXTRACT_DTB_SIZE, len(dtb))
def testExtractBadEntry(self):
@ -4232,13 +4233,13 @@ class TestFunctional(unittest.TestCase):
# Check compressed data
section1 = self._decompress(rest)
expect1 = tools.Compress(COMPRESS_DATA + U_BOOT_DATA, 'lz4')
expect1 = comp_util.Compress(COMPRESS_DATA + U_BOOT_DATA, 'lz4')
self.assertEquals(expect1, rest[:len(expect1)])
self.assertEquals(COMPRESS_DATA + U_BOOT_DATA, section1)
rest1 = rest[len(expect1):]
section2 = self._decompress(rest1)
expect2 = tools.Compress(COMPRESS_DATA + COMPRESS_DATA, 'lz4')
expect2 = comp_util.Compress(COMPRESS_DATA + COMPRESS_DATA, 'lz4')
self.assertEquals(expect2, rest1[:len(expect2)])
self.assertEquals(COMPRESS_DATA + COMPRESS_DATA, section2)
rest2 = rest1[len(expect2):]

View File

@ -7,7 +7,6 @@ import glob
import os
import shlex
import shutil
import struct
import sys
import tempfile
import urllib.request
@ -518,84 +517,6 @@ def ToString(bval):
"""
return bval.decode('utf-8')
def Compress(indata, algo, with_header=True):
"""Compress some data using a given algorithm
Note that for lzma this uses an old version of the algorithm, not that
provided by xz.
This requires 'lz4' and 'lzma_alone' tools. It also requires an output
directory to be previously set up, by calling PrepareOutputDir().
Care is taken to use unique temporary files so that this function can be
called from multiple threads.
Args:
indata: Input data to compress
algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma')
Returns:
Compressed data
"""
if algo == 'none':
return indata
fname = tempfile.NamedTemporaryFile(prefix='%s.comp.tmp' % algo,
dir=outdir).name
WriteFile(fname, indata)
if algo == 'lz4':
data = Run('lz4', '--no-frame-crc', '-B4', '-5', '-c', fname,
binary=True)
# cbfstool uses a very old version of lzma
elif algo == 'lzma':
outfname = tempfile.NamedTemporaryFile(prefix='%s.comp.otmp' % algo,
dir=outdir).name
Run('lzma_alone', 'e', fname, outfname, '-lc1', '-lp0', '-pb0', '-d8')
data = ReadFile(outfname)
elif algo == 'gzip':
data = Run('gzip', '-c', fname, binary=True)
else:
raise ValueError("Unknown algorithm '%s'" % algo)
if with_header:
hdr = struct.pack('<I', len(data))
data = hdr + data
return data
def Decompress(indata, algo, with_header=True):
"""Decompress some data using a given algorithm
Note that for lzma this uses an old version of the algorithm, not that
provided by xz.
This requires 'lz4' and 'lzma_alone' tools. It also requires an output
directory to be previously set up, by calling PrepareOutputDir().
Args:
indata: Input data to decompress
algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma')
Returns:
Compressed data
"""
if algo == 'none':
return indata
if with_header:
data_len = struct.unpack('<I', indata[:4])[0]
indata = indata[4:4 + data_len]
fname = GetOutputFilename('%s.decomp.tmp' % algo)
with open(fname, 'wb') as fd:
fd.write(indata)
if algo == 'lz4':
data = Run('lz4', '-dc', fname, binary=True)
elif algo == 'lzma':
outfname = GetOutputFilename('%s.decomp.otmp' % algo)
Run('lzma_alone', 'd', fname, outfname)
data = ReadFile(outfname, binary=True)
elif algo == 'gzip':
data = Run('gzip', '-cd', fname, binary=True)
else:
raise ValueError("Unknown algorithm '%s'" % algo)
return data
def ToHex(val):
"""Convert an integer value (or None) to a string