diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 13a00e3..dde32f5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -56,4 +56,4 @@ jobs: VALIDATE_BASH: false VALIDATE_TERRAFORM_TERRASCAN: false VALIDATE_MARKDOWN: false - FILTER_REGEX_EXCLUDE: monitoring/.* + FILTER_REGEX_EXCLUDE: (monitoring/.*)|(dev/.*) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/dev/.gitignore b/dev/.gitignore new file mode 100644 index 0000000..8baa2d6 --- /dev/null +++ b/dev/.gitignore @@ -0,0 +1 @@ +.httpcache/ diff --git a/dev/README.md b/dev/README.md new file mode 100644 index 0000000..ab545fb --- /dev/null +++ b/dev/README.md @@ -0,0 +1,15 @@ +# developer scripts + +This is a collection of random scripts that are helpful to developing on TVM's CI. As one-off scripts these do not conform to the usual TVM quality standards which is why they are stored out of tree. + +## `determine_shards.py` + +Given a goal runtime for each test shard and a Jenkins job, print out the number of shards that should be used for each step. + +```bash +# print out number of shards per test step +python determine_shards.py --runtime-goal-m 90 --branch PR-12473 + +# see bottleneck steps individually +python determine_shards.py --runtime-goal-m 90 --branch PR-12473 --list-steps +``` \ No newline at end of file diff --git a/dev/determine_shards.py b/dev/determine_shards.py new file mode 100644 index 0000000..7ffc51e --- /dev/null +++ b/dev/determine_shards.py @@ -0,0 +1,184 @@ +import argparse +import asyncio +import math +import re +import statistics +from typing import * + +import rich + +from utils import forward +from utils.forward import * +from utils.net import init +from utils.schema import Build, Stage +from utils.utils import init_log + + +def is_parallelizable(name: str, desc: str) -> bool: + descs = { + "Run CPU integration tests", + "Run Hexagon tests", + "Run Python GPU integration tests", + "Run Python GPU unit tests", + "Run Python frontend tests", + "Run Python unit tests", + "Run VTA tests in FSIM", + "Run VTA tests in TSIM", + "Run i386 integration tests", + "Run test_arm_compute_lib test", + "Run TOPI tests", + "Run microTVM tests", + } + if name in descs: + return True + return False + + +def find_existing_shards(stage_name: str, template: str): + with open(template) as f: + content = f.read() + + m = re.search(f'name="{stage_name}"(.*\n)+?.*num_shards=(\d+)', content, flags=re.MULTILINE) + if m is None: + print(f"Could not find {stage_name} in {template}, is that the right file?") + exit(1) + # print("match", m) + start, end = m.span() + # print(content[start:end]) + return int(m.groups()[1]) + + +def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float, jenkins_dir: str): + steps_across_shards = {} + for stage in stages: + for step in stage.steps: + if step.name not in steps_across_shards: + steps_across_shards[step.name] = [] + steps_across_shards[step.name].append(step) + + fixed_runtime_m = 0 + parallelizable_runtime_m = 0 + for name, steps in steps_across_shards.items(): + parallelizable = is_parallelizable(name, "") + median_runtime_m = ( + statistics.median([step.duration_ms for step in steps]) / 1000.0 / 60.0 + ) + total_runtime_m = sum([step.duration_ms for step in steps]) / 1000.0 / 60.0 + if parallelizable: + parallelizable_runtime_m += total_runtime_m + else: + fixed_runtime_m += median_runtime_m + + parallel_part = goal_runtime_m - fixed_runtime_m + print(stage_name) + if parallel_part <= 0: + print( + f" fixed runtime is too long ({round(fixed_runtime_m, 2)}), cannot reach goal time" + ) + return + + num_shards = parallelizable_runtime_m / parallel_part + num_shards = math.ceil(num_shards) + + existing_shards = find_existing_shards(stage_name, jenkins_dir) + + print(f" fixed runtime (m): {round(fixed_runtime_m, 2)}") + print(f" parallel runtime (m): {round(parallelizable_runtime_m, 2)}") + if existing_shards == num_shards: + print(f" required shards: {num_shards} (no action required)") + else: + print(f" required shards: change from {existing_shards} to {num_shards} in {jenkins_dir}") + + +def list_steps(build: Build): + def total_rt(stage: Stage): + return sum(step.duration_ms for step in stage.steps) + + build.stages = sorted(build.stages, key=total_rt) + print("For build at", build.blue_url) + for stage in build.stages: + if stage.name in {"Build", "Test", "Deploy"}: + continue + total = sum(step.duration_ms for step in stage.steps) + if len(stage.steps) == 0: + rich.print(f"{stage.name}: skipped") + continue + median = statistics.median([step.duration_ms for step in stage.steps]) + m75 = statistics.median( + [step.duration_ms for step in stage.steps if step.duration_ms > median] + ) + rich.print(f"{stage.name}: {round(total /1000.0/60.0)}m") + for step in stage.steps: + if step.duration_ms > m75: + rich.print( + f" [bold red]{step.name}[/bold red]: {round(step.duration_ms / 1000.0 / 60.0, 2)}" + ) + elif step.duration_ms > median: + rich.print( + f" [magenta]{step.name}[/magenta]: {round(step.duration_ms / 1000.0 / 60.0, 2)}" + ) + else: + rich.print( + f" {step.name}: {round(step.duration_ms / 1000.0 / 60.0, 2)}" + ) + + +def analyze(build: Build, goal_runtime_m: float, jenkins_template): + test_stages: List[Stage] = [] + should_add = False + for stage in build.stages: + if stage.name == "Test": + should_add = True + elif stage.name == "Deploy": + should_add = False + elif should_add: + test_stages.append(stage) + + names_to_stages = {} + for stage in test_stages: + names_to_stages[stage.name] = stage + + merged_shards = {} + for stage in test_stages: + m = re.match(r"(.*) \d+ of \d+", stage.name) + if m: + base_name = m.groups()[0] + if base_name not in merged_shards: + merged_shards[base_name] = [] + merged_shards[base_name].append(stage) + else: + merged_shards[stage.name] = [stage] + + for name, stages in merged_shards.items(): + analyze_stages(name, stages, goal_runtime_m, jenkins_template) + + +async def main(args): + async with aiohttp.ClientSession() as s: + forward.SESSION = s + data = await fetch_branch(job_name=args.job, name=args.branch) + return data + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Determine number of Jenkins shards to use" + ) + parser.add_argument("--runtime-goal-m", required=True) + parser.add_argument("--list-steps", action="store_true") + parser.add_argument("--job") + parser.add_argument("--branch", default="main") + parser.add_argument("--build", default="4082") + parser.add_argument("--jenkins-template") + args = parser.parse_args() + init(dir=".httpcache") + init_log() + + branch = asyncio.run(main(args)) + build = branch.builds[0] + + if args.list_steps: + list_steps(build) + else: + print(f"To reach goal runtime of {args.runtime_goal_m} for tests:") + analyze(build, goal_runtime_m=float(args.runtime_goal_m), jenkins_template=args.jenkins_template) diff --git a/dev/utils/__init__.py b/dev/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dev/utils/db.py b/dev/utils/db.py new file mode 100644 index 0000000..c30178d --- /dev/null +++ b/dev/utils/db.py @@ -0,0 +1,51 @@ +import os + +from sqlalchemy import create_engine +from sqlalchemy.dialects.postgresql import insert + + +def connection_string(db="tvm"): + host = os.environ["db_host"] + password = os.environ["db_password"] + user = os.environ["db_user"] + + if db is None: + return f"postgresql://{user}:{password}@{host}" + else: + return f"postgresql://{user}:{password}@{host}/{db}" + + +engine = None + + +def get_engine(connection_string: str): + global engine + if engine is None: + engine = create_engine(connection_string, echo=bool(os.getenv("ECHO", False))) + + return engine + + +def clear_engine(): + global engine + engine = None + + +def upsert(engine, model, insert_dict): + """ + Insert or update to an engine backed by MySQL + """ + inserted = insert(model).values(**insert_dict) + # MySQL version: + # upserted = inserted.on_duplicate_key_update( + # **{k: inserted.inserted[k] for k, v in insert_dict.items()} + # ) + + # Postgres version: + upserted = inserted.on_conflict_do_update( + index_elements=model._pks, + # index_where=my_table.c.user_email.like("%@gmail.com"), + set_=insert_dict, + ) + res = engine.execute(upserted) + return res.lastrowid diff --git a/dev/utils/forward.py b/dev/utils/forward.py new file mode 100644 index 0000000..65b9ee6 --- /dev/null +++ b/dev/utils/forward.py @@ -0,0 +1,251 @@ +""" +Scrape Jenkins, send build data to Loki and Postgres +""" +import argparse +import asyncio +import dataclasses +import datetime +import json +import logging +import os +import subprocess +import sys +import time +from pathlib import Path +from typing import * + +import aiohttp + +from . import db, schema +from .net import * + +# from sqlalchemy import select +from .utils import * + +SESSION = None +DEBUG = os.getenv("DEBUG", "0") == "1" +SCHEMA_SCRIPT = Path(__file__).resolve().parent / "schema.py" +# LOKI_HOST = os.environ["loki_host"] + + +def walk(o, visitor): + visitor(o) + if isinstance(o, dict): + for k, v in o.items(): + walk(v, visitor) + elif isinstance(o, list): + for v in o: + walk(v, visitor) + + +async def blue(job_name: str, url: str, use_cache: bool = True, no_slash: bool = False) -> Any: + if DEBUG: + use_cache = True + + if not no_slash and not url.endswith("/"): + url = url + "/" + + if SESSION is None: + raise RuntimeError("SESSION is None") + + r = await aioget( + f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{url}", + session=SESSION, + use_cache=use_cache, + ) + r = json.loads(r) + # These just clog up stuff for debugging + def cleaner(o): + if isinstance(o, dict): + if "_links" in o: + del o["_links"] + if "_class" in o: + del o["_class"] + + walk(r, cleaner) + return r + + +@dataclasses.dataclass +class Step: + name: str + id: int + result: str + started_at: datetime.datetime + state: str + description: str + log_url: str + duration_ms: int + url: str + log: str + + +@dataclasses.dataclass +class Stage: + name: str + id: int + duration_ms: int + state: str + result: str + started_at: datetime.datetime + parent: Optional["Stage"] + url: str + steps: List[Step] + + +@dataclasses.dataclass +class Build: + causes: List[str] + id: int + url: str + state: str + result: str + run_time_ms: int + queue_time_ms: int + queued_at: datetime.datetime + started_at: datetime.datetime + ended_at: datetime.datetime + duration_ms: int + commit: str + blue_url: str + failed_tests: int + fixed_tests: int + passed_tests: int + regressed_tests: int + skipped_tests: int + total_tests: int + stages: List[Stage] + + +@dataclasses.dataclass +class Branch: + name: str + full_name: str + url: str + blue_url: str + builds: List[Build] + + +# A branch has a number of builds which have nodes that are made of steps +DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z" + + +def parse_date(d: str) -> datetime.datetime: + return datetime.datetime.strptime(d, DATE_FORMAT) + + +async def fetch_stage( + job_name: str, branch_name: str, build: Build, stage_data: Dict[str, Any] +) -> Stage: + stage = Stage( + name=stage_data["displayName"], + started_at=None + if stage_data["startTime"] is None + else parse_date(stage_data["startTime"]), + duration_ms=int(stage_data["durationInMillis"]), + state=stage_data["state"], + result=stage_data["result"], + id=stage_data["id"], + parent=stage_data["firstParent"], + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build.id}/pipeline/{stage_data['id']}", + steps=[], + ) + + steps_data = await blue(job_name, f"{branch_name}/runs/{build.id}/nodes/{stage.id}/steps") + + for step_data in steps_data: + stage.steps.append(await fetch_step(job_name, branch_name, build, stage, step_data)) + + return stage + + +async def fetch_step( + job_name: str, branch_name: str, build: Build, stage: Stage, step_data: Dict[str, Any] +) -> Step: + id = step_data["id"] + log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{branch_name}/runs/{build.id}/nodes/{stage.id}/steps/{id}/log/" + # log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{branch_name}/runs/{build.id}/steps/{id}/log/" + # log = await aioget(log_url, session=SESSION) + log = "dog" + return Step( + name=step_data["displayName"], + id=step_data["id"], + result=step_data["result"], + started_at=parse_date(step_data["startTime"]), + state=step_data["state"], + description=step_data["displayDescription"], + log_url=log_url, + log=log, + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build.id}/pipeline/{stage.id}#step-{step_data['id']}", + duration_ms=int(step_data["durationInMillis"]), + ) + + +async def fetch_build(job_name: str, branch_name: str, build_data: Dict[str, Any]) -> Build: + queued_at = parse_date(build_data["enQueueTime"]) + started_at = parse_date(build_data["startTime"]) + ended_at = parse_date(build_data["endTime"]) + + queue_time_ms = int((started_at - queued_at).total_seconds() * 1000) + run_time_ms = int((ended_at - started_at).total_seconds() * 1000) + causes = build_data["causes"] + if causes is None: + causes = [] + + test_summary = await blue(job_name, f"{branch_name}/runs/{build_data['id']}/blueTestSummary") + + build = Build( + causes=[c["shortDescription"] for c in causes], + id=build_data["id"], + url=f"https://ci.tlcpack.ai/job/{job_name}/job/{branch_name}/{build_data['id']}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build_data['id']}/pipeline", + state=build_data["state"], + result=build_data["result"], + queued_at=queued_at, + started_at=started_at, + ended_at=ended_at, + run_time_ms=run_time_ms, + queue_time_ms=queue_time_ms, + duration_ms=int(build_data["durationInMillis"]), + commit=build_data["commitId"], + stages=[], + failed_tests=test_summary["failed"], + fixed_tests=test_summary["fixed"], + passed_tests=test_summary["passed"], + regressed_tests=test_summary["regressions"], + skipped_tests=test_summary["skipped"], + total_tests=test_summary["total"], + ) + + nodes_data = await blue(job_name, f"{branch_name}/runs/{build.id}/nodes") + for stage_data in nodes_data: + build.stages.append(await fetch_stage(job_name, branch_name, build, stage_data)) + + return build + + +async def fetch_branch(job_name: str, name: str): + logging.info(f"Fetching branch {name}") + branch_data = await blue(job_name, f"{name}", use_cache=False) + branch = Branch( + name=name, + full_name=branch_data["fullName"], + url=f"https://ci.tlcpack.ai/job/{job_name}/job/{name}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/activity?branch={name}", + builds=[], + ) + + # Jenkins only fetches the last 100 by default + builds = await blue(job_name, f"{name}/runs", use_cache=False) + logging.info(f"Found {len(builds)} builds for branch {name}") + builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) + for build_data in builds: + if build_data["state"] != "FINISHED": + # Only look at completed builds + continue + + branch.builds.append(await fetch_build(job_name, name, build_data)) + break + + return branch + diff --git a/dev/utils/net.py b/dev/utils/net.py new file mode 100644 index 0000000..4616492 --- /dev/null +++ b/dev/utils/net.py @@ -0,0 +1,80 @@ +import logging +import os +import subprocess +from pathlib import Path + +import requests + +CACHE_DIR = None +DEBUG = os.getenv("DEBUG", "0") == "1" + + +def init(dir): + global CACHE_DIR + CACHE_DIR = Path(os.getcwd()).resolve() / dir + CACHE_DIR.mkdir(exist_ok=True, parents=True) + + +def get(url, *args, **kwargs): + use_cache = kwargs.pop("use_cache", True) + + cache_name = url.replace("/", "-") + cached = CACHE_DIR / cache_name + if cached.exists() and use_cache: + with open(cached, "rb") as f: + return f.read() + else: + checker = kwargs.pop("is_fresh", None) + result = requests.get(url, *args, **kwargs) + content = result.content.decode() + if checker is None or checker(url, content): + with open(cached, "w") as f: + f.write(content) + return content + + +def curl(url): + proc = subprocess.run(["curl", "-L", url], stdout=subprocess.PIPE, check=True) + return proc.stdout.decode() + + +async def aioget(url, session, use_cache=True): + if DEBUG: + use_cache = True + cache_name = url.replace("/", "-") + cached = CACHE_DIR / cache_name + if cached.exists() and use_cache: + logging.info(f"GET {url} [cached]") + with open(cached, "r") as f: + return f.read() + else: + if use_cache: + logging.info(f"GET {url} [cache miss]") + else: + logging.info(f"GET {url} [cache disabled]") + result = await session.get(url) + text = await result.text() + with open(cached, "w") as f: + f.write(text) + return text + + +async def aiogetc(url, session, use_cache=True): + if DEBUG: + use_cache = True + cache_name = url.replace("/", "-") + cached = CACHE_DIR / cache_name + if cached.exists() and use_cache: + logging.info(f"GET {url} [cached]") + with open(cached, "r") as f: + return f.read(), 200 + else: + if use_cache: + logging.info(f"GET {url} [cache miss]") + else: + logging.info(f"GET {url} [cache disabled]") + result = await session.get(url) + text = await result.text() + with open(cached, "w") as f: + f.write(text) + return text, result.status_code diff --git a/dev/utils/schema.py b/dev/utils/schema.py new file mode 100644 index 0000000..21ae5fc --- /dev/null +++ b/dev/utils/schema.py @@ -0,0 +1,158 @@ +from typing import Any, Dict + +import sqlalchemy +from sqlalchemy import ( + JSON, + Boolean, + Column, + DateTime, + Integer, + String, + Text, + column, + table, +) +from sqlalchemy.orm import declarative_base +from sqlalchemy.sql.sqltypes import Float + +from . import db + +Base = declarative_base() + + +def gen_table(name: str, columns: Dict[str, Any], base: Any) -> Any: + the_class = type( + name, + (base,), + { + "__tablename__": name, + "__table_args__": {"extend_existing": True}, + **columns, + }, + ) + name = the_class.__tablename__ + model_data = [name] + for k in [k for k in the_class.__dict__.keys() if not k.startswith("_")]: + model_data.append(column(k)) + model = table(*model_data) + model._pks = [k for k, v in columns.items() if v.primary_key] + return model, the_class + + +branch, Branch = gen_table( + "branch", + { + "name": Column(String(300), primary_key=True), + "full_name": Column(String(300)), + "url": Column(String(300)), + "blue_url": Column(String(300)), + }, + Base, +) + +build, Build = gen_table( + "build", + { + "causes": Column(Text), + "id": Column(Integer, primary_key=True), + "url": Column(String(300)), + "blue_url": Column(String(300)), + "state": Column(String(300)), + "result": Column(String(300)), + "queued_at": Column(DateTime), + "started_at": Column(DateTime), + "ended_at": Column(DateTime), + "duration_ms": Column(Integer), + "run_time_ms": Column(Integer), + "queue_time_ms": Column(Integer), + "failed_tests": Column(Integer), + "fixed_tests": Column(Integer), + "passed_tests": Column(Integer), + "regressed_tests": Column(Integer), + "skipped_tests": Column(Integer), + "total_tests": Column(Integer), + "commit": Column(String(300)), + "branch_name": Column(String(300), primary_key=True), + }, + Base, +) + +stage, Stage = gen_table( + "stage", + { + "name": Column(String(300)), + "id": Column(Integer, primary_key=True), + "duration_ms": Column(Integer), + "state": Column(String(300)), + "result": Column(String(300)), + "started_at": Column(DateTime), + "parent": Column(Integer), + "url": Column(String(300)), + "branch_name": Column(String(300), primary_key=True), + "build_id": Column(Integer, primary_key=True), + }, + Base, +) + + +step, Step = gen_table( + "step", + { + "name": Column(Text), + "id": Column(Integer, primary_key=True), + "result": Column(String(300)), + "started_at": Column(DateTime), + "state": Column(String(300)), + "description": Column(Text), + "log_url": Column(String(300)), + "duration_ms": Column(Integer), + "url": Column(String(300)), + "branch_name": Column(String(300), primary_key=True), + "build_id": Column(Integer, primary_key=True), + "stage_id": Column(Integer, primary_key=True), + }, + Base, +) + + +testcase, TestCase = gen_table( + "testcase", + { + "build_id": Column(Integer, primary_key=True), + "branch_name": Column(String(300), primary_key=True), + "blue_url": Column(String(300)), + "status": Column(String(300)), + "state": Column(String(300)), + "duration_ms": Column(Integer), + "stage": Column(String(500)), + "node_id": Column(String(500), primary_key=True), + "name": Column(String(500)), + "parameterless_name": Column(String(500)), + "file_name": Column(String(500)), + }, + Base, +) + + +def create(db_name): + connection = db.connection_string(db=None) + print(connection) + raw = db.get_engine(connection) + from sqlalchemy.orm import sessionmaker + + session = sessionmaker(bind=raw)() + session.connection().connection.set_isolation_level(0) + try: + session.execute(f"CREATE DATABASE {db_name}") + except sqlalchemy.exc.ProgrammingError as e: + if "already exists" not in str(e): + raise e + session.connection().connection.set_isolation_level(1) + db.clear_engine() + + Base.metadata.create_all(db.get_engine(db.connection_string(db_name))) + print("Done") + + +if __name__ == "__main__": + create("tvm") diff --git a/dev/utils/utils.py b/dev/utils/utils.py new file mode 100644 index 0000000..34eface --- /dev/null +++ b/dev/utils/utils.py @@ -0,0 +1,44 @@ +import asyncio +import json +import logging +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent + + +class RelativePathFilter(logging.Filter): + def filter(self, record): + path = Path(record.pathname).resolve() + record.relativepath = str(path.relative_to(REPO_ROOT)) + return True + + +def jprint(o): + print(json.dumps(o, indent=2, default=str)) + + +def sprint(*args): + print(*args, file=sys.stderr) + + +async def gather_with_concurrency(n, *tasks): + semaphore = asyncio.Semaphore(n) + + async def sem_task(task): + async with semaphore: + return await task + + return await asyncio.gather(*(sem_task(task) for task in tasks)) + + +def init_log(): + logging.basicConfig( + format="[%(relativepath)s:%(lineno)d %(levelname)-1s] %(message)s", + level=logging.WARN, + ) + + # Flush on every log call (logging and then calling subprocess.run can make + # the output look confusing) + logging.root.handlers[0].addFilter(RelativePathFilter()) + logging.root.handlers[0].flush = sys.stderr.flush