k8s python api二次封装 例子

2020-12-13 13:41

阅读:542

标签:client   net   ini   yam   nod   self   exce   test   core   

k8s python api二次封装

pip install pprint   kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml

class K8sApi(object):
    def __init__(self):
        # self.config = config.kube_config.load_kube_config()
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.configuration = client.Configuration()
        self.configuration.host = "https://192.168.100.111:6443"
        self.configuration.api_key[‘authorization‘] = ‘Bearer token‘
        self.configuration.verify_ssl = False
        self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))
        self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))
        self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))

    ####################################################################################################################

    def list_deployment(self, namespace="default"):
        api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
        return api_response

    def read_deployment(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)
        return api_response

    def create_deployment(self, file="deploy-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_deployment(
                body=dep, namespace="default")
            return resp

    def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,
                                                                  body=dep)
            return resp

    def delete_deployment(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)
        return api_response

    ####################################################################################################################

    def list_node(self):
        api_response = self.Api_Instance.list_node()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"name": i.metadata.name,
                                     "status": i.status.conditions[-1].type if i.status.conditions[
                                                                                   -1].status == "True" else "NotReady",
                                     "ip": i.status.addresses[0].address,
                                     "kubelet_version": i.status.node_info.kubelet_version,
                                     "os_image": i.status.node_info.os_image,
                                     }
        return data

    def list_pod(self):
        api_response = self.Api_Instance.list_pod_for_all_namespaces()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}
        return data

    def read_pod(self, name="nginx-pod", namespace="default"):
        api_response = self.Api_Instance.read_namespaced_pod(name, namespace)
        return api_response

    def create_pod(self, file="pod-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)
            return api_response

    def replace_pod(self, name="nginx-pod", namespace="default"):
        dep = self.read_pod(name)
        api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)
        return api_response

    def delete_pod(self, name="nginx-pod", namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)
        return api_response

    ####################################################################################################################

    def list_service(self):
        api_response = self.Api_Instance.list_service_for_all_namespaces()
        return api_response

    def read_service(self, name="", namespace="default"):
        api_response = self.Api_Instance.read_namespaced_service(name, namespace)
        return api_response

    def create_service(self, file="service-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)
            return api_response

    def replace_service(self, name="hequan", namespace="default"):
        dep = self.read_pod(name)
        api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)
        return api_response

    def delete_service(self, name="hequan", namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_service(name, namespace)
        return api_response

    ####################################################################################################################

    def list_ingress(self):
        api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
        return api_response

    def read_ingress(self, name="", namespace="default"):
        api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)
        return api_response

    def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)
            return api_response

    def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,
                                                                                   body=dep)
            return api_response

    def delete_ingress(self, name="hequan", namespace="default"):
        api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)
        return api_response

    #####################################################################################################################

    def list_stateful(self):
        api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
        return api_response

    def read_stateful(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)
        return api_response

    def create_stateful(self, file="deploy-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_stateful_set(
                body=dep, namespace="default")
            return resp

    def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,
                                                                    body=dep)
            return resp

    def delete_stateful(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)
        return api_response

    ####################################################################################################################

if __name__ == ‘__main__‘:
    def test():
        obj = K8sApi()
        ret = obj.list_deployment()
        pprint(ret)

    test()

k8s python api二次封装 例子

标签:client   net   ini   yam   nod   self   exce   test   core   

原文地址:https://blog.51cto.com/hequan/2438485


评论


亲,登录后才可以留言!