Module ogr.services.base
Expand source code
# Copyright Contributors to the Packit project.
# SPDX-License-Identifier: MIT
from typing import Any, Optional
from urllib.request import urlopen
from ogr.abstract import (
CommitFlag,
CommitStatus,
GitProject,
GitService,
GitUser,
Issue,
IssueComment,
PullRequest,
Release,
)
from ogr.exceptions import OgrException
from ogr.parsing import parse_git_repo
from ogr.utils import filter_comments, search_in_comments
try:
from functools import cached_property
except ImportError:
from functools import lru_cache
def cached_property(func): # type: ignore
return property(lru_cache()(func))
class BaseGitService(GitService):
@cached_property
def hostname(self) -> Optional[str]:
parsed_url = parse_git_repo(potential_url=self.instance_url)
return parsed_url.hostname if parsed_url else None
def get_project_from_url(self, url: str) -> "GitProject":
repo_url = parse_git_repo(potential_url=url)
if not repo_url:
raise OgrException(f"Cannot parse project url: '{url}'")
return self.get_project(repo=repo_url.repo, namespace=repo_url.namespace)
class BaseGitProject(GitProject):
@property
def full_repo_name(self) -> str:
return f"{self.namespace}/{self.repo}"
class BasePullRequest(PullRequest):
@property
def target_branch_head_commit(self) -> str:
return self.target_project.get_sha_from_branch(self.target_branch)
def get_comments(
self,
filter_regex: Optional[str] = None,
reverse: bool = False,
author: Optional[str] = None,
):
all_comments = self._get_all_comments()
return filter_comments(all_comments, filter_regex, reverse, author)
def search(
self,
filter_regex: str,
reverse: bool = False,
description: bool = True,
):
all_comments: list[Any] = self.get_comments(reverse=reverse)
if description:
description_content = self.description
if reverse:
all_comments.append(description_content)
else:
all_comments.insert(0, description_content)
return search_in_comments(comments=all_comments, filter_regex=filter_regex)
def get_statuses(self) -> list[CommitFlag]:
commit = self.get_all_commits()[-1]
return self.target_project.get_commit_statuses(commit)
class BaseGitUser(GitUser):
pass
class BaseIssue(Issue):
def get_comments(
self,
filter_regex: Optional[str] = None,
reverse: bool = False,
author: Optional[str] = None,
) -> list[IssueComment]:
all_comments: list[IssueComment] = self._get_all_comments()
return filter_comments(all_comments, filter_regex, reverse, author)
def can_close(self, username: str) -> bool:
return username == self.author or username in self.project.who_can_close_issue()
class BaseCommitFlag(CommitFlag):
@classmethod
def _state_from_str(cls, state: str) -> CommitStatus:
if state not in cls._states:
raise ValueError("Invalid state given")
return cls._states[state]
@classmethod
def _validate_state(cls, state: CommitStatus) -> CommitStatus:
if state not in cls._states.values():
raise ValueError("Invalid state given")
return state
class BaseRelease(Release):
def save_archive(self, filename: str) -> None:
response = urlopen(self.tarball_url)
data = response.read()
with open(filename, "wb") as file:
file.write(data)
Classes
class BaseCommitFlag (raw_commit_flag: Optional[Any] = None, project: Optional[ForwardRef('GitProject')] = None, commit: Optional[str] = None, state: Optional[CommitStatus] = None, context: Optional[str] = None, comment: Optional[str] = None, uid: Optional[str] = None, url: Optional[str] = None)
-
Expand source code
class BaseCommitFlag(CommitFlag): @classmethod def _state_from_str(cls, state: str) -> CommitStatus: if state not in cls._states: raise ValueError("Invalid state given") return cls._states[state] @classmethod def _validate_state(cls, state: CommitStatus) -> CommitStatus: if state not in cls._states.values(): raise ValueError("Invalid state given") return state
Ancestors
Subclasses
Inherited members
class BaseGitProject (repo: str, service: GitService, namespace: str)
-
Args
repo
- Name of the project.
service
- GitService instance.
namespace
-
Namespace of the project.
- GitHub: username or org name.
- GitLab: username or org name.
- Pagure: namespace (e.g.
"rpms"
).
In case of forks:
"fork/{username}/{namespace}"
.
Expand source code
class BaseGitProject(GitProject): @property def full_repo_name(self) -> str: return f"{self.namespace}/{self.repo}"
Ancestors
Subclasses
Inherited members
GitProject
:add_group
add_user
can_merge_pr
change_token
commit_comment
create_issue
create_pr
create_release
default_branch
delete
description
exists
fork_create
full_repo_name
get_branches
get_commit_comments
get_commit_statuses
get_contributors
get_description
get_file_content
get_files
get_fork
get_forks
get_git_urls
get_issue
get_issue_info
get_issue_list
get_latest_release
get_owners
get_pr
get_pr_files_diff
get_pr_list
get_release
get_releases
get_sha_from_branch
get_sha_from_tag
get_tags
get_users_with_given_access
get_web_url
has_issues
has_write_access
is_fork
is_forked
is_private
parent
remove_group
remove_user
request_access
set_commit_status
users_with_write_access
which_groups_can_merge_pr
who_can_close_issue
who_can_merge_pr
class BaseGitService (**_: Any)
-
Attributes
instance_url
:str
- URL of the git forge instance.
Expand source code
class BaseGitService(GitService): @cached_property def hostname(self) -> Optional[str]: parsed_url = parse_git_repo(potential_url=self.instance_url) return parsed_url.hostname if parsed_url else None def get_project_from_url(self, url: str) -> "GitProject": repo_url = parse_git_repo(potential_url=url) if not repo_url: raise OgrException(f"Cannot parse project url: '{url}'") return self.get_project(repo=repo_url.repo, namespace=repo_url.namespace)
Ancestors
Subclasses
Inherited members
class BaseGitUser (service: GitService)
-
Represents currently authenticated user through service.
Expand source code
class BaseGitUser(GitUser): pass
Ancestors
Subclasses
Inherited members
class BaseIssue (raw_issue: Any, project: GitProject)
-
Attributes
project
:GitProject
- Project of the issue.
Expand source code
class BaseIssue(Issue): def get_comments( self, filter_regex: Optional[str] = None, reverse: bool = False, author: Optional[str] = None, ) -> list[IssueComment]: all_comments: list[IssueComment] = self._get_all_comments() return filter_comments(all_comments, filter_regex, reverse, author) def can_close(self, username: str) -> bool: return username == self.author or username in self.project.who_can_close_issue()
Ancestors
Subclasses
Inherited members
class BasePullRequest (raw_pr: Any, project: GitProject)
-
Attributes
project
:GitProject
- Project of the pull request.
Expand source code
class BasePullRequest(PullRequest): @property def target_branch_head_commit(self) -> str: return self.target_project.get_sha_from_branch(self.target_branch) def get_comments( self, filter_regex: Optional[str] = None, reverse: bool = False, author: Optional[str] = None, ): all_comments = self._get_all_comments() return filter_comments(all_comments, filter_regex, reverse, author) def search( self, filter_regex: str, reverse: bool = False, description: bool = True, ): all_comments: list[Any] = self.get_comments(reverse=reverse) if description: description_content = self.description if reverse: all_comments.append(description_content) else: all_comments.insert(0, description_content) return search_in_comments(comments=all_comments, filter_regex=filter_regex) def get_statuses(self) -> list[CommitFlag]: commit = self.get_all_commits()[-1] return self.target_project.get_commit_statuses(commit)
Ancestors
Subclasses
Inherited members
PullRequest
:add_label
author
close
closed_by
comment
commits_url
create
created
description
diff_url
get
get_all_commits
get_comment
get_comments
get_list
get_statuses
head_commit
id
labels
merge
merge_commit_sha
merge_commit_status
patch
search
source_branch
source_project
status
target_branch
target_branch_head_commit
target_project
title
update_info
url
class BaseRelease (raw_release: Any, project: GitProject)
-
Object that represents release.
Attributes
project
:GitProject
- Project on which the release is created.
Expand source code
class BaseRelease(Release): def save_archive(self, filename: str) -> None: response = urlopen(self.tarball_url) data = response.read() with open(filename, "wb") as file: file.write(data)
Ancestors
Inherited members