Module ogr.services.github
Sub-modules
ogr.services.github.auth_providers
ogr.services.github.check_run
ogr.services.github.comments
ogr.services.github.flag
ogr.services.github.issue
ogr.services.github.label
ogr.services.github.project
ogr.services.github.pull_request
ogr.services.github.release
ogr.services.github.service
ogr.services.github.user
Classes
class GithubCheckRun (project: ogr_github.GithubProject, raw_check_run: github.CheckRun.CheckRun)
-
Expand source code
class GithubCheckRun(OgrAbstractClass): def __init__( self, project: "ogr_github.GithubProject", raw_check_run: CheckRun, ) -> None: self.raw_check_run = raw_check_run self.project = project def __str__(self) -> str: return ( f"GithubCheckRun(project={self.project}, name='{self.name}', " f"commit_sha='{self.commit_sha}', " f"url='{self.url}', " f"external_id='{self.external_id}', " f"status={self.status.name}, " f"started_at={self.started_at}, " f"conclusion={self.conclusion}, " f"completed_at={self.completed_at}, " f"output={self.output}, " f"app={self.app})" ) @property def name(self) -> str: """Name of the check run.""" return self.raw_check_run.name @name.setter def name(self, name: str) -> None: self.raw_check_run.edit(name=name) @property def commit_sha(self) -> str: """Commit SHA that check run is related to.""" return self.raw_check_run.head_sha @property def url(self) -> Optional[str]: """URL with additional details.""" return self.raw_check_run.details_url @url.setter def url(self, url: str) -> None: self.raw_check_run.edit(details_url=url) @property def external_id(self) -> Optional[str]: """External ID that can be used internally by the integrator.""" return self.raw_check_run.external_id @external_id.setter def external_id(self, external_id: str) -> None: self.raw_check_run.edit(external_id=external_id) @property def status(self) -> GithubCheckRunStatus: """Current status of the check run.""" return GithubCheckRunStatus(self.raw_check_run.status) @property def started_at(self) -> Optional[datetime.datetime]: """Timestamp of start of the check run.""" return self.raw_check_run.started_at @started_at.setter def started_at(self, started_at: datetime.datetime) -> None: self.raw_check_run.edit(started_at=started_at) @property def conclusion(self) -> Optional[GithubCheckRunResult]: """Conclusion/result of the check run.""" return ( GithubCheckRunResult(self.raw_check_run.conclusion) if self.raw_check_run.conclusion else None ) @property def completed_at(self) -> Optional[datetime.datetime]: """Timestamp of completion of the check run.""" return self.raw_check_run.completed_at @property def output(self) -> CheckRunOutput: """Output of the check run.""" return self.raw_check_run.output @output.setter def output(self, output: GithubCheckRunOutput) -> None: self.raw_check_run.edit(output=output) @property def app(self) -> GithubApp: """Github App of the check run.""" return self.raw_check_run.app def change_status( self, status: Optional[GithubCheckRunStatus] = None, completed_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, ) -> None: """ Changes the status of the check run and checks the validity of new state. Args: status: Status of the check run to be set. If set to completed, you must provide conclusion. Defaults to `None`. completed_at: Timestamp of completion of the check run. If set, you must provide conclusion. Defaults to `None`. conclusion: Conclusion/result of the check run. If only conclusion is set, status is automatically set to completed. Defaults to `None`. Raises: OperationNotSupported, if given completed or timestamp of completed without conclusion. """ if not (status or completed_at or conclusion): return if ( status == GithubCheckRunStatus.completed or completed_at ) and conclusion is None: raise OperationNotSupported( "When provided completed status or completed at," " you need to provide conclusion.", ) self.raw_check_run.edit( status=value_or_NotSet(status.name if status else None), conclusion=value_or_NotSet(conclusion.name if conclusion else None), completed_at=value_or_NotSet(completed_at), ) @staticmethod def get_list( project: "ogr_github.GithubProject", commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None, ) -> list["GithubCheckRun"]: """ Returns list of GitHub check runs. Args: project: Project from which the check runs are retrieved. commit_sha: Commit to which are the check runs related to. name: Name of the check run for filtering. Defaults to `None`, no filtering. status: Status of the check runs to be returned. Defaults to `None`, no filtering. Returns: List of the check runs. """ check_runs = project.github_repo.get_commit(commit_sha).get_check_runs( check_name=value_or_NotSet(name), status=value_or_NotSet(status.name if status else None), ) return [GithubCheckRun(project, run) for run in check_runs] @staticmethod def get( project: "ogr_github.GithubProject", check_run_id: Optional[int] = None, commit_sha: Optional[str] = None, ) -> Optional["GithubCheckRun"]: """ Retrieves GitHub check run as ogr object. Args: project: Project from which the check run is retrieved. check_run_id: Check run ID. Defaults to `None`, i.e. is not used for query. commit_sha: Commit SHA from which the check run is to be retrieved. If set, returns latest check run for the commit. Defaults to `None`, i.e. is not used for query. Returns: GithubCheckRun object or `None` if no check run is found. Raises: OperationNotSupported, in case there is no parameter for query set or both are set. """ if check_run_id is not None and commit_sha: raise OperationNotSupported( "Cannot retrieve check run by both ID and commit hash", ) if not (check_run_id is not None or commit_sha): raise OperationNotSupported("Cannot retrieve check run by no criteria") if check_run_id is not None: return GithubCheckRun( project, project.github_repo.get_check_run(check_run_id), ) check_runs = project.github_repo.get_commit(commit_sha).get_check_runs() if check_runs.totalCount == 0: return None return GithubCheckRun(project, check_runs[0]) @staticmethod def create( project: "ogr_github.GithubProject", name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[GithubCheckRunOutput] = None, actions: Optional[list[dict[str, str]]] = None, ) -> "GithubCheckRun": """ Creates new check run. Args: project: Project where the check run is to be created. name: Name of the check run. commit_sha: Hash of the commit that check run is related to. url: URL with details of the run. Defaults to `None`. external_id: External ID that can be used internally by integrator. Defaults to `None`. status: Status of the check run. Defaults to queued. started_at: Timestamp of starting the check run. Defaults to `None`. conclusion: Conclusion of the check run. Should be set with status completed. Defaults to `None`. completed_at: Timestamp of completion of the check run. If set, you must provide conclusion. Defaults to `None`. output: Output of the check run. actions: List of possible follow-up actions for the check run. Returns: Created check run object. Raises: OperationNotSupported, if given completed status or completion timestamp and no conclusion. """ if ( completed_at or status == GithubCheckRunStatus.completed ) and conclusion is None: raise OperationNotSupported( "When provided completed_at or completed status, " "you need to provide conclusion.", ) created_check_run = project.github_repo.create_check_run( name=name, head_sha=commit_sha, details_url=value_or_NotSet(url), external_id=value_or_NotSet(external_id), status=status.name, started_at=value_or_NotSet(started_at), conclusion=value_or_NotSet(conclusion.name if conclusion else None), completed_at=value_or_NotSet(completed_at), output=value_or_NotSet(output), actions=value_or_NotSet(actions), ) return GithubCheckRun(project, created_check_run)
Ancestors
Static methods
def create(project: ogr_github.GithubProject,
name: str,
commit_sha: str,
url: str | None = None,
external_id: str | None = None,
status: GithubCheckRunStatus = GithubCheckRunStatus.queued,
started_at: datetime.datetime | None = None,
conclusion: GithubCheckRunResult | None = None,
completed_at: datetime.datetime | None = None,
output: dict[str, str | list[dict[str, str | int]]] | None = None,
actions: list[dict[str, str]] | None = None) ‑> GithubCheckRun-
Expand source code
@staticmethod def create( project: "ogr_github.GithubProject", name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[GithubCheckRunOutput] = None, actions: Optional[list[dict[str, str]]] = None, ) -> "GithubCheckRun": """ Creates new check run. Args: project: Project where the check run is to be created. name: Name of the check run. commit_sha: Hash of the commit that check run is related to. url: URL with details of the run. Defaults to `None`. external_id: External ID that can be used internally by integrator. Defaults to `None`. status: Status of the check run. Defaults to queued. started_at: Timestamp of starting the check run. Defaults to `None`. conclusion: Conclusion of the check run. Should be set with status completed. Defaults to `None`. completed_at: Timestamp of completion of the check run. If set, you must provide conclusion. Defaults to `None`. output: Output of the check run. actions: List of possible follow-up actions for the check run. Returns: Created check run object. Raises: OperationNotSupported, if given completed status or completion timestamp and no conclusion. """ if ( completed_at or status == GithubCheckRunStatus.completed ) and conclusion is None: raise OperationNotSupported( "When provided completed_at or completed status, " "you need to provide conclusion.", ) created_check_run = project.github_repo.create_check_run( name=name, head_sha=commit_sha, details_url=value_or_NotSet(url), external_id=value_or_NotSet(external_id), status=status.name, started_at=value_or_NotSet(started_at), conclusion=value_or_NotSet(conclusion.name if conclusion else None), completed_at=value_or_NotSet(completed_at), output=value_or_NotSet(output), actions=value_or_NotSet(actions), ) return GithubCheckRun(project, created_check_run)
Creates new check run.
Args
project
- Project where the check run is to be created.
name
- Name of the check run.
commit_sha
- Hash of the commit that check run is related to.
url
-
URL with details of the run.
Defaults to
None
. external_id
-
External ID that can be used internally by integrator.
Defaults to
None
. status
-
Status of the check run.
Defaults to queued.
started_at
-
Timestamp of starting the check run.
Defaults to
None
. conclusion
-
Conclusion of the check run. Should be set with status completed.
Defaults to
None
. completed_at
-
Timestamp of completion of the check run. If set, you must provide conclusion.
Defaults to
None
. output
- Output of the check run.
actions
- List of possible follow-up actions for the check run.
Returns
Created check run object.
Raises
OperationNotSupported, if given completed status or completion timestamp and no conclusion.
def get(project: ogr_github.GithubProject,
check_run_id: int | None = None,
commit_sha: str | None = None) ‑> GithubCheckRun | None-
Expand source code
@staticmethod def get( project: "ogr_github.GithubProject", check_run_id: Optional[int] = None, commit_sha: Optional[str] = None, ) -> Optional["GithubCheckRun"]: """ Retrieves GitHub check run as ogr object. Args: project: Project from which the check run is retrieved. check_run_id: Check run ID. Defaults to `None`, i.e. is not used for query. commit_sha: Commit SHA from which the check run is to be retrieved. If set, returns latest check run for the commit. Defaults to `None`, i.e. is not used for query. Returns: GithubCheckRun object or `None` if no check run is found. Raises: OperationNotSupported, in case there is no parameter for query set or both are set. """ if check_run_id is not None and commit_sha: raise OperationNotSupported( "Cannot retrieve check run by both ID and commit hash", ) if not (check_run_id is not None or commit_sha): raise OperationNotSupported("Cannot retrieve check run by no criteria") if check_run_id is not None: return GithubCheckRun( project, project.github_repo.get_check_run(check_run_id), ) check_runs = project.github_repo.get_commit(commit_sha).get_check_runs() if check_runs.totalCount == 0: return None return GithubCheckRun(project, check_runs[0])
Retrieves GitHub check run as ogr object.
Args
project
- Project from which the check run is retrieved.
check_run_id
-
Check run ID.
Defaults to
None
, i.e. is not used for query. commit_sha
-
Commit SHA from which the check run is to be retrieved. If set, returns latest check run for the commit.
Defaults to
None
, i.e. is not used for query.
Returns
GithubCheckRun object or
None
if no check run is found.Raises
OperationNotSupported, in case there is no parameter for query set or both are set.
def get_list(project: ogr_github.GithubProject,
commit_sha: str,
name: str | None = None,
status: GithubCheckRunStatus | None = None) ‑> list['GithubCheckRun']-
Expand source code
@staticmethod def get_list( project: "ogr_github.GithubProject", commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None, ) -> list["GithubCheckRun"]: """ Returns list of GitHub check runs. Args: project: Project from which the check runs are retrieved. commit_sha: Commit to which are the check runs related to. name: Name of the check run for filtering. Defaults to `None`, no filtering. status: Status of the check runs to be returned. Defaults to `None`, no filtering. Returns: List of the check runs. """ check_runs = project.github_repo.get_commit(commit_sha).get_check_runs( check_name=value_or_NotSet(name), status=value_or_NotSet(status.name if status else None), ) return [GithubCheckRun(project, run) for run in check_runs]
Returns list of GitHub check runs.
Args
project
- Project from which the check runs are retrieved.
commit_sha
- Commit to which are the check runs related to.
name
-
Name of the check run for filtering.
Defaults to
None
, no filtering. status
-
Status of the check runs to be returned.
Defaults to
None
, no filtering.
Returns
List of the check runs.
Instance variables
prop app : github.GithubApp.GithubApp
-
Expand source code
@property def app(self) -> GithubApp: """Github App of the check run.""" return self.raw_check_run.app
Github App of the check run.
prop commit_sha : str
-
Expand source code
@property def commit_sha(self) -> str: """Commit SHA that check run is related to.""" return self.raw_check_run.head_sha
Commit SHA that check run is related to.
prop completed_at : datetime.datetime | None
-
Expand source code
@property def completed_at(self) -> Optional[datetime.datetime]: """Timestamp of completion of the check run.""" return self.raw_check_run.completed_at
Timestamp of completion of the check run.
prop conclusion : GithubCheckRunResult | None
-
Expand source code
@property def conclusion(self) -> Optional[GithubCheckRunResult]: """Conclusion/result of the check run.""" return ( GithubCheckRunResult(self.raw_check_run.conclusion) if self.raw_check_run.conclusion else None )
Conclusion/result of the check run.
prop external_id : str | None
-
Expand source code
@property def external_id(self) -> Optional[str]: """External ID that can be used internally by the integrator.""" return self.raw_check_run.external_id
External ID that can be used internally by the integrator.
prop name : str
-
Expand source code
@property def name(self) -> str: """Name of the check run.""" return self.raw_check_run.name
Name of the check run.
prop output : github.CheckRunOutput.CheckRunOutput
-
Expand source code
@property def output(self) -> CheckRunOutput: """Output of the check run.""" return self.raw_check_run.output
Output of the check run.
prop started_at : datetime.datetime | None
-
Expand source code
@property def started_at(self) -> Optional[datetime.datetime]: """Timestamp of start of the check run.""" return self.raw_check_run.started_at
Timestamp of start of the check run.
prop status : GithubCheckRunStatus
-
Expand source code
@property def status(self) -> GithubCheckRunStatus: """Current status of the check run.""" return GithubCheckRunStatus(self.raw_check_run.status)
Current status of the check run.
prop url : str | None
-
Expand source code
@property def url(self) -> Optional[str]: """URL with additional details.""" return self.raw_check_run.details_url
URL with additional details.
Methods
def change_status(self,
status: GithubCheckRunStatus | None = None,
completed_at: datetime.datetime | None = None,
conclusion: GithubCheckRunResult | None = None) ‑> None-
Expand source code
def change_status( self, status: Optional[GithubCheckRunStatus] = None, completed_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, ) -> None: """ Changes the status of the check run and checks the validity of new state. Args: status: Status of the check run to be set. If set to completed, you must provide conclusion. Defaults to `None`. completed_at: Timestamp of completion of the check run. If set, you must provide conclusion. Defaults to `None`. conclusion: Conclusion/result of the check run. If only conclusion is set, status is automatically set to completed. Defaults to `None`. Raises: OperationNotSupported, if given completed or timestamp of completed without conclusion. """ if not (status or completed_at or conclusion): return if ( status == GithubCheckRunStatus.completed or completed_at ) and conclusion is None: raise OperationNotSupported( "When provided completed status or completed at," " you need to provide conclusion.", ) self.raw_check_run.edit( status=value_or_NotSet(status.name if status else None), conclusion=value_or_NotSet(conclusion.name if conclusion else None), completed_at=value_or_NotSet(completed_at), )
Changes the status of the check run and checks the validity of new state.
Args
status
-
Status of the check run to be set. If set to completed, you must provide conclusion.
Defaults to
None
. completed_at
-
Timestamp of completion of the check run. If set, you must provide conclusion.
Defaults to
None
. conclusion
-
Conclusion/result of the check run. If only conclusion is set, status is automatically set to completed.
Defaults to
None
.
Raises
OperationNotSupported, if given completed or timestamp of completed without conclusion.
class GithubIssue (raw_issue: github.Issue.Issue, project: ogr_github.GithubProject)
-
Expand source code
class GithubIssue(BaseIssue): raw_issue: _GithubIssue def __init__( self, raw_issue: _GithubIssue, project: "ogr_github.GithubProject", ) -> None: if raw_issue.pull_request: raise GithubAPIException( f"Requested issue #{raw_issue.number} is a pull request", ) super().__init__(raw_issue=raw_issue, project=project) @property def title(self) -> str: return self._raw_issue.title @title.setter def title(self, new_title: str) -> None: self._raw_issue.edit(title=new_title) @property def id(self) -> int: return self._raw_issue.number @property def status(self) -> IssueStatus: return IssueStatus[self._raw_issue.state] @property def url(self) -> str: return self._raw_issue.html_url @property def assignees(self) -> list: return self._raw_issue.assignees @property def description(self) -> str: return self._raw_issue.body @description.setter def description(self, new_description: str) -> None: self._raw_issue.edit(body=new_description) @property def author(self) -> str: return self._raw_issue.user.login @property def created(self) -> datetime.datetime: return self._raw_issue.created_at @property def labels(self) -> list[IssueLabel]: return [ GithubIssueLabel(raw_label, self) for raw_label in self._raw_issue.get_labels() ] def __str__(self) -> str: return "Github" + super().__str__() @staticmethod def create( project: "ogr_github.GithubProject", title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list] = None, ) -> "Issue": if private: raise OperationNotSupported("Private issues are not supported by Github") if not project.has_issues: raise IssueTrackerDisabled() github_issue = project.github_repo.create_issue( title=title, body=body, labels=labels or [], assignees=assignees or [], ) return GithubIssue(github_issue, project) @staticmethod def get(project: "ogr_github.GithubProject", issue_id: int) -> "Issue": if not project.has_issues: raise IssueTrackerDisabled() try: issue = project.github_repo.get_issue(number=issue_id) except github.UnknownObjectException as ex: raise GithubAPIException(f"No issue with id {issue_id} found") from ex return GithubIssue(issue, project) @staticmethod def get_list( project: "ogr_github.GithubProject", status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list["Issue"]: if not project.has_issues: raise IssueTrackerDisabled() parameters: dict[str, Union[str, list[str]]] = { "state": status.name, "sort": "updated", "direction": "desc", } if author: parameters["creator"] = author if assignee: parameters["assignee"] = assignee if labels: parameters["labels"] = [ project.github_repo.get_label(label) for label in labels ] issues = project.github_repo.get_issues(**parameters) try: return [ GithubIssue(issue, project) for issue in issues if not issue.pull_request ] except UnknownObjectException: return [] def _get_all_comments(self) -> list[IssueComment]: return [ GithubIssueComment(parent=self, raw_comment=raw_comment) for raw_comment in self._raw_issue.get_comments() ] def comment(self, body: str) -> IssueComment: comment = self._raw_issue.create_comment(body) return GithubIssueComment(parent=self, raw_comment=comment) def close(self) -> "Issue": self._raw_issue.edit(state="closed") return self def add_label(self, *labels: str) -> None: for label in labels: self._raw_issue.add_to_labels(label) def add_assignee(self, *assignees: str) -> None: try: self._raw_issue.edit(assignees=list(assignees)) except github.GithubException as ex: raise GithubAPIException("Failed to assign issue, unknown user") from ex def get_comment(self, comment_id: int) -> IssueComment: return GithubIssueComment(self._raw_issue.get_comment(comment_id))
Attributes
project
:GitProject
- Project of the issue.
Ancestors
Class variables
var raw_issue : github.Issue.Issue
Instance variables
prop assignees : list
-
Expand source code
@property def assignees(self) -> list: return self._raw_issue.assignees
Inherited members
class GithubIssueComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)-
Expand source code
class GithubIssueComment(GithubComment, IssueComment): def __str__(self) -> str: return "Github" + super().__str__()
Ancestors
Inherited members
class GithubPRComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)-
Expand source code
class GithubPRComment(GithubComment, PRComment): def __str__(self) -> str: return "Github" + super().__str__()
Ancestors
Inherited members
class GithubProject (repo: str,
service: ogr_github.GithubService,
namespace: str,
github_repo: github.Repository.Repository = None,
read_only: bool = False,
**unprocess_kwargs)-
Expand source code
class GithubProject(BaseGitProject): service: "ogr_github.GithubService" # Permission levels that can merge PRs CAN_MERGE_PERMS: ClassVar[set[str]] = {"admin", "write"} def __init__( self, repo: str, service: "ogr_github.GithubService", namespace: str, github_repo: Repository = None, read_only: bool = False, **unprocess_kwargs, ) -> None: if unprocess_kwargs: logger.warning( f"GithubProject will not process these kwargs: {unprocess_kwargs}", ) super().__init__(repo, service, namespace) self._github_repo = github_repo self.read_only = read_only self._github_instance = None @property def github_instance(self): if not self._github_instance: self._github_instance = self.service.get_pygithub_instance( self.namespace, self.repo, ) return self._github_instance @property def github_repo(self): if not self._github_repo: self._github_repo = self.github_instance.get_repo( full_name_or_id=f"{self.namespace}/{self.repo}", ) # Handle possible 301 if ( self._github_repo.owner.login != self.namespace or self._github_repo.name != self.repo ): (self.namespace, self.repo) = ( self._github_repo.owner.login, self._github_repo.name, ) return self._github_repo def __str__(self) -> str: return f'GithubProject(namespace="{self.namespace}", repo="{self.repo}")' def __eq__(self, o: object) -> bool: if not isinstance(o, GithubProject): return False return ( self.repo == o.repo and self.namespace == o.namespace and self.service == o.service and self.read_only == o.read_only ) @property def description(self) -> str: return self.github_repo.description @description.setter def description(self, new_description: str) -> None: self.github_repo.edit(description=new_description) @property def has_issues(self) -> bool: return self.github_repo.has_issues def _construct_fork_project(self) -> Optional["GithubProject"]: gh_user = self.github_instance.get_user() user_login = gh_user.login try: project = GithubProject( self.repo, self.service, namespace=user_login, read_only=self.read_only, ) if not project.github_repo: # The github_repo attribute is lazy. return None return project except github.GithubException as ex: logger.debug(f"Project {user_login}/{self.repo} does not exist: {ex}") return None def exists(self) -> bool: try: _ = self.github_repo return True except UnknownObjectException as ex: if "Not Found" in str(ex): return False raise GithubAPIException from ex def is_private(self) -> bool: return self.github_repo.private def is_forked(self) -> bool: return bool(self._construct_fork_project()) @property def is_fork(self) -> bool: return self.github_repo.fork @property def parent(self) -> Optional["GithubProject"]: return ( self.service.get_project_from_github_repository(self.github_repo.parent) if self.is_fork else None ) @property def default_branch(self): return self.github_repo.default_branch def get_branches(self) -> list[str]: return [branch.name for branch in self.github_repo.get_branches()] def get_commits(self, ref: Optional[str] = None) -> list[str]: ref = ref or self.github_repo.default_branch return [commit.sha for commit in self.github_repo.get_commits(sha=ref)] def get_description(self) -> str: return self.github_repo.description def add_user(self, user: str, access_level: AccessLevel) -> None: access_dict = { AccessLevel.pull: "Pull", AccessLevel.triage: "Triage", AccessLevel.push: "Push", AccessLevel.admin: "Admin", AccessLevel.maintain: "Maintain", } try: invitation = self.github_repo.add_to_collaborators( user, permission=access_dict[access_level], ) except Exception as ex: raise GithubAPIException(f"User {user} not found") from ex if invitation is None: raise GithubAPIException("User already added") def request_access(self): raise OperationNotSupported("Not possible on GitHub") def get_fork(self, create: bool = True) -> Optional["GithubProject"]: username = self.service.user.get_username() for fork in self.get_forks(): if fork.github_repo.owner.login == username: return fork if not self.is_forked(): if create: return self.fork_create() logger.info( f"Fork of {self.github_repo.full_name}" " does not exist and we were asked not to create it.", ) return None return self._construct_fork_project() def get_owners(self) -> list[str]: # in case of github, repository has only one owner return [self.github_repo.owner.login] def __get_collaborators(self) -> set[str]: collaborators = self._get_collaborators_with_permission() usernames = [] for login, permission in collaborators.items(): if permission in self.CAN_MERGE_PERMS: usernames.append(login) return set(usernames) def who_can_close_issue(self) -> set[str]: return self.__get_collaborators() def who_can_merge_pr(self) -> set[str]: return self.__get_collaborators() def can_merge_pr(self, username) -> bool: return ( self.github_repo.get_collaborator_permission(username) in self.CAN_MERGE_PERMS ) def _get_collaborators_with_permission(self) -> dict: """ Get all project collaborators in dictionary with permission association. Returns: Dictionary with logins of collaborators and their permission level. """ collaborators = {} users = self.github_repo.get_collaborators() for user in users: permission = self.github_repo.get_collaborator_permission(user) collaborators[user.login] = permission return collaborators @indirect(GithubIssue.get_list) def get_issue_list( self, status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list[Issue]: pass @indirect(GithubIssue.get) def get_issue(self, issue_id: int) -> Issue: pass @indirect(GithubIssue.create) def create_issue( self, title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list[str]] = None, ) -> Issue: pass def delete(self) -> None: self.github_repo.delete() @indirect(GithubPullRequest.get_list) def get_pr_list(self, status: PRStatus = PRStatus.open) -> list[PullRequest]: pass @indirect(GithubPullRequest.get) def get_pr(self, pr_id: int) -> PullRequest: pass def get_sha_from_tag(self, tag_name: str) -> str: # TODO: This is ugly. Can we do it better? all_tags = self.github_repo.get_tags() for tag in all_tags: if tag.name == tag_name: return tag.commit.sha raise GithubAPIException(f"Tag {tag_name} was not found.") def get_tag_from_tag_name(self, tag_name: str) -> Optional[GitTag]: """ Get a tag based on a tag name. Args: tag_name: Name of the tag. Returns: GitTag associated with the given tag name or `None`. """ all_tags = self.github_repo.get_tags() for tag in all_tags: if tag.name == tag_name: return GitTag(name=tag.name, commit_sha=tag.commit.sha) return None @if_readonly(return_function=GitProjectReadOnly.create_pr) @indirect(GithubPullRequest.create) def create_pr( self, title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> PullRequest: pass @if_readonly( return_function=GitProjectReadOnly.commit_comment, log_message="Create Comment to commit", ) def commit_comment( self, commit: str, body: str, filename: Optional[str] = None, row: Optional[int] = None, ) -> CommitComment: github_commit: Commit = self.github_repo.get_commit(commit) if filename and row: comment = github_commit.create_comment( body=body, position=row, path=filename, ) else: comment = github_commit.create_comment(body=body) return self._commit_comment_from_github_object(comment) @staticmethod def _commit_comment_from_github_object( raw_commit_coment: _GithubCommitComment, ) -> CommitComment: return GithubCommitComment( raw_comment=raw_commit_coment, sha=raw_commit_coment.commit_id, ) def get_commit_comments(self, commit: str) -> list[CommitComment]: github_commit: Commit = self.github_repo.get_commit(commit) return [ self._commit_comment_from_github_object(comment) for comment in github_commit.get_comments() ] def get_commit_comment(self, commit_sha: str, comment_id: int) -> CommitComment: return self._commit_comment_from_github_object( self.github_repo.get_comment(comment_id), ) @if_readonly( return_function=GitProjectReadOnly.set_commit_status, log_message="Create a status on a commit", ) @indirect(GithubCommitFlag.set) def set_commit_status( self, commit: str, state: Union[CommitStatus, str], target_url: str, description: str, context: str, trim: bool = False, ): pass @indirect(GithubCommitFlag.get) def get_commit_statuses(self, commit: str) -> list[CommitFlag]: pass @indirect(GithubCheckRun.get) def get_check_run( self, check_run_id: Optional[int] = None, commit_sha: Optional[str] = None, ) -> Optional["GithubCheckRun"]: pass @indirect(GithubCheckRun.create) def create_check_run( self, name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[GithubCheckRunOutput] = None, actions: Optional[list[dict[str, str]]] = None, ) -> "GithubCheckRun": pass @indirect(GithubCheckRun.get_list) def get_check_runs( self, commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None, ) -> list["GithubCheckRun"]: pass def get_git_urls(self) -> dict[str, str]: return {"git": self.github_repo.clone_url, "ssh": self.github_repo.ssh_url} @if_readonly(return_function=GitProjectReadOnly.fork_create) def fork_create(self, namespace: Optional[str] = None) -> "GithubProject": fork_repo = ( self.github_repo.create_fork(organization=namespace) if namespace else self.github_repo.create_fork() ) fork = self.service.get_project_from_github_repository(fork_repo) logger.debug(f"Forked to {fork.namespace}/{fork.repo}") return fork def change_token(self, new_token: str): raise OperationNotSupported def get_file_content(self, path: str, ref=None) -> str: ref = ref or self.default_branch try: return self.github_repo.get_contents( path=path, ref=ref, ).decoded_content.decode() except (UnknownObjectException, GithubException) as ex: if ex.status == 404: raise FileNotFoundError(f"File '{path}' on {ref} not found") from ex raise GithubAPIException() from ex def get_files( self, ref: Optional[str] = None, filter_regex: Optional[str] = None, recursive: bool = False, ) -> list[str]: ref = ref or self.default_branch paths = [] contents = self.github_repo.get_contents(path="", ref=ref) if recursive: while contents: file_content = contents.pop(0) if file_content.type == "dir": contents.extend( self.github_repo.get_contents(path=file_content.path, ref=ref), ) else: paths.append(file_content.path) else: paths = [ file_content.path for file_content in contents if file_content.type != "dir" ] if filter_regex: paths = filter_paths(paths, filter_regex) return paths def get_labels(self): """ Get list of labels in the repository. Returns: List of labels in the repository. """ return list(self.github_repo.get_labels()) def update_labels(self, labels): """ Update the labels of the repository. (No deletion, only add not existing ones.) Args: labels: List of labels to be added. Returns: Number of added labels. """ current_label_names = [la.name for la in list(self.github_repo.get_labels())] changes = 0 for label in labels: if label.name not in current_label_names: color = self._normalize_label_color(color=label.color) self.github_repo.create_label( name=label.name, color=color, description=label.description or "", ) changes += 1 return changes @staticmethod def _normalize_label_color(color): if color.startswith("#"): return color[1:] return color @indirect(GithubRelease.get) def get_release(self, identifier=None, name=None, tag_name=None) -> GithubRelease: pass @indirect(GithubRelease.get_latest) def get_latest_release(self) -> Optional[GithubRelease]: pass @indirect(GithubRelease.get_list) def get_releases(self) -> list[Release]: pass @indirect(GithubRelease.create) def create_release(self, tag: str, name: str, message: str) -> GithubRelease: pass def get_forks(self) -> list["GithubProject"]: return [ self.service.get_project_from_github_repository(fork) for fork in self.github_repo.get_forks() if fork.owner ] def get_web_url(self) -> str: return self.github_repo.html_url def get_tags(self) -> list["GitTag"]: return [GitTag(tag.name, tag.commit.sha) for tag in self.github_repo.get_tags()] def get_sha_from_branch(self, branch: str) -> Optional[str]: try: return self.github_repo.get_branch(branch).commit.sha except GithubException as ex: if ex.status == 404: return None raise GithubAPIException from ex def get_contributors(self) -> set[str]: """ Returns: Logins of contributors to the project. """ return {c.login for c in self.github_repo.get_contributors()} def users_with_write_access(self) -> set[str]: return self.__get_collaborators()
Args
repo
- Name of the project.
service
- GitService instance.
namespace
-
Namespace of the project.
- GitHub: username or org name.
- GitLab: username or org name.
- Pagure: namespace (e.g.
"rpms"
).
In case of forks:
"fork/{username}/{namespace}"
.
Ancestors
Class variables
var CAN_MERGE_PERMS : ClassVar[set[str]]
var service : GithubService
Instance variables
prop github_instance
-
Expand source code
@property def github_instance(self): if not self._github_instance: self._github_instance = self.service.get_pygithub_instance( self.namespace, self.repo, ) return self._github_instance
prop github_repo
-
Expand source code
@property def github_repo(self): if not self._github_repo: self._github_repo = self.github_instance.get_repo( full_name_or_id=f"{self.namespace}/{self.repo}", ) # Handle possible 301 if ( self._github_repo.owner.login != self.namespace or self._github_repo.name != self.repo ): (self.namespace, self.repo) = ( self._github_repo.owner.login, self._github_repo.name, ) return self._github_repo
Methods
def create_check_run(self,
name: str,
commit_sha: str,
url: str | None = None,
external_id: str | None = None,
status: GithubCheckRunStatus = GithubCheckRunStatus.queued,
started_at: datetime.datetime | None = None,
conclusion: GithubCheckRunResult | None = None,
completed_at: datetime.datetime | None = None,
output: dict[str, str | list[dict[str, str | int]]] | None = None,
actions: list[dict[str, str]] | None = None) ‑> GithubCheckRun-
Expand source code
@indirect(GithubCheckRun.create) def create_check_run( self, name: str, commit_sha: str, url: Optional[str] = None, external_id: Optional[str] = None, status: GithubCheckRunStatus = GithubCheckRunStatus.queued, started_at: Optional[datetime.datetime] = None, conclusion: Optional[GithubCheckRunResult] = None, completed_at: Optional[datetime.datetime] = None, output: Optional[GithubCheckRunOutput] = None, actions: Optional[list[dict[str, str]]] = None, ) -> "GithubCheckRun": pass
def get_check_run(self, check_run_id: int | None = None, commit_sha: str | None = None) ‑> GithubCheckRun | None
-
Expand source code
@indirect(GithubCheckRun.get) def get_check_run( self, check_run_id: Optional[int] = None, commit_sha: Optional[str] = None, ) -> Optional["GithubCheckRun"]: pass
def get_check_runs(self,
commit_sha: str,
name: str | None = None,
status: GithubCheckRunStatus | None = None) ‑> list['GithubCheckRun']-
Expand source code
@indirect(GithubCheckRun.get_list) def get_check_runs( self, commit_sha: str, name: Optional[str] = None, status: Optional[GithubCheckRunStatus] = None, ) -> list["GithubCheckRun"]: pass
def get_contributors(self) ‑> set[str]
-
Expand source code
def get_contributors(self) -> set[str]: """ Returns: Logins of contributors to the project. """ return {c.login for c in self.github_repo.get_contributors()}
Returns
Logins of contributors to the project.
def get_labels(self)
-
Expand source code
def get_labels(self): """ Get list of labels in the repository. Returns: List of labels in the repository. """ return list(self.github_repo.get_labels())
Get list of labels in the repository.
Returns
List of labels in the repository.
def get_tag_from_tag_name(self, tag_name: str) ‑> GitTag | None
-
Expand source code
def get_tag_from_tag_name(self, tag_name: str) -> Optional[GitTag]: """ Get a tag based on a tag name. Args: tag_name: Name of the tag. Returns: GitTag associated with the given tag name or `None`. """ all_tags = self.github_repo.get_tags() for tag in all_tags: if tag.name == tag_name: return GitTag(name=tag.name, commit_sha=tag.commit.sha) return None
Get a tag based on a tag name.
Args
tag_name
- Name of the tag.
Returns
GitTag associated with the given tag name or
None
. def update_labels(self, labels)
-
Expand source code
def update_labels(self, labels): """ Update the labels of the repository. (No deletion, only add not existing ones.) Args: labels: List of labels to be added. Returns: Number of added labels. """ current_label_names = [la.name for la in list(self.github_repo.get_labels())] changes = 0 for label in labels: if label.name not in current_label_names: color = self._normalize_label_color(color=label.color) self.github_repo.create_label( name=label.name, color=color, description=label.description or "", ) changes += 1 return changes
Update the labels of the repository. (No deletion, only add not existing ones.)
Args
labels
- List of labels to be added.
Returns
Number of added labels.
Inherited members
BaseGitProject
:add_group
add_user
can_merge_pr
change_token
commit_comment
create_issue
create_pr
create_release
default_branch
delete
description
exists
fork_create
full_repo_name
get_branches
get_commit_comment
get_commit_comments
get_commit_statuses
get_commits
get_description
get_file_content
get_files
get_fork
get_forks
get_git_urls
get_issue
get_issue_info
get_issue_list
get_latest_release
get_owners
get_pr
get_pr_files_diff
get_pr_list
get_release
get_releases
get_sha_from_branch
get_sha_from_tag
get_tags
get_users_with_given_access
get_web_url
has_issues
has_write_access
is_fork
is_forked
is_private
parent
remove_group
remove_user
request_access
set_commit_status
users_with_write_access
which_groups_can_merge_pr
who_can_close_issue
who_can_merge_pr
class GithubPullRequest (raw_pr: Any, project: GitProject)
-
Expand source code
class GithubPullRequest(BasePullRequest): _raw_pr: _GithubPullRequest _target_project: "ogr_github.GithubProject" _source_project: "ogr_github.GithubProject" = None @property def title(self) -> str: return self._raw_pr.title @title.setter def title(self, new_title: str) -> None: self._raw_pr.edit(title=new_title) @property def id(self) -> int: return self._raw_pr.number @property def status(self) -> PRStatus: return ( PRStatus.merged if self._raw_pr.is_merged() else PRStatus[self._raw_pr.state] ) @property def url(self) -> str: return self._raw_pr.html_url @property def description(self) -> str: return self._raw_pr.body @description.setter def description(self, new_description: str) -> None: self._raw_pr.edit(body=new_description) @property def author(self) -> str: return self._raw_pr.user.login @property def source_branch(self) -> str: return self._raw_pr.head.ref @property def target_branch(self) -> str: return self._raw_pr.base.ref @property def created(self) -> datetime.datetime: return self._raw_pr.created_at @property def labels(self) -> list[PRLabel]: return [ GithubPRLabel(raw_label, self) for raw_label in self._raw_pr.get_labels() ] @property def diff_url(self) -> str: return f"{self._raw_pr.html_url}/files" @property def patch(self) -> bytes: response = requests.get(self._raw_pr.patch_url) if not response.ok: cls = OgrNetworkError if response.status_code >= 500 else GithubAPIException raise cls( f"Couldn't get patch from {self._raw_pr.patch_url} because {response.reason}.", ) return response.content @property def commits_url(self) -> str: return f"{self._raw_pr.html_url}/commits" @property def head_commit(self) -> str: return self._raw_pr.head.sha @property def merge_commit_sha(self) -> str: return self._raw_pr.merge_commit_sha @property def merge_commit_status(self) -> MergeCommitStatus: if self._raw_pr.mergeable: return MergeCommitStatus.can_be_merged return MergeCommitStatus.cannot_be_merged @property def source_project(self) -> "ogr_github.GithubProject": if self._source_project is None: self._source_project = ( self._target_project.service.get_project_from_github_repository( self._raw_pr.head.repo, ) ) return self._source_project def __str__(self) -> str: return "Github" + super().__str__() @staticmethod def create( project: "ogr_github.GithubProject", title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> "PullRequest": """ The default behavior is the pull request is made to the immediate parent repository if the repository is a forked repository. If you want to create a pull request to the forked repo, please pass the `fork_username` parameter. """ github_repo = project.github_repo target_project = project if project.is_fork and fork_username is None: logger.warning(f"{project.full_repo_name} is fork, ignoring fork_repo.") source_branch = f"{project.namespace}:{source_branch}" github_repo = project.parent.github_repo target_project = project.parent elif fork_username: source_branch = f"{fork_username}:{source_branch}" if fork_username != project.namespace and project.parent is not None: github_repo = GithubPullRequest.__get_fork( fork_username, project.parent.github_repo, ) created_pr = github_repo.create_pull( title=title, body=body, base=target_branch, head=source_branch, ) logger.info(f"PR {created_pr.id} created: {target_branch}<-{source_branch}") return GithubPullRequest(created_pr, target_project) @staticmethod def __get_fork(fork_username: str, repo: _GithubRepository) -> _GithubRepository: forks = list( filter(lambda fork: fork.owner.login == fork_username, repo.get_forks()), ) if not forks: raise GithubAPIException("Requested fork doesn't exist") return forks[0] @staticmethod def get(project: "ogr_github.GithubProject", pr_id: int) -> "PullRequest": try: pr = project.github_repo.get_pull(number=pr_id) except github.UnknownObjectException as ex: raise GithubAPIException(f"No pull request with id {pr_id} found") from ex return GithubPullRequest(pr, project) @staticmethod def get_list( project: "ogr_github.GithubProject", status: PRStatus = PRStatus.open, ) -> list["PullRequest"]: prs = project.github_repo.get_pulls( # Github API has no status 'merged', just 'closed'/'opened'/'all' state=status.name if status != PRStatus.merged else "closed", sort="updated", direction="desc", ) if status == PRStatus.merged: prs = list(prs) # Github PaginatedList into list() for pr in prs: if not pr.is_merged(): # parse merged PRs prs.remove(pr) try: return [GithubPullRequest(pr, project) for pr in prs] except UnknownObjectException: return [] def update_info( self, title: Optional[str] = None, description: Optional[str] = None, ) -> "PullRequest": try: self._raw_pr.edit(title=title, body=description) logger.info(f"PR updated: {self._raw_pr.url}") return self except Exception as ex: raise GithubAPIException("there was an error while updating the PR") from ex def _get_all_comments(self) -> list[PRComment]: return [ GithubPRComment(parent=self, raw_comment=raw_comment) for raw_comment in self._raw_pr.get_issue_comments() ] def get_all_commits(self) -> list[str]: return [commit.sha for commit in self._raw_pr.get_commits()] def comment( self, body: str, commit: Optional[str] = None, filename: Optional[str] = None, row: Optional[int] = None, ) -> "PRComment": comment: Union[_GithubIssueComment, _GithubPullRequestComment] = None if not any([commit, filename, row]): comment = self._raw_pr.create_issue_comment(body) else: github_commit = self._target_project.github_repo.get_commit(commit) comment = self._raw_pr.create_comment(body, github_commit, filename, row) return GithubPRComment(parent=self, raw_comment=comment) def close(self) -> "PullRequest": self._raw_pr.edit(state=PRStatus.closed.name) return self def merge(self) -> "PullRequest": self._raw_pr.merge() return self def add_label(self, *labels: str) -> None: for label in labels: self._raw_pr.add_to_labels(label) def get_comment(self, comment_id: int) -> PRComment: return GithubPRComment(self._raw_pr.get_issue_comment(comment_id))
Attributes
project
:GitProject
- Project of the pull request.
Ancestors
Static methods
def create(project: ogr_github.GithubProject,
title: str,
body: str,
target_branch: str,
source_branch: str,
fork_username: str | None = None) ‑> PullRequest-
Expand source code
@staticmethod def create( project: "ogr_github.GithubProject", title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> "PullRequest": """ The default behavior is the pull request is made to the immediate parent repository if the repository is a forked repository. If you want to create a pull request to the forked repo, please pass the `fork_username` parameter. """ github_repo = project.github_repo target_project = project if project.is_fork and fork_username is None: logger.warning(f"{project.full_repo_name} is fork, ignoring fork_repo.") source_branch = f"{project.namespace}:{source_branch}" github_repo = project.parent.github_repo target_project = project.parent elif fork_username: source_branch = f"{fork_username}:{source_branch}" if fork_username != project.namespace and project.parent is not None: github_repo = GithubPullRequest.__get_fork( fork_username, project.parent.github_repo, ) created_pr = github_repo.create_pull( title=title, body=body, base=target_branch, head=source_branch, ) logger.info(f"PR {created_pr.id} created: {target_branch}<-{source_branch}") return GithubPullRequest(created_pr, target_project)
The default behavior is the pull request is made to the immediate parent repository if the repository is a forked repository. If you want to create a pull request to the forked repo, please pass the
fork_username
parameter.
Inherited members
BasePullRequest
:add_label
author
close
closed_by
comment
commits_url
created
description
diff_url
get
get_all_commits
get_comment
get_list
head_commit
id
labels
merge
merge_commit_sha
merge_commit_status
patch
source_branch
source_project
status
target_branch
target_branch_head_commit
target_project
title
update_info
url
PullRequest
:
class GithubRelease (raw_release: Any, project: GitProject)
-
Expand source code
class GithubRelease(Release): _raw_release: PyGithubRelease project: "ogr_github.GithubProject" @staticmethod def _release_id_from_name( project: "ogr_github.GithubProject", name: str, ) -> Optional[int]: releases = project.github_repo.get_releases() for release in releases: if release.title == name: return release.id return None @staticmethod def _release_id_from_tag( project: "ogr_github.GithubProject", tag: str, ) -> Optional[int]: releases = project.github_repo.get_releases() for release in releases: if release.tag_name == tag: return release.id return None @property def title(self): return self._raw_release.title @property def body(self): return self._raw_release.body @property def git_tag(self) -> GitTag: return self.project.get_tag_from_tag_name(self.tag_name) @property def tag_name(self) -> str: return self._raw_release.tag_name @property def url(self) -> Optional[str]: return self._raw_release.html_url @property def created_at(self) -> datetime.datetime: return self._raw_release.created_at @property def tarball_url(self) -> str: return self._raw_release.tarball_url def __str__(self) -> str: return "Github" + super().__str__() @staticmethod def get( project: "ogr_github.GithubProject", identifier: Optional[int] = None, name: Optional[str] = None, tag_name: Optional[str] = None, ) -> "Release": if tag_name: identifier = GithubRelease._release_id_from_tag(project, tag_name) elif name: identifier = GithubRelease._release_id_from_name(project, name) if identifier is None: raise GithubAPIException("Release was not found.") release = project.github_repo.get_release(id=identifier) return GithubRelease(release, project) @staticmethod def get_latest(project: "ogr_github.GithubProject") -> Optional["Release"]: try: release = project.github_repo.get_latest_release() return GithubRelease(release, project) except GithubException as ex: if ex.status == 404: return None raise GithubAPIException from ex @staticmethod def get_list(project: "ogr_github.GithubProject") -> list["Release"]: releases = project.github_repo.get_releases() return [GithubRelease(release, project) for release in releases] @staticmethod def create( project: "ogr_github.GithubProject", tag: str, name: str, message: str, ref: Optional[str] = None, ) -> "Release": created_release = project.github_repo.create_git_release( tag=tag, name=name, message=message, ) return GithubRelease(created_release, project) def edit_release(self, name: str, message: str) -> None: """ Edit name and message of a release. Args: name: New name of the release. message: New message for the release. """ self._raw_release = self._raw_release.update_release(name=name, message=message)
Object that represents release.
Attributes
project
:GitProject
- Project on which the release is created.
Ancestors
Class variables
var project : GithubProject
Methods
def edit_release(self, name: str, message: str) ‑> None
-
Expand source code
def edit_release(self, name: str, message: str) -> None: """ Edit name and message of a release. Args: name: New name of the release. message: New message for the release. """ self._raw_release = self._raw_release.update_release(name=name, message=message)
Edit name and message of a release.
Args
name
- New name of the release.
message
- New message for the release.
Inherited members
class GithubService (token=None,
read_only=False,
github_app_id: str | None = None,
github_app_private_key: str | None = None,
github_app_private_key_path: str | None = None,
tokman_instance_url: str | None = None,
github_authentication: GithubAuthentication = None,
max_retries: int | urllib3.util.retry.Retry = 1,
**kwargs)-
Expand source code
@use_for_service("github.com") class GithubService(BaseGitService): # class parameter could be used to mock Github class api github_class: type[github.Github] instance_url = "https://github.com" def __init__( self, token=None, read_only=False, github_app_id: Optional[str] = None, github_app_private_key: Optional[str] = None, github_app_private_key_path: Optional[str] = None, tokman_instance_url: Optional[str] = None, github_authentication: GithubAuthentication = None, max_retries: Union[int, Retry] = 1, **kwargs, ): """ If multiple authentication methods are provided, they are prioritised: 1. Tokman 2. GithubApp 3. TokenAuthentication (which is also default one, that works without specified token) """ super().__init__() self.read_only = read_only self._default_auth_method = github_authentication self._other_auth_method: GithubAuthentication = None self._auth_methods: dict[AuthMethod, GithubAuthentication] = {} if isinstance(max_retries, Retry): self._max_retries = max_retries else: self._max_retries = Retry( total=int(max_retries), # Retry mechanism active for these HTTP methods: allowed_methods=["DELETE", "GET", "PATCH", "POST", "PUT"], # Only retry on following HTTP status codes status_forcelist=[500, 503, 403, 401], raise_on_status=True, ) if not self._default_auth_method: self.__set_authentication( token=token, github_app_id=github_app_id, github_app_private_key=github_app_private_key, github_app_private_key_path=github_app_private_key_path, tokman_instance_url=tokman_instance_url, max_retries=self._max_retries, ) if kwargs: logger.warning(f"Ignored keyword arguments: {kwargs}") def __set_authentication(self, **kwargs): auth_methods = [ (Tokman, AuthMethod.tokman), (GithubApp, AuthMethod.github_app), (TokenAuthentication, AuthMethod.token), ] for auth_class, auth_name in auth_methods: auth_inst = auth_class.try_create(**kwargs) self._auth_methods[auth_name] = auth_inst if not self._default_auth_method: self._default_auth_method = auth_inst return None if self._default_auth_method else TokenAuthentication(None) def set_auth_method(self, method: AuthMethod): if self._auth_methods[method]: logger.info("Forced Github auth method to %s", method) self._other_auth_method = self._auth_methods[method] else: raise GithubAPIException( f"Choosen authentication method ({method}) is not available", ) def reset_auth_method(self): logger.info("Reset Github auth method to the default") self._other_auth_method = None @property def authentication(self): return self._other_auth_method or self._default_auth_method @property def github(self): return self.authentication.pygithub_instance def __str__(self) -> str: readonly_str = ", read_only=True" if self.read_only else "" arguments = f", github_authentication={self.authentication!s}{readonly_str}" if arguments: # remove the first '- ' arguments = arguments[2:] return f"GithubService({arguments})" def __eq__(self, o: object) -> bool: if not issubclass(o.__class__, GithubService): return False return ( self.read_only == o.read_only # type: ignore and self.authentication == o.authentication # type: ignore ) def __hash__(self) -> int: return hash(str(self)) def get_project( self, repo=None, namespace=None, is_fork=False, **kwargs, ) -> "GithubProject": if is_fork: namespace = self.user.get_username() return GithubProject( repo=repo, namespace=namespace, service=self, read_only=self.read_only, **kwargs, ) def get_project_from_github_repository( self, github_repo: PyGithubRepository.Repository, ) -> "GithubProject": return GithubProject( repo=github_repo.name, namespace=github_repo.owner.login, github_repo=github_repo, service=self, read_only=self.read_only, ) @property def user(self) -> GitUser: return GithubUser(service=self) def change_token(self, new_token: str) -> None: self._default_auth_method = TokenAuthentication(new_token) def project_create( self, repo: str, namespace: Optional[str] = None, description: Optional[str] = None, ) -> "GithubProject": if namespace: try: owner = self.github.get_organization(namespace) except UnknownObjectException as ex: raise GithubAPIException(f"Group {namespace} not found.") from ex else: owner = self.github.get_user() try: new_repo = owner.create_repo( name=repo, description=description if description else github.GithubObject.NotSet, ) except github.GithubException as ex: raise GithubAPIException("Project creation failed") from ex return GithubProject( repo=repo, namespace=namespace or owner.login, service=self, github_repo=new_repo, ) def get_pygithub_instance(self, namespace: str, repo: str) -> PyGithubInstance: token = self.authentication.get_token(namespace, repo) return PyGithubInstance(login_or_token=token, retry=self._max_retries) def list_projects( self, namespace: Optional[str] = None, user: Optional[str] = None, search_pattern: Optional[str] = None, language: Optional[str] = None, ) -> list[GitProject]: search_query = "" if user: search_query += f"user:{user}" if language: search_query += f" language:{language}" projects: list[GitProject] projects = [ GithubProject( repo=repo.name, namespace=repo.owner.login, github_repo=repo, service=self, ) for repo in self.github.search_repositories(search_query, order="asc") ] if search_pattern: projects = [ project for project in projects if re.search(search_pattern, project.repo) ] return projects
Attributes
instance_url
:str
- URL of the git forge instance.
If multiple authentication methods are provided, they are prioritised: 1. Tokman 2. GithubApp 3. TokenAuthentication (which is also default one, that works without specified token)
Ancestors
Class variables
var github_class : type[github.MainClass.Github]
var instance_url : str | None
Instance variables
prop authentication
-
Expand source code
@property def authentication(self): return self._other_auth_method or self._default_auth_method
prop github
-
Expand source code
@property def github(self): return self.authentication.pygithub_instance
Methods
def get_project_from_github_repository(self, github_repo: github.Repository.Repository) ‑> GithubProject
-
Expand source code
def get_project_from_github_repository( self, github_repo: PyGithubRepository.Repository, ) -> "GithubProject": return GithubProject( repo=github_repo.name, namespace=github_repo.owner.login, github_repo=github_repo, service=self, read_only=self.read_only, )
def get_pygithub_instance(self, namespace: str, repo: str) ‑> github.MainClass.Github
-
Expand source code
def get_pygithub_instance(self, namespace: str, repo: str) -> PyGithubInstance: token = self.authentication.get_token(namespace, repo) return PyGithubInstance(login_or_token=token, retry=self._max_retries)
Inherited members
class GithubUser (service: ogr_github.GithubService)
-
Expand source code
class GithubUser(BaseGitUser): service: "ogr_github.GithubService" def __init__(self, service: "ogr_github.GithubService") -> None: super().__init__(service=service) def __str__(self) -> str: return f'GithubUser(username="{self.get_username()}")' @property def _github_user(self): return self.service.github.get_user() def get_username(self) -> str: return self.service.github.get_user().login def get_email(self) -> Optional[str]: user_email_property = self.service.github.get_user().email if user_email_property: return user_email_property user_emails = self.service.github.get_user().get_emails() if not user_emails: return None # To work around the braking change introduced by pygithub==1.55 # https://pygithub.readthedocs.io/en/latest/changes.html#version-1-55-april-26-2021 if isinstance(user_emails[0], dict): EmailData = namedtuple("EmailData", user_emails[0].keys()) # type: ignore for email in user_emails: if "EmailData" in locals(): email = EmailData(**email) # type: ignore if email.primary: return email.email # Return the first email we received return user_emails[0]["email"] def get_projects(self) -> list["ogr_github.GithubProject"]: raw_repos = self._github_user.get_repos(affiliation="owner") return [ GithubProject( repo=repo.name, namespace=repo.owner.login, github_repo=repo, service=self.service, ) for repo in raw_repos ] def get_forks(self) -> list["ogr_github.GithubProject"]: return [project for project in self.get_projects() if project.github_repo.fork]
Represents currently authenticated user through service.
Ancestors
Class variables
var service : GithubService
Inherited members