blob: 2ae34bee865624b1610b4ae7a5040cfd78f24bdd [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import struct
import shutil
import subprocess
import sys
import tempfile
import zipfile
class PackedTuple(object):
"""A base class for a named tuple packed into a binary string."""
def __init__(self, members, data=None):
"""Initialize the instance with a given set of members.
Args:
members: The list of members of the tuple, as a pair of strings: struct
encoding and member name.
"""
self._ptuple_fmt = '<' + ''.join(fldfmt for fldfmt, _ in members)
self._ptuple_flds = [fld for _, fld in members]
self._ptuple_size = struct.calcsize(self._ptuple_fmt)
values = ([None for _ in members] if data is None
else struct.unpack(self._ptuple_fmt, data[:self._ptuple_size]))
for value, fld in zip(values, self._ptuple_flds):
setattr(self, fld, value)
def __len__(self):
return self._ptuple_size
def __str__(self):
return struct.pack(self._ptuple_fmt,
*(getattr(self, fld) for fld in self._ptuple_flds))
def __repr__(self):
return '<%s ' % (type(self).__name__) + ' '.join(
'%s=%r' % (fld, getattr(self, fld)) for fld in self._ptuple_flds) + ' >'
class GPTPartitionEntry(PackedTuple):
"""A packed tuple representing a GPT partition entry."""
def __init__(self, data):
members = (
('16s', 'guid'),
('16s', 'uuid'),
('Q', 'first_lba'),
('Q', 'last_lba'),
('Q', 'flags'),
('72s', 'name'),
)
super(GPTPartitionEntry, self).__init__(members, data)
if data is None:
self.guid = '\0' * 16
self.uuid = '\0' * 16
self.name = ''
class GPTPartitionTable(PackedTuple):
"""A packed tuple representing the header of a GPT partition."""
def __init__(self, data):
members = (
('8s', 'signature'),
('I', 'revision'),
('I', 'header_size'),
('I', 'crc32'),
('4s', '_pad'),
('Q', 'current_lba'),
('Q', 'backup_lba'),
('Q', 'first_usable_lba'),
('Q', 'last_usable_lba'),
('16s', 'disk_guid'),
('Q', 'part_entry_start_lba'),
('I', 'num_part_entries'),
('I', 'part_entry_size'),
('I', 'crc32_part_array'),
)
super(GPTPartitionTable, self).__init__(members, data)
if data is None:
self.current_lba = 1
def SparseImageExtract(image_file):
"""Return a temporary file with the RAW image from the sparse image in
|image_file|.
If |image_file| isn't an Android sparse image returns None instead. The
temporary file will be deleted once closed.
"""
SPARSE_HEADER_FMT = '<I4H'
header_size = struct.calcsize(SPARSE_HEADER_FMT)
try:
# magic, major_version, minor_version, file_hdr_sz, chunk_hdr_sz
header = struct.unpack(SPARSE_HEADER_FMT, image_file.read(header_size))
image_file.seek(-header_size, os.SEEK_CUR)
# This is the only combination supported, so we used it to identify sparse
# image files.
if header != (0xED26FF3A, 1, 0, 28, 12):
return
except IOError:
pass
temp_file = tempfile.NamedTemporaryFile(dir=os.path.dirname(image_file.name))
simg2img_bin = os.environ.get('SIMG2IMG_BIN', 'simg2img')
subprocess.check_call([simg2img_bin, image_file.name, temp_file.name])
return temp_file
def WritePartition(out_file, part_file, start_offset):
out_file.seek(start_offset);
# Autodetect sparse images if possible.
raw_file = SparseImageExtract(part_file)
source_file = raw_file if raw_file else part_file
shutil.copyfileobj(source_file, out_file)
return source_file.tell()
def ExtractZips(zips, file_name):
for zip_path in zips:
if not zip_path:
continue
with zipfile.ZipFile(zip_path, 'r') as zip_file:
if file_name not in zip_file.namelist():
continue
with zip_file.open(file_name, 'r') as part_file:
temp_file = tempfile.NamedTemporaryFile()
shutil.copyfileobj(part_file, temp_file)
temp_file.seek(0)
return temp_file
def GenerateEMMC(args):
"""Assemble an EMMC raw image."""
if args.partition_table:
with open(args.partition_table, 'rb') as gpt_file:
partition_table = gpt_file.read()
else:
gpt_file = ExtractZips(args.tfzips, 'IMAGES/partition-table.img')
if not gpt_file:
gpt_file = ExtractZips([args.factory_files],
'partition-table.img')
assert gpt_file, 'No partition_table provided'
partition_table = gpt_file.read()
gpt = GPTPartitionTable(partition_table[512:])
gpt_partitions = {}
for i in range(gpt.num_part_entries):
offset = gpt.part_entry_start_lba * 512 + i * gpt.part_entry_size
part = GPTPartitionEntry(partition_table[offset:])
part_name = str(part.name.decode('utf-16le').rstrip('\0'))
if part_name:
if part_name in gpt_partitions:
print 'Ignoring duplicate partition entry "%s"' % part_name
else:
gpt_partitions[part_name] = part
gpt_head = partition_table[:gpt.first_usable_lba * 512]
gpt_tail = partition_table[gpt.first_usable_lba * 512:]
gpt_tail_offset = (gpt.last_usable_lba + 1) * 512
out_file = tempfile.NamedTemporaryFile()
print("Creating image: %s" % out_file.name)
out_file.write(gpt_head)
out_file.seek(gpt_tail_offset)
out_file.write(gpt_tail)
partition_overrides = {}
if args.partition:
partition_overrides = {name: path for name, path in args.partition}
for part_name, part in gpt_partitions.items():
part_offset = part.first_lba * 512
part_size = (part.last_lba - part.first_lba + 1) * 512
if part_name.endswith('_a') or part_name.endswith('_b'):
if not args.all_slots and part_name.endswith('_b'):
continue
part_name_no_suffix = part_name[:-2]
else:
part_name_no_suffix = part_name
if part_name in partition_overrides:
part_file = open(partition_overrides[part_name], 'rb')
elif part_name_no_suffix in partition_overrides:
part_file = open(partition_overrides[part_name_no_suffix], 'rb')
elif part_name == 'userdata':
part_file = None
else:
part_file = ExtractZips(args.tfzips,
'IMAGES/%s.img' % part_name_no_suffix)
if not part_file:
part_file = ExtractZips([args.factory_files],
'%s.img' % part_name_no_suffix)
if not part_file:
print ' Skipping partition "%s", no file provided' % (part_name,)
continue
print(' Copying partition "%s"' % (part_name))
copied_size = WritePartition(out_file, part_file, part_offset)
part_file.close()
assert copied_size <= part_size, \
'Partition %s overflow; size is %d KiB but copied %d KiB' % (
part_name, part_size / 1024, copied_size / 1024)
print(' Partition "%s", copied size: %d KiB at offset %d KiB' %
(part_name, copied_size / 1024, part_offset / 1024))
out_file.flush()
with zipfile.ZipFile(args.out, 'w', zipfile.ZIP_DEFLATED, True) as out_zip:
print("Zipping image: %s" % args.out)
out_zip.write(out_file.name, 'emmc.img')
return 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=GenerateEMMC.__doc__)
parser.add_argument('--out', metavar='OUTPUT', type=str, required=True,
help='The output zipped emmc image.')
parser.add_argument('--partition_table', help='path to the MBR+GPT image.')
parser.add_argument('--partition', nargs=2, metavar=('PARTITION_NAME','PATH'),
action='append', help='override partition images.')
parser.add_argument('--factory_files',
help='path to the factory_files or flashfiles zip')
parser.add_argument('tfzips', nargs='*', metavar='TARGET_FILES',
help='path to target_files zip(s)')
parser.add_argument('--all-slots', default=False, action='store_true',
help='copy the provided images to all slots')
sys.exit(GenerateEMMC(parser.parse_args()))