Module ogr.services.github.service
Expand source code
# Copyright Contributors to the Packit project.
# SPDX-License-Identifier: MIT
import logging
import re
from typing import Optional, Union
import github
import github.GithubObject
from github import (
Github as PyGithubInstance,
)
from github import (
Repository as PyGithubRepository,
)
from github import (
UnknownObjectException,
)
from urllib3.util import Retry
from ogr.abstract import AuthMethod, GitUser
from ogr.exceptions import GithubAPIException
from ogr.factory import use_for_service
from ogr.services.base import BaseGitService, GitProject
from ogr.services.github.auth_providers import (
GithubApp,
GithubAuthentication,
TokenAuthentication,
Tokman,
)
from ogr.services.github.project import GithubProject
from ogr.services.github.user import GithubUser
logger = logging.getLogger(__name__)
@use_for_service("github.com")
class GithubService(BaseGitService):
# class parameter could be used to mock Github class api
github_class: type[github.Github]
instance_url = "https://github.com"
def __init__(
self,
token=None,
read_only=False,
github_app_id: Optional[str] = None,
github_app_private_key: Optional[str] = None,
github_app_private_key_path: Optional[str] = None,
tokman_instance_url: Optional[str] = None,
github_authentication: GithubAuthentication = None,
max_retries: Union[int, Retry] = 0,
**kwargs,
):
"""
If multiple authentication methods are provided, they are prioritised:
1. Tokman
2. GithubApp
3. TokenAuthentication (which is also default one, that works without specified token)
"""
super().__init__()
self.read_only = read_only
self._default_auth_method = github_authentication
self._other_auth_method: GithubAuthentication = None
self._auth_methods: dict[AuthMethod, GithubAuthentication] = {}
if isinstance(max_retries, Retry):
self._max_retries = max_retries
else:
self._max_retries = Retry(
total=int(max_retries),
read=0,
# Retry mechanism active for these HTTP methods:
allowed_methods=["DELETE", "GET", "PATCH", "POST", "PUT"],
# Only retry on following HTTP status codes
status_forcelist=[500, 503, 403, 401],
raise_on_status=False,
)
if not self._default_auth_method:
self.__set_authentication(
token=token,
github_app_id=github_app_id,
github_app_private_key=github_app_private_key,
github_app_private_key_path=github_app_private_key_path,
tokman_instance_url=tokman_instance_url,
max_retries=self._max_retries,
)
if kwargs:
logger.warning(f"Ignored keyword arguments: {kwargs}")
def __set_authentication(self, **kwargs):
auth_methods = [
(Tokman, AuthMethod.tokman),
(GithubApp, AuthMethod.github_app),
(TokenAuthentication, AuthMethod.token),
]
for auth_class, auth_name in auth_methods:
auth_inst = auth_class.try_create(**kwargs)
self._auth_methods[auth_name] = auth_inst
if not self._default_auth_method:
self._default_auth_method = auth_inst
return None if self._default_auth_method else TokenAuthentication(None)
def set_auth_method(self, method: AuthMethod):
if self._auth_methods[method]:
logger.info("Forced Github auth method to %s", method)
self._other_auth_method = self._auth_methods[method]
else:
raise GithubAPIException(
f"Choosen authentication method ({method}) is not available",
)
def reset_auth_method(self):
logger.info("Reset Github auth method to the default")
self._other_auth_method = None
@property
def authentication(self):
return self._other_auth_method or self._default_auth_method
@property
def github(self):
return self.authentication.pygithub_instance
def __str__(self) -> str:
readonly_str = ", read_only=True" if self.read_only else ""
arguments = f", github_authentication={self.authentication!s}{readonly_str}"
if arguments:
# remove the first '- '
arguments = arguments[2:]
return f"GithubService({arguments})"
def __eq__(self, o: object) -> bool:
if not issubclass(o.__class__, GithubService):
return False
return (
self.read_only == o.read_only # type: ignore
and self.authentication == o.authentication # type: ignore
)
def __hash__(self) -> int:
return hash(str(self))
def get_project(
self,
repo=None,
namespace=None,
is_fork=False,
**kwargs,
) -> "GithubProject":
if is_fork:
namespace = self.user.get_username()
return GithubProject(
repo=repo,
namespace=namespace,
service=self,
read_only=self.read_only,
**kwargs,
)
def get_project_from_github_repository(
self,
github_repo: PyGithubRepository.Repository,
) -> "GithubProject":
return GithubProject(
repo=github_repo.name,
namespace=github_repo.owner.login,
github_repo=github_repo,
service=self,
read_only=self.read_only,
)
@property
def user(self) -> GitUser:
return GithubUser(service=self)
def change_token(self, new_token: str) -> None:
self._default_auth_method = TokenAuthentication(new_token)
def project_create(
self,
repo: str,
namespace: Optional[str] = None,
description: Optional[str] = None,
) -> "GithubProject":
if namespace:
try:
owner = self.github.get_organization(namespace)
except UnknownObjectException as ex:
raise GithubAPIException(f"Group {namespace} not found.") from ex
else:
owner = self.github.get_user()
try:
new_repo = owner.create_repo(
name=repo,
description=description if description else github.GithubObject.NotSet,
)
except github.GithubException as ex:
raise GithubAPIException("Project creation failed") from ex
return GithubProject(
repo=repo,
namespace=namespace or owner.login,
service=self,
github_repo=new_repo,
)
def get_pygithub_instance(self, namespace: str, repo: str) -> PyGithubInstance:
token = self.authentication.get_token(namespace, repo)
return PyGithubInstance(login_or_token=token, retry=self._max_retries)
def list_projects(
self,
namespace: Optional[str] = None,
user: Optional[str] = None,
search_pattern: Optional[str] = None,
language: Optional[str] = None,
) -> list[GitProject]:
search_query = ""
if user:
search_query += f"user:{user}"
if language:
search_query += f" language:{language}"
projects: list[GitProject]
projects = [
GithubProject(
repo=repo.name,
namespace=repo.owner.login,
github_repo=repo,
service=self,
)
for repo in self.github.search_repositories(search_query, order="asc")
]
if search_pattern:
projects = [
project
for project in projects
if re.search(search_pattern, project.repo)
]
return projects
Classes
class GithubService (token=None, read_only=False, github_app_id: Optional[str] = None, github_app_private_key: Optional[str] = None, github_app_private_key_path: Optional[str] = None, tokman_instance_url: Optional[str] = None, github_authentication: GithubAuthentication = None, max_retries: Union[int, urllib3.util.retry.Retry] = 0, **kwargs)
-
Attributes
instance_url
:str
- URL of the git forge instance.
If multiple authentication methods are provided, they are prioritised: 1. Tokman 2. GithubApp 3. TokenAuthentication (which is also default one, that works without specified token)
Expand source code
@use_for_service("github.com") class GithubService(BaseGitService): # class parameter could be used to mock Github class api github_class: type[github.Github] instance_url = "https://github.com" def __init__( self, token=None, read_only=False, github_app_id: Optional[str] = None, github_app_private_key: Optional[str] = None, github_app_private_key_path: Optional[str] = None, tokman_instance_url: Optional[str] = None, github_authentication: GithubAuthentication = None, max_retries: Union[int, Retry] = 0, **kwargs, ): """ If multiple authentication methods are provided, they are prioritised: 1. Tokman 2. GithubApp 3. TokenAuthentication (which is also default one, that works without specified token) """ super().__init__() self.read_only = read_only self._default_auth_method = github_authentication self._other_auth_method: GithubAuthentication = None self._auth_methods: dict[AuthMethod, GithubAuthentication] = {} if isinstance(max_retries, Retry): self._max_retries = max_retries else: self._max_retries = Retry( total=int(max_retries), read=0, # Retry mechanism active for these HTTP methods: allowed_methods=["DELETE", "GET", "PATCH", "POST", "PUT"], # Only retry on following HTTP status codes status_forcelist=[500, 503, 403, 401], raise_on_status=False, ) if not self._default_auth_method: self.__set_authentication( token=token, github_app_id=github_app_id, github_app_private_key=github_app_private_key, github_app_private_key_path=github_app_private_key_path, tokman_instance_url=tokman_instance_url, max_retries=self._max_retries, ) if kwargs: logger.warning(f"Ignored keyword arguments: {kwargs}") def __set_authentication(self, **kwargs): auth_methods = [ (Tokman, AuthMethod.tokman), (GithubApp, AuthMethod.github_app), (TokenAuthentication, AuthMethod.token), ] for auth_class, auth_name in auth_methods: auth_inst = auth_class.try_create(**kwargs) self._auth_methods[auth_name] = auth_inst if not self._default_auth_method: self._default_auth_method = auth_inst return None if self._default_auth_method else TokenAuthentication(None) def set_auth_method(self, method: AuthMethod): if self._auth_methods[method]: logger.info("Forced Github auth method to %s", method) self._other_auth_method = self._auth_methods[method] else: raise GithubAPIException( f"Choosen authentication method ({method}) is not available", ) def reset_auth_method(self): logger.info("Reset Github auth method to the default") self._other_auth_method = None @property def authentication(self): return self._other_auth_method or self._default_auth_method @property def github(self): return self.authentication.pygithub_instance def __str__(self) -> str: readonly_str = ", read_only=True" if self.read_only else "" arguments = f", github_authentication={self.authentication!s}{readonly_str}" if arguments: # remove the first '- ' arguments = arguments[2:] return f"GithubService({arguments})" def __eq__(self, o: object) -> bool: if not issubclass(o.__class__, GithubService): return False return ( self.read_only == o.read_only # type: ignore and self.authentication == o.authentication # type: ignore ) def __hash__(self) -> int: return hash(str(self)) def get_project( self, repo=None, namespace=None, is_fork=False, **kwargs, ) -> "GithubProject": if is_fork: namespace = self.user.get_username() return GithubProject( repo=repo, namespace=namespace, service=self, read_only=self.read_only, **kwargs, ) def get_project_from_github_repository( self, github_repo: PyGithubRepository.Repository, ) -> "GithubProject": return GithubProject( repo=github_repo.name, namespace=github_repo.owner.login, github_repo=github_repo, service=self, read_only=self.read_only, ) @property def user(self) -> GitUser: return GithubUser(service=self) def change_token(self, new_token: str) -> None: self._default_auth_method = TokenAuthentication(new_token) def project_create( self, repo: str, namespace: Optional[str] = None, description: Optional[str] = None, ) -> "GithubProject": if namespace: try: owner = self.github.get_organization(namespace) except UnknownObjectException as ex: raise GithubAPIException(f"Group {namespace} not found.") from ex else: owner = self.github.get_user() try: new_repo = owner.create_repo( name=repo, description=description if description else github.GithubObject.NotSet, ) except github.GithubException as ex: raise GithubAPIException("Project creation failed") from ex return GithubProject( repo=repo, namespace=namespace or owner.login, service=self, github_repo=new_repo, ) def get_pygithub_instance(self, namespace: str, repo: str) -> PyGithubInstance: token = self.authentication.get_token(namespace, repo) return PyGithubInstance(login_or_token=token, retry=self._max_retries) def list_projects( self, namespace: Optional[str] = None, user: Optional[str] = None, search_pattern: Optional[str] = None, language: Optional[str] = None, ) -> list[GitProject]: search_query = "" if user: search_query += f"user:{user}" if language: search_query += f" language:{language}" projects: list[GitProject] projects = [ GithubProject( repo=repo.name, namespace=repo.owner.login, github_repo=repo, service=self, ) for repo in self.github.search_repositories(search_query, order="asc") ] if search_pattern: projects = [ project for project in projects if re.search(search_pattern, project.repo) ] return projects
Ancestors
Class variables
var github_class : type[github.MainClass.Github]
var instance_url : Optional[str]
Instance variables
var authentication
-
Expand source code
@property def authentication(self): return self._other_auth_method or self._default_auth_method
var github
-
Expand source code
@property def github(self): return self.authentication.pygithub_instance
Methods
def get_project_from_github_repository(self, github_repo: github.Repository.Repository) ‑> GithubProject
-
Expand source code
def get_project_from_github_repository( self, github_repo: PyGithubRepository.Repository, ) -> "GithubProject": return GithubProject( repo=github_repo.name, namespace=github_repo.owner.login, github_repo=github_repo, service=self, read_only=self.read_only, )
def get_pygithub_instance(self, namespace: str, repo: str) ‑> github.MainClass.Github
-
Expand source code
def get_pygithub_instance(self, namespace: str, repo: str) -> PyGithubInstance: token = self.authentication.get_token(namespace, repo) return PyGithubInstance(login_or_token=token, retry=self._max_retries)
Inherited members