mirror of
https://gitlab.freedesktop.org/mesa/mesa.git
synced 2024-11-23 10:14:13 +08:00
99cd56a684
Add an argument to ci_run_n_monitor specifying certain stages to be excluded from consideration, defaulting to the one with post-merge and performance jobs. This allows, e.g., to run all Panfrost pre-merge jobs: ./ci_run_n_monitor.py --target 'panfrost.*' or to run all Freedreno pre-merge jobs: ./ci_run_n_monitor.py --target '.*' --include-stage freedreno Signed-off-by: Daniel Stone <daniels@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/30784>
563 lines
20 KiB
Python
Executable File
563 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# For the dependencies, see the requirements.txt
|
|
|
|
import logging
|
|
import re
|
|
import traceback
|
|
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
|
|
from collections import OrderedDict
|
|
from copy import deepcopy
|
|
from dataclasses import dataclass, field
|
|
from itertools import accumulate
|
|
from pathlib import Path
|
|
from subprocess import check_output
|
|
from textwrap import dedent
|
|
from typing import Any, Iterable, Optional, Pattern, TypedDict, Union
|
|
|
|
import yaml
|
|
from filecache import DAY, filecache
|
|
from gitlab_common import get_token_from_default_dir
|
|
from gql import Client, gql
|
|
from gql.transport.requests import RequestsHTTPTransport
|
|
from graphql import DocumentNode
|
|
|
|
|
|
class DagNode(TypedDict):
|
|
needs: set[str]
|
|
stage: str
|
|
# `name` is redundant but is here for retro-compatibility
|
|
name: str
|
|
|
|
|
|
# see create_job_needs_dag function for more details
|
|
Dag = dict[str, DagNode]
|
|
|
|
|
|
StageSeq = OrderedDict[str, set[str]]
|
|
|
|
|
|
def get_project_root_dir():
|
|
root_path = Path(__file__).parent.parent.parent.resolve()
|
|
gitlab_file = root_path / ".gitlab-ci.yml"
|
|
assert gitlab_file.exists()
|
|
|
|
return root_path
|
|
|
|
|
|
@dataclass
|
|
class GitlabGQL:
|
|
_transport: Any = field(init=False)
|
|
client: Client = field(init=False)
|
|
url: str = "https://gitlab.freedesktop.org/api/graphql"
|
|
token: Optional[str] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
self._setup_gitlab_gql_client()
|
|
|
|
def _setup_gitlab_gql_client(self) -> None:
|
|
# Select your transport with a defined url endpoint
|
|
headers = {}
|
|
if self.token:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
self._transport = RequestsHTTPTransport(url=self.url, headers=headers)
|
|
|
|
# Create a GraphQL client using the defined transport
|
|
self.client = Client(transport=self._transport, fetch_schema_from_transport=True)
|
|
|
|
def query(
|
|
self,
|
|
gql_file: Union[Path, str],
|
|
params: dict[str, Any] = {},
|
|
operation_name: Optional[str] = None,
|
|
paginated_key_loc: Iterable[str] = [],
|
|
disable_cache: bool = False,
|
|
) -> dict[str, Any]:
|
|
def run_uncached() -> dict[str, Any]:
|
|
if paginated_key_loc:
|
|
return self._sweep_pages(gql_file, params, operation_name, paginated_key_loc)
|
|
return self._query(gql_file, params, operation_name)
|
|
|
|
if disable_cache:
|
|
return run_uncached()
|
|
|
|
try:
|
|
# Create an auxiliary variable to deliver a cached result and enable catching exceptions
|
|
# Decorate the query to be cached
|
|
if paginated_key_loc:
|
|
result = self._sweep_pages_cached(
|
|
gql_file, params, operation_name, paginated_key_loc
|
|
)
|
|
else:
|
|
result = self._query_cached(gql_file, params, operation_name)
|
|
return result # type: ignore
|
|
except Exception as ex:
|
|
logging.error(f"Cached query failed with {ex}")
|
|
# print exception traceback
|
|
traceback_str = "".join(traceback.format_exception(ex))
|
|
logging.error(traceback_str)
|
|
self.invalidate_query_cache()
|
|
logging.error("Cache invalidated, retrying without cache")
|
|
finally:
|
|
return run_uncached()
|
|
|
|
def _query(
|
|
self,
|
|
gql_file: Union[Path, str],
|
|
params: dict[str, Any] = {},
|
|
operation_name: Optional[str] = None,
|
|
) -> dict[str, Any]:
|
|
# Provide a GraphQL query
|
|
source_path: Path = Path(__file__).parent
|
|
pipeline_query_file: Path = source_path / gql_file
|
|
|
|
query: DocumentNode
|
|
with open(pipeline_query_file, "r") as f:
|
|
pipeline_query = f.read()
|
|
query = gql(pipeline_query)
|
|
|
|
# Execute the query on the transport
|
|
return self.client.execute_sync(
|
|
query, variable_values=params, operation_name=operation_name
|
|
)
|
|
|
|
@filecache(DAY)
|
|
def _sweep_pages_cached(self, *args, **kwargs):
|
|
return self._sweep_pages(*args, **kwargs)
|
|
|
|
@filecache(DAY)
|
|
def _query_cached(self, *args, **kwargs):
|
|
return self._query(*args, **kwargs)
|
|
|
|
def _sweep_pages(
|
|
self, query, params, operation_name=None, paginated_key_loc: Iterable[str] = []
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Retrieve paginated data from a GraphQL API and concatenate the results into a single
|
|
response.
|
|
|
|
Args:
|
|
query: represents a filepath with the GraphQL query to be executed.
|
|
params: a dictionary that contains the parameters to be passed to the query. These
|
|
parameters can be used to filter or modify the results of the query.
|
|
operation_name: The `operation_name` parameter is an optional parameter that specifies
|
|
the name of the GraphQL operation to be executed. It is used when making a GraphQL
|
|
query to specify which operation to execute if there are multiple operations defined
|
|
in the GraphQL schema. If not provided, the default operation will be executed.
|
|
paginated_key_loc (Iterable[str]): The `paginated_key_loc` parameter is an iterable of
|
|
strings that represents the location of the paginated field within the response. It
|
|
is used to extract the paginated field from the response and append it to the final
|
|
result. The node has to be a list of objects with a `pageInfo` field that contains
|
|
at least the `hasNextPage` and `endCursor` fields.
|
|
|
|
Returns:
|
|
a dictionary containing the response from the query with the paginated field
|
|
concatenated.
|
|
"""
|
|
|
|
def fetch_page(cursor: str | None = None) -> dict[str, Any]:
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
logging.info(
|
|
f"Found more than 100 elements, paginating. "
|
|
f"Current cursor at {cursor}"
|
|
)
|
|
|
|
return self._query(query, params, operation_name)
|
|
|
|
# Execute the initial query
|
|
response: dict[str, Any] = fetch_page()
|
|
|
|
# Initialize an empty list to store the final result
|
|
final_partial_field: list[dict[str, Any]] = []
|
|
|
|
# Loop until all pages have been retrieved
|
|
while True:
|
|
# Get the partial field to be appended to the final result
|
|
partial_field = response
|
|
for key in paginated_key_loc:
|
|
partial_field = partial_field[key]
|
|
|
|
# Append the partial field to the final result
|
|
final_partial_field += partial_field["nodes"]
|
|
|
|
# Check if there are more pages to retrieve
|
|
page_info = partial_field["pageInfo"]
|
|
if not page_info["hasNextPage"]:
|
|
break
|
|
|
|
# Execute the query with the updated cursor parameter
|
|
response = fetch_page(page_info["endCursor"])
|
|
|
|
# Replace the "nodes" field in the original response with the final result
|
|
partial_field["nodes"] = final_partial_field
|
|
return response
|
|
|
|
def invalidate_query_cache(self) -> None:
|
|
logging.warning("Invalidating query cache")
|
|
try:
|
|
self._sweep_pages._db.clear()
|
|
self._query._db.clear()
|
|
except AttributeError as ex:
|
|
logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?")
|
|
|
|
|
|
def insert_early_stage_jobs(stage_sequence: StageSeq, jobs_metadata: Dag) -> Dag:
|
|
pre_processed_dag: dict[str, set[str]] = {}
|
|
jobs_from_early_stages = list(accumulate(stage_sequence.values(), set.union))
|
|
for job_name, metadata in jobs_metadata.items():
|
|
final_needs: set[str] = deepcopy(metadata["needs"])
|
|
# Pre-process jobs that are not based on needs field
|
|
# e.g. sanity job in mesa MR pipelines
|
|
if not final_needs:
|
|
job_stage: str = jobs_metadata[job_name]["stage"]
|
|
stage_index: int = list(stage_sequence.keys()).index(job_stage)
|
|
if stage_index > 0:
|
|
final_needs |= jobs_from_early_stages[stage_index - 1]
|
|
pre_processed_dag[job_name] = final_needs
|
|
|
|
for job_name, needs in pre_processed_dag.items():
|
|
jobs_metadata[job_name]["needs"] = needs
|
|
|
|
return jobs_metadata
|
|
|
|
|
|
def traverse_dag_needs(jobs_metadata: Dag) -> None:
|
|
created_jobs = set(jobs_metadata.keys())
|
|
for job, metadata in jobs_metadata.items():
|
|
final_needs: set = deepcopy(metadata["needs"]) & created_jobs
|
|
# Post process jobs that are based on needs field
|
|
partial = True
|
|
|
|
while partial:
|
|
next_depth: set[str] = {n for dn in final_needs if dn in jobs_metadata for n in jobs_metadata[dn]["needs"]}
|
|
partial: bool = not final_needs.issuperset(next_depth)
|
|
final_needs = final_needs.union(next_depth)
|
|
|
|
jobs_metadata[job]["needs"] = final_needs
|
|
|
|
|
|
def extract_stages_and_job_needs(
|
|
pipeline_jobs: dict[str, Any], pipeline_stages: dict[str, Any]
|
|
) -> tuple[StageSeq, Dag]:
|
|
jobs_metadata = Dag()
|
|
# Record the stage sequence to post process deps that are not based on needs
|
|
# field, for example: sanity job
|
|
stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
|
|
for stage in pipeline_stages["nodes"]:
|
|
stage_sequence[stage["name"]] = set()
|
|
|
|
for job in pipeline_jobs["nodes"]:
|
|
stage_sequence[job["stage"]["name"]].add(job["name"])
|
|
dag_job: DagNode = {
|
|
"name": job["name"],
|
|
"stage": job["stage"]["name"],
|
|
"needs": set([j["node"]["name"] for j in job["needs"]["edges"]]),
|
|
}
|
|
jobs_metadata[job["name"]] = dag_job
|
|
|
|
return stage_sequence, jobs_metadata
|
|
|
|
|
|
def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = True) -> Dag:
|
|
"""
|
|
This function creates a Directed Acyclic Graph (DAG) to represent a sequence of jobs, where each
|
|
job has a set of jobs that it depends on (its "needs") and belongs to a certain "stage".
|
|
The "name" of the job is used as the key in the dictionary.
|
|
|
|
For example, consider the following DAG:
|
|
|
|
1. build stage: job1 -> job2 -> job3
|
|
2. test stage: job2 -> job4
|
|
|
|
- The job needs for job3 are: job1, job2
|
|
- The job needs for job4 are: job2
|
|
- The job2 needs to wait all jobs from build stage to finish.
|
|
|
|
The resulting DAG would look like this:
|
|
|
|
dag = {
|
|
"job1": {"needs": set(), "stage": "build", "name": "job1"},
|
|
"job2": {"needs": {"job1", "job2", job3"}, "stage": "test", "name": "job2"},
|
|
"job3": {"needs": {"job1", "job2"}, "stage": "build", "name": "job3"},
|
|
"job4": {"needs": {"job2"}, "stage": "test", "name": "job4"},
|
|
}
|
|
|
|
To access the job needs, one can do:
|
|
|
|
dag["job3"]["needs"]
|
|
|
|
This will return the set of jobs that job3 needs: {"job1", "job2"}
|
|
|
|
Args:
|
|
gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the `GitlabGQL` class, which is
|
|
used to make GraphQL queries to the GitLab API.
|
|
params (dict): The `params` parameter is a dictionary that contains the necessary parameters
|
|
for the GraphQL query. It is used to specify the details of the pipeline for which the
|
|
job needs DAG is being created.
|
|
The specific keys and values in the `params` dictionary will depend on
|
|
the requirements of the GraphQL query being executed
|
|
disable_cache (bool): The `disable_cache` parameter is a boolean that specifies whether the
|
|
|
|
Returns:
|
|
The final DAG (Directed Acyclic Graph) representing the job dependencies sourced from needs
|
|
or stages rule.
|
|
"""
|
|
stages_jobs_gql = gl_gql.query(
|
|
"pipeline_details.gql",
|
|
params=params,
|
|
paginated_key_loc=["project", "pipeline", "jobs"],
|
|
disable_cache=disable_cache,
|
|
)
|
|
pipeline_data = stages_jobs_gql["project"]["pipeline"]
|
|
if not pipeline_data:
|
|
raise RuntimeError(f"Could not find any pipelines for {params}")
|
|
|
|
stage_sequence, jobs_metadata = extract_stages_and_job_needs(
|
|
pipeline_data["jobs"], pipeline_data["stages"]
|
|
)
|
|
# Fill the DAG with the job needs from stages that don't have any needs but still need to wait
|
|
# for previous stages
|
|
final_dag = insert_early_stage_jobs(stage_sequence, jobs_metadata)
|
|
# Now that each job has its direct needs filled correctly, update the "needs" field for each job
|
|
# in the DAG by performing a topological traversal
|
|
traverse_dag_needs(final_dag)
|
|
|
|
return final_dag
|
|
|
|
|
|
def filter_dag(
|
|
dag: Dag, job_name_regex: Pattern, include_stage_regex: Pattern, exclude_stage_regex: Pattern
|
|
) -> Dag:
|
|
filtered_jobs: Dag = Dag({})
|
|
for (job, data) in dag.items():
|
|
if not job_name_regex.fullmatch(job):
|
|
continue
|
|
if not include_stage_regex.fullmatch(data["stage"]):
|
|
continue
|
|
if exclude_stage_regex.fullmatch(data["stage"]):
|
|
continue
|
|
filtered_jobs[job] = data
|
|
return filtered_jobs
|
|
|
|
|
|
def print_dag(dag: Dag) -> None:
|
|
for job, data in sorted(dag.items()):
|
|
print(f"{job}:\n\t{' '.join(data['needs'])}\n")
|
|
|
|
|
|
def fetch_merged_yaml(gl_gql: GitlabGQL, params) -> dict[str, Any]:
|
|
params["content"] = dedent("""\
|
|
include:
|
|
- local: .gitlab-ci.yml
|
|
""")
|
|
raw_response = gl_gql.query("job_details.gql", params)
|
|
ci_config = raw_response["ciConfig"]
|
|
if merged_yaml := ci_config["mergedYaml"]:
|
|
return yaml.safe_load(merged_yaml)
|
|
if "errors" in ci_config:
|
|
for error in ci_config["errors"]:
|
|
print(error)
|
|
|
|
gl_gql.invalidate_query_cache()
|
|
raise ValueError(
|
|
"""
|
|
Could not fetch any content for merged YAML,
|
|
please verify if the git SHA exists in remote.
|
|
Maybe you forgot to `git push`? """
|
|
)
|
|
|
|
|
|
def recursive_fill(job, relationship_field, target_data, acc_data: dict, merged_yaml):
|
|
if relatives := job.get(relationship_field):
|
|
if isinstance(relatives, str):
|
|
relatives = [relatives]
|
|
|
|
for relative in relatives:
|
|
parent_job = merged_yaml[relative]
|
|
acc_data = recursive_fill(parent_job, acc_data, merged_yaml) # type: ignore
|
|
|
|
acc_data |= job.get(target_data, {})
|
|
|
|
return acc_data
|
|
|
|
|
|
def get_variables(job, merged_yaml, project_path, sha) -> dict[str, str]:
|
|
p = get_project_root_dir() / ".gitlab-ci" / "image-tags.yml"
|
|
image_tags = yaml.safe_load(p.read_text())
|
|
|
|
variables = image_tags["variables"]
|
|
variables |= merged_yaml["variables"]
|
|
variables |= job["variables"]
|
|
variables["CI_PROJECT_PATH"] = project_path
|
|
variables["CI_PROJECT_NAME"] = project_path.split("/")[1]
|
|
variables["CI_REGISTRY_IMAGE"] = "registry.freedesktop.org/${CI_PROJECT_PATH}"
|
|
variables["CI_COMMIT_SHA"] = sha
|
|
|
|
while recurse_among_variables_space(variables):
|
|
pass
|
|
|
|
return variables
|
|
|
|
|
|
# Based on: https://stackoverflow.com/a/2158532/1079223
|
|
def flatten(xs):
|
|
for x in xs:
|
|
if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
|
|
yield from flatten(x)
|
|
else:
|
|
yield x
|
|
|
|
|
|
def get_full_script(job) -> list[str]:
|
|
script = []
|
|
for script_part in ("before_script", "script", "after_script"):
|
|
script.append(f"# {script_part}")
|
|
lines = flatten(job.get(script_part, []))
|
|
script.extend(lines)
|
|
script.append("")
|
|
|
|
return script
|
|
|
|
|
|
def recurse_among_variables_space(var_graph) -> bool:
|
|
updated = False
|
|
for var, value in var_graph.items():
|
|
value = str(value)
|
|
dep_vars = []
|
|
if match := re.findall(r"(\$[{]?[\w\d_]*[}]?)", value):
|
|
all_dep_vars = [v.lstrip("${").rstrip("}") for v in match]
|
|
# print(value, match, all_dep_vars)
|
|
dep_vars = [v for v in all_dep_vars if v in var_graph]
|
|
|
|
for dep_var in dep_vars:
|
|
dep_value = str(var_graph[dep_var])
|
|
new_value = var_graph[var]
|
|
new_value = new_value.replace(f"${{{dep_var}}}", dep_value)
|
|
new_value = new_value.replace(f"${dep_var}", dep_value)
|
|
var_graph[var] = new_value
|
|
updated |= dep_value != new_value
|
|
|
|
return updated
|
|
|
|
|
|
def print_job_final_definition(job_name, merged_yaml, project_path, sha):
|
|
job = merged_yaml[job_name]
|
|
variables = get_variables(job, merged_yaml, project_path, sha)
|
|
|
|
print("# --------- variables ---------------")
|
|
for var, value in sorted(variables.items()):
|
|
print(f"export {var}={value!r}")
|
|
|
|
# TODO: Recurse into needs to get full script
|
|
# TODO: maybe create a extra yaml file to avoid too much rework
|
|
script = get_full_script(job)
|
|
print()
|
|
print()
|
|
print("# --------- full script ---------------")
|
|
print("\n".join(script))
|
|
|
|
if image := variables.get("MESA_IMAGE"):
|
|
print()
|
|
print()
|
|
print("# --------- container image ---------------")
|
|
print(image)
|
|
|
|
|
|
def from_sha_to_pipeline_iid(gl_gql: GitlabGQL, params) -> str:
|
|
result = gl_gql.query("pipeline_utils.gql", params)
|
|
|
|
return result["project"]["pipelines"]["nodes"][0]["iid"]
|
|
|
|
|
|
def parse_args() -> Namespace:
|
|
parser = ArgumentParser(
|
|
formatter_class=ArgumentDefaultsHelpFormatter,
|
|
description="CLI and library with utility functions to debug jobs via Gitlab GraphQL",
|
|
epilog=f"""Example:
|
|
{Path(__file__).name} --print-dag""",
|
|
)
|
|
parser.add_argument("-pp", "--project-path", type=str, default="mesa/mesa")
|
|
parser.add_argument("--sha", "--rev", type=str, default='HEAD')
|
|
parser.add_argument(
|
|
"--regex",
|
|
type=str,
|
|
required=False,
|
|
default=".*",
|
|
help="Regex pattern for the job name to be considered",
|
|
)
|
|
parser.add_argument(
|
|
"--include-stage",
|
|
type=str,
|
|
required=False,
|
|
default=".*",
|
|
help="Regex pattern for the stage name to be considered",
|
|
)
|
|
parser.add_argument(
|
|
"--exclude-stage",
|
|
type=str,
|
|
required=False,
|
|
default="^$",
|
|
help="Regex pattern for the stage name to be excluded",
|
|
)
|
|
mutex_group_print = parser.add_mutually_exclusive_group()
|
|
mutex_group_print.add_argument(
|
|
"--print-dag",
|
|
action="store_true",
|
|
help="Print job needs DAG",
|
|
)
|
|
mutex_group_print.add_argument(
|
|
"--print-merged-yaml",
|
|
action="store_true",
|
|
help="Print the resulting YAML for the specific SHA",
|
|
)
|
|
mutex_group_print.add_argument(
|
|
"--print-job-manifest",
|
|
metavar='JOB_NAME',
|
|
type=str,
|
|
help="Print the resulting job data"
|
|
)
|
|
parser.add_argument(
|
|
"--gitlab-token-file",
|
|
type=str,
|
|
default=get_token_from_default_dir(),
|
|
help="force GitLab token, otherwise it's read from $XDG_CONFIG_HOME/gitlab-token",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
args.gitlab_token = Path(args.gitlab_token_file).read_text().strip()
|
|
return args
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
gl_gql = GitlabGQL(token=args.gitlab_token)
|
|
|
|
sha = check_output(['git', 'rev-parse', args.sha]).decode('ascii').strip()
|
|
|
|
if args.print_dag:
|
|
iid = from_sha_to_pipeline_iid(gl_gql, {"projectPath": args.project_path, "sha": sha})
|
|
dag = create_job_needs_dag(
|
|
gl_gql, {"projectPath": args.project_path, "iid": iid}, disable_cache=True
|
|
)
|
|
|
|
dag = filter_dag(dag, re.compile(args.regex), re.compile(args.include_stage), re.compile(args.exclude_stage))
|
|
|
|
print_dag(dag)
|
|
|
|
if args.print_merged_yaml or args.print_job_manifest:
|
|
merged_yaml = fetch_merged_yaml(
|
|
gl_gql, {"projectPath": args.project_path, "sha": sha}
|
|
)
|
|
|
|
if args.print_merged_yaml:
|
|
print(yaml.dump(merged_yaml, indent=2))
|
|
|
|
if args.print_job_manifest:
|
|
print_job_final_definition(
|
|
args.print_job_manifest, merged_yaml, args.project_path, sha
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|