Module ogr.services.github.project
Expand source code
# Copyright Contributors to the Packit project.
# SPDX-License-Identifier: MIT
import datetime
import logging
from typing import ClassVar, Optional, Union
import github
from github import UnknownObjectException
from github.Commit import Commit
from github.CommitComment import CommitComment as GithubCommitComment
from github.GithubException import GithubException
from github.Repository import Repository
from ogr.abstract import (
AccessLevel,
CommitComment,
CommitFlag,
CommitStatus,
GitTag,
Issue,
IssueStatus,
PRStatus,
PullRequest,
Release,
)
from ogr.exceptions import GithubAPIException, OperationNotSupported
from ogr.read_only import GitProjectReadOnly, if_readonly
from ogr.services import github as ogr_github
from ogr.services.base import BaseGitProject
from ogr.services.github.check_run import (
GithubCheckRun,
GithubCheckRunOutput,
GithubCheckRunResult,
GithubCheckRunStatus,
)
from ogr.services.github.flag import GithubCommitFlag
from ogr.services.github.issue import GithubIssue
from ogr.services.github.pull_request import GithubPullRequest
from ogr.services.github.release import GithubRelease
from ogr.utils import filter_paths, indirect
logger = logging.getLogger(__name__)
class GithubProject(BaseGitProject):
service: "ogr_github.GithubService"
# Permission levels that can merge PRs
CAN_MERGE_PERMS: ClassVar[set[str]] = {"admin", "write"}
def __init__(
self,
repo: str,
service: "ogr_github.GithubService",
namespace: str,
github_repo: Repository = None,
read_only: bool = False,
**unprocess_kwargs,
) -> None:
if unprocess_kwargs:
logger.warning(
f"GithubProject will not process these kwargs: {unprocess_kwargs}",
)
super().__init__(repo, service, namespace)
self._github_repo = github_repo
self.read_only = read_only
self._github_instance = None
@property
def github_instance(self):
if not self._github_instance:
self._github_instance = self.service.get_pygithub_instance(
self.namespace,
self.repo,
)
return self._github_instance
@property
def github_repo(self):
if not self._github_repo:
self._github_repo = self.github_instance.get_repo(
full_name_or_id=f"{self.namespace}/{self.repo}",
)
return self._github_repo
def __str__(self) -> str:
return f'GithubProject(namespace="{self.namespace}", repo="{self.repo}")'
def __eq__(self, o: object) -> bool:
if not isinstance(o, GithubProject):
return False
return (
self.repo == o.repo
and self.namespace == o.namespace
and self.service == o.service
and self.read_only == o.read_only
)
@property
def description(self) -> str:
return self.github_repo.description
@description.setter
def description(self, new_description: str) -> None:
self.github_repo.edit(description=new_description)
@property
def has_issues(self) -> bool:
return self.github_repo.has_issues
def _construct_fork_project(self) -> Optional["GithubProject"]:
gh_user = self.github_instance.get_user()
user_login = gh_user.login
try:
project = GithubProject(
self.repo,
self.service,
namespace=user_login,
read_only=self.read_only,
)
if not project.github_repo:
# The github_repo attribute is lazy.
return None
return project
except github.GithubException as ex:
logger.debug(f"Project {user_login}/{self.repo} does not exist: {ex}")
return None
def exists(self) -> bool:
try:
_ = self.github_repo
return True
except UnknownObjectException as ex:
if "Not Found" in str(ex):
return False
raise GithubAPIException from ex
def is_private(self) -> bool:
return self.github_repo.private
def is_forked(self) -> bool:
return bool(self._construct_fork_project())
@property
def is_fork(self) -> bool:
return self.github_repo.fork
@property
def parent(self) -> Optional["GithubProject"]:
return (
self.service.get_project_from_github_repository(self.github_repo.parent)
if self.is_fork
else None
)
@property
def default_branch(self):
return self.github_repo.default_branch
def get_branches(self) -> list[str]:
return [branch.name for branch in self.github_repo.get_branches()]
def get_description(self) -> str:
return self.github_repo.description
def add_user(self, user: str, access_level: AccessLevel) -> None:
access_dict = {
AccessLevel.pull: "Pull",
AccessLevel.triage: "Triage",
AccessLevel.push: "Push",
AccessLevel.admin: "Admin",
AccessLevel.maintain: "Maintain",
}
try:
invitation = self.github_repo.add_to_collaborators(
user,
permission=access_dict[access_level],
)
except Exception as ex:
raise GithubAPIException(f"User {user} not found") from ex
if invitation is None:
raise GithubAPIException("User already added")
def request_access(self):
raise OperationNotSupported("Not possible on GitHub")
def get_fork(self, create: bool = True) -> Optional["GithubProject"]:
username = self.service.user.get_username()
for fork in self.get_forks():
if fork.github_repo.owner.login == username:
return fork
if not self.is_forked():
if create:
return self.fork_create()
logger.info(
f"Fork of {self.github_repo.full_name}"
" does not exist and we were asked not to create it.",
)
return None
return self._construct_fork_project()
def get_owners(self) -> list[str]:
# in case of github, repository has only one owner
return [self.github_repo.owner.login]
def __get_collaborators(self) -> set[str]:
try:
collaborators = self._get_collaborators_with_permission()
except github.GithubException:
logger.debug(
"Current Github token must have push access to view repository permissions.",
)
return set()
usernames = []
for login, permission in collaborators.items():
if permission in self.CAN_MERGE_PERMS:
usernames.append(login)
return set(usernames)
def who_can_close_issue(self) -> set[str]:
return self.__get_collaborators()
def who_can_merge_pr(self) -> set[str]:
return self.__get_collaborators()
def can_merge_pr(self, username) -> bool:
return (
self.github_repo.get_collaborator_permission(username)
in self.CAN_MERGE_PERMS
)
def _get_collaborators_with_permission(self) -> dict:
"""
Get all project collaborators in dictionary with permission association.
Returns:
Dictionary with logins of collaborators and their permission level.
"""
collaborators = {}
users = self.github_repo.get_collaborators()
for user in users:
permission = self.github_repo.get_collaborator_permission(user)
collaborators[user.login] = permission
return collaborators
@indirect(GithubIssue.get_list)
def get_issue_list(
self,
status: IssueStatus = IssueStatus.open,
author: Optional[str] = None,
assignee: Optional[str] = None,
labels: Optional[list[str]] = None,
) -> list[Issue]:
pass
@indirect(GithubIssue.get)
def get_issue(self, issue_id: int) -> Issue:
pass
@indirect(GithubIssue.create)
def create_issue(
self,
title: str,
body: str,
private: Optional[bool] = None,
labels: Optional[list[str]] = None,
assignees: Optional[list[str]] = None,
) -> Issue:
pass
def delete(self) -> None:
self.github_repo.delete()
@indirect(GithubPullRequest.get_list)
def get_pr_list(self, status: PRStatus = PRStatus.open) -> list[PullRequest]:
pass
@indirect(GithubPullRequest.get)
def get_pr(self, pr_id: int) -> PullRequest:
pass
def get_sha_from_tag(self, tag_name: str) -> str:
# TODO: This is ugly. Can we do it better?
all_tags = self.github_repo.get_tags()
for tag in all_tags:
if tag.name == tag_name:
return tag.commit.sha
raise GithubAPIException(f"Tag {tag_name} was not found.")
def get_tag_from_tag_name(self, tag_name: str) -> Optional[GitTag]:
"""
Get a tag based on a tag name.
Args:
tag_name: Name of the tag.
Returns:
GitTag associated with the given tag name or `None`.
"""
all_tags = self.github_repo.get_tags()
for tag in all_tags:
if tag.name == tag_name:
return GitTag(name=tag.name, commit_sha=tag.commit.sha)
return None
@if_readonly(return_function=GitProjectReadOnly.create_pr)
@indirect(GithubPullRequest.create)
def create_pr(
self,
title: str,
body: str,
target_branch: str,
source_branch: str,
fork_username: Optional[str] = None,
) -> PullRequest:
pass
@if_readonly(
return_function=GitProjectReadOnly.commit_comment,
log_message="Create Comment to commit",
)
def commit_comment(
self,
commit: str,
body: str,
filename: Optional[str] = None,
row: Optional[int] = None,
) -> CommitComment:
github_commit: Commit = self.github_repo.get_commit(commit)
if filename and row:
comment = github_commit.create_comment(
body=body,
position=row,
path=filename,
)
else:
comment = github_commit.create_comment(body=body)
return self._commit_comment_from_github_object(comment)
@staticmethod
def _commit_comment_from_github_object(
raw_commit_coment: GithubCommitComment,
) -> CommitComment:
return CommitComment(
body=raw_commit_coment.body,
author=raw_commit_coment.user.login,
sha=raw_commit_coment.commit_id,
)
def get_commit_comments(self, commit: str) -> list[CommitComment]:
github_commit: Commit = self.github_repo.get_commit(commit)
return [
self._commit_comment_from_github_object(comment)
for comment in github_commit.get_comments()
]
@if_readonly(
return_function=GitProjectReadOnly.set_commit_status,
log_message="Create a status on a commit",
)
@indirect(GithubCommitFlag.set)
def set_commit_status(
self,
commit: str,
state: Union[CommitStatus, str],
target_url: str,
description: str,
context: str,
trim: bool = False,
):
pass
@indirect(GithubCommitFlag.get)
def get_commit_statuses(self, commit: str) -> list[CommitFlag]:
pass
@indirect(GithubCheckRun.get)
def get_check_run(
self,
check_run_id: Optional[int] = None,
commit_sha: Optional[str] = None,
) -> Optional["GithubCheckRun"]:
pass
@indirect(GithubCheckRun.create)
def create_check_run(
self,
name: str,
commit_sha: str,
url: Optional[str] = None,
external_id: Optional[str] = None,
status: GithubCheckRunStatus = GithubCheckRunStatus.queued,
started_at: Optional[datetime.datetime] = None,
conclusion: Optional[GithubCheckRunResult] = None,
completed_at: Optional[datetime.datetime] = None,
output: Optional[GithubCheckRunOutput] = None,
actions: Optional[list[dict[str, str]]] = None,
) -> "GithubCheckRun":
pass
@indirect(GithubCheckRun.get_list)
def get_check_runs(
self,
commit_sha: str,
name: Optional[str] = None,
status: Optional[GithubCheckRunStatus] = None,
) -> list["GithubCheckRun"]:
pass
def get_git_urls(self) -> dict[str, str]:
return {"git": self.github_repo.clone_url, "ssh": self.github_repo.ssh_url}
@if_readonly(return_function=GitProjectReadOnly.fork_create)
def fork_create(self, namespace: Optional[str] = None) -> "GithubProject":
fork_repo = (
self.github_repo.create_fork(organization=namespace)
if namespace
else self.github_repo.create_fork()
)
fork = self.service.get_project_from_github_repository(fork_repo)
logger.debug(f"Forked to {fork.namespace}/{fork.repo}")
return fork
def change_token(self, new_token: str):
raise OperationNotSupported
def get_file_content(self, path: str, ref=None) -> str:
ref = ref or self.default_branch
try:
return self.github_repo.get_contents(
path=path,
ref=ref,
).decoded_content.decode()
except (UnknownObjectException, GithubException) as ex:
if ex.status == 404:
raise FileNotFoundError(f"File '{path}' on {ref} not found") from ex
raise GithubAPIException() from ex
def get_files(
self,
ref: Optional[str] = None,
filter_regex: Optional[str] = None,
recursive: bool = False,
) -> list[str]:
ref = ref or self.default_branch
paths = []
contents = self.github_repo.get_contents(path="", ref=ref)
if recursive:
while contents:
file_content = contents.pop(0)
if file_content.type == "dir":
contents.extend(
self.github_repo.get_contents(path=file_content.path, ref=ref),
)
else:
paths.append(file_content.path)
else:
paths = [
file_content.path
for file_content in contents
if file_content.type != "dir"
]
if filter_regex:
paths = filter_paths(paths, filter_regex)
return paths
def get_labels(self):
"""
Get list of labels in the repository.
Returns:
List of labels in the repository.
"""
return list(self.github_repo.get_labels())
def update_labels(self, labels):
"""
Update the labels of the repository. (No deletion, only add not existing ones.)
Args:
labels: List of labels to be added.
Returns:
Number of added labels.
"""
current_label_names = [la.name for la in list(self.github_repo.get_labels())]
changes = 0
for label in labels:
if label.name not in current_label_names:
color = self._normalize_label_color(color=label.color)
self.github_repo.create_label(
name=label.name,
color=color,
description=label.description or "",
)
changes += 1
return changes
@staticmethod
def _normalize_label_color(color):
if color.startswith("#"):
return color[1:]
return color
@indirect(GithubRelease.get)
def get_release(self, identifier=None, name=None, tag_name=None) -> GithubRelease:
pass
@indirect(GithubRelease.get_latest)
def get_latest_release(self) -> Optional[GithubRelease]:
pass
@indirect(GithubRelease.get_list)
def get_releases(self) -> list[Release]:
pass
@indirect(GithubRelease.create)
def create_release(self, tag: str, name: str, message: str) -> GithubRelease:
pass
def get_forks(self) -> list["GithubProject"]:
return [
self.service.get_project_from_github_repository(fork)
for fork in self.github_repo.get_forks()
if fork.owner
]
def get_web_url(self) -> str:
return self.github_repo.html_url
def get_tags(self) -> list["GitTag"]:
return [GitTag(tag.name, tag.commit.sha) for tag in self.github_repo.get_tags()]
def get_sha_from_branch(self, branch: str) -> Optional[str]:
try:
return self.github_repo.get_branch(branch).commit.sha
except GithubException as ex:
if ex.status == 404:
return None
raise GithubAPIException from ex
def get_contributors(self) -> set[str]:
"""
Returns:
Logins of contributors to the project.
"""
return {c.login for c in self.github_repo.get_contributors()}
def users_with_write_access(self) -> set[str]:
return self.__get_collaborators()
Classes
class GithubProject (repo: str, service: ogr_github.GithubService, namespace: str, github_repo: github.Repository.Repository = None, read_only: bool = False, **unprocess_kwargs)
-
Args
repo
- Name of the project.
service
- GitService instance.
namespace
-
Namespace of the project.
- GitHub: username or org name.
- GitLab: username or org name.
- Pagure: namespace (e.g.
"rpms"
).
In case of forks:
"fork/{username}/{namespace}"
.
Expand source code
class GithubProject(BaseGitProject): service: "ogr_github.GithubService" # Permission levels that can merge PRs CAN_MERGE_PERMS: ClassVar[set[str]] = {"admin", "write"} def __init__( self, repo: str, service: "ogr_github.GithubService", namespace: str, github_repo: Repository = None, read_only: bool = False, **unprocess_kwargs, ) -> None: if unprocess_kwargs: logger.warning( f"GithubProject will not process these kwargs: {unprocess_kwargs}", ) super().__init__(repo, service, namespace) self._github_repo = github_repo self.read_only = read_only self._github_instance = None @property def github_instance(self): if not self._github_instance: self._github_instance = self.service.get_pygithub_instance( self.namespace, self.repo, ) return self._github_instance @property def github_repo(self): if not self._github_repo: self._github_repo = self.github_instance.get_repo( full_name_or_id=f"{self.namespace}/{self.repo}", ) return self._github_repo def __str__(self) -> str: return f'GithubProject(namespace="{self.namespace}", repo="{self.repo}")' def __eq__(self, o: object) -> bool: if not isinstance(o, GithubProject): return False return ( self.repo == o.repo and self.namespace == o.namespace and self.service == o.service and self.read_only == o.read_only ) @property def description(self) -> str: return self.github_repo.description @description.setter def description(self, new_description: str) -> None: self.github_repo.edit(description=new_description) @property def has_issues(self) -> bool: return self.github_repo.has_issues def _construct_fork_project(self) -> Optional["GithubProject"]: gh_user = self.github_instance.get_user() user_login = gh_user.login try: project = GithubProject( self.repo, self.service, namespace=user_login, read_only=self.read_only, ) if not project.github_repo: # The github_repo attribute is lazy. return None return project except github.GithubException as ex: logger.debug(f"Project {user_login}/{self.repo} does not exist: {ex}") return None def exists(self) -> bool: try: _ = self.github_repo return True except UnknownObjectException as ex: if "Not Found" in str(ex): return False raise GithubAPIException from ex def is_private(self) -> bool: return self.github_repo.private def is_forked(self) -> bool: return bool(self._construct_fork_project()) @property def is_fork(self) -> bool: return self.github_repo.fork @property def parent(self) -> Optional["GithubProject"]: return ( self.service.get_project_from_github_repository(self.github_repo.parent) if self.is_fork else None ) @property def default_branch(self): return self.github_repo.default_branch def get_branches(self) -> list[str]: return [branch.name for branch in self.github_repo.get_branches()] def get_description(self) -> str: return self.github_repo.description def add_user(self, user: str, access_level: AccessLevel) -> None: access_dict = { AccessLevel.pull: "Pull", AccessLevel.triage: "Triage", AccessLevel.push: "Push", AccessLevel.admin: "Admin", AccessLevel.maintain: "Maintain", } try: invitation = self.github_repo.add_to_collaborators( user, permission=access_dict[access_level], ) except Exception as ex: raise GithubAPIException(f"User {user} not found") from ex if invitation is None: raise GithubAPIException("User already added") def request_access(self): raise OperationNotSupported("Not possible on GitHub") def get_fork(self, create: bool = True) -> Optional["GithubProject"]: username = self.service.user.get_username() for fork in self.get_forks(): if fork.github_repo.owner.login == username: return fork if not self.is_forked(): if create: return self.fork_create() logger.info( f"Fork of {self.github_repo.full_name}" " does not exist and we were asked not to create it.", ) return None return self._construct_fork_project() def get_owners(self) -> list[str]: # in case of github, repository has only one owner return [self.github_repo.owner.login] def __get_collaborators(self) -> set[str]: try: collaborators = self._get_collaborators_with_permission() except github.GithubException: logger.debug( "Current Github token must have push access to view repository permissions.", ) return set() usernames = [] for login, permission in collaborators.items(): if permission in self.CAN_MERGE_PERMS: usernames.append(login) return set(usernames) def who_can_close_issue(self) -> set[str]: return self.__get_collaborators() def who_can_merge_pr(self) -> set[str]: return self.__get_collaborators() def can_merge_pr(self, username) -> bool: return ( self.github_repo.get_collaborator_permission(username) in self.CAN_MERGE_PERMS ) def _get_collaborators_with_permission(self) -> dict: """ Get all project collaborators in dictionary with permission association. Returns: Dictionary with logins of collaborators and their permission level. """ collaborators = {} users = self.github_repo.get_collaborators() for user in users: permission = self.github_repo.get_collaborator_permission(user) collaborators[user.login] = permission return collaborators @indirect(GithubIssue.get_list) def get_issue_list( self, status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list[Issue]: pass @indirect(GithubIssue.get) def get_issue(self, issue_id: int) -> Issue: pass @indirect(GithubIssue.create) def create_issue( self, title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list[str]] = None, ) -> Issue: pass def delete(self) -> None: self.github_repo.delete() @indirect(GithubPullRequest.get_list) def get_pr_list(self, status: PRStatus = PRStatus.open) -> list[PullRequest]: pass @indirect(GithubPullRequest.get) def get_pr(self, pr_id: int) -> PullRequest: pass def get_sha_from_tag(self, tag_name: str) -> str: # TODO: This is ugly. Can we do it better? all_tags = self.github_repo.get_tags() for tag in all_tags: if tag.name == tag_name: return tag.commit.sha raise GithubAPIException(f"Tag {tag_name} was not found.") def get_tag_from_tag_name(self, tag_name: str) -> Optional[GitTag]: """ Get a tag based on a tag name. Args: tag_name: Name of the tag. Returns: GitTag associated with the given tag name or `None`. """ all_tags = self.github_repo.get_tags() for tag in all_tags: if tag.name == tag_name: return GitTag(name=tag.name, commit_sha=tag.commit.sha) return None @if_readonly(return_function=GitProjectReadOnly.create_pr) @indirect(GithubPullRequest.create) def create_pr( self, title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> PullRequest: pass @if_readonly( return_function=GitProjectReadOnly.commit_comment, log_message="Create Comment to commit", ) def commit_comment( self, commit: str, body: str, filename: Optional[str] = None, row: Optional[int] = None, ) -> CommitComment: github_commit: Commit = self.github_repo.get_commit(commit) if filename and row: comment = github_commit.create_comment( body=body, position=row, path=filename, ) else: comment = github_commit.create_comment(body=body) return self._commit_comment_from_github_object(comment) @staticmethod def _commit_comment_from_github_object( raw_commit_coment: GithubCommitComment, ) -> CommitComment: return CommitComment( body=raw_commit_coment.body, author=raw_commit_coment.user.login, sha=raw_commit_coment.commit_id, ) def get_commit_comments(self, commit: str) -> list[CommitComment]: github_commit: Commit = self.github_repo.get_commit(commit) return [ self._commit_comment_from_github_object(comment) for comment in github_commit.get_comments() ] @if_readonly( return_function=GitProjectReadOnly.set_commit_status, log_message="Create a status on a commit", ) @indirect(GithubCommitFlag.set) def set_commit_status( self, commit: str, state: Union[CommitStatus, str], target_url: str, description: str, context: str, trim: bool = False, ): pass @indirect(GithubCommitFlag.get) def get_commit_statuses(self, commit: str) -> list[CommitFlag]: pass @indirect(GithubCheckRun.get) def get_check_run( self, check_run_id: Optional[int] = None, commit_sha: Optional[str] = None, ) -> Optional["GithubCheckRun"]: pass @indirect(GithubCheckRun.create) def create_check_run( self, name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[GithubCheckRunOutput] = None, actions: Optional[list[dict[str, str]]] = None, ) -> "GithubCheckRun": pass @indirect(GithubCheckRun.get_list) def get_check_runs( self, commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None, ) -> list["GithubCheckRun"]: pass def get_git_urls(self) -> dict[str, str]: return {"git": self.github_repo.clone_url, "ssh": self.github_repo.ssh_url} @if_readonly(return_function=GitProjectReadOnly.fork_create) def fork_create(self, namespace: Optional[str] = None) -> "GithubProject": fork_repo = ( self.github_repo.create_fork(organization=namespace) if namespace else self.github_repo.create_fork() ) fork = self.service.get_project_from_github_repository(fork_repo) logger.debug(f"Forked to {fork.namespace}/{fork.repo}") return fork def change_token(self, new_token: str): raise OperationNotSupported def get_file_content(self, path: str, ref=None) -> str: ref = ref or self.default_branch try: return self.github_repo.get_contents( path=path, ref=ref, ).decoded_content.decode() except (UnknownObjectException, GithubException) as ex: if ex.status == 404: raise FileNotFoundError(f"File '{path}' on {ref} not found") from ex raise GithubAPIException() from ex def get_files( self, ref: Optional[str] = None, filter_regex: Optional[str] = None, recursive: bool = False, ) -> list[str]: ref = ref or self.default_branch paths = [] contents = self.github_repo.get_contents(path="", ref=ref) if recursive: while contents: file_content = contents.pop(0) if file_content.type == "dir": contents.extend( self.github_repo.get_contents(path=file_content.path, ref=ref), ) else: paths.append(file_content.path) else: paths = [ file_content.path for file_content in contents if file_content.type != "dir" ] if filter_regex: paths = filter_paths(paths, filter_regex) return paths def get_labels(self): """ Get list of labels in the repository. Returns: List of labels in the repository. """ return list(self.github_repo.get_labels()) def update_labels(self, labels): """ Update the labels of the repository. (No deletion, only add not existing ones.) Args: labels: List of labels to be added. Returns: Number of added labels. """ current_label_names = [la.name for la in list(self.github_repo.get_labels())] changes = 0 for label in labels: if label.name not in current_label_names: color = self._normalize_label_color(color=label.color) self.github_repo.create_label( name=label.name, color=color, description=label.description or "", ) changes += 1 return changes @staticmethod def _normalize_label_color(color): if color.startswith("#"): return color[1:] return color @indirect(GithubRelease.get) def get_release(self, identifier=None, name=None, tag_name=None) -> GithubRelease: pass @indirect(GithubRelease.get_latest) def get_latest_release(self) -> Optional[GithubRelease]: pass @indirect(GithubRelease.get_list) def get_releases(self) -> list[Release]: pass @indirect(GithubRelease.create) def create_release(self, tag: str, name: str, message: str) -> GithubRelease: pass def get_forks(self) -> list["GithubProject"]: return [ self.service.get_project_from_github_repository(fork) for fork in self.github_repo.get_forks() if fork.owner ] def get_web_url(self) -> str: return self.github_repo.html_url def get_tags(self) -> list["GitTag"]: return [GitTag(tag.name, tag.commit.sha) for tag in self.github_repo.get_tags()] def get_sha_from_branch(self, branch: str) -> Optional[str]: try: return self.github_repo.get_branch(branch).commit.sha except GithubException as ex: if ex.status == 404: return None raise GithubAPIException from ex def get_contributors(self) -> set[str]: """ Returns: Logins of contributors to the project. """ return {c.login for c in self.github_repo.get_contributors()} def users_with_write_access(self) -> set[str]: return self.__get_collaborators()
Ancestors
Class variables
var CAN_MERGE_PERMS : ClassVar[set[str]]
var service : GithubService
Instance variables
var github_instance
-
Expand source code
@property def github_instance(self): if not self._github_instance: self._github_instance = self.service.get_pygithub_instance( self.namespace, self.repo, ) return self._github_instance
var github_repo
-
Expand source code
@property def github_repo(self): if not self._github_repo: self._github_repo = self.github_instance.get_repo( full_name_or_id=f"{self.namespace}/{self.repo}", ) return self._github_repo
Methods
def create_check_run(self, name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[dict[str, typing.Union[str, list[dict[str, typing.Union[str, int]]]]]] = None, actions: Optional[list[dict[str, str]]] = None) ‑> GithubCheckRun
-
Expand source code
@indirect(GithubCheckRun.create) def create_check_run( self, name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[GithubCheckRunOutput] = None, actions: Optional[list[dict[str, str]]] = None, ) -> "GithubCheckRun": pass
def get_check_run(self, check_run_id: Optional[int] = None, commit_sha: Optional[str] = None) ‑> Optional[GithubCheckRun]
-
Expand source code
@indirect(GithubCheckRun.get) def get_check_run( self, check_run_id: Optional[int] = None, commit_sha: Optional[str] = None, ) -> Optional["GithubCheckRun"]: pass
def get_check_runs(self, commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None) ‑> list['GithubCheckRun']
-
Expand source code
@indirect(GithubCheckRun.get_list) def get_check_runs( self, commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None, ) -> list["GithubCheckRun"]: pass
def get_contributors(self) ‑> set[str]
-
Returns
Logins of contributors to the project.
Expand source code
def get_contributors(self) -> set[str]: """ Returns: Logins of contributors to the project. """ return {c.login for c in self.github_repo.get_contributors()}
def get_labels(self)
-
Get list of labels in the repository.
Returns
List of labels in the repository.
Expand source code
def get_labels(self): """ Get list of labels in the repository. Returns: List of labels in the repository. """ return list(self.github_repo.get_labels())
def get_tag_from_tag_name(self, tag_name: str) ‑> Optional[GitTag]
-
Get a tag based on a tag name.
Args
tag_name
- Name of the tag.
Returns
GitTag associated with the given tag name or
None
.Expand source code
def get_tag_from_tag_name(self, tag_name: str) -> Optional[GitTag]: """ Get a tag based on a tag name. Args: tag_name: Name of the tag. Returns: GitTag associated with the given tag name or `None`. """ all_tags = self.github_repo.get_tags() for tag in all_tags: if tag.name == tag_name: return GitTag(name=tag.name, commit_sha=tag.commit.sha) return None
def update_labels(self, labels)
-
Update the labels of the repository. (No deletion, only add not existing ones.)
Args
labels
- List of labels to be added.
Returns
Number of added labels.
Expand source code
def update_labels(self, labels): """ Update the labels of the repository. (No deletion, only add not existing ones.) Args: labels: List of labels to be added. Returns: Number of added labels. """ current_label_names = [la.name for la in list(self.github_repo.get_labels())] changes = 0 for label in labels: if label.name not in current_label_names: color = self._normalize_label_color(color=label.color) self.github_repo.create_label( name=label.name, color=color, description=label.description or "", ) changes += 1 return changes
Inherited members
BaseGitProject
:add_group
add_user
can_merge_pr
change_token
commit_comment
create_issue
create_pr
create_release
default_branch
delete
description
exists
fork_create
full_repo_name
get_branches
get_commit_comments
get_commit_statuses
get_description
get_file_content
get_files
get_fork
get_forks
get_git_urls
get_issue
get_issue_info
get_issue_list
get_latest_release
get_owners
get_pr
get_pr_files_diff
get_pr_list
get_release
get_releases
get_sha_from_branch
get_sha_from_tag
get_tags
get_users_with_given_access
get_web_url
has_issues
has_write_access
is_fork
is_forked
is_private
parent
remove_group
remove_user
request_access
set_commit_status
users_with_write_access
which_groups_can_merge_pr
who_can_close_issue
who_can_merge_pr