blob: 4c4b20f5a28e3d2a4b991ac6931cf4969eecd5fb [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import itertools
import logging
import re
from google.appengine.ext import ndb
from webapp.src import vtslab_status as Status
from webapp.src.proto import model
from webapp.src.utils import logger
import webapp2
MAX_LOG_CHARACTERS = 10000 # maximum number of characters per each log
BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60 # retry minutes when boot-up error is occurred
CREATE_JOB_SUCCESS = "success"
CREATE_JOB_FAILED_NO_BUILD = "no_build"
CREATE_JOB_FAILED_NO_DEVICE = "no_device"
def GetTestVersionType(manifest_branch, gsi_branch, test_type=0):
"""Compares manifest branch and gsi branch to get test type.
This function only completes two LSBs which represent version related
test type.
Args:
manifest_branch: a string, manifest branch name.
gsi_branch: a string, gsi branch name.
test_type: an integer, previous test type value.
Returns:
An integer, test type value.
"""
if not test_type:
value = 0
else:
# clear two bits
value = test_type & ~(1 | 1 << 1)
if not manifest_branch:
logging.debug("manifest branch cannot be empty or None.")
return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
if not gsi_branch:
logging.debug("gsi_branch is empty.")
return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]
gcs_pattern = "^gs://.*/v([0-9.]*)/.*"
q_pattern = "(git_)?(aosp-)?q.*"
p_pattern = "(git_)?(aosp-)?p.*"
o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*"
o_pattern = "(git_)?(aosp-)?o.*"
master_pattern = "(git_)?(aosp-)?master"
gcs_search = re.search(gcs_pattern, manifest_branch)
if gcs_search:
device_version = gcs_search.group(1)
elif re.match(q_pattern, manifest_branch):
device_version = "10.0"
elif re.match(p_pattern, manifest_branch):
device_version = "9.0"
elif re.match(o_mr1_pattern, manifest_branch):
device_version = "8.1"
elif re.match(o_pattern, manifest_branch):
device_version = "8.0"
elif re.match(master_pattern, manifest_branch):
device_version = "master"
else:
logging.debug("Unknown device version.")
return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
gcs_search = re.search(gcs_pattern, gsi_branch)
if gcs_search:
gsi_version = gcs_search.group(1)
elif re.match(q_pattern, gsi_branch):
gsi_version = "10.0"
elif re.match(p_pattern, gsi_branch):
gsi_version = "9.0"
elif re.match(o_mr1_pattern, gsi_branch):
gsi_version = "8.1"
elif re.match(o_pattern, gsi_branch):
gsi_version = "8.0"
elif re.match(master_pattern, gsi_branch):
gsi_version = "master"
else:
logging.debug("Unknown gsi version.")
return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
if device_version == gsi_version:
return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]
else:
return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA]
class ScheduleHandler(webapp2.RequestHandler):
"""Background worker class for /worker/schedule_handler.
This class pull tasks from 'queue-schedule' queue and processes in
background service 'worker'.
Attributes:
logger: Logger class
"""
logger = logger.Logger()
def ReserveDevices(self, target_device_serials):
"""Reserves devices.
Args:
target_device_serials: a list of strings, containing target device
serial numbers.
"""
device_query = model.DeviceModel.query(
model.DeviceModel.serial.IN(target_device_serials))
devices = device_query.fetch()
devices_to_put = []
for device in devices:
device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
"reserved"]
devices_to_put.append(device)
if devices_to_put:
ndb.put_multi(devices_to_put)
def FindBuildId(self, artifact_type, manifest_branch, target,
signed=False):
"""Finds a designated build ID.
Args:
artifact_type: a string, build artifact type.
manifest_branch: a string, build manifest branch.
target: a string which build target and type are joined by '-'.
signed: a boolean to get a signed build.
Return:
string, build ID found.
"""
build_id = ""
if "-" in target:
build_target, build_type = target.split("-")
else:
build_target = target
build_type = ""
if not artifact_type or not manifest_branch or not build_target:
self.logger.Println("The argument format is invalid.")
return build_id
build_query = model.BuildModel.query(
model.BuildModel.artifact_type == artifact_type,
model.BuildModel.manifest_branch == manifest_branch,
model.BuildModel.build_target == build_target,
model.BuildModel.build_type == build_type)
builds = build_query.fetch()
if builds:
builds = [
build for build in builds
if (build.timestamp >
datetime.datetime.now() - datetime.timedelta(hours=72))
]
if builds:
self.logger.Println("-- Found build ID")
builds.sort(key=lambda x: x.build_id, reverse=True)
for build in builds:
if not signed or build.signed:
build_id = build.build_id
break
return build_id
def post(self):
self.logger.Clear()
manual_job = False
schedule_key = self.request.get("schedule_key")
if schedule_key:
key = ndb.key.Key(urlsafe=schedule_key)
manual_job = True
schedules = [key.get()]
else:
schedule_query = model.ScheduleModel.query(
model.ScheduleModel.suspended != True)
schedules = schedule_query.fetch()
if schedules:
# filter out the schedules which are not updated within 72 hours.
schedules = [
schedule for schedule in schedules
if (schedule.timestamp >
datetime.datetime.now() - datetime.timedelta(hours=72))
]
schedules = self.FilterWithPeriod(schedules)
if schedules:
schedules.sort(key=lambda x: self.GetProductName(x))
group_by_product = [
list(g)
for _, g in itertools.groupby(schedules,
lambda x: self.GetProductName(x))
]
for group in group_by_product:
group.sort(key=lambda x: x.priority_value if (
x.priority_value) else Status.GetPriorityValue(x.priority))
create_result = {
CREATE_JOB_SUCCESS: [],
CREATE_JOB_FAILED_NO_BUILD: [],
CREATE_JOB_FAILED_NO_DEVICE: []
}
for schedule in group:
self.logger.Println("")
self.logger.Println("Schedule: %s (branch: %s)" %
(schedule.test_name,
schedule.manifest_branch))
self.logger.Println(
"Build Target: %s" % schedule.build_target)
self.logger.Println("Device: %s" % schedule.device)
self.logger.Indent()
result, lab = self.CreateJob(schedule, manual_job)
if result == CREATE_JOB_SUCCESS:
create_result[result].append(lab)
else:
create_result[result].append(schedule)
self.logger.Unindent()
# if any schedule in group created a job, increase priority of
# the schedules which couldn't create due to out of devices.
schedules_to_put = []
for lab in create_result[CREATE_JOB_SUCCESS]:
for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]:
if any([lab in target for target in schedule.device
]) and schedule not in schedules_to_put:
if schedule.priority_value is None:
schedule.priority_value = (
Status.GetPriorityValue(schedule.priority))
if schedule.priority_value > 0:
schedule.priority_value -= 1
schedules_to_put.append(schedule)
if schedules_to_put:
ndb.put_multi(schedules_to_put)
self.logger.Println("Scheduling completed.")
lines = self.logger.Get()
lines = [line.strip() for line in lines]
outputs = []
chars = 0
for line in lines:
chars += len(line)
if chars > MAX_LOG_CHARACTERS:
logging.info("\n".join(outputs))
outputs = []
chars = len(line)
outputs.append(line)
logging.info("\n".join(outputs))
def CreateJob(self, schedule, manual_job=False):
"""Creates a job for given schedule.
Args:
schedule: model.ScheduleModel instance.
manual_job: True if a job is created by a user, False otherwise.
Returns:
a string of job creation result message.
a string of lab name if job is created, otherwise empty string.
"""
target_host, target_device, target_device_serials = (
self.SelectTargetLab(schedule))
if not target_host:
return CREATE_JOB_FAILED_NO_DEVICE, ""
self.logger.Println("- Target host: %s" % target_host)
self.logger.Println("- Target device: %s" % target_device)
self.logger.Println("- Target serials: %s" % target_device_serials)
# create job and add.
new_job = model.JobModel()
new_job.hostname = target_host
new_job.priority = schedule.priority
new_job.test_name = schedule.test_name
new_job.require_signed_device_build = (
schedule.require_signed_device_build)
new_job.device = target_device
new_job.period = schedule.period
new_job.serial.extend(target_device_serials)
new_job.build_storage_type = schedule.build_storage_type
new_job.manifest_branch = schedule.manifest_branch
new_job.build_target = schedule.build_target
new_job.pab_account_id = schedule.device_pab_account_id
new_job.shards = schedule.shards
new_job.param = schedule.param
new_job.retry_count = schedule.retry_count
new_job.gsi_storage_type = schedule.gsi_storage_type
new_job.gsi_branch = schedule.gsi_branch
new_job.gsi_build_target = schedule.gsi_build_target
new_job.gsi_pab_account_id = schedule.gsi_pab_account_id
new_job.gsi_vendor_version = schedule.gsi_vendor_version
new_job.test_storage_type = schedule.test_storage_type
new_job.test_branch = schedule.test_branch
new_job.test_build_target = schedule.test_build_target
new_job.test_pab_account_id = schedule.test_pab_account_id
new_job.parent_schedule = schedule.key
new_job.image_package_repo_base = schedule.image_package_repo_base
new_job.required_host_equipment = schedule.required_host_equipment
new_job.required_device_equipment = schedule.required_device_equipment
new_job.has_bootloader_img = schedule.has_bootloader_img
new_job.has_radio_img = schedule.has_radio_img
new_job.report_bucket = schedule.report_bucket
new_job.report_spreadsheet_id = schedule.report_spreadsheet_id
new_job.report_persistent_url = schedule.report_persistent_url
new_job.report_reference_url = schedule.report_reference_url
# uses bit 0-1 to indicate version.
test_type = GetTestVersionType(schedule.manifest_branch,
schedule.gsi_branch)
# uses bit 2
if schedule.require_signed_device_build:
test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]
if manual_job:
test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL]
new_job.test_type = test_type
new_job.build_id = ""
new_job.gsi_build_id = ""
new_job.test_build_id = ""
for artifact_type in ["device", "gsi", "test"]:
if artifact_type == "device":
storage_type_text = "build_storage_type"
manifest_branch_text = "manifest_branch"
build_target_text = "build_target"
build_id_text = "build_id"
signed = new_job.require_signed_device_build
else:
storage_type_text = artifact_type + "_storage_type"
manifest_branch_text = artifact_type + "_branch"
build_target_text = artifact_type + "_build_target"
build_id_text = artifact_type + "_build_id"
signed = False
manifest_branch = getattr(new_job, manifest_branch_text)
build_target = getattr(new_job, build_target_text)
storage_type = getattr(new_job, storage_type_text)
if storage_type == Status.STORAGE_TYPE_DICT["PAB"]:
build_id = self.FindBuildId(
artifact_type=artifact_type,
manifest_branch=manifest_branch,
target=build_target,
signed=signed)
elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]:
# temp value to distinguish from empty values.
build_id = "gcs"
else:
build_id = ""
self.logger.Println(
"Unexpected storage type (%s)." % storage_type)
setattr(new_job, build_id_text, build_id)
if ((not new_job.manifest_branch or new_job.build_id)
and (not new_job.gsi_branch or new_job.gsi_build_id)
and (not new_job.test_branch or new_job.test_build_id)):
new_job.build_id = new_job.build_id.replace("gcs", "")
new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", ""))
new_job.test_build_id = (new_job.test_build_id.replace("gcs", ""))
self.ReserveDevices(target_device_serials)
new_job.status = Status.JOB_STATUS_DICT["ready"]
new_job.timestamp = datetime.datetime.now()
new_job_key = new_job.put()
schedule.children_jobs.append(new_job_key)
schedule.priority_value = Status.GetPriorityValue(
schedule.priority)
schedule.put()
self.logger.Println("A new job has been created.")
labs = model.LabModel.query(
model.LabModel.hostname == target_host).fetch()
return CREATE_JOB_SUCCESS, labs[0].name
else:
self.logger.Println("Cannot find builds to create a job.")
self.logger.Println("- Device branch / build - {} / {}".format(
new_job.manifest_branch, new_job.build_id))
self.logger.Println("- GSI branch / build - {} / {}".format(
new_job.gsi_branch, new_job.gsi_build_id))
self.logger.Println("- Test branch / build - {} / {}".format(
new_job.test_branch, new_job.test_build_id))
return CREATE_JOB_FAILED_NO_BUILD, ""
def FilterWithPeriod(self, schedules):
"""Filters schedules with period.
This method filters schedules if any children jobs are created within
period time.
Args:
schedules: a list of model.ScheduleModel instances.
Returns:
a list of model.ScheduleModel instances which need to create a new
job.
"""
ret_list = []
if not schedules:
return ret_list
if type(schedules) is not list:
schedules = [schedules]
for schedule in schedules:
if not schedule.children_jobs:
ret_list.append(schedule)
continue
latest_job_key = schedule.children_jobs[-1]
latest_job = latest_job_key.get()
if datetime.datetime.now() - latest_job.timestamp > (
datetime.timedelta(
minutes=self.GetCorrectedPeriod(schedule))):
ret_list.append(schedule)
return ret_list
def SelectTargetLab(self, schedule):
"""Find target host and devices to schedule a new job.
Args:
schedule: a proto containing the information of a schedule.
Returns:
a string which represents hostname,
a string containing target lab and product with '/' separator,
a list of selected devices serial (see whether devices will be
selected later when the job is picked up.)
"""
available_devices = []
for target_device in schedule.device:
if "/" not in target_device:
self.logger.Println(
"Device malformed - {}".format(target_device))
continue
target_lab, target_product_type = target_device.split("/")
self.logger.Println("- Lab %s" % target_lab)
self.logger.Indent()
host_query = model.LabModel.query(
model.LabModel.name == target_lab)
target_hosts = host_query.fetch()
if target_hosts:
for host in target_hosts:
if not (set(schedule.required_host_equipment) <= set(
host.host_equipment)):
continue
self.logger.Println("- Host: %s" % host.hostname)
self.logger.Indent()
device_query = model.DeviceModel.query(
model.DeviceModel.hostname == host.hostname,
model.DeviceModel.scheduling_status ==
Status.DEVICE_SCHEDULING_STATUS_DICT["free"],
model.DeviceModel.status.IN([
Status.DEVICE_STATUS_DICT["fastboot"],
Status.DEVICE_STATUS_DICT["online"],
Status.DEVICE_STATUS_DICT["ready"]
]))
host_devices = device_query.fetch()
host_devices = [
x for x in host_devices
if x.product.lower() == target_product_type.lower() and
(set(schedule.required_device_equipment) <= set(
x.device_equipment))
]
if len(host_devices) < schedule.shards:
self.logger.Println(
"A host {} does not have enough devices. "
"# of devices = {}, shards = {}".format(
host.hostname, len(host_devices),
schedule.shards))
self.logger.Unindent()
continue
host_devices.sort(
key=lambda x: (len(x.device_equipment)
if x.device_equipment else 0))
available_devices.append((host_devices, target_device))
self.logger.Unindent()
self.logger.Unindent()
if not available_devices:
self.logger.Println("No hosts have enough devices for schedule!")
return None, None, []
available_devices.sort(key=lambda x: (
sum([len(y.device_equipment) for y in x[0][:schedule.shards]])))
selected_host_devices = available_devices[0]
return selected_host_devices[0][0].hostname, selected_host_devices[
1], [x.serial for x in selected_host_devices[0][:schedule.shards]]
def GetProductName(self, schedule):
"""Gets a product name from schedule instance.
Args:
schedule: a schedule instance.
Returns:
a string, product name in lowercase.
"""
if not schedule or not schedule.device:
return ""
if "/" not in schedule.device[0]:
return ""
return schedule.device[0].split("/")[1].lower()
def GetCorrectedPeriod(self, schedule):
"""Corrects and returns period value based on latest children jobs.
Args:
schedule: a model.ScheduleModel instance containing schedule
information.
Returns:
an integer, corrected schedule period.
"""
if not schedule.error_count or not schedule.children_jobs or (
schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS):
return schedule.period
latest_job = schedule.children_jobs[-1].get()
if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]:
return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS
else:
return schedule.period