Python - 腾讯云云存储cos简单使用
2021-02-10 07:20
标签:云存储 ref 文件上传 while ogg rom prefix wan ppi 创建桶 上传文件 python上传实例代码 python创建桶的实例代码 python实例删除桶 获取cos上传临时凭证及删除 Python - 腾讯云云存储cos简单使用 标签:云存储 ref 文件上传 while ogg rom prefix wan ppi 原文地址:https://www.cnblogs.com/zhaoganggang/p/12743690.html获取cos上传临时凭证pip install -U cos-python-sdk-v5
# -*- coding=utf-8
# appid 已在配置中移除,请在参数 Bucket 中带上 appid。Bucket 由 BucketName-APPID 组成
# 1. 设置用户配置, 包括 secretId,secretKey 以及 Region
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import sys
import logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
secret_id = ‘COS_SECRETID‘ # 替换为用户的 secretId
secret_key = ‘COS_SECRETKEY‘ # 替换为用户的 secretKey
region = ‘ap-chengdu‘ # 替换为用户的 Region
token = None # 使用临时密钥需要传入 Token,默认为空,可不填
scheme = ‘https‘ # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
# 2. 获取客户端对象
client = CosS3Client(config)
response = client.create_bucket(
Bucket=‘examplebucket-1250000000‘
)
#### 高级上传接口(推荐)
response = client.upload_file(
Bucket=‘zhoagang-1301531359‘,
LocalFilePath=‘local.txt‘, #//代指本地文件路径
Key=‘picture.jpg‘, #//上传到桶之后的文件名
PartSize=1,
MAXThread=10,
EnableMD5=False
)
print(response[‘ETag‘])
# -*- coding=utf-8
# appid 已在配置中移除,请在参数 Bucket 中带上 appid。Bucket 由 BucketName-APPID 组成
# 1. 设置用户配置, 包括 secretId,secretKey 以及 Region
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import sys
secret_id = ‘自己的id‘ # 替换为用户的 secretId
secret_key = ‘自己的key‘ # 替换为用户的 secretKey
region = ‘ap-chengdu‘ # 替换为用户的 Region
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key)
# 2. 获取客户端对象
client = CosS3Client(config)
#文件上传
response = client.upload_file(
Bucket=‘zhoagang-1301531359‘,
LocalFilePath=‘code.png‘, #//代指本地文件路径
Key=‘p1.png‘, #//上传到桶之后的文件名
)
print(response[‘ETag‘])
# 1. 设置用户配置, 包括 secretId,secretKey 以及 Region
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import sys
secret_id = ‘自己的id‘ # 替换为用户的 secretId
secret_key = ‘自己的key‘ # 替换为用户的 secretKey
region = ‘ap-chengdu‘ # 替换为用户的 Region
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key)
client = CosS3Client(config)
response = client.create_bucket(
Bucket=‘examplebucket-1250000000‘,
ACL="public-read",# private 私有读私有写 /public-read 公共读 私有写 / public-read-write /公共读/公共写
)
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
secret_id = ‘555555‘ # 替换为用户的 secretId
secret_key = ‘6666666‘ # 替换为用户的 secretKey
region = ‘ap-chengdu‘ # 替换为用户的 Region
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key)
client = CosS3Client(config)
# client.delete_object(
# Bucket=‘wangyang-1251317460‘,
# Key=‘p1.png‘
# )
objects = {
"Quiet": "true",
"Object": [
{
"Key": "day2牛存果.py"
},
{
"Key": "小米CC9e.jpg"
}
]
}
client.delete_objects(
Bucket=‘wangyang-1251317460‘,
Delete=objects
)
def credential(bucket, region):
""" 获取cos上传临时凭证 """
from sts.sts import Sts
config = {
# 临时密钥有效时长,单位是秒(30分钟=1800秒)
‘duration_seconds‘: 5,
# 固定密钥 id
‘secret_id‘: settings.TENCENT_COS_ID,
# 固定密钥 key
‘secret_key‘: settings.TENCENT_COS_KEY,
# 换成你的 bucket
‘bucket‘: bucket,
# 换成 bucket 所在地区
‘region‘: region,
# 这里改成允许的路径前缀,可以根据自己网站的用户登录态判断允许上传的具体路径
# 例子: a.jpg 或者 a/* 或者 * (使用通配符*存在重大安全风险, 请谨慎评估使用)
‘allow_prefix‘: ‘*‘,
# 密钥的权限列表。简单上传和分片需要以下的权限,其他权限列表请看 https://cloud.tencent.com/document/product/436/31923
‘allow_actions‘: [
# "name/cos:PutObject",
# ‘name/cos:PostObject‘,
# ‘name/cos:DeleteObject‘,
# "name/cos:UploadPart",
# "name/cos:UploadPartCopy",
# "name/cos:CompleteMultipartUpload",
# "name/cos:AbortMultipartUpload",
"*",
],
}
sts = Sts(config)
result_dict = sts.get_credential()
return result_dict
def delete_bucket(bucket, region):
""" 删除桶 """
# 删除桶中所有文件
# 删除桶中所有碎片
# 删除桶
config = CosConfig(Region=region, SecretId=settings.TENCENT_COS_ID, SecretKey=settings.TENCENT_COS_KEY)
client = CosS3Client(config)
try:
# 找到文件 & 删除
while True:
part_objects = client.list_objects(bucket)
# 已经删除完毕,获取不到值
contents = part_objects.get(‘Contents‘)
if not contents:
break
# 批量删除
objects = {
"Quiet": "true",
"Object": [{‘Key‘: item["Key"]} for item in contents]
}
client.delete_objects(bucket, objects)
if part_objects[‘IsTruncated‘] == "false":
break
# 找到碎片 & 删除
while True:
part_uploads = client.list_multipart_uploads(bucket)
uploads = part_uploads.get(‘Upload‘)
if not uploads:
break
for item in uploads:
client.abort_multipart_upload(bucket, item[‘Key‘], item[‘UploadId‘])
if part_uploads[‘IsTruncated‘] == "false":
break
client.delete_bucket(bucket)
except CosServiceError as e:
pass