Module ogr.services.forgejo

Sub-modules

ogr.services.forgejo.flag
ogr.services.forgejo.issue
ogr.services.forgejo.label
ogr.services.forgejo.project
ogr.services.forgejo.pull_request
ogr.services.forgejo.release
ogr.services.forgejo.service
ogr.services.forgejo.user
ogr.services.forgejo.utils

Classes

class ForgejoIssue (raw_issue: pyforgejo.types.issue.Issue, project: forgejo.ForgejoProject)
Expand source code
class ForgejoIssue(BaseIssue):
    def __init__(self, raw_issue: PyforgejoIssue, project: "forgejo.ForgejoProject"):
        super().__init__(raw_issue, project)

    @staticmethod
    def create(
        project: "forgejo.ForgejoProject",
        title: str,
        body: str,
        private: Optional[bool] = None,
        labels: Optional[list[str]] = None,
        assignees: Optional[list] = None,
    ) -> "Issue":
        raise NotImplementedError("TBD")

    @staticmethod
    def get(project: "forgejo.ForgejoProject", issue_id: int) -> "Issue":
        raise NotImplementedError("TBD")

    @staticmethod
    def get_list(
        project: "forgejo.ForgejoProject",
        status: IssueStatus = IssueStatus.open,
        author: Optional[str] = None,
        assignee: Optional[str] = None,
        labels: Optional[list[str]] = None,
    ) -> list["Issue"]:
        raise NotImplementedError("TBD")

Attributes

project : GitProject
Project of the issue.

Ancestors

Inherited members

class ForgejoProject (repo: str,
service: forgejo.ForgejoService,
namespace: str,
forgejo_repo: pyforgejo.types.repository.Repository | None = None,
**kwargs)
Expand source code
class ForgejoProject(BaseGitProject):
    service: "forgejo.ForgejoService"
    access_dict: ClassVar[dict] = {
        AccessLevel.pull: "read",
        AccessLevel.triage: "read",
        AccessLevel.push: "write",
        AccessLevel.admin: "admin",
        AccessLevel.maintain: "owner",
        None: "",
    }

    def __init__(
        self,
        repo: str,
        service: "forgejo.ForgejoService",
        namespace: str,
        forgejo_repo: Optional[Repository] = None,
        **kwargs,
    ):
        super().__init__(repo, service, namespace)
        self._forgejo_repo = forgejo_repo

    @property
    def api(self) -> RepositoryClient:
        """Returns a `RepositoryClient` from pyforgejo. Helper to save some
        typing.
        """
        return self.service.api.repository

    def partial_api(self, method, /, *args, **kwargs):
        """Returns a partial API call for `ForgejoProject`.

        Injects `owner` and `repo` for the calls to `/repository/` endpoints.

        Args:
            method: Specific method on the Pyforgejo API that is to be wrapped.
            *args: Positional arguments that get injected into every call.
            **kwargs: Keyword-arguments that get injected into every call.

        Returns:
            Callable with pre-injected parameters.

        """
        return partial(
            method,
            *args,
            **kwargs,
            owner=self.namespace,
            repo=self.repo,
        )

    @cached_property
    def forgejo_repo(self) -> types.Repository:
        return self.api.repo_get(
            owner=self.namespace,
            repo=self.repo,
        )

    def __str__(self) -> str:
        return (
            f'ForgejoProject(namespace="{self.namespace}", repo="{self.repo}", '
            f"service={self.service})"
        )

    def __eq__(self, o: object) -> bool:
        return (
            isinstance(o, ForgejoProject)
            and self.repo == o.repo
            and self.namespace == o.namespace
            and self.service == o.service
        )

    @property
    def description(self) -> str:
        return self.forgejo_repo.description or ""

    @description.setter
    def description(self, new_description: str) -> None:
        self.partial_api(self.api.repo_edit)(description=new_description)

    def delete(self) -> None:
        self.partial_api(self.api.repo_delete)()

    def exists(self) -> bool:
        try:
            _ = self.forgejo_repo
            return True
        except NotFoundError:
            return False

    def is_private(self) -> bool:
        return self.forgejo_repo.private

    def is_forked(self) -> bool:
        return (
            self.forgejo_repo.fork
            and self.forgejo_repo.owner.login == self.service.user.get_username()
        )

    @property
    def is_fork(self) -> bool:
        return self.forgejo_repo.fork

    @property
    def full_repo_name(self) -> str:
        return self.forgejo_repo.full_name

    @property
    def parent(self) -> Optional["GitProject"]:
        if not self.forgejo_repo.parent:
            return None

        return ForgejoProject(
            service=self.service,
            repo=self.forgejo_repo.parent.name,
            namespace=self.forgejo_repo.parent.owner.username,
        )

    @property
    def has_issues(self) -> bool:
        return self.forgejo_repo.has_issues

    def get_branches(self) -> Iterable[str]:
        return (
            branch.name
            for branch in paginate(
                self.partial_api(self.api.repo_list_branches),
            )
        )

    @property
    def default_branch(self) -> str:
        return self.forgejo_repo.default_branch

    def get_commits(self, ref: Optional[str] = None) -> Iterable[str]:
        return (
            commit.sha
            for commit in paginate(
                self.partial_api(
                    self.api.repo_get_all_commits,
                    sha=ref,
                ),
            )
        )

    def get_description(self) -> str:
        return self.description

    def _construct_fork_project(self) -> Optional["ForgejoProject"]:
        login = self.service.user.get_username()
        try:
            project = ForgejoProject(
                repo=self.repo,
                service=self.service,
                namespace=login,
            )
            _ = project.forgejo_repo
            return project
        except NotFoundError:
            return None

    def get_fork(self, create: bool = True) -> Optional["GitProject"]:
        # The cheapest check that assumes fork has the same repository name as
        # the upstream
        if fork := self._construct_fork_project():
            return fork

        # If not successful, the fork could still exist, but has a custom name
        username = self.service.user.get_username()
        for fork in self.get_forks():
            if fork.forgejo_repo.owner.login == username:
                return fork

        # We have not found any fork owned by the auth'd user
        if create:
            return self.fork_create()

        logger.info(
            f"Fork of {self.forgejo_repo.full_name}"
            " does not exist and we were asked not to create it.",
        )
        return None

    def get_owners(self) -> list[str]:
        return [self.forgejo_repo.owner.username]

    def _get_owner_or_org_collaborators(self) -> set[str]:
        namespace = self.get_owners()[0]
        try:
            teams = self.api.repo_list_teams(
                owner=self.namespace,
                repo=self.repo,
            )
        except Exception as ex:
            # no teams, repo owned by regular user
            if "not owned by an organization" in str(ex):
                return {namespace}
            raise

        # repo owned by org, each org can have multiple teams with
        # different levels of access
        collaborators: set[str] = set()
        for team in teams:
            members = self.service.api.organization.org_list_team_members(team.id)
            collaborators.update(user.username for user in members)

        return collaborators

    def _get_collaborators(self) -> list[str]:
        return [
            c.username
            for c in self.api.repo_list_collaborators(
                owner=self.namespace,
                repo=self.repo,
            )
        ] + list(self._get_owner_or_org_collaborators())

    def _get_collaborators_with_access(self) -> dict[str, str]:
        return {
            c: self.api.repo_get_repo_permissions(
                owner=self.namespace,
                repo=self.repo,
                collaborator=c,
            ).permission
            for c in self._get_collaborators()
        }

    def get_contributors(self) -> set[str]:
        return set(self._get_collaborators())

    def users_with_write_access(self) -> set[str]:
        return {
            collaborator
            for collaborator, access in self._get_collaborators_with_access().items()
            if access in ("owner", "admin", "write")
        }

    def who_can_close_issue(self) -> set[str]:
        return self.users_with_write_access()

    def who_can_merge_pr(self) -> set[str]:
        return self.users_with_write_access()

    def can_merge_pr(self, username: str) -> bool:
        return self.api.repo_get_repo_permissions(
            owner=self.namespace,
            repo=self.repo,
            collaborator=username,
        ).permission in ("owner", "admin", "write")

    def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]:
        access_levels_forgejo = [
            self.access_dict[access_level] for access_level in access_levels
        ]

        return {
            user
            for user, permission in self._get_collaborators_with_access().items()
            if permission in access_levels_forgejo
        }

    def add_user(self, user: str, access_level: AccessLevel) -> None:
        if access_level == AccessLevel.maintain:
            raise OperationNotSupported("Not possible to add a user as `owner`.")

        self.api.repo_add_collaborator(
            owner=self.namespace,
            repo=self.repo,
            collaborator=user,
            permission=self.access_dict[access_level],
        )

    def remove_user(self, user: str) -> None:
        self.api.repo_delete_collaborator(
            owner=self.namespace,
            repo=self.repo,
            collaborator=user,
        )

    def request_access(self) -> None:
        raise OperationNotSupported("Not possible on Forgejo")

    @indirect(ForgejoIssue.get_list)
    def get_issue_list(
        self,
        status: IssueStatus = IssueStatus.open,
        author: Optional[str] = None,
        assignee: Optional[str] = None,
        labels: Optional[list[str]] = None,
    ) -> list["Issue"]:
        pass

    @indirect(ForgejoIssue.get)
    def get_issue(self, issue_id: int) -> "Issue":
        pass

    @indirect(ForgejoIssue.create)
    def create_issue(
        self,
        title: str,
        body: str,
        private: Optional[bool] = None,
        labels: Optional[list[str]] = None,
        assignees: Optional[list[str]] = None,
    ) -> Issue:
        pass

    @indirect(ForgejoPullRequest.get_list)
    def get_pr_list(self, status: PRStatus = PRStatus.open) -> Iterable["PullRequest"]:
        pass

    @indirect(ForgejoPullRequest.get)
    def get_pr(self, pr_id: int) -> "PullRequest":
        pass

    def get_pr_files_diff(
        self,
        pr_id: int,
        retries: int = 0,
        wait_seconds: int = 3,
    ) -> dict:
        """
        Get files diff of a pull request.

        Args:
            pr_id: ID of the pull request.

        Returns:
            Dictionary representing files diff.
        """
        # [NOTE] Implemented only for Pagure, for details see
        # https://github.com/packit/ogr/issues/895
        raise NotImplementedError()

    def get_tags(self) -> Iterable["GitTag"]:
        return (
            GitTag(
                name=tag.name,
                commit_sha=tag.commit.sha,
            )
            for tag in paginate(self.partial_api(self.api.repo_list_tags))
        )

    def get_sha_from_tag(self, tag_name: str) -> str:
        return self.partial_api(
            self.api.repo_get_tag,
            tag=tag_name,
        )().commit.sha

    @indirect(ForgejoRelease.get)
    def get_release(
        self,
        identifier: Optional[int] = None,
        name: Optional[str] = None,
        tag_name: Optional[str] = None,
    ) -> Release:
        pass

    @indirect(ForgejoRelease.get_latest)
    def get_latest_release(self) -> Optional[Release]:
        pass

    @indirect(ForgejoRelease.get_list)
    def get_releases(self) -> list[Release]:
        pass

    @indirect(ForgejoRelease.create)
    def create_release(
        self,
        tag: str,
        name: str,
        message: str,
        ref: Optional[str] = None,
    ) -> Release:
        pass

    @indirect(ForgejoPullRequest.create)
    def create_pr(
        self,
        title: str,
        body: str,
        target_branch: str,
        source_branch: str,
        fork_username: Optional[str] = None,
    ) -> "PullRequest":
        pass

    def commit_comment(
        self,
        commit: str,
        body: str,
        filename: Optional[str] = None,
        row: Optional[int] = None,
    ) -> "CommitComment":
        raise OperationNotSupported("Forgejo doesn't support commit comments")

    def get_commit_comments(self, commit: str) -> list[CommitComment]:
        raise OperationNotSupported("Forgejo doesn't support commit comments")

    def get_commit_comment(self, commit_sha: str, comment_id: int) -> CommitComment:
        raise OperationNotSupported("Forgejo doesn't support commit comments")

    @indirect(ForgejoCommitFlag.set)
    def set_commit_status(
        self,
        commit: str,
        state: Union[CommitStatus, str],
        target_url: str,
        description: str,
        context: str,
        trim: bool = False,
    ) -> "CommitFlag":
        pass

    @indirect(ForgejoCommitFlag.get)
    def get_commit_statuses(self, commit: str) -> list[CommitFlag]:
        pass

    def get_git_urls(self) -> dict[str, str]:
        return {
            "git": self.forgejo_repo.clone_url,
            "ssh": self.forgejo_repo.ssh_url,
        }

    def fork_create(self, namespace: Optional[str] = None) -> "GitProject":
        if namespace:
            self.api.create_fork(
                owner=self.namespace,
                repo=self.repo,
                organization=namespace,
            )
            return ForgejoProject(
                repo=self.repo,
                service=self.service,
                namespace=namespace,
            )

        self.api.create_fork(
            owner=self.namespace,
            repo=self.repo,
        )
        return ForgejoProject(
            repo=self.repo,
            service=self.service,
            namespace=self.service.user.get_username(),
        )

    def change_token(self, new_token: str) -> None:
        # [NOTE] API doesn't provide any method to change the token, and it's
        # embedded in the httpx client that's wrapped by pyforgejo wrapper to
        # avoid duplication between sync and async calls…
        raise NotImplementedError(
            "Not possible; requires recreation of the httpx client",
        )

    def get_file_content(self, path: str, ref: Optional[str] = None) -> str:
        try:
            remote_file: types.ContentsResponse = self.partial_api(
                self.api.repo_get_contents,
                filepath=path,
                ref=ref,
            )()

            # [NOTE] If you touch this, good luck, have fun…
            # tl;dr ‹ContentsResponse› from the Pyforgejo contains the content
            # of the file that's (I hope always) base64-encoded, but it's stored
            # as a string, so here it's needed to convert the UTF-8 encoded
            # string back to bytes (duh, cause base64 is used for encoding raw
            # data), then decode the base64 bytes to just bytes and then decode
            # those to a UTF-8 string… EWWW…
            return codecs.decode(
                bytes(remote_file.content, "utf-8"),
                encoding=remote_file.encoding,
            ).decode("utf-8")

        except NotFoundError as ex:
            raise FileNotFoundError() from ex

    def __get_files(
        self,
        path: str,
        ref: str,
        recursive: bool,
    ) -> Iterable[str]:
        contents: types.ContentsResponse | list[types.ContentsResponse]

        subdirectories = ["."]

        with contextlib.suppress(IndexError):
            while path := subdirectories.pop():
                contents = self.partial_api(
                    self.api.repo_get_contents,
                    filepath=path,
                    ref=ref,
                )()

                if isinstance(contents, types.ContentsResponse):
                    # singular file, return path and skip any further processing
                    yield contents.path
                    continue

                for file in contents:
                    if file.type == "dir":
                        subdirectories.append(file.path)
                        continue

                    yield file.path

    def get_files(
        self,
        ref: Optional[str] = None,
        filter_regex: Optional[str] = None,
        recursive: bool = False,
    ) -> Iterable[str]:
        logger.warning(
            "‹ForgejoProject.get_files()› method can fail because of incorrect"
            " OpenAPI spec",
        )

        ref = ref or self.default_branch
        paths = self.__get_files(".", ref=ref, recursive=recursive)

        if filter_regex:
            return filter_paths(paths, filter_regex)

        return paths

    def get_forks(self) -> Iterable["ForgejoProject"]:
        return (
            ForgejoProject(
                namespace=fork.owner.login,
                repo=fork.name,
                service=self.service,
            )
            for fork in paginate(
                self.partial_api(self.api.list_forks),
            )
        )

    def get_web_url(self) -> str:
        return self.forgejo_repo.html_url

    def get_sha_from_branch(self, branch: str) -> Optional[str]:
        try:
            branch_info = self.partial_api(
                self.api.repo_get_branch,
                branch=branch,
            )()
            return branch_info.commit.id
        except NotFoundError:
            return None

Args

repo
Name of the project.
service
GitService instance.
namespace

Namespace of the project.

  • GitHub: username or org name.
  • GitLab: username or org name.
  • Pagure: namespace (e.g. "rpms").

In case of forks: "fork/{username}/{namespace}".

Ancestors

Class variables

var access_dict : ClassVar[dict]
var serviceForgejoService

Instance variables

prop api : pyforgejo.repository.client.RepositoryClient
Expand source code
@property
def api(self) -> RepositoryClient:
    """Returns a `RepositoryClient` from pyforgejo. Helper to save some
    typing.
    """
    return self.service.api.repository

Returns a RepositoryClient from pyforgejo. Helper to save some typing.

var forgejo_repo : pyforgejo.types.repository.Repository
Expand source code
@cached_property
def forgejo_repo(self) -> types.Repository:
    return self.api.repo_get(
        owner=self.namespace,
        repo=self.repo,
    )

Methods

def partial_api(self, method, /, *args, **kwargs)
Expand source code
def partial_api(self, method, /, *args, **kwargs):
    """Returns a partial API call for `ForgejoProject`.

    Injects `owner` and `repo` for the calls to `/repository/` endpoints.

    Args:
        method: Specific method on the Pyforgejo API that is to be wrapped.
        *args: Positional arguments that get injected into every call.
        **kwargs: Keyword-arguments that get injected into every call.

    Returns:
        Callable with pre-injected parameters.

    """
    return partial(
        method,
        *args,
        **kwargs,
        owner=self.namespace,
        repo=self.repo,
    )

Returns a partial API call for ForgejoProject.

Injects owner and repo for the calls to /repository/ endpoints.

Args

method
Specific method on the Pyforgejo API that is to be wrapped.
*args
Positional arguments that get injected into every call.
**kwargs
Keyword-arguments that get injected into every call.

Returns

Callable with pre-injected parameters.

Inherited members

class ForgejoPullRequest (raw_pr: pyforgejo.types.pull_request.PullRequest,
project: forgejo.ForgejoProject)
Expand source code
class ForgejoPullRequest(BasePullRequest):
    _target_project: "forgejo.ForgejoProject" = None
    _source_project: "forgejo.ForgejoProject" = None
    _labels: list[PRLabel] = None

    def __init__(
        self,
        raw_pr: PyforgejoPullRequest,
        project: "forgejo.ForgejoProject",
    ):
        super().__init__(raw_pr, project)

    def __str__(self) -> str:
        return "Forgejo" + super().__str__()

    @property
    def title(self) -> str:
        return self._raw_pr.title

    @title.setter
    def title(self, new_title: str) -> None:
        self.update_info(title=new_title)

    @property
    def id(self) -> int:
        return self._raw_pr.number

    @property
    def status(self) -> PRStatus:
        return PRStatus.merged if self._raw_pr.merged else PRStatus[self._raw_pr.state]

    @property
    def url(self) -> str:
        return self._raw_pr.url

    @property
    def description(self) -> str:
        return self._raw_pr.body

    @description.setter
    def description(self, new_description: str) -> None:
        self.update_info(description=new_description)

    @property
    def author(self) -> str:
        return self._raw_pr.user.login

    @property
    def source_branch(self) -> str:
        return self._raw_pr.head.ref

    @property
    def target_branch(self) -> str:
        return self._raw_pr.base.ref

    @property
    def created(self) -> datetime.datetime:
        return self._raw_pr.created_at

    @property
    def labels(self) -> list[PRLabel]:
        if not self._labels:
            self._labels = (
                [ForgejoPRLabel(raw_label, self) for raw_label in self._raw_pr.labels]
                if self._raw_pr.labels
                else []
            )
        return self._labels

    @property
    def diff_url(self) -> str:
        return self._raw_pr.diff_url

    @property
    def patch(self) -> bytes:
        patch_url = self._raw_pr.patch_url
        response = requests.get(patch_url)

        if not response.ok:
            raise OgrNetworkError(
                f"Couldn't get patch from {patch_url}.patch because {response.reason}.",
            )

        return response.content

    @property
    def head_commit(self) -> str:
        return self._raw_pr.head.sha

    @property
    def merge_commit_sha(self) -> Optional[str]:
        # this is None for non-merged PRs
        return self._raw_pr.merge_commit_sha

    @property
    def merge_commit_status(self) -> MergeCommitStatus:
        return (
            MergeCommitStatus.can_be_merged
            if self._raw_pr.mergeable
            else MergeCommitStatus.cannot_be_merged
        )

    @cached_property
    def source_project(self) -> "forgejo.ForgejoProject":
        pyforgejo_repo = self._raw_pr.head.repo
        return self._target_project.service.get_project(
            repo=pyforgejo_repo.name,
            namespace=pyforgejo_repo.owner.login,
            forgejo_repo=pyforgejo_repo,
        )

    @property
    def commits_url(self) -> str:
        return f"{self.url}/commits"

    @property
    def closed_by(self) -> Optional[str]:
        return self._raw_pr.merged_by.login if self._raw_pr.merged_by else None

    @staticmethod
    def create(
        project: "forgejo.ForgejoProject",
        title: str,
        body: str,
        target_branch: str,
        source_branch: str,
        fork_username: Optional[str] = None,
    ) -> "PullRequest":
        target_project = project

        if project.is_fork and fork_username is None:
            # handles fork -> upstream (called on fork)
            source_branch = f"{project.namespace}:{source_branch}"
            target_project = project.parent  # type: ignore
        elif fork_username:
            if fork_username != project.namespace and project.parent is not None:
                # handles fork -> other_fork
                #   (username of other_fork owner specified by fork_username)
                forks = list(
                    filter(
                        lambda fork: fork.namespace == fork_username,
                        project.parent.get_forks(),
                    ),
                )
                if not forks:
                    raise ForgejoAPIException("Requested fork doesn't exist")
                target_project = forks[0]  # type: ignore
                source_branch = f"{project.namespace}:{source_branch}"
            else:
                # handles fork -> upstream
                #   (username of fork owner specified by fork_username)
                source_branch = f"{fork_username}:{source_branch}"

        logger.debug(f"Creating PR {target_branch}<-{source_branch}")

        pr = target_project.api.repo_create_pull_request(
            owner=target_project.namespace,
            repo=target_project.repo,
            base=target_branch,
            body=body,
            head=source_branch,
            title=title,
        )
        logger.info(f"PR {pr.id} created.")

        return ForgejoPullRequest(pr, target_project)

    @staticmethod
    def get(project: "forgejo.ForgejoProject", pr_id: int) -> "PullRequest":
        try:
            raw_pr = project.api.repo_get_pull_request(
                owner=project.namespace,
                repo=project.repo,
                index=pr_id,
            )
        except NotFoundError as ex:
            raise ForgejoAPIException(f"No pull request with id {pr_id} found.") from ex
        return ForgejoPullRequest(raw_pr, project)

    @staticmethod
    def get_list(
        project: "forgejo.ForgejoProject",
        status: PRStatus = PRStatus.open,
    ) -> Iterable["PullRequest"]:
        prs = paginate(
            partial(
                project.api.repo_list_pull_requests,
                owner=project.namespace,
                repo=project.repo,
                # Forgejo has just open/closed/all
                state=status.name if status != PRStatus.merged else "closed",
            ),
        )
        return (ForgejoPullRequest(pr, project) for pr in prs)

    def update_info(
        self,
        title: Optional[str] = None,
        description: Optional[str] = None,
    ) -> "PullRequest":
        try:
            data = {"title": title if title else self.title}

            if description is not None:
                data["body"] = description

            updated_pr = self._target_project.api.repo_edit_pull_request(
                owner=self.target_project.namespace,
                repo=self.target_project.repo,
                index=self.id,
                **data,
            )

            self._raw_pr = updated_pr
            return self
        except Exception as ex:
            raise ForgejoAPIException(
                f"There was an error while updating Forgejo PR: {ex}",
            ) from ex

    def close(self) -> "PullRequest":
        self._raw_pr = self._target_project.api.repo_edit_pull_request(
            owner=self.target_project.namespace,
            repo=self.target_project.repo,
            index=self.id,
            state="closed",
        )
        return self

    def merge(self) -> "PullRequest":
        self._target_project.api.repo_merge_pull_request(
            owner=self.target_project.namespace,
            repo=self.target_project.repo,
            index=self.id,
            # options: merge, rebase, rebase-merge, squash, fast-forward-only, manually-merged
            do="merge",
        )
        return self.get(self._target_project, self.id)

    def add_label(self, *labels: str) -> None:
        issue_client = self._target_project.service.api.issue
        new_labels = issue_client.add_label(
            owner=self.target_project.namespace,
            repo=self.target_project.repo,
            index=self.id,
            labels=list(labels),
        )
        self._labels = [ForgejoPRLabel(raw_label, self) for raw_label in new_labels]

    def get_all_commits(self) -> Iterable[str]:
        return (
            commit.sha
            for commit in paginate(
                partial(
                    self._target_project.api.repo_get_pull_request_commits,
                    owner=self.target_project.namespace,
                    repo=self.target_project.repo,
                    index=self.id,
                ),
            )
        )

    def get_comments(
        self,
        filter_regex: Optional[str] = None,
        reverse: bool = False,
        author: Optional[str] = None,
    ) -> Union[list["PRComment"], Iterable["PRComment"]]:
        """
        Get list of pull request comments.

        Args:
            filter_regex: Filter the comments' content with `re.search`.

                Defaults to `None`, which means no filtering.
            reverse: Whether the comments are to be returned in
                reversed order.

                Defaults to `False`.
            author: Filter the comments by author.

                Defaults to `None`, which means no filtering.

        Returns:
            List of pull request comments.
        """
        raise NotImplementedError()

    def comment(
        self,
        body: str,
        commit: Optional[str] = None,
        filename: Optional[str] = None,
        row: Optional[int] = None,
    ) -> "PRComment":
        """
        Add new comment to the pull request.

        Args:
            body: Body of the comment.
            commit: Commit hash to which comment is related.

                Defaults to generic comment.
            filename: Path to the file to which comment is related.

                Defaults to no relation to the file.
            row: Line number to which the comment is related.

                Defaults to no relation to the line.

        Returns:
            Newly created comment.
        """
        raise NotImplementedError()

    def get_comment(self, comment_id: int) -> PRComment:
        """
        Returns a PR comment.

        Args:
            comment_id: id of comment

        Returns:
            Object representing a PR comment.
        """
        raise NotImplementedError()

    def get_statuses(self) -> Union[list[CommitFlag], Iterable[CommitFlag]]:
        """
        Returns statuses for latest commit on pull request.

        Returns:
            List of commit statuses of the latest commit.
        """
        raise NotImplementedError()

Attributes

project : GitProject
Project of the pull request.

Ancestors

Inherited members

class ForgejoService (instance_url: str = 'https://codeberg.org',
api_key: str | None = None,
**kwargs)
Expand source code
@use_for_service("forgejo")
@use_for_service("codeberg.org")
class ForgejoService(BaseGitService):
    version = "/api/v1"

    def __init__(
        self,
        instance_url: str = "https://codeberg.org",
        api_key: Optional[str] = None,
        **kwargs,
    ):
        super().__init__()
        self.instance_url = instance_url + self.version
        self._token = f"token {api_key}"
        self._api = None

    @cached_property
    def api(self) -> PyforgejoApi:
        return PyforgejoApi(base_url=self.instance_url, api_key=self._token)

    def get_project(  # type: ignore[override]
        self,
        repo: str,
        namespace: str,
        **kwargs,
    ) -> "ForgejoProject":
        return ForgejoProject(
            repo=repo,
            namespace=namespace,
            service=self,
            **kwargs,
        )

    @property
    def user(self) -> GitUser:
        return ForgejoUser(self)

    def project_create(
        self,
        repo: str,
        namespace: Optional[str] = None,
        description: Optional[str] = None,
    ) -> "ForgejoProject":
        if namespace:
            new_repo = self.api.organization.create_org_repo(
                org=namespace,
                name=repo,
                description=description,
            )
        else:
            new_repo = self.api.repository.create_current_user_repo(
                name=repo,
                description=description,
            )
        return ForgejoProject(
            repo=repo,
            namespace=namespace or self.user.get_username(),
            service=self,
            github_repo=new_repo,
        )

    def get_project_from_url(self, url: str) -> "ForgejoProject":
        parsed_url = urlparse(url)
        path_parts = parsed_url.path.strip("/").split("/")

        if len(path_parts) < 2:
            raise OgrException(f"Invalid Forgejo URL: {url}")

        namespace = path_parts[0]
        repo = path_parts[1]

        return self.get_project(repo=repo, namespace=namespace)

Attributes

instance_url : str
URL of the git forge instance.

Ancestors

Class variables

var version

Instance variables

var api : pyforgejo.client.PyforgejoApi
Expand source code
@cached_property
def api(self) -> PyforgejoApi:
    return PyforgejoApi(base_url=self.instance_url, api_key=self._token)

Inherited members