blob: 2ebe6879cbf13e89024f145049886a4a072be47d [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License") + "\n</pre>");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from webapp.src import vtslab_status as Status
from webapp.src.proto import model
from webapp.src.scheduler import schedule_worker
import webapp2
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
PAGING_SIZE = 1000
DICT_MODELS = {
"build": model.BuildModel,
"device": model.DeviceModel,
"lab": model.LabModel,
"job": model.JobModel,
"schedule": model.ScheduleModel
}
class CreateIndex(webapp2.RequestHandler):
"""Cron class for /tasks/indexing/{model}."""
def get(self, arg):
"""Creates a task to re-index, with given URL format."""
index_list = []
if arg:
if arg.startswith("/") and arg[1:].lower() in DICT_MODELS.keys():
index_list.append(arg[1:].lower())
else:
self.response.write("<pre>Access Denied. Please visit "
"/tasks/indexing/{model}</pre>")
return
else:
# accessed by /tasks/indexing
index_list.extend(DICT_MODELS.keys())
self.response.write(
"<pre>Re-indexing task{} for {} {} going to be created.</pre>".
format("s"
if len(index_list) > 1 else "", ", ".join(index_list), "are"
if len(index_list) > 1 else "is"))
for model_type in index_list:
task = taskqueue.add(
url="/worker/indexing",
target="worker",
queue_name="queue-indexing",
transactional=False,
params={
"model_type": model_type
})
self.response.write(
"<pre>Re-indexing task for {} is created. ETA: {}</pre>".
format(model_type, task.eta))
class IndexingHandler(webapp2.RequestHandler):
"""Task queue handler class to re-index ndb model."""
def post(self):
"""Fetch entities and process model specific jobs."""
reload(model)
model_type = self.request.get("model_type")
num_updated = 0
next_cursor = None
more = True
while more:
query = DICT_MODELS[model_type].query()
entities, next_cursor, more = query.fetch_page(
PAGING_SIZE, start_cursor=next_cursor)
to_put = []
for entity in entities:
if model_type == "build":
pass
elif model_type == "device":
pass
elif model_type == "lab":
pass
elif model_type == "job":
# uses bits 0-1 to indicate version.
test_type = schedule_worker.GetTestVersionType(
entity.manifest_branch, entity.gsi_branch)
# uses bit 2
if entity.require_signed_device_build:
test_type |= (
Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED])
entity.test_type = test_type
if not entity.parent_schedule:
# finds and links to a parent schedule.
parent_schedule_query = model.ScheduleModel.query(
model.ScheduleModel.priority == entity.priority,
model.ScheduleModel.test_name == entity.test_name,
model.ScheduleModel.period == entity.period,
model.ScheduleModel.build_storage_type == (
entity.build_storage_type),
model.ScheduleModel.manifest_branch == (
entity.manifest_branch),
model.ScheduleModel.build_target == (
entity.build_target),
model.ScheduleModel.device_pab_account_id == (
entity.pab_account_id),
model.ScheduleModel.shards == entity.shards,
model.ScheduleModel.retry_count == (
entity.retry_count),
model.ScheduleModel.gsi_storage_type == (
entity.gsi_storage_type),
model.ScheduleModel.gsi_branch == (
entity.gsi_branch),
model.ScheduleModel.gsi_build_target == (
entity.gsi_build_target),
model.ScheduleModel.gsi_pab_account_id == (
entity.gsi_pab_account_id),
model.ScheduleModel.gsi_vendor_version == (
entity.gsi_vendor_version),
model.ScheduleModel.test_storage_type == (
entity.test_storage_type),
model.ScheduleModel.test_branch == (
entity.test_branch),
model.ScheduleModel.test_build_target == (
entity.test_build_target),
model.ScheduleModel.test_pab_account_id == (
entity.test_pab_account_id))
parent_schedules = parent_schedule_query.fetch()
if not parent_schedules:
logging.error("Parent not found.")
else:
parent_schedule = parent_schedules[0]
parent_schedule.children_jobs.append(entity.key)
entity.parent_schedule = parent_schedule.key
to_put.append(parent_schedule)
elif model_type == "schedule":
if entity.error_count is None:
entity.error_count = 0
if entity.suspended is None:
entity.suspended = False
if entity.build_storage_type is None:
entity.build_storage_type = Status.STORAGE_TYPE_DICT[
"PAB"]
# remove None children jobs.
if entity.children_jobs:
entity.children_jobs = [
x for x in entity.children_jobs if x]
else:
entity.children_jobs = []
for attr in ["has_bootloader_img", "has_radio_img"]:
if getattr(entity, attr, None) is None:
setattr(entity, attr, True)
# set priority_value for old schedules.
if entity.priority_value is None:
entity.priority_value = Status.GetPriorityValue(
entity.priority)
else:
pass
to_put.append(entity)
if to_put:
ndb.put_multi(to_put)
num_updated += len(to_put)
logging.info("{} indexing complete with {} updates!".format(
model_type, num_updated))