Module ogr.services.forgejo

Sub-modules

ogr.services.forgejo.comments
ogr.services.forgejo.flag
ogr.services.forgejo.issue
ogr.services.forgejo.label
ogr.services.forgejo.project
ogr.services.forgejo.pull_request
ogr.services.forgejo.release
ogr.services.forgejo.service
ogr.services.forgejo.user
ogr.services.forgejo.utils

Classes

class ForgejoComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)
Expand source code
class ForgejoComment(Comment):
    def _from_raw_comment(self, raw_comment: _ForgejoComment) -> None:
        self._raw_comment = raw_comment
        self._id = raw_comment.id
        self._author = raw_comment.user.login
        self._created = raw_comment.created_at
        self._edited = raw_comment.updated_at

    @property
    def body(self) -> str:
        return self._raw_comment.body

    @body.setter
    def body(self, new_body: str) -> None:
        self._raw_comment = self._client.issue.edit_comment(
            owner=self._parent.project.namespace,
            repo=self._parent.project.repo,
            id=self._id,
            body=new_body,
        )

    @property
    def edited(self) -> datetime.datetime:
        return self._edited

    @property
    def _client(self):
        return self._parent.project.service.api

    def get_reactions(self) -> list[Reaction]:
        try:
            reactions = self._client.issue.get_comment_reactions(
                owner=self._parent.project.namespace,
                repo=self._parent.project.repo,
                id=self._id,
            )

        # in case no reactions exist:
        # re-recording causes pyforgejo to raise pydantic's ValidationError
        # re-running tests causes pyforgejo's ApiError to be raised instead
        # both need to be caught
        except (ValidationError, ApiError):
            return []

        return [
            ForgejoReaction(raw_reaction=reaction, parent=self)
            for reaction in reactions
        ]

    def add_reaction(self, reaction: str) -> Reaction:
        raw_reaction = self._client.issue.post_comment_reaction(
            owner=self._parent.project.namespace,
            repo=self._parent.project.repo,
            id=self._id,
            content=reaction,
        )
        return ForgejoReaction(raw_reaction=raw_reaction, parent=self)

Ancestors

Subclasses

Inherited members

class ForgejoIssue (raw_issue: pyforgejo.types.issue.Issue, project: forgejo.ForgejoProject)
Expand source code
class ForgejoIssue(BaseIssue):
    project: "forgejo.ForgejoProject"

    def __init__(self, raw_issue: _issue, project: "forgejo.ForgejoProject"):
        if raw_issue.pull_request:
            raise ForgejoAPIException(
                f"Requested issue #{raw_issue.number} is a pull request",
            )

        super().__init__(raw_issue, project)

    @property
    def api(self):
        """Returns the issue API client from pyforgejo."""
        return self.project.service.api.issue

    def partial_api(self, method, /, *args, **kwargs):
        """Returns a partial API call for ForgejoIssue.


        Injects owner and repo parameters for the calls to issue API endpoints.

        Args:
            method: Specific method on the Pyforgejo API that is to be wrapped.
            *args: Positional arguments that get injected into every call.
            **kwargs: Keyword-arguments that get injected into every call.

        Returns:
            Callable with pre-injected parameters.
        """
        params = {"owner": self.project.namespace, "repo": self.project.repo}
        return partial(method, *args, **kwargs, **params)

    def __update_info(self) -> None:
        """Refresh the local issue object with the latest data from the server."""
        self._raw_issue = self.partial_api(self.api.get_issue)(index=self.id)

    @property
    def title(self) -> str:
        return self._raw_issue.title

    @title.setter
    def title(self, new_title: str) -> None:
        self._raw_issue = self.partial_api(self.api.edit_issue)(
            title=new_title,
            index=self.id,
        )

    @property
    def id(self) -> int:
        return self._raw_issue.number

    @property
    def url(self) -> str:
        return self._raw_issue.url

    @property
    def description(self) -> str:
        return self._raw_issue.body

    @description.setter
    def description(self, new_description: str):
        self._raw_issue = self.partial_api(self.api.edit_issue)(
            body=new_description,
            index=self.id,
        )

    @property
    def author(self) -> str:
        return self._raw_issue.user.login

    @property
    def created(self) -> datetime:
        return self._raw_issue.created_at

    @property
    def status(self) -> IssueStatus:
        return IssueStatus[self._raw_issue.state]

    @property
    def assignees(self) -> list[PyforgejoUser]:
        return self._raw_issue.assignees or []

    @property
    def labels(self) -> list[IssueLabel]:
        return [
            ForgejoIssueLabel(raw_label, self) for raw_label in self._raw_issue.labels
        ]

    def __str__(self) -> str:
        return "Forgejo" + super().__str__()

    @staticmethod
    def create(
        project: "forgejo.ForgejoProject",
        title: str,
        body: str,
        private: Optional[bool] = None,
        labels: Optional[list[str]] = None,
        assignees: Optional[list[str]] = None,
    ) -> "Issue":
        if private:
            raise OperationNotSupported("Private issues are not supported by Forgejo")
        if not project.has_issues:
            raise IssueTrackerDisabled()

        # The API requires ids of labels in the create_issue method
        # which would lead to having to retrieve existing labels and
        # needing to find the ids of those we need to add to the issue;
        # A separate API call would also need to be made to create each
        # label that does not yet exist, potentially leading to many
        # API calls and unclear code, so labels are instead added seprately
        # below after creating a new issue without labels
        issue = project.service.api.issue.create_issue(
            owner=project.namespace,
            repo=project.repo,
            title=title,
            body=body,
            labels=[],
            assignees=assignees,
        )

        forgejo_issue = ForgejoIssue(issue, project)

        if labels:
            forgejo_issue.add_label(*labels)

        return forgejo_issue

    @staticmethod
    def get(project: "forgejo.ForgejoProject", issue_id: int) -> "Issue":
        if not project.has_issues:
            raise IssueTrackerDisabled()

        try:
            issue = project.service.api.issue.get_issue(
                owner=project.namespace,
                repo=project.repo,
                index=issue_id,
            )

        except NotFoundError as ex:
            raise ForgejoAPIException(f"Issue {issue_id} not found") from ex
        return ForgejoIssue(issue, project)

    @staticmethod
    def get_list(
        project: "forgejo.ForgejoProject",
        status: IssueStatus = IssueStatus.open,
        author: Optional[str] = None,
        assignee: Optional[str] = None,
        labels: Optional[list[str]] = None,
    ) -> list["Issue"]:
        if not project.has_issues:
            raise IssueTrackerDisabled()

        parameters: dict[str, Union[str, list[str], bool]] = {
            "state": status.name,
            "type": "issues",
        }
        if author:
            parameters["created_by"] = author
        if assignee:
            parameters["assigned_by"] = assignee
        if labels:
            parameters["labels"] = labels

        try:
            return [
                ForgejoIssue(issue, project)
                for issue in paginate(
                    project.service.api.issue.list_issues,
                    owner=project.namespace,
                    repo=project.repo,
                    **parameters,
                )
            ]
        except NotFoundError as ex:
            raise ForgejoAPIException("Failed to list issues") from ex

    def comment(self, body: str) -> IssueComment:
        comment = self.partial_api(self.api.create_comment)(
            body=body,
            index=self.id,
        )
        return ForgejoIssueComment(parent=self, raw_comment=comment)

    def close(self) -> "Issue":
        self._raw_issue = self.partial_api(self.api.edit_issue)(
            state="closed",
            index=self.id,
        )

        return self

    def _get_all_comments(self, reverse: bool = False) -> Iterable[IssueComment]:
        comments = self.partial_api(self.api.get_comments)(
            index=self.id,
        )

        if reverse:
            comments = list(reversed(comments))

        return (
            ForgejoIssueComment(parent=self, raw_comment=raw_comment)
            for raw_comment in comments
        )

    def get_comment(self, comment_id: int) -> ForgejoIssueComment:
        return ForgejoIssueComment(
            self.partial_api(self.api.get_comment)(id=comment_id),
            parent=self,
        )

    def add_assignee(self, *assignees: str) -> None:
        current_assignees = [assignee.login for assignee in self.assignees]
        updated_assignees = set(itertools.chain(current_assignees, assignees))

        try:
            self._raw_issue = self.partial_api(self.api.edit_issue)(
                assignees=updated_assignees,
                index=self.id,
            )
        except ApiError as ex:
            raise ForgejoAPIException(
                "Failed to assign issue, unknown user",
            ) from ex

    def add_label(self, *labels: str) -> None:
        self.partial_api(self.api.add_label)(labels=labels, index=self.id)
        self.__update_info()

Attributes

project : GitProject
Project of the issue.

Ancestors

Class variables

var projectForgejoProject

Instance variables

prop api
Expand source code
@property
def api(self):
    """Returns the issue API client from pyforgejo."""
    return self.project.service.api.issue

Returns the issue API client from pyforgejo.

prop assignees : list[pyforgejo.types.user.User]
Expand source code
@property
def assignees(self) -> list[PyforgejoUser]:
    return self._raw_issue.assignees or []

Methods

def partial_api(self, method, /, *args, **kwargs)
Expand source code
def partial_api(self, method, /, *args, **kwargs):
    """Returns a partial API call for ForgejoIssue.


    Injects owner and repo parameters for the calls to issue API endpoints.

    Args:
        method: Specific method on the Pyforgejo API that is to be wrapped.
        *args: Positional arguments that get injected into every call.
        **kwargs: Keyword-arguments that get injected into every call.

    Returns:
        Callable with pre-injected parameters.
    """
    params = {"owner": self.project.namespace, "repo": self.project.repo}
    return partial(method, *args, **kwargs, **params)

Returns a partial API call for ForgejoIssue.

Injects owner and repo parameters for the calls to issue API endpoints.

Args

method
Specific method on the Pyforgejo API that is to be wrapped.
*args
Positional arguments that get injected into every call.
**kwargs
Keyword-arguments that get injected into every call.

Returns

Callable with pre-injected parameters.

Inherited members

class ForgejoIssueComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)
Expand source code
class ForgejoIssueComment(ForgejoComment, IssueComment):
    def __str__(self) -> str:
        return "Forgejo" + super().__str__()

Ancestors

Inherited members

class ForgejoPRComment (raw_comment: Any | None = None,
parent: Any | None = None,
body: str | None = None,
id_: int | None = None,
author: str | None = None,
created: datetime.datetime | None = None,
edited: datetime.datetime | None = None)
Expand source code
class ForgejoPRComment(ForgejoComment, PRComment):
    def __str__(self) -> str:
        return "Forgejo" + super().__str__()

Ancestors

Inherited members

class ForgejoProject (repo: str,
service: forgejo.ForgejoService,
namespace: str,
forgejo_repo: pyforgejo.types.repository.Repository | None = None,
**kwargs)
Expand source code
class ForgejoProject(BaseGitProject):
    service: "forgejo.ForgejoService"
    access_dict: ClassVar[dict] = {
        AccessLevel.pull: "read",
        AccessLevel.triage: "read",
        AccessLevel.push: "write",
        AccessLevel.admin: "admin",
        AccessLevel.maintain: "owner",
        None: "",
    }

    def __init__(
        self,
        repo: str,
        service: "forgejo.ForgejoService",
        namespace: str,
        forgejo_repo: Optional[Repository] = None,
        **kwargs,
    ):
        super().__init__(repo, service, namespace)
        self._forgejo_repo = forgejo_repo

    @property
    def api(self) -> RepositoryClient:
        """Returns a `RepositoryClient` from pyforgejo. Helper to save some
        typing.
        """
        return self.service.api.repository

    def partial_api(self, method, /, *args, **kwargs):
        """Returns a partial API call for `ForgejoProject`.

        Injects `owner` and `repo` for the calls to `/repository/` endpoints.

        Args:
            method: Specific method on the Pyforgejo API that is to be wrapped.
            *args: Positional arguments that get injected into every call.
            **kwargs: Keyword-arguments that get injected into every call.

        Returns:
            Callable with pre-injected parameters.

        """
        return partial(
            method,
            *args,
            **kwargs,
            owner=self.namespace,
            repo=self.repo,
        )

    @cached_property
    def forgejo_repo(self) -> types.Repository:
        return self.api.repo_get(
            owner=self.namespace,
            repo=self.repo,
        )

    def __str__(self) -> str:
        return (
            f'ForgejoProject(namespace="{self.namespace}", repo="{self.repo}", '
            f"service={self.service})"
        )

    def __eq__(self, o: object) -> bool:
        return (
            isinstance(o, ForgejoProject)
            and self.repo == o.repo
            and self.namespace == o.namespace
            and self.service == o.service
        )

    @property
    def description(self) -> str:
        return self.forgejo_repo.description or ""

    @description.setter
    def description(self, new_description: str) -> None:
        self.partial_api(self.api.repo_edit)(description=new_description)

    def delete(self) -> None:
        self.partial_api(self.api.repo_delete)()

    def exists(self) -> bool:
        try:
            _ = self.forgejo_repo
            return True
        except NotFoundError:
            return False

    def is_private(self) -> bool:
        return self.forgejo_repo.private

    def is_forked(self) -> bool:
        return (
            self.forgejo_repo.fork
            and self.forgejo_repo.owner.login == self.service.user.get_username()
        )

    @property
    def is_fork(self) -> bool:
        return self.forgejo_repo.fork

    @property
    def full_repo_name(self) -> str:
        return self.forgejo_repo.full_name

    @property
    def parent(self) -> Optional["GitProject"]:
        if not self.forgejo_repo.parent:
            return None

        return ForgejoProject(
            service=self.service,
            repo=self.forgejo_repo.parent.name,
            namespace=self.forgejo_repo.parent.owner.username,
        )

    @property
    def has_issues(self) -> bool:
        return self.forgejo_repo.has_issues

    def get_branches(self) -> Iterable[str]:
        return (
            branch.name
            for branch in paginate(
                self.partial_api(self.api.repo_list_branches),
            )
        )

    @property
    def default_branch(self) -> str:
        return self.forgejo_repo.default_branch

    def get_commits(self, ref: Optional[str] = None) -> Iterable[str]:
        return (
            commit.sha
            for commit in paginate(
                self.partial_api(
                    self.api.repo_get_all_commits,
                    sha=ref,
                ),
            )
        )

    def get_description(self) -> str:
        return self.description

    def _construct_fork_project(self) -> Optional["ForgejoProject"]:
        login = self.service.user.get_username()
        try:
            project = ForgejoProject(
                repo=self.repo,
                service=self.service,
                namespace=login,
            )
            _ = project.forgejo_repo
            return project
        except NotFoundError:
            return None

    def get_fork(self, create: bool = True) -> Optional["GitProject"]:
        # The cheapest check that assumes fork has the same repository name as
        # the upstream
        if fork := self._construct_fork_project():
            return fork

        # If not successful, the fork could still exist, but has a custom name
        username = self.service.user.get_username()
        for fork in self.get_forks():
            if fork.forgejo_repo.owner.login == username:
                return fork

        # We have not found any fork owned by the auth'd user
        if create:
            return self.fork_create()

        logger.info(
            f"Fork of {self.forgejo_repo.full_name}"
            " does not exist and we were asked not to create it.",
        )
        return None

    def get_owners(self) -> list[str]:
        return [self.forgejo_repo.owner.username]

    def _get_owner_or_org_collaborators(self) -> set[str]:
        namespace = self.get_owners()[0]
        try:
            teams = self.api.repo_list_teams(
                owner=self.namespace,
                repo=self.repo,
            )
        except Exception as ex:
            # no teams, repo owned by regular user
            if "not owned by an organization" in str(ex):
                return {namespace}
            raise

        # repo owned by org, each org can have multiple teams with
        # different levels of access
        collaborators: set[str] = set()
        for team in teams:
            members = self.service.api.organization.org_list_team_members(team.id)
            collaborators.update(user.username for user in members)

        return collaborators

    def _get_collaborators(self) -> list[str]:
        return [
            c.username
            for c in self.api.repo_list_collaborators(
                owner=self.namespace,
                repo=self.repo,
            )
        ] + list(self._get_owner_or_org_collaborators())

    def _get_collaborators_with_access(self) -> dict[str, str]:
        return {
            c: self.api.repo_get_repo_permissions(
                owner=self.namespace,
                repo=self.repo,
                collaborator=c,
            ).permission
            for c in self._get_collaborators()
        }

    def get_contributors(self) -> set[str]:
        return set(self._get_collaborators())

    def users_with_write_access(self) -> set[str]:
        return {
            collaborator
            for collaborator, access in self._get_collaborators_with_access().items()
            if access in ("owner", "admin", "write")
        }

    def who_can_close_issue(self) -> set[str]:
        return self.users_with_write_access()

    def who_can_merge_pr(self) -> set[str]:
        return self.users_with_write_access()

    def can_merge_pr(self, username: str) -> bool:
        return self.api.repo_get_repo_permissions(
            owner=self.namespace,
            repo=self.repo,
            collaborator=username,
        ).permission in ("owner", "admin", "write")

    def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]:
        access_levels_forgejo = [
            self.access_dict[access_level] for access_level in access_levels
        ]

        return {
            user
            for user, permission in self._get_collaborators_with_access().items()
            if permission in access_levels_forgejo
        }

    def add_user(self, user: str, access_level: AccessLevel) -> None:
        if access_level == AccessLevel.maintain:
            raise OperationNotSupported("Not possible to add a user as `owner`.")

        self.api.repo_add_collaborator(
            owner=self.namespace,
            repo=self.repo,
            collaborator=user,
            permission=self.access_dict[access_level],
        )

    def remove_user(self, user: str) -> None:
        self.api.repo_delete_collaborator(
            owner=self.namespace,
            repo=self.repo,
            collaborator=user,
        )

    def request_access(self) -> None:
        raise OperationNotSupported("Not possible on Forgejo")

    @indirect(ForgejoIssue.get_list)
    def get_issue_list(
        self,
        status: IssueStatus = IssueStatus.open,
        author: Optional[str] = None,
        assignee: Optional[str] = None,
        labels: Optional[list[str]] = None,
    ) -> list["Issue"]:
        pass

    @indirect(ForgejoIssue.get)
    def get_issue(self, issue_id: int) -> "Issue":
        pass

    @indirect(ForgejoIssue.create)
    def create_issue(
        self,
        title: str,
        body: str,
        private: Optional[bool] = None,
        labels: Optional[list[str]] = None,
        assignees: Optional[list[str]] = None,
    ) -> Issue:
        pass

    @indirect(ForgejoPullRequest.get_list)
    def get_pr_list(self, status: PRStatus = PRStatus.open) -> Iterable["PullRequest"]:
        pass

    @indirect(ForgejoPullRequest.get)
    def get_pr(self, pr_id: int) -> "PullRequest":
        pass

    def get_pr_files_diff(
        self,
        pr_id: int,
        retries: int = 0,
        wait_seconds: int = 3,
    ) -> dict:
        """
        Get files diff of a pull request.

        Args:
            pr_id: ID of the pull request.

        Returns:
            Dictionary representing files diff.
        """
        # [NOTE] Implemented only for Pagure, for details see
        # https://github.com/packit/ogr/issues/895
        raise NotImplementedError()

    def get_tags(self) -> Iterable["GitTag"]:
        return (
            GitTag(
                name=tag.name,
                commit_sha=tag.commit.sha,
            )
            for tag in paginate(self.partial_api(self.api.repo_list_tags))
        )

    def get_sha_from_tag(self, tag_name: str) -> str:
        return self.partial_api(
            self.api.repo_get_tag,
            tag=tag_name,
        )().commit.sha

    @indirect(ForgejoRelease.get)
    def get_release(
        self,
        identifier: Optional[int] = None,
        name: Optional[str] = None,
        tag_name: Optional[str] = None,
    ) -> Release:
        pass

    @indirect(ForgejoRelease.get_latest)
    def get_latest_release(self) -> Optional[Release]:
        pass

    @indirect(ForgejoRelease.get_list)
    def get_releases(self) -> list[Release]:
        pass

    @indirect(ForgejoRelease.create)
    def create_release(
        self,
        tag: str,
        name: str,
        message: str,
        ref: Optional[str] = None,
    ) -> Release:
        pass

    @indirect(ForgejoPullRequest.create)
    def create_pr(
        self,
        title: str,
        body: str,
        target_branch: str,
        source_branch: str,
        fork_username: Optional[str] = None,
    ) -> "PullRequest":
        pass

    def commit_comment(
        self,
        commit: str,
        body: str,
        filename: Optional[str] = None,
        row: Optional[int] = None,
    ) -> "CommitComment":
        raise OperationNotSupported("Forgejo doesn't support commit comments")

    def get_commit_comments(self, commit: str) -> list[CommitComment]:
        raise OperationNotSupported("Forgejo doesn't support commit comments")

    def get_commit_comment(self, commit_sha: str, comment_id: int) -> CommitComment:
        raise OperationNotSupported("Forgejo doesn't support commit comments")

    @indirect(ForgejoCommitFlag.set)
    def set_commit_status(
        self,
        commit: str,
        state: Union[CommitStatus, str],
        target_url: str,
        description: str,
        context: str,
        trim: bool = False,
    ) -> "CommitFlag":
        pass

    @indirect(ForgejoCommitFlag.get)
    def get_commit_statuses(self, commit: str) -> Iterable["CommitFlag"]:
        pass

    def get_git_urls(self) -> dict[str, str]:
        return {
            "git": self.forgejo_repo.clone_url,
            "ssh": self.forgejo_repo.ssh_url,
        }

    def fork_create(self, namespace: Optional[str] = None) -> "GitProject":
        if namespace:
            self.api.create_fork(
                owner=self.namespace,
                repo=self.repo,
                organization=namespace,
            )
            return ForgejoProject(
                repo=self.repo,
                service=self.service,
                namespace=namespace,
            )

        self.api.create_fork(
            owner=self.namespace,
            repo=self.repo,
        )
        return ForgejoProject(
            repo=self.repo,
            service=self.service,
            namespace=self.service.user.get_username(),
        )

    def change_token(self, new_token: str) -> None:
        # [NOTE] API doesn't provide any method to change the token, and it's
        # embedded in the httpx client that's wrapped by pyforgejo wrapper to
        # avoid duplication between sync and async calls…
        raise NotImplementedError(
            "Not possible; requires recreation of the httpx client",
        )

    def get_file_content(
        self,
        path: str,
        ref: Optional[str] = None,
        headers: Optional[dict[str, str]] = None,
    ) -> str:
        try:
            remote_file: types.ContentsResponse = self.partial_api(
                self.api.repo_get_contents,
                filepath=path,
                ref=ref,
            )()

            # [NOTE] If you touch this, good luck, have fun…
            # tl;dr ‹ContentsResponse› from the Pyforgejo contains the content
            # of the file that's (I hope always) base64-encoded, but it's stored
            # as a string, so here it's needed to convert the UTF-8 encoded
            # string back to bytes (duh, cause base64 is used for encoding raw
            # data), then decode the base64 bytes to just bytes and then decode
            # those to a UTF-8 string… EWWW…
            return codecs.decode(
                bytes(remote_file.content, "utf-8"),
                encoding=remote_file.encoding,
            ).decode("utf-8")

        except NotFoundError as ex:
            raise FileNotFoundError() from ex

    def __get_files(
        self,
        path: str,
        ref: str,
        recursive: bool,
    ) -> Iterable[str]:
        contents: types.ContentsResponse | list[types.ContentsResponse]

        subdirectories = ["."]

        with contextlib.suppress(IndexError):
            while path := subdirectories.pop():
                contents = self.partial_api(
                    self.api.repo_get_contents,
                    filepath=path,
                    ref=ref,
                )()

                if isinstance(contents, types.ContentsResponse):
                    # singular file, return path and skip any further processing
                    yield contents.path
                    continue

                for file in contents:
                    if file.type == "dir":
                        subdirectories.append(file.path)
                        continue

                    yield file.path

    def get_files(
        self,
        ref: Optional[str] = None,
        filter_regex: Optional[str] = None,
        recursive: bool = False,
    ) -> Iterable[str]:
        logger.warning(
            "‹ForgejoProject.get_files()› method can fail because of incorrect"
            " OpenAPI spec",
        )

        ref = ref or self.default_branch
        paths = self.__get_files(".", ref=ref, recursive=recursive)

        if filter_regex:
            return filter_paths(paths, filter_regex)

        return paths

    def get_forks(self) -> Iterable["ForgejoProject"]:
        return (
            ForgejoProject(
                namespace=fork.owner.login,
                repo=fork.name,
                service=self.service,
            )
            for fork in paginate(
                self.partial_api(self.api.list_forks),
            )
        )

    def get_web_url(self) -> str:
        return self.forgejo_repo.html_url

    def get_sha_from_branch(self, branch: str) -> Optional[str]:
        try:
            branch_info = self.partial_api(
                self.api.repo_get_branch,
                branch=branch,
            )()
            return branch_info.commit.id
        except NotFoundError:
            return None

Args

repo
Name of the project.
service
GitService instance.
namespace

Namespace of the project.

  • GitHub: username or org name.
  • GitLab: username or org name.
  • Pagure: namespace (e.g. "rpms").

In case of forks: "fork/{username}/{namespace}".

Ancestors

Class variables

var access_dict : ClassVar[dict]
var serviceForgejoService

Instance variables

prop api : pyforgejo.repository.client.RepositoryClient
Expand source code
@property
def api(self) -> RepositoryClient:
    """Returns a `RepositoryClient` from pyforgejo. Helper to save some
    typing.
    """
    return self.service.api.repository

Returns a RepositoryClient from pyforgejo. Helper to save some typing.

var forgejo_repo : pyforgejo.types.repository.Repository
Expand source code
@cached_property
def forgejo_repo(self) -> types.Repository:
    return self.api.repo_get(
        owner=self.namespace,
        repo=self.repo,
    )

Methods

def partial_api(self, method, /, *args, **kwargs)
Expand source code
def partial_api(self, method, /, *args, **kwargs):
    """Returns a partial API call for `ForgejoProject`.

    Injects `owner` and `repo` for the calls to `/repository/` endpoints.

    Args:
        method: Specific method on the Pyforgejo API that is to be wrapped.
        *args: Positional arguments that get injected into every call.
        **kwargs: Keyword-arguments that get injected into every call.

    Returns:
        Callable with pre-injected parameters.

    """
    return partial(
        method,
        *args,
        **kwargs,
        owner=self.namespace,
        repo=self.repo,
    )

Returns a partial API call for ForgejoProject.

Injects owner and repo for the calls to /repository/ endpoints.

Args

method
Specific method on the Pyforgejo API that is to be wrapped.
*args
Positional arguments that get injected into every call.
**kwargs
Keyword-arguments that get injected into every call.

Returns

Callable with pre-injected parameters.

Inherited members

class ForgejoPullRequest (raw_pr: pyforgejo.types.pull_request.PullRequest,
project: forgejo.ForgejoProject)
Expand source code
class ForgejoPullRequest(BasePullRequest):
    _target_project: "forgejo.ForgejoProject" = None
    _source_project: "forgejo.ForgejoProject" = None
    _labels: list[PRLabel] = None

    def __init__(
        self,
        raw_pr: PyforgejoPullRequest,
        project: "forgejo.ForgejoProject",
    ):
        super().__init__(raw_pr, project)
        self.project = project

    def __str__(self) -> str:
        return "Forgejo" + super().__str__()

    @property
    def api(self):
        """Returns the issue API client from pyforgejo."""
        return self.project.service.api.issue

    @property
    def title(self) -> str:
        return self._raw_pr.title

    @title.setter
    def title(self, new_title: str) -> None:
        self.update_info(title=new_title)

    @property
    def id(self) -> int:
        return self._raw_pr.number

    @property
    def status(self) -> PRStatus:
        return PRStatus.merged if self._raw_pr.merged else PRStatus[self._raw_pr.state]

    @property
    def url(self) -> str:
        return self._raw_pr.url

    @property
    def description(self) -> str:
        return self._raw_pr.body

    @description.setter
    def description(self, new_description: str) -> None:
        self.update_info(description=new_description)

    @property
    def author(self) -> str:
        return self._raw_pr.user.login

    @property
    def source_branch(self) -> str:
        return self._raw_pr.head.ref

    @property
    def target_branch(self) -> str:
        return self._raw_pr.base.ref

    @property
    def created(self) -> datetime.datetime:
        return self._raw_pr.created_at

    @property
    def labels(self) -> list[PRLabel]:
        if self._labels is None:
            self._labels = (
                [ForgejoPRLabel(raw_label, self) for raw_label in self._raw_pr.labels]
                if self._raw_pr.labels
                else []
            )
        return self._labels

    @property
    def diff_url(self) -> str:
        return self._raw_pr.diff_url

    @property
    def patch(self) -> bytes:
        patch_url = self._raw_pr.patch_url
        response = httpx.get(patch_url)

        if not response.is_success:
            raise OgrNetworkError(
                f"Couldn't get patch from {patch_url}.patch because {response.reason}.",
            )

        return response.content

    @property
    def head_commit(self) -> str:
        return self._raw_pr.head.sha

    @property
    def merge_commit_sha(self) -> Optional[str]:
        # this is None for non-merged PRs
        return self._raw_pr.merge_commit_sha

    @property
    def merge_commit_status(self) -> MergeCommitStatus:
        return (
            MergeCommitStatus.can_be_merged
            if self._raw_pr.mergeable
            else MergeCommitStatus.cannot_be_merged
        )

    @cached_property
    def source_project(self) -> "forgejo.ForgejoProject":
        pyforgejo_repo = self._raw_pr.head.repo
        return self._target_project.service.get_project(
            repo=pyforgejo_repo.name,
            namespace=pyforgejo_repo.owner.login,
            forgejo_repo=pyforgejo_repo,
        )

    @property
    def commits_url(self) -> str:
        return f"{self.url}/commits"

    @property
    def closed_by(self) -> Optional[str]:
        return self._raw_pr.merged_by.login if self._raw_pr.merged_by else None

    @staticmethod
    def create(
        project: "forgejo.ForgejoProject",
        title: str,
        body: str,
        target_branch: str,
        source_branch: str,
        fork_username: Optional[str] = None,
    ) -> "PullRequest":
        target_project = project

        if project.is_fork and fork_username is None:
            # handles fork -> upstream (called on fork)
            source_branch = f"{project.namespace}:{source_branch}"
            target_project = project.parent  # type: ignore
        elif fork_username:
            if fork_username != project.namespace and project.parent is not None:
                # handles fork -> other_fork
                #   (username of other_fork owner specified by fork_username)
                forks = list(
                    filter(
                        lambda fork: fork.namespace == fork_username,
                        project.parent.get_forks(),
                    ),
                )
                if not forks:
                    raise ForgejoAPIException("Requested fork doesn't exist")
                target_project = forks[0]  # type: ignore
                source_branch = f"{project.namespace}:{source_branch}"
            else:
                # handles fork -> upstream
                #   (username of fork owner specified by fork_username)
                source_branch = f"{fork_username}:{source_branch}"

        logger.debug(f"Creating PR {target_branch}<-{source_branch}")

        pr = target_project.api.repo_create_pull_request(
            owner=target_project.namespace,
            repo=target_project.repo,
            base=target_branch,
            body=body,
            head=source_branch,
            title=title,
        )
        logger.info(f"PR {pr.id} created.")

        return ForgejoPullRequest(pr, target_project)

    @staticmethod
    def get(project: "forgejo.ForgejoProject", pr_id: int) -> "PullRequest":
        try:
            raw_pr = project.api.repo_get_pull_request(
                owner=project.namespace,
                repo=project.repo,
                index=pr_id,
            )
        except NotFoundError as ex:
            raise ForgejoAPIException(f"No pull request with id {pr_id} found.") from ex
        return ForgejoPullRequest(raw_pr, project)

    @staticmethod
    def get_list(
        project: "forgejo.ForgejoProject",
        status: PRStatus = PRStatus.open,
    ) -> Iterable["PullRequest"]:
        prs = paginate(
            partial(
                project.api.repo_list_pull_requests,
                owner=project.namespace,
                repo=project.repo,
                # Forgejo has just open/closed/all
                state=status.name if status != PRStatus.merged else "closed",
            ),
        )
        return (ForgejoPullRequest(pr, project) for pr in prs)

    def update_info(
        self,
        title: Optional[str] = None,
        description: Optional[str] = None,
    ) -> "PullRequest":
        try:
            data = {"title": title if title else self.title}

            if description is not None:
                data["body"] = description

            updated_pr = self._target_project.api.repo_edit_pull_request(
                owner=self.target_project.namespace,
                repo=self.target_project.repo,
                index=self.id,
                **data,
            )

            self._raw_pr = updated_pr
            return self
        except Exception as ex:
            raise ForgejoAPIException(
                f"There was an error while updating Forgejo PR: {ex}",
            ) from ex

    def close(self) -> "PullRequest":
        self._raw_pr = self._target_project.api.repo_edit_pull_request(
            owner=self.target_project.namespace,
            repo=self.target_project.repo,
            index=self.id,
            state="closed",
        )
        return self

    def merge(self) -> "PullRequest":
        self._target_project.api.repo_merge_pull_request(
            owner=self.target_project.namespace,
            repo=self.target_project.repo,
            index=self.id,
            # options: merge, rebase, rebase-merge, squash, fast-forward-only, manually-merged
            do="merge",
        )
        return self.get(self._target_project, self.id)

    def add_label(self, *labels: str) -> None:
        issue_client = self._target_project.service.api.issue
        new_labels = issue_client.add_label(
            owner=self.target_project.namespace,
            repo=self.target_project.repo,
            index=self.id,
            labels=list(labels),
        )
        self._labels = [ForgejoPRLabel(raw_label, self) for raw_label in new_labels]

    def get_all_commits(self) -> Iterable[str]:
        return (
            commit.sha
            for commit in paginate(
                partial(
                    self._target_project.api.repo_get_pull_request_commits,
                    owner=self.target_project.namespace,
                    repo=self.target_project.repo,
                    index=self.id,
                ),
            )
        )

    def _get_all_comments(self, reverse: bool = False) -> Iterable[PRComment]:
        try:
            comments = self.api.get_comments(
                owner=self.project.namespace,
                repo=self.project.repo,
                index=self.id,
            )

        except NotFoundError as ex:
            raise ForgejoAPIException(
                "There was an error when retrieving PR comments.",
            ) from ex

        if reverse:
            comments = list(reversed(comments))

        return (
            ForgejoPRComment(raw_comment=comment, parent=self) for comment in comments
        )

    def comment(
        self,
        body: str,
        commit: Optional[str] = None,
        filename: Optional[str] = None,
        row: Optional[int] = None,
    ) -> PRComment:
        """
        Add new comment to the pull request.

        Args:
            body: Body of the comment.
            commit: Commit hash to which comment is related.

                Defaults to generic comment.
            filename: Path to the file to which comment is related.

                Defaults to no relation to the file.
            row: Line number to which the comment is related.

                Defaults to no relation to the line.

        Returns:
            Newly created comment.
        """
        if commit or filename or row:
            raise NotImplementedError

        comment = self.api.create_comment(
            owner=self.project.namespace,
            repo=self.project.repo,
            index=self.id,
            body=body,
        )

        return ForgejoPRComment(raw_comment=comment, parent=self)

    def get_comment(self, comment_id: int) -> PRComment:
        """
        Returns a PR comment.

        Args:
            comment_id: id of comment

        Returns:
            Object representing a PR comment.
        """
        comment = self.project.service.api.issue.get_comment(
            owner=self.project.namespace,
            repo=self.project.repo,
            id=comment_id,
        )
        return ForgejoPRComment(parent=self, raw_comment=comment)

    def get_statuses(self) -> Union[list[CommitFlag], Iterable[CommitFlag]]:
        """
        Returns statuses for latest commit on pull request.

        Returns:
            List of commit statuses of the latest commit.
        """
        raise NotImplementedError()

Attributes

project : GitProject
Project of the pull request.

Ancestors

Instance variables

prop api
Expand source code
@property
def api(self):
    """Returns the issue API client from pyforgejo."""
    return self.project.service.api.issue

Returns the issue API client from pyforgejo.

Inherited members

class ForgejoRelease (raw_release: Any, project: GitProject)
Expand source code
class ForgejoRelease(Release):
    _raw_release: PyforgejoRelease
    project: "ogr_forgejo.ForgejoProject"

    @property
    def title(self) -> str:
        return self._raw_release.name

    @property
    def body(self) -> str:
        return self._raw_release.body

    @cached_property
    def git_tag(self) -> GitTag:
        tag = self.project.api.repo_get_tag(
            owner=self.project.namespace,
            repo=self.project.repo,
            tag=self.tag_name,
        )
        return GitTag(name=tag.name, commit_sha=tag.commit.sha)

    @property
    def tag_name(self) -> str:
        return self._raw_release.tag_name

    @property
    def url(self) -> Optional[str]:
        return self._raw_release.url

    @property
    def created_at(self) -> datetime.datetime:
        return self._raw_release.created_at

    @property
    def tarball_url(self) -> str:
        return self._raw_release.tarball_url

    @staticmethod
    def _release_id_from_name(
        project: "ogr_forgejo.ForgejoProject",
        name: str,
    ) -> Optional[int]:
        for release in paginate(
            partial(
                project.api.repo_list_releases,
                owner=project.namespace,
                repo=project.repo,
            ),
        ):
            if release.name == name:
                return release.id
        return None

    @staticmethod
    def get(
        project: "ogr_forgejo.ForgejoProject",
        identifier: Optional[int] = None,
        name: Optional[str] = None,
        tag_name: Optional[str] = None,
    ) -> "Release":
        if tag_name:
            release = project.api.repo_get_release_by_tag(
                owner=project.namespace,
                repo=project.repo,
                tag=tag_name,
            )
            return ForgejoRelease(release, project)

        if name:
            identifier = ForgejoRelease._release_id_from_name(project, name)

        if identifier is None:
            raise ForgejoAPIException("Release was not found.")

        release = project.api.repo_get_release(
            owner=project.namespace,
            repo=project.repo,
            id=identifier,
        )
        return ForgejoRelease(release, project)

    @staticmethod
    def get_latest(project: "ogr_forgejo.ForgejoProject") -> Optional["Release"]:
        releases = project.api.repo_list_releases(
            owner=project.namespace,
            repo=project.repo,
            page=1,
            limit=1,
        )

        return ForgejoRelease(releases[0], project) if releases else None

    @staticmethod
    def get_list(project: "ogr_forgejo.ForgejoProject") -> list["Release"]:
        releases = paginate(
            partial(
                project.api.repo_list_releases,
                owner=project.namespace,
                repo=project.repo,
            ),
        )
        return [ForgejoRelease(release, project) for release in releases]

    @staticmethod
    def create(
        project: "ogr_forgejo.ForgejoProject",
        tag: str,
        name: str,
        message: str,
        ref: Optional[str] = None,
    ) -> "Release":
        release = project.api.repo_create_release(
            owner=project.namespace,
            repo=project.repo,
            tag_name=tag,
            body=message,
            name=name,
            target_commitish=ref,
        )

        return ForgejoRelease(release, project)

    def edit_release(self, name: str, message: str) -> None:
        try:
            data = {}
            if name is not None:
                data["name"] = name

            if message is not None:
                data["body"] = message

            updated_release = self.project.api.repo_edit_release(
                owner=self.project.namespace,
                repo=self.project.repo,
                id=self._raw_release.id,
                **data,
            )

            self._raw_release = updated_release
        except Exception as ex:
            raise ForgejoAPIException(
                f"There was an error while updating Forgejo release: {ex}",
            ) from ex

Object that represents release.

Attributes

project : GitProject
Project on which the release is created.

Ancestors

Class variables

var projectForgejoProject

Inherited members

class ForgejoService (instance_url: str = 'https://codeberg.org', token: str | None = None, **kwargs)
Expand source code
@use_for_service("forgejo")
@use_for_service("codeberg.org")
class ForgejoService(BaseGitService):
    version = "/api/v1"

    def __init__(
        self,
        instance_url: str = "https://codeberg.org",
        token: Optional[str] = None,
        **kwargs,
    ):
        super().__init__()
        self.instance_url = instance_url + self.version
        self._token = f"token {token}"
        self._api = None

    @cached_property
    def api(self) -> PyforgejoApi:
        return PyforgejoApi(base_url=self.instance_url, api_key=self._token)

    def get_project(  # type: ignore[override]
        self,
        repo: str,
        namespace: str,
        **kwargs,
    ) -> "ForgejoProject":
        return ForgejoProject(
            repo=repo,
            namespace=namespace,
            service=self,
            **kwargs,
        )

    @property
    def user(self) -> GitUser:
        return ForgejoUser(self)

    def project_create(
        self,
        repo: str,
        namespace: Optional[str] = None,
        description: Optional[str] = None,
    ) -> "ForgejoProject":
        if namespace:
            new_repo = self.api.organization.create_org_repo(
                org=namespace,
                name=repo,
                description=description,
            )
        else:
            new_repo = self.api.repository.create_current_user_repo(
                name=repo,
                description=description,
            )
        return ForgejoProject(
            repo=repo,
            namespace=namespace or self.user.get_username(),
            service=self,
            github_repo=new_repo,
        )

    def get_project_from_url(self, url: str) -> "ForgejoProject":
        parsed_url = urlparse(url)
        path_parts = parsed_url.path.strip("/").split("/")

        if len(path_parts) < 2:
            raise OgrException(f"Invalid Forgejo URL: {url}")

        namespace = path_parts[0]
        repo = path_parts[1]

        return self.get_project(repo=repo, namespace=namespace)

    def get_rate_limit_remaining(
        self,
        namespace: Optional[str] = None,
        repo: Optional[str] = None,
    ) -> Optional[int]:
        """
        There is no way to check rate limit status from Forgejo API.
        """
        return None

Attributes

instance_url : str
URL of the git forge instance.

Ancestors

Class variables

var version

Instance variables

var api : pyforgejo.client.PyforgejoApi
Expand source code
@cached_property
def api(self) -> PyforgejoApi:
    return PyforgejoApi(base_url=self.instance_url, api_key=self._token)

Methods

def get_rate_limit_remaining(self, namespace: str | None = None, repo: str | None = None) ‑> int | None
Expand source code
def get_rate_limit_remaining(
    self,
    namespace: Optional[str] = None,
    repo: Optional[str] = None,
) -> Optional[int]:
    """
    There is no way to check rate limit status from Forgejo API.
    """
    return None

There is no way to check rate limit status from Forgejo API.

Inherited members

class ForgejoUser (service: forgejo.ForgejoService)
Expand source code
class ForgejoUser(BaseGitUser):
    service: "forgejo.ForgejoService"

    def __init__(self, service: "forgejo.ForgejoService") -> None:
        super().__init__(service=service)
        self._forgejo_user = None

    def __str__(self) -> str:
        return f'ForgejoUser(username="{self.get_username()}")'

    @cached_property
    def forgejo_user(self):
        return self.service.api.user.get_current()

    def get_username(self) -> str:
        return self.forgejo_user.login

    def get_email(self) -> str:
        return self.forgejo_user.email

    def get_projects(self) -> list["ForgejoProject"]:
        repos = self.service.api.user.current_list_repos()
        return [
            ForgejoProject(
                repo=repo.name,
                namespace=repo.owner.login,
                service=self.service,
                forgejo_repo=repo,
            )
            for repo in repos
        ]

    def get_forks(self) -> list["ForgejoProject"]:
        return [project for project in self.get_projects() if project.forgejo_repo.fork]

Represents currently authenticated user through service.

Ancestors

Class variables

var serviceForgejoService

Instance variables

var forgejo_user
Expand source code
@cached_property
def forgejo_user(self):
    return self.service.api.user.get_current()

Inherited members