| #!/usr/bin/env python |
| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import datetime |
| import itertools |
| import logging |
| import re |
| |
| from google.appengine.ext import ndb |
| |
| from webapp.src import vtslab_status as Status |
| from webapp.src.proto import model |
| from webapp.src.utils import logger |
| import webapp2 |
| |
| MAX_LOG_CHARACTERS = 10000 # maximum number of characters per each log |
| BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60 # retry minutes when boot-up error is occurred |
| |
| CREATE_JOB_SUCCESS = "success" |
| CREATE_JOB_FAILED_NO_BUILD = "no_build" |
| CREATE_JOB_FAILED_NO_DEVICE = "no_device" |
| |
| |
| def GetTestVersionType(manifest_branch, gsi_branch, test_type=0): |
| """Compares manifest branch and gsi branch to get test type. |
| |
| This function only completes two LSBs which represent version related |
| test type. |
| |
| Args: |
| manifest_branch: a string, manifest branch name. |
| gsi_branch: a string, gsi branch name. |
| test_type: an integer, previous test type value. |
| |
| Returns: |
| An integer, test type value. |
| """ |
| if not test_type: |
| value = 0 |
| else: |
| # clear two bits |
| value = test_type & ~(1 | 1 << 1) |
| |
| if not manifest_branch: |
| logging.debug("manifest branch cannot be empty or None.") |
| return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] |
| |
| if not gsi_branch: |
| logging.debug("gsi_branch is empty.") |
| return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT] |
| |
| gcs_pattern = "^gs://.*/v([0-9.]*)/.*" |
| q_pattern = "(git_)?(aosp-)?q.*" |
| p_pattern = "(git_)?(aosp-)?p.*" |
| o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*" |
| o_pattern = "(git_)?(aosp-)?o.*" |
| master_pattern = "(git_)?(aosp-)?master" |
| |
| gcs_search = re.search(gcs_pattern, manifest_branch) |
| if gcs_search: |
| device_version = gcs_search.group(1) |
| elif re.match(q_pattern, manifest_branch): |
| device_version = "10.0" |
| elif re.match(p_pattern, manifest_branch): |
| device_version = "9.0" |
| elif re.match(o_mr1_pattern, manifest_branch): |
| device_version = "8.1" |
| elif re.match(o_pattern, manifest_branch): |
| device_version = "8.0" |
| elif re.match(master_pattern, manifest_branch): |
| device_version = "master" |
| else: |
| logging.debug("Unknown device version.") |
| return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] |
| |
| gcs_search = re.search(gcs_pattern, gsi_branch) |
| if gcs_search: |
| gsi_version = gcs_search.group(1) |
| elif re.match(q_pattern, gsi_branch): |
| gsi_version = "10.0" |
| elif re.match(p_pattern, gsi_branch): |
| gsi_version = "9.0" |
| elif re.match(o_mr1_pattern, gsi_branch): |
| gsi_version = "8.1" |
| elif re.match(o_pattern, gsi_branch): |
| gsi_version = "8.0" |
| elif re.match(master_pattern, gsi_branch): |
| gsi_version = "master" |
| else: |
| logging.debug("Unknown gsi version.") |
| return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] |
| |
| if device_version == gsi_version: |
| return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT] |
| else: |
| return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA] |
| |
| |
| class ScheduleHandler(webapp2.RequestHandler): |
| """Background worker class for /worker/schedule_handler. |
| |
| This class pull tasks from 'queue-schedule' queue and processes in |
| background service 'worker'. |
| |
| Attributes: |
| logger: Logger class |
| """ |
| logger = logger.Logger() |
| |
| def ReserveDevices(self, target_device_serials): |
| """Reserves devices. |
| |
| Args: |
| target_device_serials: a list of strings, containing target device |
| serial numbers. |
| """ |
| device_query = model.DeviceModel.query( |
| model.DeviceModel.serial.IN(target_device_serials)) |
| devices = device_query.fetch() |
| devices_to_put = [] |
| for device in devices: |
| device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[ |
| "reserved"] |
| devices_to_put.append(device) |
| if devices_to_put: |
| ndb.put_multi(devices_to_put) |
| |
| def FindBuildId(self, artifact_type, manifest_branch, target, |
| signed=False): |
| """Finds a designated build ID. |
| |
| Args: |
| artifact_type: a string, build artifact type. |
| manifest_branch: a string, build manifest branch. |
| target: a string which build target and type are joined by '-'. |
| signed: a boolean to get a signed build. |
| |
| Return: |
| string, build ID found. |
| """ |
| build_id = "" |
| if "-" in target: |
| build_target, build_type = target.split("-") |
| else: |
| build_target = target |
| build_type = "" |
| if not artifact_type or not manifest_branch or not build_target: |
| self.logger.Println("The argument format is invalid.") |
| return build_id |
| build_query = model.BuildModel.query( |
| model.BuildModel.artifact_type == artifact_type, |
| model.BuildModel.manifest_branch == manifest_branch, |
| model.BuildModel.build_target == build_target, |
| model.BuildModel.build_type == build_type) |
| builds = build_query.fetch() |
| |
| if builds: |
| builds = [ |
| build for build in builds |
| if (build.timestamp > |
| datetime.datetime.now() - datetime.timedelta(hours=72)) |
| ] |
| |
| if builds: |
| self.logger.Println("-- Found build ID") |
| builds.sort(key=lambda x: x.build_id, reverse=True) |
| for build in builds: |
| if not signed or build.signed: |
| build_id = build.build_id |
| break |
| return build_id |
| |
| def post(self): |
| self.logger.Clear() |
| manual_job = False |
| schedule_key = self.request.get("schedule_key") |
| if schedule_key: |
| key = ndb.key.Key(urlsafe=schedule_key) |
| manual_job = True |
| schedules = [key.get()] |
| else: |
| schedule_query = model.ScheduleModel.query( |
| model.ScheduleModel.suspended != True) |
| schedules = schedule_query.fetch() |
| |
| if schedules: |
| # filter out the schedules which are not updated within 72 hours. |
| schedules = [ |
| schedule for schedule in schedules |
| if (schedule.timestamp > |
| datetime.datetime.now() - datetime.timedelta(hours=72)) |
| ] |
| schedules = self.FilterWithPeriod(schedules) |
| |
| if schedules: |
| schedules.sort(key=lambda x: self.GetProductName(x)) |
| group_by_product = [ |
| list(g) |
| for _, g in itertools.groupby(schedules, |
| lambda x: self.GetProductName(x)) |
| ] |
| for group in group_by_product: |
| group.sort(key=lambda x: x.priority_value if ( |
| x.priority_value) else Status.GetPriorityValue(x.priority)) |
| create_result = { |
| CREATE_JOB_SUCCESS: [], |
| CREATE_JOB_FAILED_NO_BUILD: [], |
| CREATE_JOB_FAILED_NO_DEVICE: [] |
| } |
| for schedule in group: |
| self.logger.Println("") |
| self.logger.Println("Schedule: %s (branch: %s)" % |
| (schedule.test_name, |
| schedule.manifest_branch)) |
| self.logger.Println( |
| "Build Target: %s" % schedule.build_target) |
| self.logger.Println("Device: %s" % schedule.device) |
| self.logger.Indent() |
| result, lab = self.CreateJob(schedule, manual_job) |
| if result == CREATE_JOB_SUCCESS: |
| create_result[result].append(lab) |
| else: |
| create_result[result].append(schedule) |
| self.logger.Unindent() |
| # if any schedule in group created a job, increase priority of |
| # the schedules which couldn't create due to out of devices. |
| schedules_to_put = [] |
| for lab in create_result[CREATE_JOB_SUCCESS]: |
| for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]: |
| if any([lab in target for target in schedule.device |
| ]) and schedule not in schedules_to_put: |
| if schedule.priority_value is None: |
| schedule.priority_value = ( |
| Status.GetPriorityValue(schedule.priority)) |
| if schedule.priority_value > 0: |
| schedule.priority_value -= 1 |
| schedules_to_put.append(schedule) |
| if schedules_to_put: |
| ndb.put_multi(schedules_to_put) |
| |
| self.logger.Println("Scheduling completed.") |
| |
| lines = self.logger.Get() |
| lines = [line.strip() for line in lines] |
| outputs = [] |
| chars = 0 |
| for line in lines: |
| chars += len(line) |
| if chars > MAX_LOG_CHARACTERS: |
| logging.info("\n".join(outputs)) |
| outputs = [] |
| chars = len(line) |
| outputs.append(line) |
| logging.info("\n".join(outputs)) |
| |
| def CreateJob(self, schedule, manual_job=False): |
| """Creates a job for given schedule. |
| |
| Args: |
| schedule: model.ScheduleModel instance. |
| manual_job: True if a job is created by a user, False otherwise. |
| |
| Returns: |
| a string of job creation result message. |
| a string of lab name if job is created, otherwise empty string. |
| """ |
| target_host, target_device, target_device_serials = ( |
| self.SelectTargetLab(schedule)) |
| if not target_host: |
| return CREATE_JOB_FAILED_NO_DEVICE, "" |
| |
| self.logger.Println("- Target host: %s" % target_host) |
| self.logger.Println("- Target device: %s" % target_device) |
| self.logger.Println("- Target serials: %s" % target_device_serials) |
| |
| # create job and add. |
| new_job = model.JobModel() |
| new_job.hostname = target_host |
| new_job.priority = schedule.priority |
| new_job.test_name = schedule.test_name |
| new_job.require_signed_device_build = ( |
| schedule.require_signed_device_build) |
| new_job.device = target_device |
| new_job.period = schedule.period |
| new_job.serial.extend(target_device_serials) |
| new_job.build_storage_type = schedule.build_storage_type |
| new_job.manifest_branch = schedule.manifest_branch |
| new_job.build_target = schedule.build_target |
| new_job.pab_account_id = schedule.device_pab_account_id |
| new_job.shards = schedule.shards |
| new_job.param = schedule.param |
| new_job.retry_count = schedule.retry_count |
| new_job.gsi_storage_type = schedule.gsi_storage_type |
| new_job.gsi_branch = schedule.gsi_branch |
| new_job.gsi_build_target = schedule.gsi_build_target |
| new_job.gsi_pab_account_id = schedule.gsi_pab_account_id |
| new_job.gsi_vendor_version = schedule.gsi_vendor_version |
| new_job.test_storage_type = schedule.test_storage_type |
| new_job.test_branch = schedule.test_branch |
| new_job.test_build_target = schedule.test_build_target |
| new_job.test_pab_account_id = schedule.test_pab_account_id |
| new_job.parent_schedule = schedule.key |
| new_job.image_package_repo_base = schedule.image_package_repo_base |
| new_job.required_host_equipment = schedule.required_host_equipment |
| new_job.required_device_equipment = schedule.required_device_equipment |
| new_job.has_bootloader_img = schedule.has_bootloader_img |
| new_job.has_radio_img = schedule.has_radio_img |
| new_job.report_bucket = schedule.report_bucket |
| new_job.report_spreadsheet_id = schedule.report_spreadsheet_id |
| new_job.report_persistent_url = schedule.report_persistent_url |
| new_job.report_reference_url = schedule.report_reference_url |
| |
| # uses bit 0-1 to indicate version. |
| test_type = GetTestVersionType(schedule.manifest_branch, |
| schedule.gsi_branch) |
| # uses bit 2 |
| if schedule.require_signed_device_build: |
| test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED] |
| |
| if manual_job: |
| test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL] |
| |
| new_job.test_type = test_type |
| |
| new_job.build_id = "" |
| new_job.gsi_build_id = "" |
| new_job.test_build_id = "" |
| for artifact_type in ["device", "gsi", "test"]: |
| if artifact_type == "device": |
| storage_type_text = "build_storage_type" |
| manifest_branch_text = "manifest_branch" |
| build_target_text = "build_target" |
| build_id_text = "build_id" |
| signed = new_job.require_signed_device_build |
| else: |
| storage_type_text = artifact_type + "_storage_type" |
| manifest_branch_text = artifact_type + "_branch" |
| build_target_text = artifact_type + "_build_target" |
| build_id_text = artifact_type + "_build_id" |
| signed = False |
| |
| manifest_branch = getattr(new_job, manifest_branch_text) |
| build_target = getattr(new_job, build_target_text) |
| storage_type = getattr(new_job, storage_type_text) |
| if storage_type == Status.STORAGE_TYPE_DICT["PAB"]: |
| build_id = self.FindBuildId( |
| artifact_type=artifact_type, |
| manifest_branch=manifest_branch, |
| target=build_target, |
| signed=signed) |
| elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]: |
| # temp value to distinguish from empty values. |
| build_id = "gcs" |
| else: |
| build_id = "" |
| self.logger.Println( |
| "Unexpected storage type (%s)." % storage_type) |
| setattr(new_job, build_id_text, build_id) |
| |
| if ((not new_job.manifest_branch or new_job.build_id) |
| and (not new_job.gsi_branch or new_job.gsi_build_id) |
| and (not new_job.test_branch or new_job.test_build_id)): |
| new_job.build_id = new_job.build_id.replace("gcs", "") |
| new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", "")) |
| new_job.test_build_id = (new_job.test_build_id.replace("gcs", "")) |
| self.ReserveDevices(target_device_serials) |
| new_job.status = Status.JOB_STATUS_DICT["ready"] |
| new_job.timestamp = datetime.datetime.now() |
| new_job_key = new_job.put() |
| schedule.children_jobs.append(new_job_key) |
| schedule.priority_value = Status.GetPriorityValue( |
| schedule.priority) |
| schedule.put() |
| self.logger.Println("A new job has been created.") |
| labs = model.LabModel.query( |
| model.LabModel.hostname == target_host).fetch() |
| return CREATE_JOB_SUCCESS, labs[0].name |
| else: |
| self.logger.Println("Cannot find builds to create a job.") |
| self.logger.Println("- Device branch / build - {} / {}".format( |
| new_job.manifest_branch, new_job.build_id)) |
| self.logger.Println("- GSI branch / build - {} / {}".format( |
| new_job.gsi_branch, new_job.gsi_build_id)) |
| self.logger.Println("- Test branch / build - {} / {}".format( |
| new_job.test_branch, new_job.test_build_id)) |
| return CREATE_JOB_FAILED_NO_BUILD, "" |
| |
| def FilterWithPeriod(self, schedules): |
| """Filters schedules with period. |
| |
| This method filters schedules if any children jobs are created within |
| period time. |
| |
| Args: |
| schedules: a list of model.ScheduleModel instances. |
| |
| Returns: |
| a list of model.ScheduleModel instances which need to create a new |
| job. |
| """ |
| ret_list = [] |
| if not schedules: |
| return ret_list |
| |
| if type(schedules) is not list: |
| schedules = [schedules] |
| |
| for schedule in schedules: |
| if not schedule.children_jobs: |
| ret_list.append(schedule) |
| continue |
| |
| latest_job_key = schedule.children_jobs[-1] |
| latest_job = latest_job_key.get() |
| |
| if datetime.datetime.now() - latest_job.timestamp > ( |
| datetime.timedelta( |
| minutes=self.GetCorrectedPeriod(schedule))): |
| ret_list.append(schedule) |
| |
| return ret_list |
| |
| def SelectTargetLab(self, schedule): |
| """Find target host and devices to schedule a new job. |
| |
| Args: |
| schedule: a proto containing the information of a schedule. |
| |
| Returns: |
| a string which represents hostname, |
| a string containing target lab and product with '/' separator, |
| a list of selected devices serial (see whether devices will be |
| selected later when the job is picked up.) |
| """ |
| |
| available_devices = [] |
| for target_device in schedule.device: |
| if "/" not in target_device: |
| self.logger.Println( |
| "Device malformed - {}".format(target_device)) |
| continue |
| |
| target_lab, target_product_type = target_device.split("/") |
| self.logger.Println("- Lab %s" % target_lab) |
| self.logger.Indent() |
| host_query = model.LabModel.query( |
| model.LabModel.name == target_lab) |
| target_hosts = host_query.fetch() |
| |
| if target_hosts: |
| for host in target_hosts: |
| if not (set(schedule.required_host_equipment) <= set( |
| host.host_equipment)): |
| continue |
| self.logger.Println("- Host: %s" % host.hostname) |
| self.logger.Indent() |
| device_query = model.DeviceModel.query( |
| model.DeviceModel.hostname == host.hostname, |
| model.DeviceModel.scheduling_status == |
| Status.DEVICE_SCHEDULING_STATUS_DICT["free"], |
| model.DeviceModel.status.IN([ |
| Status.DEVICE_STATUS_DICT["fastboot"], |
| Status.DEVICE_STATUS_DICT["online"], |
| Status.DEVICE_STATUS_DICT["ready"] |
| ])) |
| host_devices = device_query.fetch() |
| host_devices = [ |
| x for x in host_devices |
| if x.product.lower() == target_product_type.lower() and |
| (set(schedule.required_device_equipment) <= set( |
| x.device_equipment)) |
| ] |
| if len(host_devices) < schedule.shards: |
| self.logger.Println( |
| "A host {} does not have enough devices. " |
| "# of devices = {}, shards = {}".format( |
| host.hostname, len(host_devices), |
| schedule.shards)) |
| self.logger.Unindent() |
| continue |
| host_devices.sort( |
| key=lambda x: (len(x.device_equipment) |
| if x.device_equipment else 0)) |
| available_devices.append((host_devices, target_device)) |
| self.logger.Unindent() |
| |
| self.logger.Unindent() |
| |
| if not available_devices: |
| self.logger.Println("No hosts have enough devices for schedule!") |
| return None, None, [] |
| |
| available_devices.sort(key=lambda x: ( |
| sum([len(y.device_equipment) for y in x[0][:schedule.shards]]))) |
| selected_host_devices = available_devices[0] |
| return selected_host_devices[0][0].hostname, selected_host_devices[ |
| 1], [x.serial for x in selected_host_devices[0][:schedule.shards]] |
| |
| def GetProductName(self, schedule): |
| """Gets a product name from schedule instance. |
| |
| Args: |
| schedule: a schedule instance. |
| |
| Returns: |
| a string, product name in lowercase. |
| """ |
| if not schedule or not schedule.device: |
| return "" |
| |
| if "/" not in schedule.device[0]: |
| return "" |
| |
| return schedule.device[0].split("/")[1].lower() |
| |
| def GetCorrectedPeriod(self, schedule): |
| """Corrects and returns period value based on latest children jobs. |
| |
| Args: |
| schedule: a model.ScheduleModel instance containing schedule |
| information. |
| |
| Returns: |
| an integer, corrected schedule period. |
| """ |
| if not schedule.error_count or not schedule.children_jobs or ( |
| schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS): |
| return schedule.period |
| |
| latest_job = schedule.children_jobs[-1].get() |
| |
| if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]: |
| return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS |
| else: |
| return schedule.period |