Module ogr.services.forgejo
Sub-modules
ogr.services.forgejo.commentsogr.services.forgejo.flagogr.services.forgejo.issueogr.services.forgejo.labelogr.services.forgejo.projectogr.services.forgejo.pull_requestogr.services.forgejo.releaseogr.services.forgejo.serviceogr.services.forgejo.userogr.services.forgejo.utils
Classes
class ForgejoComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)-
Expand source code
class ForgejoComment(Comment): def _from_raw_comment(self, raw_comment: _ForgejoComment) -> None: self._raw_comment = raw_comment self._id = raw_comment.id self._author = raw_comment.user.login self._created = raw_comment.created_at self._edited = raw_comment.updated_at @property def body(self) -> str: return self._raw_comment.body @body.setter def body(self, new_body: str) -> None: self._raw_comment = self._client.issue.edit_comment( owner=self._parent.project.namespace, repo=self._parent.project.repo, id=self._id, body=new_body, ) @property def edited(self) -> datetime.datetime: return self._edited @property def _client(self): return self._parent.project.service.api def get_reactions(self) -> list[Reaction]: try: reactions = self._client.issue.get_comment_reactions( owner=self._parent.project.namespace, repo=self._parent.project.repo, id=self._id, ) # in case no reactions exist: # re-recording causes pyforgejo to raise pydantic's ValidationError # re-running tests causes pyforgejo's ApiError to be raised instead # both need to be caught except (ValidationError, ApiError): return [] return [ ForgejoReaction(raw_reaction=reaction, parent=self) for reaction in reactions ] def add_reaction(self, reaction: str) -> Reaction: raw_reaction = self._client.issue.post_comment_reaction( owner=self._parent.project.namespace, repo=self._parent.project.repo, id=self._id, content=reaction, ) return ForgejoReaction(raw_reaction=raw_reaction, parent=self)Ancestors
Subclasses
Inherited members
class ForgejoIssue (raw_issue: pyforgejo.types.issue.Issue, project: forgejo.ForgejoProject)-
Expand source code
class ForgejoIssue(BaseIssue): project: "forgejo.ForgejoProject" def __init__(self, raw_issue: _issue, project: "forgejo.ForgejoProject"): if raw_issue.pull_request: raise ForgejoAPIException( f"Requested issue #{raw_issue.number} is a pull request", ) super().__init__(raw_issue, project) @property def api(self): """Returns the issue API client from pyforgejo.""" return self.project.service.api.issue def partial_api(self, method, /, *args, **kwargs): """Returns a partial API call for ForgejoIssue. Injects owner and repo parameters for the calls to issue API endpoints. Args: method: Specific method on the Pyforgejo API that is to be wrapped. *args: Positional arguments that get injected into every call. **kwargs: Keyword-arguments that get injected into every call. Returns: Callable with pre-injected parameters. """ params = {"owner": self.project.namespace, "repo": self.project.repo} return partial(method, *args, **kwargs, **params) def __update_info(self) -> None: """Refresh the local issue object with the latest data from the server.""" self._raw_issue = self.partial_api(self.api.get_issue)(index=self.id) @property def title(self) -> str: return self._raw_issue.title @title.setter def title(self, new_title: str) -> None: self._raw_issue = self.partial_api(self.api.edit_issue)( title=new_title, index=self.id, ) @property def id(self) -> int: return self._raw_issue.number @property def url(self) -> str: return self._raw_issue.url @property def description(self) -> str: return self._raw_issue.body @description.setter def description(self, new_description: str): self._raw_issue = self.partial_api(self.api.edit_issue)( body=new_description, index=self.id, ) @property def author(self) -> str: return self._raw_issue.user.login @property def created(self) -> datetime: return self._raw_issue.created_at @property def status(self) -> IssueStatus: return IssueStatus[self._raw_issue.state] @property def assignees(self) -> list[PyforgejoUser]: return self._raw_issue.assignees or [] @property def labels(self) -> list[IssueLabel]: return [ ForgejoIssueLabel(raw_label, self) for raw_label in self._raw_issue.labels ] def __str__(self) -> str: return "Forgejo" + super().__str__() @staticmethod def create( project: "forgejo.ForgejoProject", title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list[str]] = None, ) -> "Issue": if private: raise OperationNotSupported("Private issues are not supported by Forgejo") if not project.has_issues: raise IssueTrackerDisabled() # The API requires ids of labels in the create_issue method # which would lead to having to retrieve existing labels and # needing to find the ids of those we need to add to the issue; # A separate API call would also need to be made to create each # label that does not yet exist, potentially leading to many # API calls and unclear code, so labels are instead added seprately # below after creating a new issue without labels issue = project.service.api.issue.create_issue( owner=project.namespace, repo=project.repo, title=title, body=body, labels=[], assignees=assignees, ) forgejo_issue = ForgejoIssue(issue, project) if labels: forgejo_issue.add_label(*labels) return forgejo_issue @staticmethod def get(project: "forgejo.ForgejoProject", issue_id: int) -> "Issue": if not project.has_issues: raise IssueTrackerDisabled() try: issue = project.service.api.issue.get_issue( owner=project.namespace, repo=project.repo, index=issue_id, ) except NotFoundError as ex: raise ForgejoAPIException(f"Issue {issue_id} not found") from ex return ForgejoIssue(issue, project) @staticmethod def get_list( project: "forgejo.ForgejoProject", status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list["Issue"]: if not project.has_issues: raise IssueTrackerDisabled() parameters: dict[str, Union[str, list[str], bool]] = { "state": status.name, "type": "issues", } if author: parameters["created_by"] = author if assignee: parameters["assigned_by"] = assignee if labels: parameters["labels"] = labels try: return [ ForgejoIssue(issue, project) for issue in paginate( project.service.api.issue.list_issues, owner=project.namespace, repo=project.repo, **parameters, ) ] except NotFoundError as ex: raise ForgejoAPIException("Failed to list issues") from ex def comment(self, body: str) -> IssueComment: comment = self.partial_api(self.api.create_comment)( body=body, index=self.id, ) return ForgejoIssueComment(parent=self, raw_comment=comment) def close(self) -> "Issue": self._raw_issue = self.partial_api(self.api.edit_issue)( state="closed", index=self.id, ) return self def _get_all_comments(self, reverse: bool = False) -> Iterable[IssueComment]: comments = self.partial_api(self.api.get_comments)( index=self.id, ) if reverse: comments = list(reversed(comments)) return ( ForgejoIssueComment(parent=self, raw_comment=raw_comment) for raw_comment in comments ) def get_comment(self, comment_id: int) -> ForgejoIssueComment: return ForgejoIssueComment( self.partial_api(self.api.get_comment)(id=comment_id), parent=self, ) def add_assignee(self, *assignees: str) -> None: current_assignees = [assignee.login for assignee in self.assignees] updated_assignees = set(itertools.chain(current_assignees, assignees)) try: self._raw_issue = self.partial_api(self.api.edit_issue)( assignees=updated_assignees, index=self.id, ) except ApiError as ex: raise ForgejoAPIException( "Failed to assign issue, unknown user", ) from ex def add_label(self, *labels: str) -> None: self.partial_api(self.api.add_label)(labels=labels, index=self.id) self.__update_info()Attributes
project:GitProject- Project of the issue.
Ancestors
Class variables
var project : ForgejoProject
Instance variables
prop api-
Expand source code
@property def api(self): """Returns the issue API client from pyforgejo.""" return self.project.service.api.issueReturns the issue API client from pyforgejo.
prop assignees : list[pyforgejo.types.user.User]-
Expand source code
@property def assignees(self) -> list[PyforgejoUser]: return self._raw_issue.assignees or []
Methods
def partial_api(self, method, /, *args, **kwargs)-
Expand source code
def partial_api(self, method, /, *args, **kwargs): """Returns a partial API call for ForgejoIssue. Injects owner and repo parameters for the calls to issue API endpoints. Args: method: Specific method on the Pyforgejo API that is to be wrapped. *args: Positional arguments that get injected into every call. **kwargs: Keyword-arguments that get injected into every call. Returns: Callable with pre-injected parameters. """ params = {"owner": self.project.namespace, "repo": self.project.repo} return partial(method, *args, **kwargs, **params)Returns a partial API call for ForgejoIssue.
Injects owner and repo parameters for the calls to issue API endpoints.
Args
method- Specific method on the Pyforgejo API that is to be wrapped.
*args- Positional arguments that get injected into every call.
**kwargs- Keyword-arguments that get injected into every call.
Returns
Callable with pre-injected parameters.
Inherited members
class ForgejoIssueComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)-
Expand source code
class ForgejoIssueComment(ForgejoComment, IssueComment): def __str__(self) -> str: return "Forgejo" + super().__str__()Ancestors
Inherited members
class ForgejoPRComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)-
Expand source code
class ForgejoPRComment(ForgejoComment, PRComment): def __str__(self) -> str: return "Forgejo" + super().__str__()Ancestors
Inherited members
class ForgejoProject (repo: str,
service: forgejo.ForgejoService,
namespace: str,
forgejo_repo: pyforgejo.types.repository.Repository | None = None,
**kwargs)-
Expand source code
class ForgejoProject(BaseGitProject): service: "forgejo.ForgejoService" access_dict: ClassVar[dict] = { AccessLevel.pull: "read", AccessLevel.triage: "read", AccessLevel.push: "write", AccessLevel.admin: "admin", AccessLevel.maintain: "owner", None: "", } def __init__( self, repo: str, service: "forgejo.ForgejoService", namespace: str, forgejo_repo: Optional[Repository] = None, **kwargs, ): super().__init__(repo, service, namespace) self._forgejo_repo = forgejo_repo @property def api(self) -> RepositoryClient: """Returns a `RepositoryClient` from pyforgejo. Helper to save some typing. """ return self.service.api.repository def partial_api(self, method, /, *args, **kwargs): """Returns a partial API call for `ForgejoProject`. Injects `owner` and `repo` for the calls to `/repository/` endpoints. Args: method: Specific method on the Pyforgejo API that is to be wrapped. *args: Positional arguments that get injected into every call. **kwargs: Keyword-arguments that get injected into every call. Returns: Callable with pre-injected parameters. """ return partial( method, *args, **kwargs, owner=self.namespace, repo=self.repo, ) @cached_property def forgejo_repo(self) -> types.Repository: return self.api.repo_get( owner=self.namespace, repo=self.repo, ) def __str__(self) -> str: return ( f'ForgejoProject(namespace="{self.namespace}", repo="{self.repo}", ' f"service={self.service})" ) def __eq__(self, o: object) -> bool: return ( isinstance(o, ForgejoProject) and self.repo == o.repo and self.namespace == o.namespace and self.service == o.service ) @property def description(self) -> str: return self.forgejo_repo.description or "" @description.setter def description(self, new_description: str) -> None: self.partial_api(self.api.repo_edit)(description=new_description) def delete(self) -> None: self.partial_api(self.api.repo_delete)() def exists(self) -> bool: try: _ = self.forgejo_repo return True except NotFoundError: return False def is_private(self) -> bool: return self.forgejo_repo.private def is_forked(self) -> bool: return ( self.forgejo_repo.fork and self.forgejo_repo.owner.login == self.service.user.get_username() ) @property def is_fork(self) -> bool: return self.forgejo_repo.fork @property def full_repo_name(self) -> str: return self.forgejo_repo.full_name @property def parent(self) -> Optional["GitProject"]: if not self.forgejo_repo.parent: return None return ForgejoProject( service=self.service, repo=self.forgejo_repo.parent.name, namespace=self.forgejo_repo.parent.owner.username, ) @property def has_issues(self) -> bool: return self.forgejo_repo.has_issues def get_branches(self) -> Iterable[str]: return ( branch.name for branch in paginate( self.partial_api(self.api.repo_list_branches), ) ) @property def default_branch(self) -> str: return self.forgejo_repo.default_branch def get_commits(self, ref: Optional[str] = None) -> Iterable[str]: return ( commit.sha for commit in paginate( self.partial_api( self.api.repo_get_all_commits, sha=ref, ), ) ) def get_description(self) -> str: return self.description def _construct_fork_project(self) -> Optional["ForgejoProject"]: login = self.service.user.get_username() try: project = ForgejoProject( repo=self.repo, service=self.service, namespace=login, ) _ = project.forgejo_repo return project except NotFoundError: return None def get_fork(self, create: bool = True) -> Optional["GitProject"]: # The cheapest check that assumes fork has the same repository name as # the upstream if fork := self._construct_fork_project(): return fork # If not successful, the fork could still exist, but has a custom name username = self.service.user.get_username() for fork in self.get_forks(): if fork.forgejo_repo.owner.login == username: return fork # We have not found any fork owned by the auth'd user if create: return self.fork_create() logger.info( f"Fork of {self.forgejo_repo.full_name}" " does not exist and we were asked not to create it.", ) return None def get_owners(self) -> list[str]: return [self.forgejo_repo.owner.username] def _get_owner_or_org_collaborators(self) -> set[str]: namespace = self.get_owners()[0] try: teams = self.api.repo_list_teams( owner=self.namespace, repo=self.repo, ) except Exception as ex: # no teams, repo owned by regular user if "not owned by an organization" in str(ex): return {namespace} raise # repo owned by org, each org can have multiple teams with # different levels of access collaborators: set[str] = set() for team in teams: members = self.service.api.organization.org_list_team_members(team.id) collaborators.update(user.username for user in members) return collaborators def _get_collaborators(self) -> list[str]: return [ c.username for c in self.api.repo_list_collaborators( owner=self.namespace, repo=self.repo, ) ] + list(self._get_owner_or_org_collaborators()) def _get_collaborators_with_access(self) -> dict[str, str]: return { c: self.api.repo_get_repo_permissions( owner=self.namespace, repo=self.repo, collaborator=c, ).permission for c in self._get_collaborators() } def get_contributors(self) -> set[str]: return set(self._get_collaborators()) def users_with_write_access(self) -> set[str]: return { collaborator for collaborator, access in self._get_collaborators_with_access().items() if access in ("owner", "admin", "write") } def who_can_close_issue(self) -> set[str]: return self.users_with_write_access() def who_can_merge_pr(self) -> set[str]: return self.users_with_write_access() def can_merge_pr(self, username: str) -> bool: return self.api.repo_get_repo_permissions( owner=self.namespace, repo=self.repo, collaborator=username, ).permission in ("owner", "admin", "write") def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]: access_levels_forgejo = [ self.access_dict[access_level] for access_level in access_levels ] return { user for user, permission in self._get_collaborators_with_access().items() if permission in access_levels_forgejo } def add_user(self, user: str, access_level: AccessLevel) -> None: if access_level == AccessLevel.maintain: raise OperationNotSupported("Not possible to add a user as `owner`.") self.api.repo_add_collaborator( owner=self.namespace, repo=self.repo, collaborator=user, permission=self.access_dict[access_level], ) def remove_user(self, user: str) -> None: self.api.repo_delete_collaborator( owner=self.namespace, repo=self.repo, collaborator=user, ) def request_access(self) -> None: raise OperationNotSupported("Not possible on Forgejo") @indirect(ForgejoIssue.get_list) def get_issue_list( self, status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list["Issue"]: pass @indirect(ForgejoIssue.get) def get_issue(self, issue_id: int) -> "Issue": pass @indirect(ForgejoIssue.create) def create_issue( self, title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list[str]] = None, ) -> Issue: pass @indirect(ForgejoPullRequest.get_list) def get_pr_list(self, status: PRStatus = PRStatus.open) -> Iterable["PullRequest"]: pass @indirect(ForgejoPullRequest.get) def get_pr(self, pr_id: int) -> "PullRequest": pass def get_pr_files_diff( self, pr_id: int, retries: int = 0, wait_seconds: int = 3, ) -> dict: """ Get files diff of a pull request. Args: pr_id: ID of the pull request. Returns: Dictionary representing files diff. """ # [NOTE] Implemented only for Pagure, for details see # https://github.com/packit/ogr/issues/895 raise NotImplementedError() def get_tags(self) -> Iterable["GitTag"]: return ( GitTag( name=tag.name, commit_sha=tag.commit.sha, ) for tag in paginate(self.partial_api(self.api.repo_list_tags)) ) def get_sha_from_tag(self, tag_name: str) -> str: return self.partial_api( self.api.repo_get_tag, tag=tag_name, )().commit.sha @indirect(ForgejoRelease.get) def get_release( self, identifier: Optional[int] = None, name: Optional[str] = None, tag_name: Optional[str] = None, ) -> Release: pass @indirect(ForgejoRelease.get_latest) def get_latest_release(self) -> Optional[Release]: pass @indirect(ForgejoRelease.get_list) def get_releases(self) -> list[Release]: pass @indirect(ForgejoRelease.create) def create_release( self, tag: str, name: str, message: str, ref: Optional[str] = None, ) -> Release: pass @indirect(ForgejoPullRequest.create) def create_pr( self, title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> "PullRequest": pass def commit_comment( self, commit: str, body: str, filename: Optional[str] = None, row: Optional[int] = None, ) -> "CommitComment": raise OperationNotSupported("Forgejo doesn't support commit comments") def get_commit_comments(self, commit: str) -> list[CommitComment]: raise OperationNotSupported("Forgejo doesn't support commit comments") def get_commit_comment(self, commit_sha: str, comment_id: int) -> CommitComment: raise OperationNotSupported("Forgejo doesn't support commit comments") @indirect(ForgejoCommitFlag.set) def set_commit_status( self, commit: str, state: Union[CommitStatus, str], target_url: str, description: str, context: str, trim: bool = False, ) -> "CommitFlag": pass @indirect(ForgejoCommitFlag.get) def get_commit_statuses(self, commit: str) -> Iterable["CommitFlag"]: pass def get_git_urls(self) -> dict[str, str]: return { "git": self.forgejo_repo.clone_url, "ssh": self.forgejo_repo.ssh_url, } def fork_create(self, namespace: Optional[str] = None) -> "GitProject": if namespace: self.api.create_fork( owner=self.namespace, repo=self.repo, organization=namespace, ) return ForgejoProject( repo=self.repo, service=self.service, namespace=namespace, ) self.api.create_fork( owner=self.namespace, repo=self.repo, ) return ForgejoProject( repo=self.repo, service=self.service, namespace=self.service.user.get_username(), ) def change_token(self, new_token: str) -> None: # [NOTE] API doesn't provide any method to change the token, and it's # embedded in the httpx client that's wrapped by pyforgejo wrapper to # avoid duplication between sync and async calls… raise NotImplementedError( "Not possible; requires recreation of the httpx client", ) def get_file_content( self, path: str, ref: Optional[str] = None, headers: Optional[dict[str, str]] = None, ) -> str: try: remote_file: types.ContentsResponse = self.partial_api( self.api.repo_get_contents, filepath=path, ref=ref, )() # [NOTE] If you touch this, good luck, have fun… # tl;dr ‹ContentsResponse› from the Pyforgejo contains the content # of the file that's (I hope always) base64-encoded, but it's stored # as a string, so here it's needed to convert the UTF-8 encoded # string back to bytes (duh, cause base64 is used for encoding raw # data), then decode the base64 bytes to just bytes and then decode # those to a UTF-8 string… EWWW… return codecs.decode( bytes(remote_file.content, "utf-8"), encoding=remote_file.encoding, ).decode("utf-8") except NotFoundError as ex: raise FileNotFoundError() from ex def __get_files( self, path: str, ref: str, recursive: bool, ) -> Iterable[str]: contents: types.ContentsResponse | list[types.ContentsResponse] subdirectories = ["."] with contextlib.suppress(IndexError): while path := subdirectories.pop(): contents = self.partial_api( self.api.repo_get_contents, filepath=path, ref=ref, )() if isinstance(contents, types.ContentsResponse): # singular file, return path and skip any further processing yield contents.path continue for file in contents: if file.type == "dir": subdirectories.append(file.path) continue yield file.path def get_files( self, ref: Optional[str] = None, filter_regex: Optional[str] = None, recursive: bool = False, ) -> Iterable[str]: logger.warning( "‹ForgejoProject.get_files()› method can fail because of incorrect" " OpenAPI spec", ) ref = ref or self.default_branch paths = self.__get_files(".", ref=ref, recursive=recursive) if filter_regex: return filter_paths(paths, filter_regex) return paths def get_forks(self) -> Iterable["ForgejoProject"]: return ( ForgejoProject( namespace=fork.owner.login, repo=fork.name, service=self.service, ) for fork in paginate( self.partial_api(self.api.list_forks), ) ) def get_web_url(self) -> str: return self.forgejo_repo.html_url def get_sha_from_branch(self, branch: str) -> Optional[str]: try: branch_info = self.partial_api( self.api.repo_get_branch, branch=branch, )() return branch_info.commit.id except NotFoundError: return NoneArgs
repo- Name of the project.
service- GitService instance.
namespace-
Namespace of the project.
- GitHub: username or org name.
- GitLab: username or org name.
- Pagure: namespace (e.g.
"rpms").
In case of forks:
"fork/{username}/{namespace}".
Ancestors
Class variables
var access_dict : ClassVar[dict]var service : ForgejoService
Instance variables
prop api : pyforgejo.repository.client.RepositoryClient-
Expand source code
@property def api(self) -> RepositoryClient: """Returns a `RepositoryClient` from pyforgejo. Helper to save some typing. """ return self.service.api.repositoryReturns a
RepositoryClientfrom pyforgejo. Helper to save some typing. var forgejo_repo : pyforgejo.types.repository.Repository-
Expand source code
@cached_property def forgejo_repo(self) -> types.Repository: return self.api.repo_get( owner=self.namespace, repo=self.repo, )
Methods
def partial_api(self, method, /, *args, **kwargs)-
Expand source code
def partial_api(self, method, /, *args, **kwargs): """Returns a partial API call for `ForgejoProject`. Injects `owner` and `repo` for the calls to `/repository/` endpoints. Args: method: Specific method on the Pyforgejo API that is to be wrapped. *args: Positional arguments that get injected into every call. **kwargs: Keyword-arguments that get injected into every call. Returns: Callable with pre-injected parameters. """ return partial( method, *args, **kwargs, owner=self.namespace, repo=self.repo, )Returns a partial API call for
ForgejoProject.Injects
ownerandrepofor the calls to/repository/endpoints.Args
method- Specific method on the Pyforgejo API that is to be wrapped.
*args- Positional arguments that get injected into every call.
**kwargs- Keyword-arguments that get injected into every call.
Returns
Callable with pre-injected parameters.
Inherited members
BaseGitProject:add_groupadd_usercan_merge_prchange_tokencommit_commentcreate_issuecreate_prcreate_releasedefault_branchdeletedescriptionexistsfork_createfull_repo_nameget_branchesget_commit_commentget_commit_commentsget_commit_statusesget_commitsget_contributorsget_descriptionget_file_contentget_filesget_forkget_forksget_git_urlsget_issueget_issue_infoget_issue_listget_latest_releaseget_ownersget_prget_pr_files_diffget_pr_listget_releaseget_releasesget_sha_from_branchget_sha_from_tagget_tagsget_users_with_given_accessget_web_urlhas_issueshas_write_accessis_forkis_forkedis_privateparentremove_groupremove_userrequest_accessset_commit_statususers_with_write_accesswhich_groups_can_merge_prwho_can_close_issuewho_can_merge_pr
class ForgejoPullRequest (raw_pr: pyforgejo.types.pull_request.PullRequest,
project: forgejo.ForgejoProject)-
Expand source code
class ForgejoPullRequest(BasePullRequest): _target_project: "forgejo.ForgejoProject" = None _source_project: "forgejo.ForgejoProject" = None _labels: list[PRLabel] = None def __init__( self, raw_pr: PyforgejoPullRequest, project: "forgejo.ForgejoProject", ): super().__init__(raw_pr, project) self.project = project def __str__(self) -> str: return "Forgejo" + super().__str__() @property def api(self): """Returns the issue API client from pyforgejo.""" return self.project.service.api.issue @property def title(self) -> str: return self._raw_pr.title @title.setter def title(self, new_title: str) -> None: self.update_info(title=new_title) @property def id(self) -> int: return self._raw_pr.number @property def status(self) -> PRStatus: return PRStatus.merged if self._raw_pr.merged else PRStatus[self._raw_pr.state] @property def url(self) -> str: return self._raw_pr.url @property def description(self) -> str: return self._raw_pr.body @description.setter def description(self, new_description: str) -> None: self.update_info(description=new_description) @property def author(self) -> str: return self._raw_pr.user.login @property def source_branch(self) -> str: return self._raw_pr.head.ref @property def target_branch(self) -> str: return self._raw_pr.base.ref @property def created(self) -> datetime.datetime: return self._raw_pr.created_at @property def labels(self) -> list[PRLabel]: if self._labels is None: self._labels = ( [ForgejoPRLabel(raw_label, self) for raw_label in self._raw_pr.labels] if self._raw_pr.labels else [] ) return self._labels @property def diff_url(self) -> str: return self._raw_pr.diff_url @property def patch(self) -> bytes: patch_url = self._raw_pr.patch_url response = httpx.get(patch_url) if not response.is_success: raise OgrNetworkError( f"Couldn't get patch from {patch_url}.patch because {response.reason}.", ) return response.content @property def head_commit(self) -> str: return self._raw_pr.head.sha @property def merge_commit_sha(self) -> Optional[str]: # this is None for non-merged PRs return self._raw_pr.merge_commit_sha @property def merge_commit_status(self) -> MergeCommitStatus: return ( MergeCommitStatus.can_be_merged if self._raw_pr.mergeable else MergeCommitStatus.cannot_be_merged ) @cached_property def source_project(self) -> "forgejo.ForgejoProject": pyforgejo_repo = self._raw_pr.head.repo return self._target_project.service.get_project( repo=pyforgejo_repo.name, namespace=pyforgejo_repo.owner.login, forgejo_repo=pyforgejo_repo, ) @property def commits_url(self) -> str: return f"{self.url}/commits" @property def closed_by(self) -> Optional[str]: return self._raw_pr.merged_by.login if self._raw_pr.merged_by else None @staticmethod def create( project: "forgejo.ForgejoProject", title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> "PullRequest": target_project = project if project.is_fork and fork_username is None: # handles fork -> upstream (called on fork) source_branch = f"{project.namespace}:{source_branch}" target_project = project.parent # type: ignore elif fork_username: if fork_username != project.namespace and project.parent is not None: # handles fork -> other_fork # (username of other_fork owner specified by fork_username) forks = list( filter( lambda fork: fork.namespace == fork_username, project.parent.get_forks(), ), ) if not forks: raise ForgejoAPIException("Requested fork doesn't exist") target_project = forks[0] # type: ignore source_branch = f"{project.namespace}:{source_branch}" else: # handles fork -> upstream # (username of fork owner specified by fork_username) source_branch = f"{fork_username}:{source_branch}" logger.debug(f"Creating PR {target_branch}<-{source_branch}") pr = target_project.api.repo_create_pull_request( owner=target_project.namespace, repo=target_project.repo, base=target_branch, body=body, head=source_branch, title=title, ) logger.info(f"PR {pr.id} created.") return ForgejoPullRequest(pr, target_project) @staticmethod def get(project: "forgejo.ForgejoProject", pr_id: int) -> "PullRequest": try: raw_pr = project.api.repo_get_pull_request( owner=project.namespace, repo=project.repo, index=pr_id, ) except NotFoundError as ex: raise ForgejoAPIException(f"No pull request with id {pr_id} found.") from ex return ForgejoPullRequest(raw_pr, project) @staticmethod def get_list( project: "forgejo.ForgejoProject", status: PRStatus = PRStatus.open, ) -> Iterable["PullRequest"]: prs = paginate( partial( project.api.repo_list_pull_requests, owner=project.namespace, repo=project.repo, # Forgejo has just open/closed/all state=status.name if status != PRStatus.merged else "closed", ), ) return (ForgejoPullRequest(pr, project) for pr in prs) def update_info( self, title: Optional[str] = None, description: Optional[str] = None, ) -> "PullRequest": try: data = {"title": title if title else self.title} if description is not None: data["body"] = description updated_pr = self._target_project.api.repo_edit_pull_request( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, **data, ) self._raw_pr = updated_pr return self except Exception as ex: raise ForgejoAPIException( f"There was an error while updating Forgejo PR: {ex}", ) from ex def close(self) -> "PullRequest": self._raw_pr = self._target_project.api.repo_edit_pull_request( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, state="closed", ) return self def merge(self) -> "PullRequest": self._target_project.api.repo_merge_pull_request( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, # options: merge, rebase, rebase-merge, squash, fast-forward-only, manually-merged do="merge", ) return self.get(self._target_project, self.id) def add_label(self, *labels: str) -> None: issue_client = self._target_project.service.api.issue new_labels = issue_client.add_label( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, labels=list(labels), ) self._labels = [ForgejoPRLabel(raw_label, self) for raw_label in new_labels] def get_all_commits(self) -> Iterable[str]: return ( commit.sha for commit in paginate( partial( self._target_project.api.repo_get_pull_request_commits, owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, ), ) ) def _get_all_comments(self, reverse: bool = False) -> Iterable[PRComment]: try: comments = self.api.get_comments( owner=self.project.namespace, repo=self.project.repo, index=self.id, ) except NotFoundError as ex: raise ForgejoAPIException( "There was an error when retrieving PR comments.", ) from ex if reverse: comments = list(reversed(comments)) return ( ForgejoPRComment(raw_comment=comment, parent=self) for comment in comments ) def comment( self, body: str, commit: Optional[str] = None, filename: Optional[str] = None, row: Optional[int] = None, ) -> PRComment: """ Add new comment to the pull request. Args: body: Body of the comment. commit: Commit hash to which comment is related. Defaults to generic comment. filename: Path to the file to which comment is related. Defaults to no relation to the file. row: Line number to which the comment is related. Defaults to no relation to the line. Returns: Newly created comment. """ if commit or filename or row: raise NotImplementedError comment = self.api.create_comment( owner=self.project.namespace, repo=self.project.repo, index=self.id, body=body, ) return ForgejoPRComment(raw_comment=comment, parent=self) def get_comment(self, comment_id: int) -> PRComment: """ Returns a PR comment. Args: comment_id: id of comment Returns: Object representing a PR comment. """ comment = self.project.service.api.issue.get_comment( owner=self.project.namespace, repo=self.project.repo, id=comment_id, ) return ForgejoPRComment(parent=self, raw_comment=comment) def get_statuses(self) -> Union[list[CommitFlag], Iterable[CommitFlag]]: """ Returns statuses for latest commit on pull request. Returns: List of commit statuses of the latest commit. """ raise NotImplementedError()Attributes
project:GitProject- Project of the pull request.
Ancestors
Instance variables
prop api-
Expand source code
@property def api(self): """Returns the issue API client from pyforgejo.""" return self.project.service.api.issueReturns the issue API client from pyforgejo.
Inherited members
class ForgejoRelease (raw_release: Any, project: GitProject)-
Expand source code
class ForgejoRelease(Release): _raw_release: PyforgejoRelease project: "ogr_forgejo.ForgejoProject" @property def title(self) -> str: return self._raw_release.name @property def body(self) -> str: return self._raw_release.body @cached_property def git_tag(self) -> GitTag: tag = self.project.api.repo_get_tag( owner=self.project.namespace, repo=self.project.repo, tag=self.tag_name, ) return GitTag(name=tag.name, commit_sha=tag.commit.sha) @property def tag_name(self) -> str: return self._raw_release.tag_name @property def url(self) -> Optional[str]: return self._raw_release.url @property def created_at(self) -> datetime.datetime: return self._raw_release.created_at @property def tarball_url(self) -> str: return self._raw_release.tarball_url @staticmethod def _release_id_from_name( project: "ogr_forgejo.ForgejoProject", name: str, ) -> Optional[int]: for release in paginate( partial( project.api.repo_list_releases, owner=project.namespace, repo=project.repo, ), ): if release.name == name: return release.id return None @staticmethod def get( project: "ogr_forgejo.ForgejoProject", identifier: Optional[int] = None, name: Optional[str] = None, tag_name: Optional[str] = None, ) -> "Release": if tag_name: release = project.api.repo_get_release_by_tag( owner=project.namespace, repo=project.repo, tag=tag_name, ) return ForgejoRelease(release, project) if name: identifier = ForgejoRelease._release_id_from_name(project, name) if identifier is None: raise ForgejoAPIException("Release was not found.") release = project.api.repo_get_release( owner=project.namespace, repo=project.repo, id=identifier, ) return ForgejoRelease(release, project) @staticmethod def get_latest(project: "ogr_forgejo.ForgejoProject") -> Optional["Release"]: releases = project.api.repo_list_releases( owner=project.namespace, repo=project.repo, page=1, limit=1, ) return ForgejoRelease(releases[0], project) if releases else None @staticmethod def get_list(project: "ogr_forgejo.ForgejoProject") -> list["Release"]: releases = paginate( partial( project.api.repo_list_releases, owner=project.namespace, repo=project.repo, ), ) return [ForgejoRelease(release, project) for release in releases] @staticmethod def create( project: "ogr_forgejo.ForgejoProject", tag: str, name: str, message: str, ref: Optional[str] = None, ) -> "Release": release = project.api.repo_create_release( owner=project.namespace, repo=project.repo, tag_name=tag, body=message, name=name, target_commitish=ref, ) return ForgejoRelease(release, project) def edit_release(self, name: str, message: str) -> None: try: data = {} if name is not None: data["name"] = name if message is not None: data["body"] = message updated_release = self.project.api.repo_edit_release( owner=self.project.namespace, repo=self.project.repo, id=self._raw_release.id, **data, ) self._raw_release = updated_release except Exception as ex: raise ForgejoAPIException( f"There was an error while updating Forgejo release: {ex}", ) from exObject that represents release.
Attributes
project:GitProject- Project on which the release is created.
Ancestors
Class variables
var project : ForgejoProject
Inherited members
class ForgejoService (instance_url: str = 'https://codeberg.org', token: str | None = None, **kwargs)-
Expand source code
@use_for_service("forgejo") @use_for_service("codeberg.org") class ForgejoService(BaseGitService): version = "/api/v1" def __init__( self, instance_url: str = "https://codeberg.org", token: Optional[str] = None, **kwargs, ): super().__init__() self.instance_url = instance_url + self.version self._token = f"token {token}" self._api = None @cached_property def api(self) -> PyforgejoApi: return PyforgejoApi(base_url=self.instance_url, api_key=self._token) def get_project( # type: ignore[override] self, repo: str, namespace: str, **kwargs, ) -> "ForgejoProject": return ForgejoProject( repo=repo, namespace=namespace, service=self, **kwargs, ) @property def user(self) -> GitUser: return ForgejoUser(self) def project_create( self, repo: str, namespace: Optional[str] = None, description: Optional[str] = None, ) -> "ForgejoProject": if namespace: new_repo = self.api.organization.create_org_repo( org=namespace, name=repo, description=description, ) else: new_repo = self.api.repository.create_current_user_repo( name=repo, description=description, ) return ForgejoProject( repo=repo, namespace=namespace or self.user.get_username(), service=self, github_repo=new_repo, ) def get_project_from_url(self, url: str) -> "ForgejoProject": parsed_url = urlparse(url) path_parts = parsed_url.path.strip("/").split("/") if len(path_parts) < 2: raise OgrException(f"Invalid Forgejo URL: {url}") namespace = path_parts[0] repo = path_parts[1] return self.get_project(repo=repo, namespace=namespace) def get_rate_limit_remaining( self, namespace: Optional[str] = None, repo: Optional[str] = None, ) -> Optional[int]: """ There is no way to check rate limit status from Forgejo API. """ return NoneAttributes
instance_url:str- URL of the git forge instance.
Ancestors
Class variables
var version
Instance variables
var api : pyforgejo.client.PyforgejoApi-
Expand source code
@cached_property def api(self) -> PyforgejoApi: return PyforgejoApi(base_url=self.instance_url, api_key=self._token)
Methods
def get_rate_limit_remaining(self, namespace: str | None = None, repo: str | None = None) ‑> int | None-
Expand source code
def get_rate_limit_remaining( self, namespace: Optional[str] = None, repo: Optional[str] = None, ) -> Optional[int]: """ There is no way to check rate limit status from Forgejo API. """ return NoneThere is no way to check rate limit status from Forgejo API.
Inherited members
class ForgejoUser (service: forgejo.ForgejoService)-
Expand source code
class ForgejoUser(BaseGitUser): service: "forgejo.ForgejoService" def __init__(self, service: "forgejo.ForgejoService") -> None: super().__init__(service=service) self._forgejo_user = None def __str__(self) -> str: return f'ForgejoUser(username="{self.get_username()}")' @cached_property def forgejo_user(self): return self.service.api.user.get_current() def get_username(self) -> str: return self.forgejo_user.login def get_email(self) -> str: return self.forgejo_user.email def get_projects(self) -> list["ForgejoProject"]: repos = self.service.api.user.current_list_repos() return [ ForgejoProject( repo=repo.name, namespace=repo.owner.login, service=self.service, forgejo_repo=repo, ) for repo in repos ] def get_forks(self) -> list["ForgejoProject"]: return [project for project in self.get_projects() if project.forgejo_repo.fork]Represents currently authenticated user through service.
Ancestors
Class variables
var service : ForgejoService
Instance variables
var forgejo_user-
Expand source code
@cached_property def forgejo_user(self): return self.service.api.user.get_current()
Inherited members