diff --git a/docs/source/pages/tesk/tesk.api.ga4gh.rst b/docs/source/pages/tesk/tesk.api.ga4gh.rst new file mode 100644 index 00000000..8c96efd5 --- /dev/null +++ b/docs/source/pages/tesk/tesk.api.ga4gh.rst @@ -0,0 +1,18 @@ +tesk.api.ga4gh package +====================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + tesk.api.ga4gh.tes + +Module contents +--------------- + +.. automodule:: tesk.api.ga4gh + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst b/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst new file mode 100644 index 00000000..90818435 --- /dev/null +++ b/docs/source/pages/tesk/tesk.api.ga4gh.tes.rst @@ -0,0 +1,37 @@ +tesk.api.ga4gh.tes package +========================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + tesk.api.ga4gh.tes.service_info + +Submodules +---------- + +tesk.api.ga4gh.tes.controllers module +------------------------------------- + +.. automodule:: tesk.api.ga4gh.tes.controllers + :members: + :undoc-members: + :show-inheritance: + +tesk.api.ga4gh.tes.models module +-------------------------------- + +.. automodule:: tesk.api.ga4gh.tes.models + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: tesk.api.ga4gh.tes + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.api.ga4gh.tes.service_info.rst b/docs/source/pages/tesk/tesk.api.ga4gh.tes.service_info.rst new file mode 100644 index 00000000..6df50f47 --- /dev/null +++ b/docs/source/pages/tesk/tesk.api.ga4gh.tes.service_info.rst @@ -0,0 +1,21 @@ +tesk.api.ga4gh.tes.service\_info package +======================================== + +Submodules +---------- + +tesk.api.ga4gh.tes.service\_info.service\_info module +----------------------------------------------------- + +.. automodule:: tesk.api.ga4gh.tes.service_info.service_info + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: tesk.api.ga4gh.tes.service_info + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.api.kubernetes.rst b/docs/source/pages/tesk/tesk.api.kubernetes.rst new file mode 100644 index 00000000..06de1229 --- /dev/null +++ b/docs/source/pages/tesk/tesk.api.kubernetes.rst @@ -0,0 +1,29 @@ +tesk.api.kubernetes package +=========================== + +Submodules +---------- + +tesk.api.kubernetes.constants module +------------------------------------ + +.. automodule:: tesk.api.kubernetes.constants + :members: + :undoc-members: + :show-inheritance: + +tesk.api.kubernetes.wrapper module +---------------------------------- + +.. automodule:: tesk.api.kubernetes.wrapper + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: tesk.api.kubernetes + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.api.rst b/docs/source/pages/tesk/tesk.api.rst new file mode 100644 index 00000000..90ba9353 --- /dev/null +++ b/docs/source/pages/tesk/tesk.api.rst @@ -0,0 +1,19 @@ +tesk.api package +================ + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + tesk.api.ga4gh + tesk.api.kubernetes + +Module contents +--------------- + +.. automodule:: tesk.api + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pages/tesk/tesk.rst b/docs/source/pages/tesk/tesk.rst index b023c8fc..9afd7a5b 100644 --- a/docs/source/pages/tesk/tesk.rst +++ b/docs/source/pages/tesk/tesk.rst @@ -7,8 +7,52 @@ Subpackages .. toctree:: :maxdepth: 4 + tesk.api tesk.services +Submodules +---------- + +tesk.app module +--------------- + +.. automodule:: tesk.app + :members: + :undoc-members: + :show-inheritance: + +tesk.constants module +--------------------- + +.. automodule:: tesk.constants + :members: + :undoc-members: + :show-inheritance: + +tesk.custom\_config module +-------------------------- + +.. automodule:: tesk.custom_config + :members: + :undoc-members: + :show-inheritance: + +tesk.exceptions module +---------------------- + +.. automodule:: tesk.exceptions + :members: + :undoc-members: + :show-inheritance: + +tesk.utils module +----------------- + +.. automodule:: tesk.utils + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/tesk/api/kubernetes/__init__.py b/tesk/api/kubernetes/__init__.py new file mode 100644 index 00000000..d14cbc36 --- /dev/null +++ b/tesk/api/kubernetes/__init__.py @@ -0,0 +1 @@ +"""Kubernetes API module for TESK.""" diff --git a/tesk/api/kubernetes/constants.py b/tesk/api/kubernetes/constants.py new file mode 100644 index 00000000..4386b34c --- /dev/null +++ b/tesk/api/kubernetes/constants.py @@ -0,0 +1,166 @@ +"""Constants for Kubernetes API.""" + +from enum import Enum +from typing import Set + +from pydantic import BaseModel, Field + + +class Constants(BaseModel): + """Constants related to job and tasks.""" + + taskmaster_input: str = Field( + default="JSON_INPUT", + description="ENV var that serves as taskmaster script input (JSON format)", + ) + taskmaster_input_exec_key: str = Field( + default="executors", + description="Key in JSON taskmaster input, which holds list of executors", + ) + volume_name: str = Field(default="PVC", description="Volume name") + job_create_attempts_no: int = Field( + default=5, + description="Number of attempts of job creation in case of name collision", + ) + job_name_taskm_prefix: str = Field( + default="task-", + description="Constant prefix of taskmaster's job name (== TES task ID)", + ) + job_name_exec_prefix: str = Field( + default="-ex-", + description="Part of executor's job name, that follows taskmaster's name", + ) + job_name_taskm_rand_part_length: int = Field( + default=4, + description=( + "No of bytes of random part of task master's name (which end up " + "encoded to hex)" + ), + ) + job_name_exec_no_length: int = Field( + default=2, + description="No of digits reserved for executor number in executor's job name." + " Ends up padded with '0' for numbers < 10", + ) + job_name_filer_suf: str = Field( + default="-outputs-filer", description="Output filer name suffix" + ) + ann_testask_name_key: str = Field( + default="tes-task-name", + description=( + "Key of the annotation, that stores name of TES task in both taskmaster's " + "job and executor's jobs" + ), + ) + ann_json_input_key: str = Field( + default="json-input", + description="Key of the annotation, that stores whole input TES task serialized" + " to JSON", + ) + label_testask_id_key: str = Field( + default="taskmaster-name", + description="Key of the label, that stores taskmaster's name (==TES task " + "generated ID) in executor jobs", + ) + label_jobtype_key: str = Field( + default="job-type", + description="Key of the label, that stores type of a job (taskmaster or " + "executor)", + ) + label_jobtype_value_taskm: str = Field( + default="taskmaster", + description="Value of the label with taskmaster's job type", + ) + label_jobtype_value_exec: str = Field( + default="executor", description="Value of the label with executor's job type" + ) + label_execno_key: str = Field( + default="executor-no", + description="Key of the label, that holds executor number for executor jobs", + ) + label_taskstate_key: str = Field( + default="task-status", + description="Key of the label, that holds executor's state", + ) + label_taskstate_value_canc: str = Field( + default="Cancelled", + description="Value of the label, that holds executor's Cancelled state", + ) + label_userid_key: str = Field( + default="creator-user-id", description="Key of the label, that holds user id" + ) + label_groupname_key: str = Field( + default="creator-group-name", + description="Key of the label, that holds user's group name", + ) + absolute_path_regexp: str = Field( + default="^\\/.*", description="Pattern to validate paths" + ) + absolute_path_message: str = Field( + default="must be an absolute path", + description="Message for absolute path validation (to avoid " + "message.properties)", + ) + resource_disk_default: float = Field( + default=0.1, description="Default resource disk value" + ) + completed_states: Set[str] = Field( + default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"}, + description="TES task states, indicating task is not running and cannot be " + "cancelled", + ) + ftp_secret_username_env: str = Field( + default="TESK_FTP_USERNAME", + description="Name of taskmaster's ENV variable with username of FTP account " + "used for storage", + ) + ftp_secret_password_env: str = Field( + default="TESK_FTP_PASSWORD", + description="Name of taskmaster's ENV variable with password of FTP account " + "used for storage", + ) + cancel_patch: str = Field( + default='{"metadata":{"labels":{"task-status":"Cancelled"}}}', + description="Patch object passed to job API, when cancelling task", + ) + executor_backoff_limit: str = Field( + default="EXECUTOR_BACKOFF_LIMIT", + description="Set a number of retries of a job execution.", + ) + filer_backoff_limit: str = Field( + default="FILER_BACKOFF_LIMIT", + description="Set a number of retries of a filer job execution.", + ) + + +class K8sConstants(BaseModel): + """Constants related to Kubernetes.""" + + k8s_batch_api_version: str = Field( + default="batch/v1", description="Kubernetes Batch API version" + ) + k8s_batch_api_job_type: str = Field( + default="Job", description="Kubernetes Job object type" + ) + job_restart_policy: str = Field( + default="Never", description="Kubernetes Job restart policy" + ) + resource_cpu_key: str = Field("cpu", description="Executor CPU resource label") + resource_mem_key: str = Field( + default="memory", description="Executor memory resource label" + ) + resource_mem_unit: str = Field( + default="Gi", description="Executor memory resource unit" + ) + resource_mem_one_gb: int = Field( + default=1073741824, description="One Gibibyte (Gi) in bytes" + ) + + class PodPhase(Enum): + """Pod state.""" + + PENDING = "Pending" + + def get_code(self) -> str: + """Return the pod state.""" + return self.value diff --git a/tesk/api/kubernetes/wrapper.py b/tesk/api/kubernetes/wrapper.py new file mode 100644 index 00000000..f97e78a4 --- /dev/null +++ b/tesk/api/kubernetes/wrapper.py @@ -0,0 +1,332 @@ +"""Wrapper Abstraction of Kubernetes Python client API for TESK.""" + +import logging +from typing import Optional + +from kubernetes import client, config +from kubernetes.client import ( + V1ConfigMap, + V1Job, + V1JobList, + V1LabelSelector, + V1LimitRangeList, + V1PodList, +) +from kubernetes.utils.quantity import parse_quantity # type: ignore + +from tesk.api.kubernetes.constants import Constants +from tesk.constants import TeskConstants +from tesk.exceptions import KubernetesError, NotFound + +logger = logging.getLogger(__name__) + + +class KubernetesClientWrapper: + """Kubernetes client wrapper class.""" + + def __init__(self): + """Initialize the Kubernetes client wrapper. + + Args: + namespace: Namespace to use for Kubernetes. + """ + config.load_kube_config() + self.batch_api = client.BatchV1Api() + self.core_api = client.CoreV1Api() + self.namespace = TeskConstants.tesk_namespace + self.constant = Constants() + + def create_job(self, job: V1Job) -> V1Job: + """Create a job in the Kubernetes cluster. + + Returns: + Job object created in the Kubernetes cluster. + """ + try: + v1_job: V1Job = self.batch_api.create_namespaced_job( + namespace=self.namespace, body=job + ) + return v1_job + except KubernetesError as e: + logger.error(f"Exception when creating job: {e}") + raise + + def create_config_map(self, config_map: V1ConfigMap) -> V1ConfigMap: + """Create a config map in the Kubernetes cluster. + + Args: + config_map: ConfigMap object to create. + """ + try: + v1_config_map: V1ConfigMap = self.core_api.create_namespaced_config_map( + namespace=self.namespace, body=config_map + ) + return v1_config_map + except KubernetesError as e: + logger.error(f"Exception when creating config map: {e}") + raise + + def read_taskmaster_job(self, task_id: str) -> V1Job: + """Read a taskmaster job from the Kubernetes cluster. + + task_id: Task identifier. + + Returns: + Job object read from the Kubernetes cluster + + Raises: + Exception: If the task is not found. + """ + try: + job: V1Job = self.batch_api.read_namespaced_job( + name=task_id, namespace=self.namespace + ) + if ( + job.metadata + and job.metadata.labels + and self.constant.label_jobtype_key in job.metadata.labels + and job.metadata.labels[self.constant.label_jobtype_key] + == self.constant.label_jobtype_value_taskm + ): + return job + except KubernetesError as e: + if e.status != NotFound.code: + logger.error(f"Exception when reading job: {e}") + raise + raise Exception(f"Task {task_id} not found") + + def list_jobs( + self, page_token: Optional[str] = None, label_selector=None, limit=None + ): + """List jobs in the Kubernetes cluster. + + Args: + page_token: pageToken supplied by user (from previous result; points to + next page of results) + label_selector: Label selector to filter jobs. + limit: Maximum number of jobs to return. + """ + try: + return self.batch_api.list_namespaced_job( + namespace=self.namespace, + label_selector=label_selector, + limit=limit, + _continue=page_token, + ) + except KubernetesError as e: + logger.error(f"Exception when listing jobs: {e}") + raise + + def list_limits(self, label_selector=None, limit=None) -> V1LimitRangeList: + """List limit ranges in the Kubernetes cluster. + + Args: + label_selector: Label selector to filter limit ranges. + limit: Maximum number of limit ranges to return. + """ + try: + limits: V1LimitRangeList = self.core_api.list_namespaced_limit_range( + namespace=self.namespace, label_selector=label_selector, limit=limit + ) + return limits + except KubernetesError as e: + logger.error(f"Exception when listing limits: {e}") + raise + + def minimum_ram_gb(self) -> float: + """Get the minimum amount of RAM in the cluster. + + Returns: + Minimum amount of RAM in the cluster in GB. + """ + try: + min_ram = 0 + limits = self.list_limits().items + for limit in limits: + if limit.spec: + for item in limit.spec.limits: + if item.min and "memory" in item.min: + mem_quantity = item.min["memory"] + mem_bytes = self.quantity_to_bytes(mem_quantity) + min_ram = max(min_ram, mem_bytes) + return min_ram / (1024**3) + except (ValueError, TypeError) as e: + logger.error(f"Error in minimum_ram_gb: {e}") + return 0.0 + except Exception as e: + logger.error(f"Unexpected error in minimum_ram_gb: {e}") + raise + + def quantity_to_bytes(self, quantity: str) -> int: + """Convert quantity(resource) to bytes.""" + parsed_quantity: int = parse_quantity(quantity) + return parsed_quantity + + def list_all_taskmaster_jobs_for_user( + self, + page_token: str, + items_per_page: int, + # user: str + ) -> V1JobList: + """Gets all Taskmaster job objects, a User is allowed to see. + + Args: + page_token: pageToken supplied by user (from previous result; points to + next page of results) + items_per_page: Value submitted by user, limiting number of results. + # user: User identifier. + + Returns: + Job list of Taskmaster jobs that user is allowed to see. + """ + # TODO: Implement this method when auth is implemented in FOCA. + label_selector = ( + f"{self.constant.label_jobtype_key}" + "=" + f"{self.constant.label_jobtype_value_taskm}" + ) + # if user.get_label_selector(): + # label_selector += f",{user.get_label_selector()}" + + result: V1JobList = self.list_jobs(page_token, label_selector, items_per_page) + + # if user.is_member_in_non_managed_groups(): + # filtered_job_list = [ + # job for job in result.items + # if user.is_group_manager( + # job.metadata.labels.get(self.constant.label_groupname_key) + # ) or user.get_username() + # == job.metadata.labels.get(self.constant.label_userid_key) + # ] + # result.items = filtered_job_list + + return result + + def list_single_task_executor_jobs(self, task_id: str) -> V1JobList: + """List single task executor job.""" + label_selector = (self.constant.label_testask_id_key + "=" + task_id,) + job_list: V1JobList = self.list_jobs(label_selector=label_selector) + return job_list + + def get_single_task_output_filer_job(self, task_id: str) -> Optional[V1Job]: + """Get single task output filer job.""" + try: + job: V1Job = self.batch_api.read_namespaced_job( + name=task_id + self.constant.job_name_filer_suf, + namespace=self.namespace, + ) + return job + except KubernetesError as e: + if e.status != NotFound.code: + logger.error(f"Exception when reading output filer job: {e}") + raise + return None + + def list_all_taskmaster_jobs(self) -> V1JobList: + """List all taskmaster jobs in the Kubernetes cluster.""" + label_selector = ( + self.constant.label_jobtype_key + + "=" + + self.constant.label_jobtype_value_taskm + ) + job_list: V1JobList = self.list_jobs(label_selector=label_selector) + return job_list + + def list_all_task_executor_jobs(self) -> V1JobList: + """List all executor jobs in the Kubernetes cluster.""" + label_selector = ( + self.constant.label_jobtype_key + + "=" + + self.constant.label_jobtype_value_exec + ) + job_list: V1JobList = self.list_jobs(label_selector=label_selector) + return job_list + + def list_all_filer_jobs(self) -> V1JobList: + """List all output filer jobs in the Kubernetes cluster.""" + label_selector = "!" + self.constant.label_jobtype_key + job_list: V1JobList = self.list_jobs(label_selector=label_selector) + return job_list + + def list_single_job_pods(self, job: V1Job) -> V1PodList: + """List pods associated with a single job. + + Args: + job: Job object to list pods for. + """ + try: + if ( + job.spec + and job.spec.selector + and isinstance(job.spec.selector, V1LabelSelector) + and job.spec.selector.match_labels + ): + label_selector = ",".join( + f"{k}={v}" for k, v in job.spec.selector.match_labels.items() + ) + namespaced_pods: V1PodList = self.core_api.list_namespaced_pod( + namespace=self.namespace, label_selector=label_selector + ) + return namespaced_pods + else: + logger.error("Job spec, selector, or match_labels is None or invalid") + return V1PodList(items=[]) + except KubernetesError as e: + logger.error(f"Exception when listing pods: {e}") + raise + + def list_all_job_pods(self): + """List all job pods.""" + label_selector = "job-name" + try: + return self.core_api.list_namespaced_pod( + namespace=self.namespace, label_selector=label_selector + ) + except KubernetesError as e: + logger.error(f"Couldn't list job of {self.namespace} namespace. {e}") + raise + + def read_pod_log(self, pod_name: str) -> Optional[str]: + """Read logs from a pod. + + Args: + pod_name: Name of the pod to read logs from. + """ + try: + pod_log: str = self.core_api.read_namespaced_pod_log( + name=pod_name, namespace=self.namespace + ) + return pod_log + except KubernetesError as e: + logger.error(f"Exception when reading pod log: {e}") + return None + + def label_job_as_cancelled(self, task_id: str) -> None: + """Label a job as cancelled. + + Args: + task_id: Task identifier. + """ + try: + patch = {"metadata": {"labels": {"status": "cancelled"}}} + self.batch_api.patch_namespaced_job( + name=task_id, namespace=self.namespace, body=patch + ) + except KubernetesError as e: + logger.error(f"Exception when labeling job as cancelled: {e}") + raise + + def label_pod_as_cancelled(self, pod_name: str) -> None: + """Label a pod as cancelled. + + Args: + pod_name: Pod name. + """ + try: + patch = {"metadata": {"labels": {"status": "cancelled"}}} + self.core_api.patch_namespaced_pod( + name=pod_name, namespace=self.namespace, body=patch + ) + except KubernetesError as e: + logger.error(f"Exception when labeling pod as cancelled: {e}") + raise diff --git a/tesk/constants.py b/tesk/constants.py new file mode 100644 index 00000000..cdd33212 --- /dev/null +++ b/tesk/constants.py @@ -0,0 +1,33 @@ +"""Tesk scoped constants.""" + +import os + + +class TeskConstants: + """Tesk's K8s scoped constants.""" + + filer_image_name: str = os.getenv( + "TESK_API_TASKMASTER_FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer" + ) + filer_image_version: str = os.getenv( + "TESK_API_TASKMASTER_FILER_IMAGE_VERSION", "latest" + ) + taskmaster_image_name: str = os.getenv( + "TESK_API_TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster" + ) + taskmaster_image_version: str = os.getenv( + "TESK_API_TASKMASTER_IMAGE_VERSION", "latest" + ) + tesk_namespace: str = os.getenv("TESK_API_K8S_NAMESPACE", "tesk") + taskmaster_service_account_name: str = os.getenv( + "TESK_API_TASKMASTER_SERVICE_ACCOUNT_NAME", "taskmaster" + ) + taskmaster_environment_executor_backoff_limit: str = os.getenv( + "ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT", "6" + ) + filer_backoff_limit: str = os.getenv("FILER_BACKOFF_LIMIT", "2") + executor_backoff_limit: str = os.getenv("EXECUTOR_BACKOFF_LIMIT", "2") + # FIXME: IDK what to do with this + tesk_api_taskmaster_environment_filer_backoff_limit: str = os.getenv( + "TESK_API_TASKMASTER_ENVIRONMENT_FILER_BACKOFF_LIMIT", "6" + ) diff --git a/tesk/exceptions.py b/tesk/exceptions.py index 367064ff..a6af9a17 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -1,5 +1,7 @@ """App exceptions.""" +from http import HTTPStatus + from connexion.exceptions import ( BadRequestProblem, ExtraParameterProblem, @@ -7,6 +9,7 @@ OAuthProblem, Unauthorized, ) +from kubernetes.client.exceptions import ApiException from werkzeug.exceptions import ( BadRequest, InternalServerError, @@ -71,3 +74,11 @@ class ConfigNotFoundError(FileNotFoundError): # exceptions raised outside of app context class ValidationError(Exception): """Value or object is not compatible with required type or schema.""" + + +class KubernetesError(ApiException): + """Kubernetes error.""" + + def is_object_name_duplicated(self) -> bool: + """Check if object name is duplicated.""" + return self.status == HTTPStatus.CONFLICT