Skip to content

ding.utils.orchestrator_launcher

ding.utils.orchestrator_launcher

OrchestratorLauncher

Bases: object

Overview

Object to manage di-orchestrator in existing k8s cluster

Interfaces: __init__, create_orchestrator, delete_orchestrator

__init__(version, name='di-orchestrator', cluster=None, registry='diorchestrator', cert_manager_version='v1.3.1', cert_manager_registry='quay.io/jetstack')

Overview

Initialize the OrchestratorLauncher object.

Arguments: - version (:obj:str): The version of di-orchestrator. - name (:obj:str): The name of di-orchestrator. - cluster (:obj:K8sLauncher): The k8s cluster to deploy di-orchestrator. - registry (:obj:str): The docker registry to pull images. - cert_manager_version (:obj:str): The version of cert-manager. - cert_manager_registry (:obj:str): The docker registry to pull cert-manager images.

create_orchestrator()

Overview

Create di-orchestrator in k8s cluster.

delete_orchestrator()

Overview

Delete di-orchestrator in k8s cluster.

create_components_from_config(config)

Overview

Create components from config file.

Arguments: - config (:obj:str): The config file.

wait_to_be_ready(namespace, component, timeout=120)

Overview

Wait for the component to be ready.

Arguments: - namespace (:obj:str): The namespace of the component. - component (:obj:str): The name of the component. - timeout (:obj:int): The timeout of waiting.

Full Source Code

../ding/utils/orchestrator_launcher.py

1import subprocess 2import time 3from ding.utils import K8sLauncher 4from .default_helper import one_time_warning 5 6 7class OrchestratorLauncher(object): 8 """ 9 Overview: 10 Object to manage di-orchestrator in existing k8s cluster 11 Interfaces: 12 ``__init__``, ``create_orchestrator``, ``delete_orchestrator`` 13 """ 14 15 def __init__( 16 self, 17 version: str, 18 name: str = 'di-orchestrator', 19 cluster: K8sLauncher = None, 20 registry: str = 'diorchestrator', 21 cert_manager_version: str = 'v1.3.1', 22 cert_manager_registry: str = 'quay.io/jetstack' 23 ) -> None: 24 """ 25 Overview: 26 Initialize the OrchestratorLauncher object. 27 Arguments: 28 - version (:obj:`str`): The version of di-orchestrator. 29 - name (:obj:`str`): The name of di-orchestrator. 30 - cluster (:obj:`K8sLauncher`): The k8s cluster to deploy di-orchestrator. 31 - registry (:obj:`str`): The docker registry to pull images. 32 - cert_manager_version (:obj:`str`): The version of cert-manager. 33 - cert_manager_registry (:obj:`str`): The docker registry to pull cert-manager images. 34 """ 35 36 self.name = name 37 self.version = version 38 self.cluster = cluster 39 self.registry = registry 40 self.cert_manager_version = cert_manager_version 41 self.cert_manager_registry = cert_manager_registry 42 43 self._namespace = 'di-system' 44 self._webhook = 'di-webhook' 45 self._cert_manager_namespace = 'cert-manager' 46 self._cert_manager_webhook = 'cert-manager-webhook' 47 48 self.installer = 'https://raw.githubusercontent.com/opendilab/' + \ 49 f'DI-orchestrator/{self.version}/config/di-manager.yaml' 50 self.cert_manager = 'https://github.com/jetstack/' + \ 51 f'cert-manager/releases/download/{self.cert_manager_version}/cert-manager.yaml' 52 53 self._images = [ 54 f'{self.registry}/di-operator:{self.version}', 55 f'{self.registry}/di-webhook:{self.version}', 56 f'{self.registry}/di-server:{self.version}', 57 f'{self.cert_manager_registry}/cert-manager-cainjector:{self.cert_manager_version}', 58 f'{self.cert_manager_registry}/cert-manager-controller:{self.cert_manager_version}', 59 f'{self.cert_manager_registry}/cert-manager-webhook:{self.cert_manager_version}', 60 ] 61 62 self._check_kubectl_tools() 63 64 def _check_kubectl_tools(self) -> None: 65 """ 66 Overview: 67 Check if kubectl tools is installed. 68 """ 69 70 args = ['which', 'kubectl'] 71 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 72 out, _ = proc.communicate() 73 if out.decode('utf-8') == '': 74 raise FileNotFoundError( 75 "No kubectl tools found, please install by executing ./ding/scripts/install-k8s-tools.sh" 76 ) 77 78 def create_orchestrator(self) -> None: 79 """ 80 Overview: 81 Create di-orchestrator in k8s cluster. 82 """ 83 84 print('Creating orchestrator...') 85 if self.cluster is not None: 86 self.cluster.preload_images(self._images) 87 88 # create and wait for cert-manager to be available 89 create_components_from_config(self.cert_manager) 90 wait_to_be_ready(self._cert_manager_namespace, self._cert_manager_webhook) 91 92 # create and wait for di-orchestrator to be available 93 create_components_from_config(self.installer) 94 wait_to_be_ready(self._namespace, self._webhook) 95 96 def delete_orchestrator(self) -> None: 97 """ 98 Overview: 99 Delete di-orchestrator in k8s cluster. 100 """ 101 102 print('Deleting orchestrator...') 103 for item in [self.cert_manager, self.installer]: 104 args = ['kubectl', 'delete', '-f', f'{item}'] 105 proc = subprocess.Popen(args, stderr=subprocess.PIPE) 106 _, err = proc.communicate() 107 err_str = err.decode('utf-8').strip() 108 if err_str != '' and 'WARN' not in err_str and \ 109 'NotFound' not in err_str: 110 raise RuntimeError(f'Failed to delete di-orchestrator: {err_str}') 111 112 113def create_components_from_config(config: str) -> None: 114 """ 115 Overview: 116 Create components from config file. 117 Arguments: 118 - config (:obj:`str`): The config file. 119 """ 120 121 args = ['kubectl', 'create', '-f', f'{config}'] 122 proc = subprocess.Popen(args, stderr=subprocess.PIPE) 123 _, err = proc.communicate() 124 err_str = err.decode('utf-8').strip() 125 if err_str != '' and 'WARN' not in err_str: 126 if 'already exists' in err_str: 127 print(f'Components already exists: {config}') 128 else: 129 raise RuntimeError(f'Failed to launch components: {err_str}') 130 131 132def wait_to_be_ready(namespace: str, component: str, timeout: int = 120) -> None: 133 """ 134 Overview: 135 Wait for the component to be ready. 136 Arguments: 137 - namespace (:obj:`str`): The namespace of the component. 138 - component (:obj:`str`): The name of the component. 139 - timeout (:obj:`int`): The timeout of waiting. 140 """ 141 142 try: 143 from kubernetes import config, client, watch 144 except ModuleNotFoundError: 145 one_time_warning("You have not installed kubernetes package! Please try 'pip install DI-engine[k8s]'.") 146 exit(-1) 147 148 config.load_kube_config() 149 appv1 = client.AppsV1Api() 150 w = watch.Watch() 151 for event in w.stream(appv1.list_namespaced_deployment, namespace, timeout_seconds=timeout): 152 # print("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name)) 153 if event['object'].metadata.name.startswith(component) and \ 154 event['object'].status.ready_replicas is not None and \ 155 event['object'].status.ready_replicas >= 1: 156 print(f'component {component} is ready for serving') 157 w.stop()