ceph 对象存储 创建api

2021-06-28 10:05

阅读:360

标签:org   miss   admin   exist   ast   query   ror   url   one   

# -*- coding:utf-8 -*- import boto import boto.s3.connection import paramiko class Accountinfo(): """ 用法详见 http://docs.ceph.org.cn/man/8/radosgw-admin/ """ def __init__(self): self.hostname = ‘192.168.44.70‘ self.port = 22 self.username = ‘root‘ self.passwd = ‘123456‘ def new_connect(self): try: paramiko.util.log_to_file(‘paramiko.log‘) ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=‘192.168.44.70‘,port=22,username=‘root‘,password=‘123456‘) return ssh except Exception as e: return ‘error‘ def user_manager(self,username,flag): """ 为s3访问创建radosgw用户 flag: c -> create,d -> delete """ ssh = self.new_connect() if not isinstance(ssh,str): c_command = ‘/usr/bin/radosgw-admin user create --uid="%s" --display-name="%s"‘ % (username, username.title()) d_command = ‘/usr/bin/radosgw-admin user rm --uid="%s"‘ % (username) if flag == ‘c‘: stdin, stdout, stderr = ssh.exec_command(c_command) outstr = stdout.read() errstr = stdout.read() ssh.close() return outstr,errstr elif flag == ‘d‘: stdin, stdout, stderr = ssh.exec_command(d_command) ssh.close() return ‘delete %s success‘% username # return stdout, stderr else: ssh.close() return ‘flag == c or d‘,‘flag error‘ return ‘connect error‘ class CephS3(): # 单例模式 __instance = None def __init__(self): self.access_key = "BKOLF8C5319QK2UIMQ09" self.secret_key = "jBiFwY3LeHh78tM9W6Y8oQUM2VNIbieGVViB3wEB" self.host = ‘192.168.44.70‘ self.port = 7480 self.conn = boto.connect_s3( aws_access_key_id= self.access_key, aws_secret_access_key= self.secret_key, host = self.host, port = self.port, is_secure=False, calling_format = boto.s3.connection.OrdinaryCallingFormat() ) @staticmethod def get_connect(): if CephS3.__instance: return CephS3.__instance else: CephS3.__instance = CephS3().conn return CephS3.__instance def list_all_buckets(self): con = CephS3.get_connect() all_buckets = con.get_all_buckets() for bucket in all_buckets: print("{name}\t{created}".format(name=bucket.name,created=bucket.creation_date)) def create_bucket(self,bucketname): # con = self.connect() con = CephS3.get_connect() all_bucket = con.get_all_buckets() all_bucket_name = [i.name for i in all_bucket] try: if bucketname not in all_bucket_name: bucket = con.create_bucket(bucketname) return ‘ok‘ # print(‘Bucket %s create success‘% bucketname) else: return ‘fail‘ # print(‘Bucket %s already exists!‘ % bucketname) except Exception as e: return str(e) def delete_bucket(self,bucketname): con = CephS3.get_connect() all_bucket = con.get_all_buckets() all_bucket_name = [i.name for i in all_bucket] try: if bucketname in all_bucket_name: bucket = con.delete_bucket(bucketname) return ‘ok‘ else: return ‘fail‘ except Exception as e: print(str(e)) def list_bucket_object(self,bucketname): con = CephS3.get_connect() bucket = con.get_bucket(bucketname) list_buckets = bucket.list() print("%-10s\t%-10s\t%-10s\n"%(‘Name‘,‘Size‘,‘Modified‘)) for key in list_buckets: print("{name:

ceph 对象存储 创建api

标签:org   miss   admin   exist   ast   query   ror   url   one   

原文地址:http://blog.51cto.com/wuyebamboo/2324451


评论


亲,登录后才可以留言!