通过 Ansible API 获取主机信息
2021-07-12 00:10
标签:tde playbook err number dir core swa pen search 话不多说,还是直接上代码 # - encoding:utf-8 - import os # 因为我是在 django 中使用,所以我加上了下面这段代码。自已测试时不用加 import ansibleAPI DEVICES = namedtuple(‘mount‘, [‘device‘, ‘point‘, "total_size", "free_size"]) class Resource(object): if name == ‘main‘: 通过 Ansible API 获取主机信息 标签:tde playbook err number dir core swa pen search 原文地址:http://blog.51cto.com/784687488/2173727
这一篇将使用 Ansible API 获取主机详细信息,CMDB系统中最基本的功能
import sys
import django
Search_Path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(file))))
sys.path.append(Search_Path)
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘AutoOPPlatform.settings‘)
django.setup()
from collections import namedtupledef __init__(self, hostname=None, sshport=22, username=None, password=None, **kwargs):
if hostname:
self.hostname = hostname
self.sshport = sshport
self.username = username
self.password = password
self.group = None
if kwargs:
self.hostname = None
self.group = kwargs
self.info = self._get_info()
def _get_info(self):
if self.hostname:
execute = ansibleAPI.AnsibleAPI([dict(hostname=self.hostname, port=self.sshport,
username=self.username, password=self.password)])
execute.PrivateAdHoc(‘setup‘)
msg = execute.adhoc_result()
for k in msg.keys():
if k == ‘success‘ and msg[‘success‘]:
return dict(success=msg[‘success‘])
elif k == ‘unreachable‘ and msg[‘unreachable‘]:
return dict(error=msg[‘unreachable‘])
elif k == ‘failed‘ and msg[‘failed‘]:
return dict(error=msg[‘failed‘])
if self.group:
execute = ansibleAPI.AnsibleAPI(self.group)
execute.PrivateAdHoc(‘setup‘)
msg = execute.adhoc_result()
return msg
def get_info(self):
return self.info
def get_failed(self):
return self.info[‘failed‘]
def get_unreachable(self):
return self.info[‘unreachable‘]
def get_success(self):
success = []
for k in self.info[‘success‘]:
success.append(k)
return success
def get_hostname(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_hostname‘]
def ip(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_default_ipv4‘][‘address‘]
def netmask(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_default_ipv4‘][‘netmask‘]
def gateway(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_default_ipv4‘][‘gateway‘]
def broadcast(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_default_ipv4‘][‘broadcast‘]
def mac(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_default_ipv4‘][‘macaddress‘]
def memory_cards(self):
if self.hostname:
execute = ansibleAPI.AnsibleAPI([dict(hostname=self.hostname, port=self.sshport,
username=self.username, password=self.password)])
execute.PrivateAdHoc(‘shell‘, ‘dmidecode -t memory|grep Size|grep MB|wc -l‘)
msg = execute.adhoc_result()
for k in msg.keys():
if k == ‘success‘ and msg[‘success‘]:
return msg[‘success‘][self.hostname][‘stdout‘]
elif k == ‘unreachable‘ and msg[‘unreachable‘]:
return 0
elif k == ‘failed‘ and msg[‘failed‘]:
return 0
if self.group:
execute = ansibleAPI.AnsibleAPI(self.group)
execute.PrivateAdHoc(‘shell‘, ‘dmidecode -t memory|grep Size|grep MB|wc -l‘)
msg = execute.adhoc_result()
return msg
def memory_total(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_memtotal_mb‘]
def memory_free(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_memfree_mb‘]
def memory_used(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_memory_mb‘][‘real‘][‘used‘]
def swap_total(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_swaptotal_mb‘]
def swap_free(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_swapfree_mb‘]
def swap_used(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_memory_mb‘][‘swap‘][‘used‘]
def memory(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_memory_mb‘]
def cpu_brand_model(self):
take_repetition = set(self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_processor‘])
take_repetition.remove(‘GenuineIntel‘)
if len(take_repetition) == 1:
return list(take_repetition)[0]
elif len(take_repetition) == 0:
return None
else:
cpu = ""
for n in take_repetition:
cpu = str(cpu) + str(n) + "\n"
return cpu
def cpu_number(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_processor_count‘]
def cpu_core(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_processor_cores‘]
def cpu_thread(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_processor_vcpus‘]
def kernel(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_kernel‘]
def device(self):
device_size = dict()
for dev, info in self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_devices‘].iteritems():
if dev != ‘sr0‘:
device_size.setdefault(dev, info[‘size‘])
return device_size
def device_number(self):
return len(self.device())
def device_total_size(self):
return reduce(lambda x, y: x + y, self.device().values())
def partitions(self):
partitions = dict()
for dev, info in self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_devices‘].iteritems():
if dev != ‘sr0‘:
partitions.setdefault(dev, dict())
for partition, v in info[‘partitions‘].iteritems():
partitions[dev].setdefault(partition, v[‘size‘])
return partitions
def mounts(self):
mount_list = []
for dev in self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_mounts‘]:
mount = DEVICES(device=dev[‘device‘], point=dev[‘mount‘],
total_size=dev[‘size_total‘], free_size=dev[‘size_available‘])
mount_list.append(mount)
return mount_list
def system(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_system‘]
def architecture(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_machine‘]
def distribution(self):
return "%s %s" % (self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_distribution‘],
self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_distribution_version‘])
def uuid(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_product_uuid‘]
def vendor(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_system_vendor‘]
def product_name(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_product_name‘]
def serial_number(self):
return self.info[‘success‘][self.hostname][‘ansible_facts‘][‘ansible_product_serial‘]
host = Resource(‘10.10.181.132‘, sshport=20003, username=‘root‘, password=123456)
print host.memory_cards()
#hosts = [{‘hostname‘: ‘10.0.2.6‘}, {‘hostname‘: ‘10.0.2.7‘}, {‘hostname‘: ‘10.0.2.8‘}]
#group = Resource(group1=dict(hosts=hosts, vars={}))
#print group.get_failed()
#print group.get_unreachable()
#print group.get_success()
文章标题:通过 Ansible API 获取主机信息
文章链接:http://soscw.com/index.php/essay/103921.html