mirror of
https://gitlab.freedesktop.org/mesa/mesa.git
synced 2024-12-01 06:04:12 +08:00
a5ccb4dafb
Use a lightweight container for ping, ssh, curl and bash support. Also use an image located at fd.o infrastructure, since we are having some issues with Collabora's gitlab one lately. Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/23534>
538 lines
18 KiB
Python
Executable File
538 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (C) 2020 - 2023 Collabora Limited
|
|
# Authors:
|
|
# Gustavo Padovan <gustavo.padovan@collabora.com>
|
|
# Guilherme Gallo <guilherme.gallo@collabora.com>
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
"""Send a job to LAVA, track it and collect log back"""
|
|
|
|
import contextlib
|
|
import json
|
|
import pathlib
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, fields
|
|
from datetime import datetime, timedelta
|
|
from io import StringIO
|
|
from os import environ, getenv, path
|
|
from typing import Any, Optional
|
|
|
|
import fire
|
|
from lava.exceptions import (
|
|
MesaCIException,
|
|
MesaCIParseException,
|
|
MesaCIRetryError,
|
|
MesaCITimeoutError,
|
|
)
|
|
from lava.utils import CONSOLE_LOG
|
|
from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
|
|
from lava.utils import (
|
|
GitlabSection,
|
|
LAVAJob,
|
|
LogFollower,
|
|
LogSectionType,
|
|
call_proxy,
|
|
fatal_err,
|
|
generate_lava_job_definition,
|
|
hide_sensitive_data,
|
|
print_log,
|
|
setup_lava_proxy,
|
|
)
|
|
from lavacli.utils import flow_yaml as lava_yaml
|
|
|
|
# Initialize structural logging with a defaultdict, it can be changed for more
|
|
# sophisticated dict-like data abstractions.
|
|
STRUCTURAL_LOG = defaultdict(list)
|
|
|
|
try:
|
|
from ci.structured_logger import StructuredLogger
|
|
except ImportError as e:
|
|
print_log(
|
|
f"Could not import StructuredLogger library: {e}. "
|
|
"Falling back to defaultdict based structured logger."
|
|
)
|
|
|
|
# Timeout in seconds to decide if the device from the dispatched LAVA job has
|
|
# hung or not due to the lack of new log output.
|
|
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60))
|
|
|
|
# How many seconds the script should wait before try a new polling iteration to
|
|
# check if the dispatched LAVA job is running or waiting in the job queue.
|
|
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
|
|
getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
|
|
)
|
|
|
|
# How many seconds the script will wait to let LAVA finalize the job and give
|
|
# the final details.
|
|
WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
|
|
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
|
|
getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6)
|
|
)
|
|
|
|
# How many seconds to wait between log output LAVA RPC calls.
|
|
LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
|
|
|
|
# How many retries should be made when a timeout happen.
|
|
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
|
|
getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
|
|
)
|
|
|
|
|
|
def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
|
|
"""
|
|
Investigate infrastructure errors from the job metadata.
|
|
If it finds an error, raise it as MesaCIException.
|
|
"""
|
|
if "result" not in metadata or metadata["result"] != "fail":
|
|
return
|
|
if "error_type" in metadata:
|
|
error_type = metadata["error_type"]
|
|
if error_type == "Infrastructure":
|
|
raise MesaCIException(
|
|
f"LAVA job {job_id} failed with Infrastructure Error. Retry."
|
|
)
|
|
if error_type == "Job":
|
|
# This happens when LAVA assumes that the job cannot terminate or
|
|
# with mal-formed job definitions. As we are always validating the
|
|
# jobs, only the former is probable to happen. E.g.: When some LAVA
|
|
# action timed out more times than expected in job definition.
|
|
raise MesaCIException(
|
|
f"LAVA job {job_id} failed with JobError "
|
|
"(possible LAVA timeout misconfiguration/bug). Retry."
|
|
)
|
|
if "case" in metadata and metadata["case"] == "validate":
|
|
raise MesaCIException(
|
|
f"LAVA job {job_id} failed validation (possible download error). Retry."
|
|
)
|
|
|
|
|
|
def raise_lava_error(job) -> None:
|
|
# Look for infrastructure errors, raise them, and retry if we see them.
|
|
results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
|
|
results = lava_yaml.load(results_yaml)
|
|
for res in results:
|
|
metadata = res["metadata"]
|
|
raise_exception_from_metadata(metadata, job.job_id)
|
|
|
|
# If we reach this far, it means that the job ended without hwci script
|
|
# result and no LAVA infrastructure problem was found
|
|
job.status = "fail"
|
|
|
|
|
|
def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"):
|
|
with GitlabSection(
|
|
"job_data",
|
|
"LAVA job info",
|
|
type=LogSectionType.LAVA_POST_PROCESSING,
|
|
start_collapsed=True,
|
|
colour=colour,
|
|
):
|
|
wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
|
|
while not job.is_post_processed() and wait_post_processing_retries > 0:
|
|
# Wait a little until LAVA finishes processing metadata
|
|
time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
|
|
wait_post_processing_retries -= 1
|
|
|
|
if not job.is_post_processed():
|
|
waited_for_sec: int = (
|
|
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
|
|
* WAIT_FOR_LAVA_POST_PROCESSING_SEC
|
|
)
|
|
print_log(
|
|
f"Waited for {waited_for_sec} seconds "
|
|
"for LAVA to post-process the job, it haven't finished yet. "
|
|
"Dumping it's info anyway"
|
|
)
|
|
|
|
details: dict[str, str] = job.show()
|
|
for field, value in details.items():
|
|
print(f"{field:<15}: {value}")
|
|
job.refresh_log()
|
|
|
|
|
|
def fetch_logs(job, max_idle_time, log_follower) -> None:
|
|
is_job_hanging(job, max_idle_time)
|
|
|
|
time.sleep(LOG_POLLING_TIME_SEC)
|
|
new_log_lines = fetch_new_log_lines(job)
|
|
parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
|
|
|
|
for line in parsed_lines:
|
|
print_log(line)
|
|
|
|
|
|
def is_job_hanging(job, max_idle_time):
|
|
# Poll to check for new logs, assuming that a prolonged period of
|
|
# silence means that the device has died and we should try it again
|
|
if datetime.now() - job.last_log_time > max_idle_time:
|
|
max_idle_time_min = max_idle_time.total_seconds() / 60
|
|
|
|
raise MesaCITimeoutError(
|
|
f"{CONSOLE_LOG['BOLD']}"
|
|
f"{CONSOLE_LOG['FG_YELLOW']}"
|
|
f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
|
|
"minutes. Retry."
|
|
f"{CONSOLE_LOG['RESET']}",
|
|
timeout_duration=max_idle_time,
|
|
)
|
|
|
|
|
|
def parse_log_lines(job, log_follower, new_log_lines):
|
|
|
|
if log_follower.feed(new_log_lines):
|
|
# If we had non-empty log data, we can assure that the device is alive.
|
|
job.heartbeat()
|
|
parsed_lines = log_follower.flush()
|
|
|
|
# Only parse job results when the script reaches the end of the logs.
|
|
# Depending on how much payload the RPC scheduler.jobs.logs get, it may
|
|
# reach the LAVA_POST_PROCESSING phase.
|
|
if log_follower.current_section.type in (
|
|
LogSectionType.TEST_CASE,
|
|
LogSectionType.LAVA_POST_PROCESSING,
|
|
):
|
|
parsed_lines = job.parse_job_result_from_log(parsed_lines)
|
|
return parsed_lines
|
|
|
|
|
|
def fetch_new_log_lines(job):
|
|
|
|
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
|
|
# Retry the log fetching several times before exposing the error.
|
|
for _ in range(5):
|
|
with contextlib.suppress(MesaCIParseException):
|
|
new_log_lines = job.get_logs()
|
|
break
|
|
else:
|
|
raise MesaCIParseException
|
|
return new_log_lines
|
|
|
|
|
|
def submit_job(job):
|
|
try:
|
|
job.submit()
|
|
except Exception as mesa_ci_err:
|
|
raise MesaCIException(
|
|
f"Could not submit LAVA job. Reason: {mesa_ci_err}"
|
|
) from mesa_ci_err
|
|
|
|
|
|
def wait_for_job_get_started(job):
|
|
print_log(f"Waiting for job {job.job_id} to start.")
|
|
while not job.is_started():
|
|
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
|
|
job.refresh_log()
|
|
print_log(f"Job {job.job_id} started.")
|
|
|
|
|
|
def bootstrap_log_follower() -> LogFollower:
|
|
gl = GitlabSection(
|
|
id="lava_boot",
|
|
header="LAVA boot",
|
|
type=LogSectionType.LAVA_BOOT,
|
|
start_collapsed=True,
|
|
)
|
|
print(gl.start())
|
|
return LogFollower(starting_section=gl)
|
|
|
|
|
|
def follow_job_execution(job, log_follower):
|
|
with log_follower:
|
|
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
|
|
# Start to check job's health
|
|
job.heartbeat()
|
|
while not job.is_finished:
|
|
fetch_logs(job, max_idle_time, log_follower)
|
|
structural_log_phases(job, log_follower)
|
|
|
|
# Mesa Developers expect to have a simple pass/fail job result.
|
|
# If this does not happen, it probably means a LAVA infrastructure error
|
|
# happened.
|
|
if job.status not in ["pass", "fail"]:
|
|
raise_lava_error(job)
|
|
|
|
# LogFollower does some cleanup after the early exit (trigger by
|
|
# `hwci: pass|fail` regex), let's update the phases after the cleanup.
|
|
structural_log_phases(job, log_follower)
|
|
|
|
|
|
def structural_log_phases(job, log_follower):
|
|
phases: dict[str, Any] = {
|
|
s.header.split(" - ")[0]: {
|
|
k: str(getattr(s, k)) for k in ("start_time", "end_time")
|
|
}
|
|
for s in log_follower.section_history
|
|
}
|
|
job.log["dut_job_phases"] = phases
|
|
|
|
|
|
def print_job_final_status(job):
|
|
if job.status == "running":
|
|
job.status = "hung"
|
|
|
|
color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
|
|
print_log(
|
|
f"{color}"
|
|
f"LAVA Job finished with status: {job.status}"
|
|
f"{CONSOLE_LOG['RESET']}"
|
|
)
|
|
|
|
job.refresh_log()
|
|
show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
|
|
|
|
|
|
def execute_job_with_retries(
|
|
proxy, job_definition, retry_count, jobs_log
|
|
) -> Optional[LAVAJob]:
|
|
last_failed_job = None
|
|
for attempt_no in range(1, retry_count + 2):
|
|
# Need to get the logger value from its object to enable autosave
|
|
# features, if AutoSaveDict is enabled from StructuredLogging module
|
|
jobs_log.append({})
|
|
job_log = jobs_log[-1]
|
|
job = LAVAJob(proxy, job_definition, job_log)
|
|
STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
|
|
try:
|
|
job_log["submitter_start_time"] = datetime.now().isoformat()
|
|
submit_job(job)
|
|
wait_for_job_get_started(job)
|
|
log_follower: LogFollower = bootstrap_log_follower()
|
|
follow_job_execution(job, log_follower)
|
|
return job
|
|
|
|
except (MesaCIException, KeyboardInterrupt) as exception:
|
|
job.handle_exception(exception)
|
|
|
|
finally:
|
|
print_job_final_status(job)
|
|
# If LAVA takes too long to post process the job, the submitter
|
|
# gives up and proceeds.
|
|
job_log["submitter_end_time"] = datetime.now().isoformat()
|
|
last_failed_job = job
|
|
print_log(
|
|
f"{CONSOLE_LOG['BOLD']}"
|
|
f"Finished executing LAVA job in the attempt #{attempt_no}"
|
|
f"{CONSOLE_LOG['RESET']}"
|
|
)
|
|
|
|
return last_failed_job
|
|
|
|
|
|
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
|
|
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
|
|
|
|
last_attempted_job = execute_job_with_retries(
|
|
proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
|
|
)
|
|
|
|
if last_attempted_job.exception is not None:
|
|
# Infra failed in all attempts
|
|
raise MesaCIRetryError(
|
|
f"{CONSOLE_LOG['BOLD']}"
|
|
f"{CONSOLE_LOG['FG_RED']}"
|
|
"Job failed after it exceeded the number of "
|
|
f"{number_of_retries} retries."
|
|
f"{CONSOLE_LOG['RESET']}",
|
|
retry_count=number_of_retries,
|
|
last_job=last_attempted_job,
|
|
)
|
|
|
|
return last_attempted_job
|
|
|
|
|
|
@dataclass
|
|
class PathResolver:
|
|
def __post_init__(self):
|
|
for field in fields(self):
|
|
value = getattr(self, field.name)
|
|
if not value:
|
|
continue
|
|
if field.type == pathlib.Path:
|
|
value = pathlib.Path(value)
|
|
setattr(self, field.name, value.resolve())
|
|
|
|
|
|
@dataclass
|
|
class LAVAJobSubmitter(PathResolver):
|
|
boot_method: str
|
|
ci_project_dir: str
|
|
device_type: str
|
|
job_timeout_min: int # The job timeout in minutes
|
|
build_url: str = None
|
|
dtb_filename: str = None
|
|
dump_yaml: bool = False # Whether to dump the YAML payload to stdout
|
|
first_stage_init: str = None
|
|
jwt_file: pathlib.Path = None
|
|
kernel_image_name: str = None
|
|
kernel_image_type: str = ""
|
|
kernel_url_prefix: str = None
|
|
lava_tags: str = "" # Comma-separated LAVA tags for the job
|
|
mesa_job_name: str = "mesa_ci_job"
|
|
pipeline_info: str = ""
|
|
rootfs_url_prefix: str = None
|
|
validate_only: bool = False # Whether to only validate the job, not execute it
|
|
visibility_group: str = None # Only affects LAVA farm maintainers
|
|
job_rootfs_overlay_url: str = None
|
|
structured_log_file: pathlib.Path = None # Log file path with structured LAVA log
|
|
ssh_client_image: str = None # x86_64 SSH client image to follow the job's output
|
|
__structured_log_context = contextlib.nullcontext() # Structured Logger context
|
|
|
|
def __post_init__(self) -> None:
|
|
super().__post_init__()
|
|
# Remove mesa job names with spaces, which breaks the lava-test-case command
|
|
self.mesa_job_name = self.mesa_job_name.split(" ")[0]
|
|
|
|
if not self.structured_log_file:
|
|
return
|
|
|
|
self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
|
|
self.proxy = setup_lava_proxy()
|
|
|
|
def __prepare_submission(self) -> str:
|
|
# Overwrite the timeout for the testcases with the value offered by the
|
|
# user. The testcase running time should be at least 4 times greater than
|
|
# the other sections (boot and setup), so we can safely ignore them.
|
|
# If LAVA fails to stop the job at this stage, it will fall back to the
|
|
# script section timeout with a reasonable delay.
|
|
GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(
|
|
minutes=self.job_timeout_min
|
|
)
|
|
|
|
job_definition = generate_lava_job_definition(self)
|
|
|
|
if self.dump_yaml:
|
|
self.dump_job_definition(job_definition)
|
|
|
|
validation_job = LAVAJob(self.proxy, job_definition)
|
|
if errors := validation_job.validate():
|
|
fatal_err(f"Error in LAVA job definition: {errors}")
|
|
print_log("LAVA job definition validated successfully")
|
|
|
|
return job_definition
|
|
|
|
@classmethod
|
|
def is_under_ci(cls):
|
|
ci_envvar: str = getenv("CI", "false")
|
|
return ci_envvar.lower() == "true"
|
|
|
|
def dump_job_definition(self, job_definition) -> None:
|
|
with GitlabSection(
|
|
"yaml_dump",
|
|
"LAVA job definition (YAML)",
|
|
type=LogSectionType.LAVA_BOOT,
|
|
start_collapsed=True,
|
|
):
|
|
print(hide_sensitive_data(job_definition))
|
|
|
|
def submit(self) -> None:
|
|
"""
|
|
Prepares and submits the LAVA job.
|
|
If `validate_only` is True, it validates the job without submitting it.
|
|
If the job finishes with a non-pass status or encounters an exception,
|
|
the program exits with a non-zero return code.
|
|
"""
|
|
job_definition: str = self.__prepare_submission()
|
|
|
|
if self.validate_only:
|
|
return
|
|
|
|
with self.__structured_log_context:
|
|
last_attempt_job = None
|
|
try:
|
|
last_attempt_job = retriable_follow_job(self.proxy, job_definition)
|
|
|
|
except MesaCIRetryError as retry_exception:
|
|
last_attempt_job = retry_exception.last_job
|
|
|
|
except Exception as exception:
|
|
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
|
|
raise exception
|
|
|
|
finally:
|
|
self.finish_script(last_attempt_job)
|
|
|
|
def print_log_artifact_url(self):
|
|
base_url = "https://$CI_PROJECT_ROOT_NAMESPACE.pages.freedesktop.org/"
|
|
artifacts_path = "-/$CI_PROJECT_NAME/-/jobs/$CI_JOB_ID/artifacts/"
|
|
relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd())
|
|
full_path = f"{base_url}{artifacts_path}{relative_log_path}"
|
|
artifact_url = path.expandvars(full_path)
|
|
|
|
print_log(f"Structural Logging data available at: {artifact_url}")
|
|
|
|
def finish_script(self, last_attempt_job):
|
|
if self.is_under_ci() and self.structured_log_file:
|
|
self.print_log_artifact_url()
|
|
|
|
if not last_attempt_job:
|
|
# No job was run, something bad happened
|
|
STRUCTURAL_LOG["job_combined_status"] = "script_crash"
|
|
current_exception = str(sys.exc_info()[0])
|
|
STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
|
|
raise SystemExit(1)
|
|
|
|
STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
|
|
|
|
if last_attempt_job.status != "pass":
|
|
raise SystemExit(1)
|
|
|
|
|
|
class StructuredLoggerWrapper:
|
|
def __init__(self, submitter: LAVAJobSubmitter) -> None:
|
|
self.__submitter: LAVAJobSubmitter = submitter
|
|
|
|
def _init_logger(self):
|
|
STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags
|
|
STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type
|
|
STRUCTURAL_LOG["job_combined_fail_reason"] = None
|
|
STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
|
|
STRUCTURAL_LOG["dut_attempt_counter"] = 0
|
|
|
|
# Initialize dut_jobs list to enable appends
|
|
STRUCTURAL_LOG["dut_jobs"] = []
|
|
|
|
@contextlib.contextmanager
|
|
def _simple_logger_context(self):
|
|
log_file = pathlib.Path(self.__submitter.structured_log_file)
|
|
log_file.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
# Truncate the file
|
|
log_file.write_text("")
|
|
yield
|
|
finally:
|
|
log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2))
|
|
|
|
def logger_context(self):
|
|
context = contextlib.nullcontext()
|
|
try:
|
|
|
|
global STRUCTURAL_LOG
|
|
STRUCTURAL_LOG = StructuredLogger(
|
|
self.__submitter.structured_log_file, truncate=True
|
|
).data
|
|
except NameError:
|
|
context = self._simple_logger_context()
|
|
|
|
self._init_logger()
|
|
return context
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
|
|
# GitLab runner -> GitLab primary -> user, safe to say we don't need any
|
|
# more buffering
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
sys.stderr.reconfigure(line_buffering=True)
|
|
# LAVA farm is giving datetime in UTC timezone, let's set it locally for the
|
|
# script run.
|
|
# Setting environ here will not affect the system time, as the os.environ
|
|
# lifetime follows the script one.
|
|
environ["TZ"] = "UTC"
|
|
time.tzset()
|
|
|
|
fire.Fire(LAVAJobSubmitter)
|