Module ogr.services.forgejo
Sub-modules
ogr.services.forgejo.flag
ogr.services.forgejo.issue
ogr.services.forgejo.label
ogr.services.forgejo.project
ogr.services.forgejo.pull_request
ogr.services.forgejo.release
ogr.services.forgejo.service
ogr.services.forgejo.user
ogr.services.forgejo.utils
Classes
class ForgejoIssue (raw_issue: pyforgejo.types.issue.Issue, project: forgejo.ForgejoProject)
-
Expand source code
class ForgejoIssue(BaseIssue): def __init__(self, raw_issue: PyforgejoIssue, project: "forgejo.ForgejoProject"): super().__init__(raw_issue, project) @staticmethod def create( project: "forgejo.ForgejoProject", title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list] = None, ) -> "Issue": raise NotImplementedError("TBD") @staticmethod def get(project: "forgejo.ForgejoProject", issue_id: int) -> "Issue": raise NotImplementedError("TBD") @staticmethod def get_list( project: "forgejo.ForgejoProject", status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list["Issue"]: raise NotImplementedError("TBD")
Attributes
project
:GitProject
- Project of the issue.
Ancestors
Inherited members
class ForgejoProject (repo: str,
service: forgejo.ForgejoService,
namespace: str,
forgejo_repo: pyforgejo.types.repository.Repository | None = None,
**kwargs)-
Expand source code
class ForgejoProject(BaseGitProject): service: "forgejo.ForgejoService" access_dict: ClassVar[dict] = { AccessLevel.pull: "read", AccessLevel.triage: "read", AccessLevel.push: "write", AccessLevel.admin: "admin", AccessLevel.maintain: "owner", None: "", } def __init__( self, repo: str, service: "forgejo.ForgejoService", namespace: str, forgejo_repo: Optional[Repository] = None, **kwargs, ): super().__init__(repo, service, namespace) self._forgejo_repo = forgejo_repo @property def api(self) -> RepositoryClient: """Returns a `RepositoryClient` from pyforgejo. Helper to save some typing. """ return self.service.api.repository def partial_api(self, method, /, *args, **kwargs): """Returns a partial API call for `ForgejoProject`. Injects `owner` and `repo` for the calls to `/repository/` endpoints. Args: method: Specific method on the Pyforgejo API that is to be wrapped. *args: Positional arguments that get injected into every call. **kwargs: Keyword-arguments that get injected into every call. Returns: Callable with pre-injected parameters. """ return partial( method, *args, **kwargs, owner=self.namespace, repo=self.repo, ) @cached_property def forgejo_repo(self) -> types.Repository: return self.api.repo_get( owner=self.namespace, repo=self.repo, ) def __str__(self) -> str: return ( f'ForgejoProject(namespace="{self.namespace}", repo="{self.repo}", ' f"service={self.service})" ) def __eq__(self, o: object) -> bool: return ( isinstance(o, ForgejoProject) and self.repo == o.repo and self.namespace == o.namespace and self.service == o.service ) @property def description(self) -> str: return self.forgejo_repo.description or "" @description.setter def description(self, new_description: str) -> None: self.partial_api(self.api.repo_edit)(description=new_description) def delete(self) -> None: self.partial_api(self.api.repo_delete)() def exists(self) -> bool: try: _ = self.forgejo_repo return True except NotFoundError: return False def is_private(self) -> bool: return self.forgejo_repo.private def is_forked(self) -> bool: return ( self.forgejo_repo.fork and self.forgejo_repo.owner.login == self.service.user.get_username() ) @property def is_fork(self) -> bool: return self.forgejo_repo.fork @property def full_repo_name(self) -> str: return self.forgejo_repo.full_name @property def parent(self) -> Optional["GitProject"]: if not self.forgejo_repo.parent: return None return ForgejoProject( service=self.service, repo=self.forgejo_repo.parent.name, namespace=self.forgejo_repo.parent.owner.username, ) @property def has_issues(self) -> bool: return self.forgejo_repo.has_issues def get_branches(self) -> Iterable[str]: return ( branch.name for branch in paginate( self.partial_api(self.api.repo_list_branches), ) ) @property def default_branch(self) -> str: return self.forgejo_repo.default_branch def get_commits(self, ref: Optional[str] = None) -> Iterable[str]: return ( commit.sha for commit in paginate( self.partial_api( self.api.repo_get_all_commits, sha=ref, ), ) ) def get_description(self) -> str: return self.description def _construct_fork_project(self) -> Optional["ForgejoProject"]: login = self.service.user.get_username() try: project = ForgejoProject( repo=self.repo, service=self.service, namespace=login, ) _ = project.forgejo_repo return project except NotFoundError: return None def get_fork(self, create: bool = True) -> Optional["GitProject"]: # The cheapest check that assumes fork has the same repository name as # the upstream if fork := self._construct_fork_project(): return fork # If not successful, the fork could still exist, but has a custom name username = self.service.user.get_username() for fork in self.get_forks(): if fork.forgejo_repo.owner.login == username: return fork # We have not found any fork owned by the auth'd user if create: return self.fork_create() logger.info( f"Fork of {self.forgejo_repo.full_name}" " does not exist and we were asked not to create it.", ) return None def get_owners(self) -> list[str]: return [self.forgejo_repo.owner.username] def _get_owner_or_org_collaborators(self) -> set[str]: namespace = self.get_owners()[0] try: teams = self.api.repo_list_teams( owner=self.namespace, repo=self.repo, ) except Exception as ex: # no teams, repo owned by regular user if "not owned by an organization" in str(ex): return {namespace} raise # repo owned by org, each org can have multiple teams with # different levels of access collaborators: set[str] = set() for team in teams: members = self.service.api.organization.org_list_team_members(team.id) collaborators.update(user.username for user in members) return collaborators def _get_collaborators(self) -> list[str]: return [ c.username for c in self.api.repo_list_collaborators( owner=self.namespace, repo=self.repo, ) ] + list(self._get_owner_or_org_collaborators()) def _get_collaborators_with_access(self) -> dict[str, str]: return { c: self.api.repo_get_repo_permissions( owner=self.namespace, repo=self.repo, collaborator=c, ).permission for c in self._get_collaborators() } def get_contributors(self) -> set[str]: return set(self._get_collaborators()) def users_with_write_access(self) -> set[str]: return { collaborator for collaborator, access in self._get_collaborators_with_access().items() if access in ("owner", "admin", "write") } def who_can_close_issue(self) -> set[str]: return self.users_with_write_access() def who_can_merge_pr(self) -> set[str]: return self.users_with_write_access() def can_merge_pr(self, username: str) -> bool: return self.api.repo_get_repo_permissions( owner=self.namespace, repo=self.repo, collaborator=username, ).permission in ("owner", "admin", "write") def get_users_with_given_access(self, access_levels: list[AccessLevel]) -> set[str]: access_levels_forgejo = [ self.access_dict[access_level] for access_level in access_levels ] return { user for user, permission in self._get_collaborators_with_access().items() if permission in access_levels_forgejo } def add_user(self, user: str, access_level: AccessLevel) -> None: if access_level == AccessLevel.maintain: raise OperationNotSupported("Not possible to add a user as `owner`.") self.api.repo_add_collaborator( owner=self.namespace, repo=self.repo, collaborator=user, permission=self.access_dict[access_level], ) def remove_user(self, user: str) -> None: self.api.repo_delete_collaborator( owner=self.namespace, repo=self.repo, collaborator=user, ) def request_access(self) -> None: raise OperationNotSupported("Not possible on Forgejo") @indirect(ForgejoIssue.get_list) def get_issue_list( self, status: IssueStatus = IssueStatus.open, author: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list["Issue"]: pass @indirect(ForgejoIssue.get) def get_issue(self, issue_id: int) -> "Issue": pass @indirect(ForgejoIssue.create) def create_issue( self, title: str, body: str, private: Optional[bool] = None, labels: Optional[list[str]] = None, assignees: Optional[list[str]] = None, ) -> Issue: pass @indirect(ForgejoPullRequest.get_list) def get_pr_list(self, status: PRStatus = PRStatus.open) -> Iterable["PullRequest"]: pass @indirect(ForgejoPullRequest.get) def get_pr(self, pr_id: int) -> "PullRequest": pass def get_pr_files_diff( self, pr_id: int, retries: int = 0, wait_seconds: int = 3, ) -> dict: """ Get files diff of a pull request. Args: pr_id: ID of the pull request. Returns: Dictionary representing files diff. """ # [NOTE] Implemented only for Pagure, for details see # https://github.com/packit/ogr/issues/895 raise NotImplementedError() def get_tags(self) -> Iterable["GitTag"]: return ( GitTag( name=tag.name, commit_sha=tag.commit.sha, ) for tag in paginate(self.partial_api(self.api.repo_list_tags)) ) def get_sha_from_tag(self, tag_name: str) -> str: return self.partial_api( self.api.repo_get_tag, tag=tag_name, )().commit.sha @indirect(ForgejoRelease.get) def get_release( self, identifier: Optional[int] = None, name: Optional[str] = None, tag_name: Optional[str] = None, ) -> Release: pass @indirect(ForgejoRelease.get_latest) def get_latest_release(self) -> Optional[Release]: pass @indirect(ForgejoRelease.get_list) def get_releases(self) -> list[Release]: pass @indirect(ForgejoRelease.create) def create_release( self, tag: str, name: str, message: str, ref: Optional[str] = None, ) -> Release: pass @indirect(ForgejoPullRequest.create) def create_pr( self, title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> "PullRequest": pass def commit_comment( self, commit: str, body: str, filename: Optional[str] = None, row: Optional[int] = None, ) -> "CommitComment": raise OperationNotSupported("Forgejo doesn't support commit comments") def get_commit_comments(self, commit: str) -> list[CommitComment]: raise OperationNotSupported("Forgejo doesn't support commit comments") def get_commit_comment(self, commit_sha: str, comment_id: int) -> CommitComment: raise OperationNotSupported("Forgejo doesn't support commit comments") @indirect(ForgejoCommitFlag.set) def set_commit_status( self, commit: str, state: Union[CommitStatus, str], target_url: str, description: str, context: str, trim: bool = False, ) -> "CommitFlag": pass @indirect(ForgejoCommitFlag.get) def get_commit_statuses(self, commit: str) -> list[CommitFlag]: pass def get_git_urls(self) -> dict[str, str]: return { "git": self.forgejo_repo.clone_url, "ssh": self.forgejo_repo.ssh_url, } def fork_create(self, namespace: Optional[str] = None) -> "GitProject": if namespace: self.api.create_fork( owner=self.namespace, repo=self.repo, organization=namespace, ) return ForgejoProject( repo=self.repo, service=self.service, namespace=namespace, ) self.api.create_fork( owner=self.namespace, repo=self.repo, ) return ForgejoProject( repo=self.repo, service=self.service, namespace=self.service.user.get_username(), ) def change_token(self, new_token: str) -> None: # [NOTE] API doesn't provide any method to change the token, and it's # embedded in the httpx client that's wrapped by pyforgejo wrapper to # avoid duplication between sync and async calls… raise NotImplementedError( "Not possible; requires recreation of the httpx client", ) def get_file_content(self, path: str, ref: Optional[str] = None) -> str: try: remote_file: types.ContentsResponse = self.partial_api( self.api.repo_get_contents, filepath=path, ref=ref, )() # [NOTE] If you touch this, good luck, have fun… # tl;dr ‹ContentsResponse› from the Pyforgejo contains the content # of the file that's (I hope always) base64-encoded, but it's stored # as a string, so here it's needed to convert the UTF-8 encoded # string back to bytes (duh, cause base64 is used for encoding raw # data), then decode the base64 bytes to just bytes and then decode # those to a UTF-8 string… EWWW… return codecs.decode( bytes(remote_file.content, "utf-8"), encoding=remote_file.encoding, ).decode("utf-8") except NotFoundError as ex: raise FileNotFoundError() from ex def __get_files( self, path: str, ref: str, recursive: bool, ) -> Iterable[str]: contents: types.ContentsResponse | list[types.ContentsResponse] subdirectories = ["."] with contextlib.suppress(IndexError): while path := subdirectories.pop(): contents = self.partial_api( self.api.repo_get_contents, filepath=path, ref=ref, )() if isinstance(contents, types.ContentsResponse): # singular file, return path and skip any further processing yield contents.path continue for file in contents: if file.type == "dir": subdirectories.append(file.path) continue yield file.path def get_files( self, ref: Optional[str] = None, filter_regex: Optional[str] = None, recursive: bool = False, ) -> Iterable[str]: logger.warning( "‹ForgejoProject.get_files()› method can fail because of incorrect" " OpenAPI spec", ) ref = ref or self.default_branch paths = self.__get_files(".", ref=ref, recursive=recursive) if filter_regex: return filter_paths(paths, filter_regex) return paths def get_forks(self) -> Iterable["ForgejoProject"]: return ( ForgejoProject( namespace=fork.owner.login, repo=fork.name, service=self.service, ) for fork in paginate( self.partial_api(self.api.list_forks), ) ) def get_web_url(self) -> str: return self.forgejo_repo.html_url def get_sha_from_branch(self, branch: str) -> Optional[str]: try: branch_info = self.partial_api( self.api.repo_get_branch, branch=branch, )() return branch_info.commit.id except NotFoundError: return None
Args
repo
- Name of the project.
service
- GitService instance.
namespace
-
Namespace of the project.
- GitHub: username or org name.
- GitLab: username or org name.
- Pagure: namespace (e.g.
"rpms"
).
In case of forks:
"fork/{username}/{namespace}"
.
Ancestors
Class variables
var access_dict : ClassVar[dict]
var service : ForgejoService
Instance variables
prop api : pyforgejo.repository.client.RepositoryClient
-
Expand source code
@property def api(self) -> RepositoryClient: """Returns a `RepositoryClient` from pyforgejo. Helper to save some typing. """ return self.service.api.repository
Returns a
RepositoryClient
from pyforgejo. Helper to save some typing. var forgejo_repo : pyforgejo.types.repository.Repository
-
Expand source code
@cached_property def forgejo_repo(self) -> types.Repository: return self.api.repo_get( owner=self.namespace, repo=self.repo, )
Methods
def partial_api(self, method, /, *args, **kwargs)
-
Expand source code
def partial_api(self, method, /, *args, **kwargs): """Returns a partial API call for `ForgejoProject`. Injects `owner` and `repo` for the calls to `/repository/` endpoints. Args: method: Specific method on the Pyforgejo API that is to be wrapped. *args: Positional arguments that get injected into every call. **kwargs: Keyword-arguments that get injected into every call. Returns: Callable with pre-injected parameters. """ return partial( method, *args, **kwargs, owner=self.namespace, repo=self.repo, )
Returns a partial API call for
ForgejoProject
.Injects
owner
andrepo
for the calls to/repository/
endpoints.Args
method
- Specific method on the Pyforgejo API that is to be wrapped.
*args
- Positional arguments that get injected into every call.
**kwargs
- Keyword-arguments that get injected into every call.
Returns
Callable with pre-injected parameters.
Inherited members
BaseGitProject
:add_group
add_user
can_merge_pr
change_token
commit_comment
create_issue
create_pr
create_release
default_branch
delete
description
exists
fork_create
full_repo_name
get_branches
get_commit_comment
get_commit_comments
get_commit_statuses
get_commits
get_contributors
get_description
get_file_content
get_files
get_fork
get_forks
get_git_urls
get_issue
get_issue_info
get_issue_list
get_latest_release
get_owners
get_pr
get_pr_files_diff
get_pr_list
get_release
get_releases
get_sha_from_branch
get_sha_from_tag
get_tags
get_users_with_given_access
get_web_url
has_issues
has_write_access
is_fork
is_forked
is_private
parent
remove_group
remove_user
request_access
set_commit_status
users_with_write_access
which_groups_can_merge_pr
who_can_close_issue
who_can_merge_pr
class ForgejoPullRequest (raw_pr: pyforgejo.types.pull_request.PullRequest,
project: forgejo.ForgejoProject)-
Expand source code
class ForgejoPullRequest(BasePullRequest): _target_project: "forgejo.ForgejoProject" = None _source_project: "forgejo.ForgejoProject" = None _labels: list[PRLabel] = None def __init__( self, raw_pr: PyforgejoPullRequest, project: "forgejo.ForgejoProject", ): super().__init__(raw_pr, project) def __str__(self) -> str: return "Forgejo" + super().__str__() @property def title(self) -> str: return self._raw_pr.title @title.setter def title(self, new_title: str) -> None: self.update_info(title=new_title) @property def id(self) -> int: return self._raw_pr.number @property def status(self) -> PRStatus: return PRStatus.merged if self._raw_pr.merged else PRStatus[self._raw_pr.state] @property def url(self) -> str: return self._raw_pr.url @property def description(self) -> str: return self._raw_pr.body @description.setter def description(self, new_description: str) -> None: self.update_info(description=new_description) @property def author(self) -> str: return self._raw_pr.user.login @property def source_branch(self) -> str: return self._raw_pr.head.ref @property def target_branch(self) -> str: return self._raw_pr.base.ref @property def created(self) -> datetime.datetime: return self._raw_pr.created_at @property def labels(self) -> list[PRLabel]: if not self._labels: self._labels = ( [ForgejoPRLabel(raw_label, self) for raw_label in self._raw_pr.labels] if self._raw_pr.labels else [] ) return self._labels @property def diff_url(self) -> str: return self._raw_pr.diff_url @property def patch(self) -> bytes: patch_url = self._raw_pr.patch_url response = requests.get(patch_url) if not response.ok: raise OgrNetworkError( f"Couldn't get patch from {patch_url}.patch because {response.reason}.", ) return response.content @property def head_commit(self) -> str: return self._raw_pr.head.sha @property def merge_commit_sha(self) -> Optional[str]: # this is None for non-merged PRs return self._raw_pr.merge_commit_sha @property def merge_commit_status(self) -> MergeCommitStatus: return ( MergeCommitStatus.can_be_merged if self._raw_pr.mergeable else MergeCommitStatus.cannot_be_merged ) @cached_property def source_project(self) -> "forgejo.ForgejoProject": pyforgejo_repo = self._raw_pr.head.repo return self._target_project.service.get_project( repo=pyforgejo_repo.name, namespace=pyforgejo_repo.owner.login, forgejo_repo=pyforgejo_repo, ) @property def commits_url(self) -> str: return f"{self.url}/commits" @property def closed_by(self) -> Optional[str]: return self._raw_pr.merged_by.login if self._raw_pr.merged_by else None @staticmethod def create( project: "forgejo.ForgejoProject", title: str, body: str, target_branch: str, source_branch: str, fork_username: Optional[str] = None, ) -> "PullRequest": target_project = project if project.is_fork and fork_username is None: # handles fork -> upstream (called on fork) source_branch = f"{project.namespace}:{source_branch}" target_project = project.parent # type: ignore elif fork_username: if fork_username != project.namespace and project.parent is not None: # handles fork -> other_fork # (username of other_fork owner specified by fork_username) forks = list( filter( lambda fork: fork.namespace == fork_username, project.parent.get_forks(), ), ) if not forks: raise ForgejoAPIException("Requested fork doesn't exist") target_project = forks[0] # type: ignore source_branch = f"{project.namespace}:{source_branch}" else: # handles fork -> upstream # (username of fork owner specified by fork_username) source_branch = f"{fork_username}:{source_branch}" logger.debug(f"Creating PR {target_branch}<-{source_branch}") pr = target_project.api.repo_create_pull_request( owner=target_project.namespace, repo=target_project.repo, base=target_branch, body=body, head=source_branch, title=title, ) logger.info(f"PR {pr.id} created.") return ForgejoPullRequest(pr, target_project) @staticmethod def get(project: "forgejo.ForgejoProject", pr_id: int) -> "PullRequest": try: raw_pr = project.api.repo_get_pull_request( owner=project.namespace, repo=project.repo, index=pr_id, ) except NotFoundError as ex: raise ForgejoAPIException(f"No pull request with id {pr_id} found.") from ex return ForgejoPullRequest(raw_pr, project) @staticmethod def get_list( project: "forgejo.ForgejoProject", status: PRStatus = PRStatus.open, ) -> Iterable["PullRequest"]: prs = paginate( partial( project.api.repo_list_pull_requests, owner=project.namespace, repo=project.repo, # Forgejo has just open/closed/all state=status.name if status != PRStatus.merged else "closed", ), ) return (ForgejoPullRequest(pr, project) for pr in prs) def update_info( self, title: Optional[str] = None, description: Optional[str] = None, ) -> "PullRequest": try: data = {"title": title if title else self.title} if description is not None: data["body"] = description updated_pr = self._target_project.api.repo_edit_pull_request( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, **data, ) self._raw_pr = updated_pr return self except Exception as ex: raise ForgejoAPIException( f"There was an error while updating Forgejo PR: {ex}", ) from ex def close(self) -> "PullRequest": self._raw_pr = self._target_project.api.repo_edit_pull_request( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, state="closed", ) return self def merge(self) -> "PullRequest": self._target_project.api.repo_merge_pull_request( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, # options: merge, rebase, rebase-merge, squash, fast-forward-only, manually-merged do="merge", ) return self.get(self._target_project, self.id) def add_label(self, *labels: str) -> None: issue_client = self._target_project.service.api.issue new_labels = issue_client.add_label( owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, labels=list(labels), ) self._labels = [ForgejoPRLabel(raw_label, self) for raw_label in new_labels] def get_all_commits(self) -> Iterable[str]: return ( commit.sha for commit in paginate( partial( self._target_project.api.repo_get_pull_request_commits, owner=self.target_project.namespace, repo=self.target_project.repo, index=self.id, ), ) ) def get_comments( self, filter_regex: Optional[str] = None, reverse: bool = False, author: Optional[str] = None, ) -> Union[list["PRComment"], Iterable["PRComment"]]: """ Get list of pull request comments. Args: filter_regex: Filter the comments' content with `re.search`. Defaults to `None`, which means no filtering. reverse: Whether the comments are to be returned in reversed order. Defaults to `False`. author: Filter the comments by author. Defaults to `None`, which means no filtering. Returns: List of pull request comments. """ raise NotImplementedError() def comment( self, body: str, commit: Optional[str] = None, filename: Optional[str] = None, row: Optional[int] = None, ) -> "PRComment": """ Add new comment to the pull request. Args: body: Body of the comment. commit: Commit hash to which comment is related. Defaults to generic comment. filename: Path to the file to which comment is related. Defaults to no relation to the file. row: Line number to which the comment is related. Defaults to no relation to the line. Returns: Newly created comment. """ raise NotImplementedError() def get_comment(self, comment_id: int) -> PRComment: """ Returns a PR comment. Args: comment_id: id of comment Returns: Object representing a PR comment. """ raise NotImplementedError() def get_statuses(self) -> Union[list[CommitFlag], Iterable[CommitFlag]]: """ Returns statuses for latest commit on pull request. Returns: List of commit statuses of the latest commit. """ raise NotImplementedError()
Attributes
project
:GitProject
- Project of the pull request.
Ancestors
Inherited members
class ForgejoService (instance_url: str = 'https://codeberg.org',
api_key: str | None = None,
**kwargs)-
Expand source code
@use_for_service("forgejo") @use_for_service("codeberg.org") class ForgejoService(BaseGitService): version = "/api/v1" def __init__( self, instance_url: str = "https://codeberg.org", api_key: Optional[str] = None, **kwargs, ): super().__init__() self.instance_url = instance_url + self.version self._token = f"token {api_key}" self._api = None @cached_property def api(self) -> PyforgejoApi: return PyforgejoApi(base_url=self.instance_url, api_key=self._token) def get_project( # type: ignore[override] self, repo: str, namespace: str, **kwargs, ) -> "ForgejoProject": return ForgejoProject( repo=repo, namespace=namespace, service=self, **kwargs, ) @property def user(self) -> GitUser: return ForgejoUser(self) def project_create( self, repo: str, namespace: Optional[str] = None, description: Optional[str] = None, ) -> "ForgejoProject": if namespace: new_repo = self.api.organization.create_org_repo( org=namespace, name=repo, description=description, ) else: new_repo = self.api.repository.create_current_user_repo( name=repo, description=description, ) return ForgejoProject( repo=repo, namespace=namespace or self.user.get_username(), service=self, github_repo=new_repo, ) def get_project_from_url(self, url: str) -> "ForgejoProject": parsed_url = urlparse(url) path_parts = parsed_url.path.strip("/").split("/") if len(path_parts) < 2: raise OgrException(f"Invalid Forgejo URL: {url}") namespace = path_parts[0] repo = path_parts[1] return self.get_project(repo=repo, namespace=namespace)
Attributes
instance_url
:str
- URL of the git forge instance.
Ancestors
Class variables
var version
Instance variables
var api : pyforgejo.client.PyforgejoApi
-
Expand source code
@cached_property def api(self) -> PyforgejoApi: return PyforgejoApi(base_url=self.instance_url, api_key=self._token)
Inherited members