diff --git a/audit.py b/audit.py index 231f2f7..093960d 100644 --- a/audit.py +++ b/audit.py @@ -1,5 +1,5 @@ """Audit GitHub organizations.""" - +import abc import configparser import json from enum import Enum @@ -20,6 +20,13 @@ from yattag import SimpleDoc, Doc, indent +LARGE_NUMBER = 1_000_000_000_000 + + +def reverse_numeric_sort_order(number: int): + return LARGE_NUMBER - number + + CONFIG_FILE = ".audit.cfg" # supported: # [github.com] @@ -47,8 +54,176 @@ def unknown(what: str) -> None: output_option = typer.Option(None, "--output", "-o") -g = Github(config["github.com"]["token"]) -app = typer.Typer() +class ReportBase(abc.ABC): + + def __init__(self, started: datetime, output): + self.started = started + self.output = output + + @abc.abstractmethod + def begin_report(self, title: str): + pass + + @abc.abstractmethod + def end_report(self): + pass + + @abc.abstractmethod + def empty_line(self): + pass + + @abc.abstractmethod + def begin_table(self): + pass + + @abc.abstractmethod + def table_row(self, *args, **kwargs): + pass + + @abc.abstractmethod + def end_table(self): + pass + + +class TextReportBase(ReportBase): + + def __init__(self, started, output): + super().__init__(started, output) + self.console = Console(file=output) + self.table = None + + def begin_report(self, title: str): + self.console.print(title) + + def end_report(self): + self.console.print(format_generated_timestamp(self.started)) + + def empty_line(self): + self.console.print() + + def begin_table(self): + if self.table is not None: + raise RuntimeError("already building a table") + self.table = Table(box=box.SQUARE) + self._table_header() + + @abc.abstractmethod + def _table_header(self, headers): + if self.table is None: + raise RuntimeError("not building a table") + for header, kwargs in headers: + self.table.add_column(header, **kwargs) + + def table_row(self, *args, **kwargs): + self._table_row(*args, **kwargs) + + @abc.abstractmethod + def _table_row(self, *args, **kwargs): + if self.table is None: + raise RuntimeError("not building a table") + self.table.add_row(*args) + + def end_table(self): + if self.table is None: + raise RuntimeError("not building a table") + self.console.print() + self.console.print(self.table) + self.console.print() + self.table = None + + +class HtmlReportBase(ReportBase): + + def __init__(self, started, output): + super().__init__(started, output) + self.doc, self.tag, self.text = Doc().tagtext() + + def begin_report(self, title: str): + self.doc.asis("") + with self.tag("head"): + self.doc.asis('') + with self.tag("title"): + self.text(title) + with self.tag("style"): + self.text("\nbody {font-size: 100%; }") + self.text("\ndiv.footer {font-size: 50%; padding-top: 24px; }") + self.text("\ntable {border-spacing: 0; border-collapse: collapse; }") + self.text("\nth {vertical-align: bottom; }") + self.text("\ntd {vertical-align: top; }") + self.text("\n.centered {text-align: center; }") + self.text("\n.column-group-header {border-bottom: 1px solid black; }") + self.text("\n.first-row {border-top: 1px solid black; }") + self.text("\n.right {text-align: right; }") + self.doc.asis("") + with self.tag("h1"): + self.text(title) + self._prologue() + self._begin_main() + + def end_report(self): + self._end_main() + self._epilogue() + with self.doc.tag("div", klass="footer"): + self.doc.text(format_generated_timestamp(self.started)) + self.doc.asis("") + self.doc.asis("") + print(indent(self.doc.getvalue()), file=self.output) + + def _prologue(self): + pass + + def _epilogue(self): + pass + + def _command_options(self, options): + with self.tag("h2"): + self.text("Options") + with self.tag("table"): + for name, value in options.items(): + with self.tag("tr"): + with self.tag("td"): + self.text(name) + with self.tag("td"): + self.text(value) + + # main part of the report + + def _begin_main(self): + with self.tag("h2"): + self.text("Report") + + def _end_main(self): + pass + + # report content + + def empty_line(self): + self.doc.stag("br") + + # table creation + + def begin_table(self): + self.doc.asis("") + self._table_header() + + @abc.abstractmethod + def _table_header(self, *header_lines): + for headers in header_lines: + with self.tag("tr"): + for header, kwargs in headers: + with self.tag("th", **kwargs): + self.text(header) + + def table_row(self, *args, **kwargs): + with self.tag("tr"): + self._table_row(*args, **kwargs) + + @abc.abstractmethod + def _table_row(self, *args, **kwargs): + pass + + def end_table(self): + self.doc.asis("
") # Github API wrappers @@ -114,31 +289,6 @@ def format_member(member: dict) -> str: # HTML utilities -def create_html_head(doc: SimpleDoc, title: str) -> None: - """ - Standard HTML head segment - - sets text encoding for compatibility with Python - - sets document title - - defines styles for content - :param doc: yattag document - :param title: document title - """ - with doc.tag("head"): - doc.asis('') - with doc.tag("title"): - doc.text(title) - with doc.tag("style"): - doc.text("\nbody {font-size: 100%; }") - doc.text("\ndiv.footer {font-size: 50%; padding-top: 24px; }") - doc.text("\ntable {border-spacing: 0; border-collapse: collapse; }") - doc.text("\nth {vertical-align: bottom; }") - doc.text("\ntd {vertical-align: top; }") - doc.text("\n.centered {text-align: center; }") - doc.text("\n.column-group-header {border-bottom: 1px solid black; }") - doc.text("\n.first-row {border-top: 1px solid black; }") - doc.text("\n.right {text-align: right; }") - - def create_html_email_href(email: str) -> str: """ HTML version of an email address @@ -157,16 +307,6 @@ def create_html_url_href(url: str) -> str: return f'{url}' if url else "" -def create_html_footer(doc: SimpleDoc, started: datetime) -> None: - """ - Standard HTML footer for the body segment - :param doc: yattag document - :param started: when report creation started - """ - with doc.tag("div", klass="footer"): - doc.text(format_generated_timestamp(started)) - - # output creation def output_json(json_data: list, output: Optional[typer.FileTextWrite]) -> None: @@ -178,20 +318,104 @@ def output_json(json_data: list, output: Optional[typer.FileTextWrite]) -> None: typer.echo(json.dumps(json_data, indent=" "), file=output) -def output_text_elements(title: str, table: Table, started: datetime, output: Optional[typer.FileTextWrite]) -> None: - """ - Output the text elements - :param title: title above the table - :param table: data to report - :param started: when creating this report started - :param output: output file to write to (default: stdout) - """ - console = Console(file=output) - console.print(title) - console.print() - console.print(table) - console.print() - console.print(format_generated_timestamp(started)) +### +# APPLICATION COMMANDS + +g = Github(config["github.com"]["token"]) +app = typer.Typer() + + +class RepoTextReport(TextReportBase): + + def __init__(self, started, output, include_archived_repositories, include_forked_repositories): + super().__init__(started, output) + self.include_archived_repositories = include_archived_repositories + self.include_forked_repositories = include_forked_repositories + + def _table_header(self): + headers = [ + ("Name", {}), + ("Archived", dict(justify="center")) if self.include_archived_repositories else None, + ("Fork", dict(justify="center")) if self.include_forked_repositories else None, + ("Pushed at", {}), + ("Pull request title", {}), + ("Created at", {}), + ] + headers = [header for header in headers if header is not None] + super()._table_header(headers) + + def _table_row( + self, + repo_name=None, archived=None, fork=None, pushed_at=None, title=None, created_at=None, + rowspan=0, first=False + ): + row = [] + row.append(repo_name) + if self.include_archived_repositories: + row.append(format_bool(archived) if archived is not None else None) + if self.include_forked_repositories: + row.append(format_bool(fork) if fork is not None else None) + row.append(format_friendly_timestamp(pushed_at, self.started) if pushed_at is not None else None) + if title and created_at: + row.extend([title, format_friendly_timestamp(created_at, self.started)]) + super()._table_row(*row) + + +class RepoHtmlReport(HtmlReportBase): + + def __init__(self, started, output, include_archived_repositories, include_forked_repositories): + super().__init__(started, output) + self.include_archived_repositories = include_archived_repositories + self.include_forked_repositories = include_forked_repositories + + def _epilogue(self): + options = { + "Archived repositories": "included" if self.include_archived_repositories else "not included", + "Forked repositories": "included" if self.include_forked_repositories else "not included", + } + self._command_options(options) + + def _table_header(self): + headers_1 = [ + ("Name", dict(rowspan=2)), + ("Archived", dict(rowspan=2)) if self.include_archived_repositories else None, + ("Fork", dict(rowspan=2)) if self.include_forked_repositories else None, + ("Pushed at", dict(rowspan=2)), + ("Pull request", dict(colspan=2, klass="column-group-header")), + ] + headers_1 = [header for header in headers_1 if header is not None] + headers_2 = [ + ("Title", {}), + ("Created at", {}), + ] + super()._table_header(headers_1, headers_2) + + def _table_row( + self, + repo_name=None, archived=None, fork=None, pushed_at=None, title=None, created_at=None, + rowspan=0, first=False + ): + klass_first_row = {"klass": "first-row"} if first else {} + klass_first_row_centered = {"klass": "first-row centered"} if first else {} + if repo_name is not None: + with self.tag("td", **klass_first_row, rowspan=rowspan): + self.text(repo_name) + if archived is not None and self.include_archived_repositories: + with self.tag("td", **klass_first_row_centered, rowspan=rowspan): + self.doc.asis(format_bool(archived)) + if fork is not None and self.include_forked_repositories: + with self.tag("td", **klass_first_row_centered, rowspan=rowspan): + self.doc.asis(format_bool(fork)) + if pushed_at is not None: + with self.tag("td", **klass_first_row, rowspan=rowspan): + self.text(format_friendly_timestamp(pushed_at, self.started)) + if title and created_at: + with self.tag("td", **klass_first_row): + self.text(title) + with self.tag("td", **klass_first_row): + self.text(format_friendly_timestamp(created_at, self.started)) + else: + self.doc.stag("td", **klass_first_row, colspan=2) @app.command() @@ -226,94 +450,108 @@ def repos( if output_format == OutputFormat.json: output_json(repositories, output) + return - elif output_format == OutputFormat.html: - doc, tag, text = Doc().tagtext() - with tag("html"): - with tag("head"): - create_html_head(doc, title) - with tag("body"): - with tag("h1"): - text(title) - with tag("table"): - with tag("tr"): - with tag("th", rowspan=2): - text("Name") - if include_archived_repositories: - with tag("th", rowspan=2): - text("Archived") - if include_forked_repositories: - with tag("th", rowspan=2): - text("Fork") - with tag("th", rowspan=2): - text("Pushed at") - with tag("th", colspan=2, klass="column-group-header"): - text("Pull request") - with tag("tr"): - with tag("th"): - text("Title") - with tag("th"): - text("Created at") - for repo in sorted(repositories, key=lambda x: x['name'].lower()): - open_pull_requests = repo["open_pull_requests"] - with tag("tr"): - with tag("td", klass="first-row", rowspan=len(open_pull_requests)): - text(repo["name"]) - if include_archived_repositories: - with tag("td", klass="first-row centered", rowspan=len(open_pull_requests)): - doc.asis(format_bool(repo["archived"])) - if include_forked_repositories: - with tag("td", klass="first-row centered", rowspan=len(open_pull_requests)): - doc.asis(format_bool(repo["fork"])) - with tag("td", klass="first-row", rowspan=len(open_pull_requests)): - text(format_friendly_timestamp(repo["pushed_at"], started)) - if open_pull_requests: - first_pr = open_pull_requests[0] - with tag("td", klass="first-row"): - text(first_pr["title"]) - with tag("td", klass="first-row"): - text(format_friendly_timestamp(first_pr["created_at"], started)) - else: - doc.stag("td", klass="first-row", colspan=2) - for pr in open_pull_requests[1:]: - with tag("tr"): - with tag("td"): - text(pr["title"]) - with tag("td"): - text(format_friendly_timestamp(pr["created_at"], started)) - create_html_footer(doc, started) - print(indent(doc.getvalue()), file=output) - - elif output_format == OutputFormat.text: - table = Table("Name", box=box.SQUARE) - empty_columns = [None, None] # Name, Pushed at - if include_archived_repositories: - table.add_column("Archived", justify="center") - empty_columns.append(None) - if include_forked_repositories: - table.add_column("Fork", justify="center") - empty_columns.append(None) - table.add_column("Pushed at") - table.add_column("Pull request title") - table.add_column("Created at") - for repo in sorted(repositories, key=lambda x: x['name'].lower()): - repo_row = [repo["name"]] - if include_archived_repositories: - repo_row.append(format_bool(repo["archived"])) - if include_forked_repositories: - repo_row.append(format_bool(repo["fork"])) - repo_row.append(f'{format_friendly_timestamp(repo["pushed_at"], started)}') - if repo["open_pull_requests"]: - first_pr = repo["open_pull_requests"][0] - repo_row.extend([first_pr["title"], f'{format_friendly_timestamp(first_pr["created_at"], started)}']) - table.add_row(*repo_row) - for pr in repo["open_pull_requests"][1:]: - pr_row = empty_columns + [pr["title"], f'{format_friendly_timestamp(pr["created_at"], started)}'] - table.add_row(*pr_row) - output_text_elements(title, table, started, output) - - else: + report_class = { + OutputFormat.html: RepoHtmlReport, + OutputFormat.text: RepoTextReport, + }.get(output_format) + if report_class is None: OutputFormat.unknown(output_format) + return + + report = report_class(started, output, include_archived_repositories, include_forked_repositories) + report.begin_report(title) + report.begin_table() + for repo in sorted(repositories, key=lambda repo: repo['name'].lower()): + open_pull_requests = repo["open_pull_requests"] + columns = [ + repo["name"], + repo["archived"], + repo["fork"], + repo["pushed_at"], + open_pull_requests[0]["title"] if open_pull_requests else "", + open_pull_requests[0]["created_at"] if open_pull_requests else "", + ] + report.table_row(*columns, rowspan=len(open_pull_requests), first=True) + for pr in open_pull_requests[1:]: + report.table_row(title=pr["title"], created_at=pr["created_at"]) + report.end_table() + report.end_report() + + +class RepoContributionsTextReport(TextReportBase): + + def __init__(self, started, output, include_archived_repositories, include_forked_repositories): + super().__init__(started, output) + self.include_archived_repositories = include_archived_repositories + self.include_forked_repositories = include_forked_repositories + + def _table_header(self): + headers = [ + ("Name", {}), + ("Contributor", {}), + ("Nr. of contributions", dict(justify="right")), + ] + super()._table_header(headers) + + def _table_row(self, repo_name=None, contributor=None, rowspan=0, first=False): + row = [] + row.append(repo_name) + if contributor is not None: + row.append(format_member(contributor)) + row.append(format_int(contributor.get("contributions"))) + else: + row.extend((None, None)) + super()._table_row(*row) + + +class RepoContributionsHtmlReport(HtmlReportBase): + + def __init__(self, started, output, include_archived_repositories, include_forked_repositories): + super().__init__(started, output) + self.include_archived_repositories = include_archived_repositories + self.include_forked_repositories = include_forked_repositories + + def _epilogue(self): + options = { + "Archived repositories": "included" if self.include_archived_repositories else "not included", + "Forked repositories": "included" if self.include_forked_repositories else "not included", + } + self._command_options(options) + + def _table_header(self): + headers_1 = [ + ("Name", dict(rowspan=2)), + ("Contributor", dict(colspan=5, klass="column-group-header")), + ] + headers_2 = [ + ("Name", {}), + ("Login", {}), + ("Email", {}), + ("Profile", {}), + ("#contributions", {}), + ] + super()._table_header(headers_1, headers_2) + + def _table_row(self, repo_name=None, contributor=None, rowspan=0, first=False): + klass_first_row = {"klass": "first-row"} if first else {} + klass_first_row_right = {"klass": "first-row right"} if first else {"klass": "right"} + if repo_name is not None: + with self.tag("td", **klass_first_row, rowspan=rowspan): + self.text(repo_name) + if contributor is None: + contributor = {} + with self.tag("td", **klass_first_row): + self.text(contributor.get("name") or "") + with self.tag("td", **klass_first_row): + self.text(contributor.get("login") or "") + with self.tag("td", **klass_first_row): + self.doc.asis(create_html_email_href(contributor.get("email")) or "") + with self.tag("td", **klass_first_row): + self.doc.asis(create_html_url_href(contributor.get("url")) or "") + with self.tag("td", **klass_first_row_right): + self.text(format_int(contributor.get("contributions"))) @app.command() @@ -340,7 +578,10 @@ def repo_contributions( ) for contributor in sorted( get_contributors(repo), - key=lambda contributor: (1_000_000_000 - contributor.contributions, contributor.login.lower()) + key=lambda contributor: ( + reverse_numeric_sort_order(contributor.contributions), + contributor.login.lower(), + ) ) ] ) @@ -352,78 +593,83 @@ def repo_contributions( if output_format == OutputFormat.json: output_json(repositories, output) + return - elif output_format == OutputFormat.html: - doc, tag, text = Doc().tagtext() - with tag('html'): - create_html_head(doc, title) - with tag("body"): - with tag("h1"): - text(title) - with tag("table"): - with tag("tr"): - with tag("th", rowspan=2): - text("Name") - with tag("th", colspan=5, klass="column-group-header"): - text("Contributor") - with tag("tr"): - with tag("th"): - text("Name") - with tag("th"): - text("Login") - with tag("th"): - text("Email") - with tag("th"): - text("Profile") - with tag("th"): - text("#contributions") - - def contributor_row_cells(contributor, klass): - td_optional_klass_arg = dict(klass=klass) if klass else {} - name = contributor.get("name") or "" - login = contributor.get("login") or "" - email = contributor.get("email") or "" - url = contributor.get("url") or "" - with tag("td", **td_optional_klass_arg): - text(name) - with tag("td", **td_optional_klass_arg): - text(login) - with tag("td", **td_optional_klass_arg): - doc.asis(create_html_email_href(email)) - with tag("td", **td_optional_klass_arg): - doc.asis(create_html_url_href(url)) - with tag("td", klass=f"{klass} right" if klass else "right"): - text(format_int(contributor.get("contributions"))) - - for repo in repositories: - contributors = repo['contributors'] - if len(contributors) == 0: - contributors = [{}] - with tag("tr"): - with tag("td", klass="first-row", rowspan=len(contributors)): - text(repo["name"]) - contributor_row_cells(contributors[0], "first-row") - for contributor in contributors[1:]: - with tag("tr"): - contributor_row_cells(contributor, None) - create_html_footer(doc, started) - print(indent(doc.getvalue()), file=output) - - elif output_format == OutputFormat.text: - table = Table( - "Name", "Contributor", Column("Nr. of contributions", justify="right"), box=box.SQUARE - ) - for repo in sorted(repositories, key=lambda x: x['name'].lower()): - contributors = sorted(repo['contributors'], key=lambda x: (1_000_000_000 - x['contributions'], x['login'].lower())) - first_contributor = contributors[0] if contributors else dict() - first_row = [repo["name"], format_member(first_contributor), format_int(first_contributor.get("contributions"))] - table.add_row(*first_row) - for contributor in contributors[1:]: - table.add_row(None, format_member(contributor), format_int(contributor["contributions"])) - output_text_elements(title, table, started, output) - - else: + report_class = { + OutputFormat.html: RepoContributionsHtmlReport, + OutputFormat.text: RepoContributionsTextReport, + }.get(output_format) + if report_class is None: OutputFormat.unknown(output_format) + return + + report = report_class(started, output, include_archived_repositories, include_forked_repositories) + report.begin_report(title) + report.begin_table() + for repo in sorted(repositories, key=lambda repo: repo['name'].lower()): + contributors = list(sorted( + repo['contributors'], + key=lambda contributor: ( + reverse_numeric_sort_order(contributor['contributions']), + contributor['login'].lower() + ) + )) + if len(contributors) == 0: + contributors = [{}] + report.table_row(repo_name=repo["name"], contributor=contributors[0], first=True, rowspan=len(contributors)) + for contributor in contributors[1:]: + report.table_row(contributor=contributor) + report.end_table() + report.end_report() + + +class MembersTextReport(TextReportBase): + + def _table_header(self): + headers = [ + ("Member", {}), + ("Membership state", {}), + ("Membership role", {}), + ] + super()._table_header(headers) + + def _table_row(self, member): + super()._table_row(format_member(member), member['membership_state'], member['membership_role']) + + +class MembersHtmlReport(HtmlReportBase): + + def _table_header(self): + headers_1 = [ + ("Member", dict(colspan=4, klass="column-group-header")), + ("Membership", dict(colspan=2, klass="column-group-header")), + ] + headers_2 = [ + ("Name", {}), + ("Login", {}), + ("Email", {}), + ("Profile", {}), + ("", {}), + ("state", {}), + ("role", {}), + ] + super()._table_header(headers_1, headers_2) + + def _table_row(self, member): + email = member.get("email") + with self.tag("td"): + self.text(member.get("name") or "") + with self.tag("td"): + self.text(member.get("login") or "") + with self.tag("td"): + self.doc.asis(create_html_email_href(email) if email else "") + with self.tag("td"): + self.doc.asis(create_html_url_href(member.get("url") or "")) + self.doc.stag("td") + with self.tag("td"): + self.text(member['membership_state']) + with self.tag("td"): + self.text(member['membership_role']) @app.command() @@ -450,65 +696,23 @@ def members( if output_format == OutputFormat.json: output_json(member_info, output) + return - elif output_format == OutputFormat.html: - doc, tag, text = Doc().tagtext() - with tag('html'): - create_html_head(doc, title) - with tag("body"): - with tag("h1"): - text(title) - with tag("table"): - with tag("tr"): - with tag("th", colspan=4, klass="column-group-header"): - text("Member") - doc.stag("th") - with tag("th", colspan=2, klass="column-group-header"): - text("Membership") - with tag("tr"): - with tag("th"): - text("Name") - with tag("th"): - text("Login") - with tag("th"): - text("Email") - with tag("th"): - text("Profile") - doc.stag("th") - with tag("th"): - text("state") - with tag("th"): - text("role") - for member in member_info: - name = member.get("name") or "" - login = member.get("login") or "" - email = member.get("email") or "" - url = member.get("url") or "" - with tag("tr"): - with tag("td"): - text(name) - with tag("td"): - text(login) - with tag("td"): - doc.asis(create_html_email_href(email)) - with tag("td"): - doc.asis(create_html_url_href(url)) - doc.stag("td") - with tag("td"): - text(member['membership_state']) - with tag("td"): - text(member['membership_role']) - create_html_footer(doc, started) - print(indent(doc.getvalue()), file=output) - - elif output_format == OutputFormat.text: - table = Table("Member", "Membership state", "Membership role", box=box.SQUARE) - for member in member_info: - table.add_row(format_member(member), member['membership_state'], member['membership_role']) - output_text_elements(title, table, started, output) - - else: + report_class = { + OutputFormat.html: MembersHtmlReport, + OutputFormat.text: MembersTextReport, + }.get(output_format) + if report_class is None: OutputFormat.unknown(output_format) + return + + report = report_class(started, output) + report.begin_report(title) + report.begin_table() + for member in member_info: + report.table_row(member) + report.end_table() + report.end_report() if __name__ == "__main__":