Skip to content

ding.utils.k8s_helper

ding.utils.k8s_helper

K8sLauncher

Bases: object

Overview

object to manage the K8s cluster

Interfaces: __init__, _load, create_cluster, _check_k3d_tools, delete_cluster, preload_images

__init__(config_path)

Overview

Initialize the K8sLauncher object.

Arguments: - config_path (:obj:str): The path of the config file.

create_cluster()

Overview

Create the k8s cluster.

delete_cluster()

Overview

Delete the k8s cluster.

preload_images(images)

Overview

Preload images.

get_operator_server_kwargs(cfg)

Overview

Get kwarg dict from config file

Arguments: - cfg (:obj:EasyDict) System config Returns: - result (:obj:dict) Containing api_version, namespace, name, port, host.

exist_operator_server()

Overview

Check if the 'KUBERNETES_SERVER_URL' environment variable exists.

pod_exec_command(kubeconfig, name, namespace, cmd)

Overview

Execute command in pod

Arguments: - kubeconfig (:obj:str) The path of kubeconfig file - name (:obj:str) The name of pod - namespace (:obj:str) The namespace of pod

Full Source Code

../ding/utils/k8s_helper.py

1import os 2import json 3from typing import Tuple 4from easydict import EasyDict 5import yaml 6import subprocess 7from enum import Enum, unique 8from ding.interaction.base import split_http_address 9from .default_helper import one_time_warning 10 11DEFAULT_NAMESPACE = 'default' 12DEFAULT_POD_NAME = 'dijob-example-coordinator' 13DEFAULT_API_VERSION = '/v1alpha1' 14 15DEFAULT_K8S_COLLECTOR_PORT = 22270 16DEFAULT_K8S_LEARNER_PORT = 22271 17DEFAULT_K8S_AGGREGATOR_SLAVE_PORT = 22272 18DEFAULT_K8S_COORDINATOR_PORT = 22273 19DEFAULT_K8S_AGGREGATOR_MASTER_PORT = 22273 20 21 22def get_operator_server_kwargs(cfg: EasyDict) -> dict: 23 """ 24 Overview: 25 Get kwarg dict from config file 26 Arguments: 27 - cfg (:obj:`EasyDict`) System config 28 Returns: 29 - result (:obj:`dict`) Containing ``api_version``, ``namespace``, ``name``, ``port``, ``host``. 30 """ 31 32 namespace = os.environ.get('KUBERNETES_POD_NAMESPACE', DEFAULT_NAMESPACE) 33 name = os.environ.get('KUBERNETES_POD_NAME', DEFAULT_POD_NAME) 34 url = cfg.get('system_addr', None) or os.environ.get('KUBERNETES_SERVER_URL', None) 35 assert url, 'please set environment variable KUBERNETES_SERVER_URL in Kubenetes platform.' 36 api_version = cfg.get('api_version', None) \ 37 or os.environ.get('KUBERNETES_SERVER_API_VERSION', DEFAULT_API_VERSION) 38 try: 39 host, port = url.split(":")[0], int(url.split(":")[1]) 40 except Exception as e: 41 host, port, _, _ = split_http_address(url) 42 43 return { 44 'api_version': api_version, 45 'namespace': namespace, 46 'name': name, 47 'host': host, 48 'port': port, 49 } 50 51 52def exist_operator_server() -> bool: 53 """ 54 Overview: 55 Check if the 'KUBERNETES_SERVER_URL' environment variable exists. 56 """ 57 58 return 'KUBERNETES_SERVER_URL' in os.environ 59 60 61def pod_exec_command(kubeconfig: str, name: str, namespace: str, cmd: str) -> Tuple[int, str]: 62 """ 63 Overview: 64 Execute command in pod 65 Arguments: 66 - kubeconfig (:obj:`str`) The path of kubeconfig file 67 - name (:obj:`str`) The name of pod 68 - namespace (:obj:`str`) The namespace of pod 69 """ 70 71 try: 72 from kubernetes import config 73 from kubernetes.client import CoreV1Api 74 from kubernetes.client.rest import ApiException 75 from kubernetes.stream import stream 76 except ModuleNotFoundError as e: 77 one_time_warning("You have not installed kubernetes package! Please try 'pip install DI-engine[k8s]'.") 78 exit(-1) 79 80 config.load_kube_config(config_file=kubeconfig) 81 core_v1 = CoreV1Api() 82 resp = None 83 try: 84 resp = core_v1.read_namespaced_pod(name=name, namespace=namespace) 85 except ApiException as e: 86 if e.status != 404: 87 return -1, "Unknown error: %s" % e 88 if not resp: 89 return -1, f"Pod {name} does not exist." 90 if resp.status.phase != 'Running': 91 return -1, f"Pod {name} is not in Running." 92 exec_command = ['/bin/sh', '-c', cmd] 93 resp = stream( 94 core_v1.connect_get_namespaced_pod_exec, 95 name, 96 namespace, 97 command=exec_command, 98 stderr=False, 99 stdin=False, 100 stdout=True, 101 tty=False 102 ) 103 resp = resp.replace("\'", "\"") \ 104 .replace('None', 'null') \ 105 .replace(': False', ': 0') \ 106 .replace(': True', ': 1') \ 107 .replace('"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"', '\\"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\\"') 108 resp = json.loads(resp) 109 return resp['code'], resp['message'] 110 111 112@unique 113class K8sType(Enum): 114 Local = 1 115 K3s = 2 116 117 118class K8sLauncher(object): 119 """ 120 Overview: 121 object to manage the K8s cluster 122 Interfaces: 123 ``__init__``, ``_load``, ``create_cluster``, ``_check_k3d_tools``, ``delete_cluster``, ``preload_images`` 124 """ 125 126 def __init__(self, config_path: str) -> None: 127 """ 128 Overview: 129 Initialize the K8sLauncher object. 130 Arguments: 131 - config_path (:obj:`str`): The path of the config file. 132 """ 133 134 self.name = None 135 self.servers = 1 136 self.agents = 0 137 self.type = K8sType.Local 138 self._images = [] 139 140 self._load(config_path) 141 self._check_k3d_tools() 142 143 def _load(self, config_path: str) -> None: 144 """ 145 Overview: 146 Load the config file. 147 Arguments: 148 - config_path (:obj:`str`): The path of the config file. 149 """ 150 151 with open(config_path, 'r') as f: 152 data = yaml.safe_load(f) 153 self.name = data.get('name') if data.get('name') else self.name 154 if data.get('servers'): 155 if type(data.get('servers')) is not int: 156 raise TypeError(f"servers' type is expected int, actual {type(data.get('servers'))}") 157 self.servers = data.get('servers') 158 if data.get('agents'): 159 if type(data.get('agents')) is not int: 160 raise TypeError(f"agents' type is expected int, actual {type(data.get('agents'))}") 161 self.agents = data.get('agents') 162 if data.get('type'): 163 if data.get('type') == 'k3s': 164 self.type = K8sType.K3s 165 elif data.get('type') == 'local': 166 self.type = K8sType.Local 167 else: 168 raise ValueError(f"no type found for {data.get('type')}") 169 if data.get('preload_images'): 170 if type(data.get('preload_images')) is not list: 171 raise TypeError(f"preload_images' type is expected list, actual {type(data.get('preload_images'))}") 172 self._images = data.get('preload_images') 173 174 def _check_k3d_tools(self) -> None: 175 """ 176 Overview: 177 Check if the k3d tools exist. 178 """ 179 180 if self.type != K8sType.K3s: 181 return 182 args = ['which', 'k3d'] 183 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 184 out, _ = proc.communicate() 185 if out.decode('utf-8') == '': 186 raise FileNotFoundError( 187 "No k3d tools found, please install by executing ./ding/scripts/install-k8s-tools.sh" 188 ) 189 190 def create_cluster(self) -> None: 191 """ 192 Overview: 193 Create the k8s cluster. 194 """ 195 196 print('Creating k8s cluster...') 197 if self.type != K8sType.K3s: 198 return 199 args = ['k3d', 'cluster', 'create', f'{self.name}', f'--servers={self.servers}', f'--agents={self.agents}'] 200 proc = subprocess.Popen(args, stderr=subprocess.PIPE) 201 _, err = proc.communicate() 202 err_str = err.decode('utf-8').strip() 203 if err_str != '' and 'WARN' not in err_str: 204 if 'already exists' in err_str: 205 print('K8s cluster already exists') 206 else: 207 raise RuntimeError(f'Failed to create cluster {self.name}: {err_str}') 208 209 # preload images 210 self.preload_images(self._images) 211 212 def delete_cluster(self) -> None: 213 """ 214 Overview: 215 Delete the k8s cluster. 216 """ 217 218 print('Deleting k8s cluster...') 219 if self.type != K8sType.K3s: 220 return 221 args = ['k3d', 'cluster', 'delete', f'{self.name}'] 222 proc = subprocess.Popen(args, stderr=subprocess.PIPE) 223 _, err = proc.communicate() 224 err_str = err.decode('utf-8').strip() 225 if err_str != '' and 'WARN' not in err_str and \ 226 'NotFound' not in err_str: 227 raise RuntimeError(f'Failed to delete cluster {self.name}: {err_str}') 228 229 def preload_images(self, images: list) -> None: 230 """ 231 Overview: 232 Preload images. 233 """ 234 235 if self.type != K8sType.K3s or len(images) == 0: 236 return 237 args = ['k3d', 'image', 'import', f'--cluster={self.name}'] 238 args += images 239 240 proc = subprocess.Popen(args, stderr=subprocess.PIPE) 241 _, err = proc.communicate() 242 err_str = err.decode('utf-8').strip() 243 if err_str != '' and 'WARN' not in err_str: 244 raise RuntimeError(f'Failed to preload images: {err_str}')