| #!/usr/bin/env python |
| # |
| # Copyright 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import argparse |
| import os |
| import struct |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import zipfile |
| |
| |
| class PackedTuple(object): |
| """A base class for a named tuple packed into a binary string.""" |
| |
| def __init__(self, members, data=None): |
| """Initialize the instance with a given set of members. |
| Args: |
| members: The list of members of the tuple, as a pair of strings: struct |
| encoding and member name. |
| """ |
| self._ptuple_fmt = '<' + ''.join(fldfmt for fldfmt, _ in members) |
| self._ptuple_flds = [fld for _, fld in members] |
| self._ptuple_size = struct.calcsize(self._ptuple_fmt) |
| |
| values = ([None for _ in members] if data is None |
| else struct.unpack(self._ptuple_fmt, data[:self._ptuple_size])) |
| for value, fld in zip(values, self._ptuple_flds): |
| setattr(self, fld, value) |
| |
| def __len__(self): |
| return self._ptuple_size |
| |
| def __str__(self): |
| return struct.pack(self._ptuple_fmt, |
| *(getattr(self, fld) for fld in self._ptuple_flds)) |
| |
| def __repr__(self): |
| return '<%s ' % (type(self).__name__) + ' '.join( |
| '%s=%r' % (fld, getattr(self, fld)) for fld in self._ptuple_flds) + ' >' |
| |
| |
| class GPTPartitionEntry(PackedTuple): |
| """A packed tuple representing a GPT partition entry.""" |
| |
| def __init__(self, data): |
| members = ( |
| ('16s', 'guid'), |
| ('16s', 'uuid'), |
| ('Q', 'first_lba'), |
| ('Q', 'last_lba'), |
| ('Q', 'flags'), |
| ('72s', 'name'), |
| ) |
| super(GPTPartitionEntry, self).__init__(members, data) |
| if data is None: |
| self.guid = '\0' * 16 |
| self.uuid = '\0' * 16 |
| self.name = '' |
| |
| |
| class GPTPartitionTable(PackedTuple): |
| """A packed tuple representing the header of a GPT partition.""" |
| |
| def __init__(self, data): |
| members = ( |
| ('8s', 'signature'), |
| ('I', 'revision'), |
| ('I', 'header_size'), |
| ('I', 'crc32'), |
| ('4s', '_pad'), |
| ('Q', 'current_lba'), |
| ('Q', 'backup_lba'), |
| ('Q', 'first_usable_lba'), |
| ('Q', 'last_usable_lba'), |
| ('16s', 'disk_guid'), |
| ('Q', 'part_entry_start_lba'), |
| ('I', 'num_part_entries'), |
| ('I', 'part_entry_size'), |
| ('I', 'crc32_part_array'), |
| ) |
| super(GPTPartitionTable, self).__init__(members, data) |
| if data is None: |
| self.current_lba = 1 |
| |
| |
| def SparseImageExtract(image_file): |
| """Return a temporary file with the RAW image from the sparse image in |
| |image_file|. |
| |
| If |image_file| isn't an Android sparse image returns None instead. The |
| temporary file will be deleted once closed. |
| """ |
| SPARSE_HEADER_FMT = '<I4H' |
| header_size = struct.calcsize(SPARSE_HEADER_FMT) |
| try: |
| # magic, major_version, minor_version, file_hdr_sz, chunk_hdr_sz |
| header = struct.unpack(SPARSE_HEADER_FMT, image_file.read(header_size)) |
| image_file.seek(-header_size, os.SEEK_CUR) |
| # This is the only combination supported, so we used it to identify sparse |
| # image files. |
| if header != (0xED26FF3A, 1, 0, 28, 12): |
| return |
| except IOError: |
| pass |
| |
| temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(image_file.name)) |
| simg2img_bin = os.environ.get('SIMG2IMG_BIN', 'simg2img') |
| subprocess.check_call([simg2img_bin, image_file.name, temp_file.name]) |
| return temp_file |
| |
| |
| def WritePartition(out_file, part_file, start_offset): |
| out_file.seek(start_offset); |
| # Autodetect sparse images if possible. |
| raw_file = SparseImageExtract(part_file) |
| source_file = raw_file if raw_file else part_file |
| shutil.copyfileobj(source_file, out_file) |
| return source_file.tell() |
| |
| |
| def ExtractZips(zips, file_name): |
| for zip_path in zips: |
| if not zip_path: |
| continue |
| with zipfile.ZipFile(zip_path, 'r') as zip_file: |
| if file_name not in zip_file.namelist(): |
| continue |
| with zip_file.open(file_name, 'r') as part_file: |
| temp_file = tempfile.NamedTemporaryFile() |
| shutil.copyfileobj(part_file, temp_file) |
| temp_file.seek(0) |
| return temp_file |
| |
| |
| def GenerateEMMC(args): |
| """Assemble an EMMC raw image.""" |
| if args.partition_table: |
| with open(args.partition_table, 'rb') as gpt_file: |
| partition_table = gpt_file.read() |
| else: |
| gpt_file = ExtractZips(args.tfzips, 'IMAGES/partition-table.img') |
| |
| if not gpt_file: |
| gpt_file = ExtractZips([args.factory_files], |
| 'partition-table.img') |
| |
| assert gpt_file, 'No partition_table provided' |
| partition_table = gpt_file.read() |
| |
| gpt = GPTPartitionTable(partition_table[512:]) |
| gpt_partitions = {} |
| for i in range(gpt.num_part_entries): |
| offset = gpt.part_entry_start_lba * 512 + i * gpt.part_entry_size |
| part = GPTPartitionEntry(partition_table[offset:]) |
| part_name = str(part.name.decode('utf-16le').rstrip('\0')) |
| if part_name: |
| if part_name in gpt_partitions: |
| print 'Ignoring duplicate partition entry "%s"' % part_name |
| else: |
| gpt_partitions[part_name] = part |
| |
| gpt_head = partition_table[:gpt.first_usable_lba * 512] |
| gpt_tail = partition_table[gpt.first_usable_lba * 512:] |
| gpt_tail_offset = (gpt.last_usable_lba + 1) * 512 |
| |
| out_file = tempfile.NamedTemporaryFile() |
| print("Creating image: %s" % out_file.name) |
| out_file.write(gpt_head) |
| out_file.seek(gpt_tail_offset) |
| out_file.write(gpt_tail) |
| |
| partition_overrides = {} |
| if args.partition: |
| partition_overrides = {name: path for name, path in args.partition} |
| for part_name, part in gpt_partitions.items(): |
| part_offset = part.first_lba * 512 |
| part_size = (part.last_lba - part.first_lba + 1) * 512 |
| |
| if part_name.endswith('_a') or part_name.endswith('_b'): |
| if not args.all_slots and part_name.endswith('_b'): |
| continue |
| part_name_no_suffix = part_name[:-2] |
| else: |
| part_name_no_suffix = part_name |
| |
| if part_name in partition_overrides: |
| part_file = open(partition_overrides[part_name], 'rb') |
| elif part_name_no_suffix in partition_overrides: |
| part_file = open(partition_overrides[part_name_no_suffix], 'rb') |
| elif part_name == 'userdata': |
| part_file = None |
| else: |
| part_file = ExtractZips(args.tfzips, |
| 'IMAGES/%s.img' % part_name_no_suffix) |
| if not part_file: |
| part_file = ExtractZips([args.factory_files], |
| '%s.img' % part_name_no_suffix) |
| if not part_file: |
| print ' Skipping partition "%s", no file provided' % (part_name,) |
| continue |
| |
| print(' Copying partition "%s"' % (part_name)) |
| copied_size = WritePartition(out_file, part_file, part_offset) |
| part_file.close() |
| assert copied_size <= part_size, \ |
| 'Partition %s overflow; size is %d KiB but copied %d KiB' % ( |
| part_name, part_size / 1024, copied_size / 1024) |
| print(' Partition "%s", copied size: %d KiB at offset %d KiB' % |
| (part_name, copied_size / 1024, part_offset / 1024)) |
| |
| out_file.flush() |
| with zipfile.ZipFile(args.out, 'w', zipfile.ZIP_DEFLATED, True) as out_zip: |
| print("Zipping image: %s" % args.out) |
| out_zip.write(out_file.name, 'emmc.img') |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser(description=GenerateEMMC.__doc__) |
| parser.add_argument('--out', metavar='OUTPUT', type=str, required=True, |
| help='The output zipped emmc image.') |
| parser.add_argument('--partition_table', help='path to the MBR+GPT image.') |
| parser.add_argument('--partition', nargs=2, metavar=('PARTITION_NAME','PATH'), |
| action='append', help='override partition images.') |
| parser.add_argument('--factory_files', |
| help='path to the factory_files or flashfiles zip') |
| parser.add_argument('tfzips', nargs='*', metavar='TARGET_FILES', |
| help='path to target_files zip(s)') |
| parser.add_argument('--all-slots', default=False, action='store_true', |
| help='copy the provided images to all slots') |
| |
| sys.exit(GenerateEMMC(parser.parse_args())) |