Skip to content

Commit

Permalink
Merge pull request #112 from dt3310321/s3
Browse files Browse the repository at this point in the history
S3
  • Loading branch information
dt3310321 authored Mar 23, 2020
2 parents f36c08b + 61fb5d8 commit ef3d9f9
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 35 deletions.
59 changes: 54 additions & 5 deletions qcloud_cos/cos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import base64
import os
import sys
import time
import copy
import json
import xml.dom.minidom
Expand All @@ -23,15 +24,15 @@
from .cos_exception import CosClientError
from .cos_exception import CosServiceError
from .version import __version__

from .select_event_stream import EventStream
logger = logging.getLogger(__name__)


class CosConfig(object):
"""config类,保存用户相关信息"""
def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token=None, Scheme=None, Timeout=None,
Access_id=None, Access_key=None, Secret_id=None, Secret_key=None, Endpoint=None, IP=None, Port=None,
Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None):
Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None, PoolConnections=10, PoolMaxSize=10):
"""初始化,保存用户的信息
:param Appid(string): 用户APPID.
Expand All @@ -53,6 +54,8 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token
:param Proxies(dict): 使用代理来访问COS
:param Domain(string): 使用自定义的域名来访问COS
:param ServiceDomain(string): 使用自定义的域名来访问cos service
:param PoolConnections(int): 连接池个数
:param PoolMaxSize(int): 连接池中最大连接数
"""
self._appid = to_unicode(Appid)
self._token = to_unicode(Token)
Expand All @@ -66,6 +69,8 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token
self._proxies = Proxies
self._domain = Domain
self._service_domain = ServiceDomain
self._pool_connections = PoolConnections
self._pool_maxsize = PoolMaxSize

if self._domain is None:
self._endpoint = format_endpoint(Endpoint, Region)
Expand Down Expand Up @@ -175,6 +180,8 @@ def __init__(self, conf, retry=1, session=None):
self._retry = retry # 重试的次数,分片上传时可适当增大
if session is None:
self._session = requests.session()
self._session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=self._conf._pool_connections, pool_maxsize=self._conf._pool_maxsize))
self._session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=self._conf._pool_connections, pool_maxsize=self._conf._pool_maxsize))
else:
self._session = session

Expand Down Expand Up @@ -235,6 +242,8 @@ def send_request(self, method, url, bucket, timeout=30, **kwargs):
kwargs['verify'] = False
for j in range(self._retry + 1):
try:
if j != 0:
time.sleep(j)
if method == 'POST':
res = self._session.post(url, timeout=timeout, proxies=self._conf._proxies, **kwargs)
elif method == 'GET':
Expand Down Expand Up @@ -948,7 +957,7 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs):
:param Bucket(string): 存储桶名称.
:param Key(string): COS路径.
:param RestoreRequest: 取回object的属性设置
:param RestoreRequest(dict): 取回object的属性设置
:param kwargs(dict): 设置请求headers.
:return: None.
"""
Expand All @@ -972,6 +981,46 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs):
params=params)
return None

def select_object_content(self, Bucket, Key, Expression, ExpressionType, InputSerialization, OutputSerialization, RequestProgress=None, **kwargs):
"""从指定文对象中检索内容
:param Bucket(string): 存储桶名称.
:param Key(string): 检索的路径.
:param Expression(string): 查询语句
:param ExpressionType(string): 查询语句的类型
:param RequestProgress(dict): 查询进度设置
:param InputSerialization(dict): 输入格式设置
:param OutputSerialization(dict): 输出格式设置
:param kwargs(dict): 设置请求headers.
:return(dict): 检索内容.
"""
params = {'select': '', 'select-type': 2}
headers = mapped(kwargs)
url = self._conf.uri(bucket=Bucket, path=Key)
logger.info("select object content, url=:{url} ,headers=:{headers}".format(
url=url,
headers=headers))
SelectRequest = {
'Expression': Expression,
'ExpressionType': ExpressionType,
'InputSerialization': InputSerialization,
'OutputSerialization': OutputSerialization
}
if RequestProgress is not None:
SelectRequest['RequestProgress'] = RequestProgress
xml_config = format_xml(data=SelectRequest, root='SelectRequest')
rt = self.send_request(
method='POST',
url=url,
stream=True,
bucket=Bucket,
data=xml_config,
auth=CosS3Auth(self._conf, Key, params=params),
headers=headers,
params=params)
data = {'Payload': EventStream(rt)}
return data

# s3 bucket interface begin
def create_bucket(self, Bucket, **kwargs):
"""创建一个bucket
Expand Down Expand Up @@ -2687,9 +2736,9 @@ def list_buckets(self, **kwargs):
)
"""
headers = mapped(kwargs)
url = 'https://service.cos.myqcloud.com/'
url = '{scheme}://service.cos.myqcloud.com/'.format(scheme=self._conf._scheme)
if self._conf._service_domain is not None:
url = 'https://{domain}/'.format(domain=self._conf._service_domain)
url = '{scheme}://{domain}/'.format(scheme=self._conf._scheme, domain=self._conf._service_domain)
rt = self.send_request(
method='GET',
url=url,
Expand Down
3 changes: 2 additions & 1 deletion qcloud_cos/cos_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
'SSECustomerKeyMD5': 'x-cos-server-side-encryption-customer-key-MD5',
'SSEKMSKeyId': 'x-cos-server-side-encryption-cos-kms-key-id',
'Referer': 'Referer',
'PicOperations': 'Pic-Operations'
'PicOperations': 'Pic-Operations',
'TrafficLimit': 'x-cos-traffic-limit',
}


Expand Down
10 changes: 8 additions & 2 deletions qcloud_cos/cos_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

class CosException(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self._message = message

def __str__(self):
return str(self._message)


def digest_xml(data):
Expand Down Expand Up @@ -46,14 +49,17 @@ class CosServiceError(CosException):
"""COS Server端错误,可以获取特定的错误信息"""
def __init__(self, method, message, status_code):
CosException.__init__(self, message)
if method == 'HEAD': # 对HEAD进行特殊处理
if isinstance(message, dict):
self._origin_msg = ''
self._digest_msg = message
else:
self._origin_msg = message
self._digest_msg = digest_xml(message)
self._status_code = status_code

def __str__(self):
return str(self._digest_msg)

def get_origin_msg(self):
"""获取原始的XML格式错误信息"""
return self._origin_msg
Expand Down
97 changes: 97 additions & 0 deletions qcloud_cos/select_event_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding=utf-8
import os
import uuid
import struct
import logging
from .cos_comm import xml_to_dict
from .cos_comm import to_unicode
from .cos_exception import CosServiceError

logger = logging.getLogger(__name__)


class EventStream():
def __init__(self, rt):
self._rt = rt
self._raw = self._rt.raw
self._finish = False

def __iter__(self):
return self

def __next__(self):
return self.next_event()

next = __next__

def next_event(self):
"""获取下一个事件"""
if self._finish:
"""要把剩下的内容读完丢弃或者自己关连接,否则不会自动关连接"""
self._raw.read()
raise StopIteration
total_byte_length = struct.unpack('>I', bytes(self._raw.read(4)))[0] # message总长度
header_byte_length = struct.unpack('>I', bytes(self._raw.read(4)))[0] # header总长度
prelude_crc = struct.unpack('>I', bytes(self._raw.read(4)))[0]
# 处理headers
offset = 0
msg_headers = {}
while offset < header_byte_length:
header_name_length = struct.unpack('>B', bytes(self._raw.read(1)))[0]
header_name = to_unicode(self._raw.read(header_name_length))
header_value_type = struct.unpack('>B', bytes(self._raw.read(1)))[0]
header_value_length = struct.unpack('>H', bytes(self._raw.read(2)))[0]
header_value = to_unicode(self._raw.read(header_value_length))
msg_headers[header_name] = header_value
offset += 4 + header_name_length + header_value_length
# 处理payload(输出给用户的dict中也为bytes)
payload_byte_length = total_byte_length - header_byte_length - 16 # payload总长度
payload = self._raw.read(payload_byte_length)
message_crc = struct.unpack('>I', bytes(self._raw.read(4)))[0]
if ':message-type' in msg_headers and msg_headers[':message-type'] == 'event':
if ':event-type' in msg_headers and msg_headers[':event-type'] == "Records":
return {'Records': {'Payload': payload}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Stats":
return {'Stats': {'Details': xml_to_dict(payload)}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Progress":
return {'Progress': {'Details': xml_to_dict(payload)}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Cont":
return {'Cont': {}}
elif ':event-type' in msg_headers and msg_headers[':event-type'] == "End":
self._finish = True
return {'End': {}}
# 处理Error Message(抛出异常)
if ':message-type' in msg_headers and msg_headers[':message-type'] == 'error':
error_info = dict()
error_info['code'] = msg_headers[':error-code']
error_info['message'] = msg_headers[':error-message']
error_info['resource'] = self._rt.request.url
error_info['requestid'] = ''
error_info['traceid'] = ''
if 'x-cos-request-id' in self._rt.headers:
error_info['requestid'] = self._rt.headers['x-cos-request-id']
if 'x-cos-trace-id' in self._rt.headers:
error_info['traceid'] = self._rt.headers['x-cos-trace-id']
logger.error(error_info)
e = CosServiceError('POST', error_info, self._rt.status_code)
raise e

def get_select_result(self):
"""获取查询结果"""
data = b""
for event in self:
if 'Records' in event:
data += event['Records']['Payload']
return data

def get_select_result_to_file(self, file_name):
"""保存查询结果到文件"""
tmp_file_name = "{file_name}_{uuid}".format(file_name=file_name, uuid=uuid.uuid4().hex)
with open(tmp_file_name, 'wb') as fp:
for event in self:
if 'Records' in event:
data = event['Records']['Payload']
fp.write(data)
if os.path.exists(file_name):
os.remove(file_name)
os.rename(tmp_file_name, file_name)
7 changes: 7 additions & 0 deletions qcloud_cos/streambody.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ class StreamBody():
def __init__(self, rt):
self._rt = rt

def __iter__(self):
"""提供一个默认的迭代器"""
return self._rt.iter_content(1024)

def get_raw_stream(self):
"""提供原始流"""
return self._rt.raw

def get_stream(self, chunk_size=1024):
"""提供一个chunk可变的迭代器"""
return self._rt.iter_content(chunk_size=chunk_size)

def get_stream_to_file(self, file_name, auto_decompress=False):
"""保存流到本地文件"""
use_chunked = False
if 'Content-Length' in self._rt.headers:
content_len = int(self._rt.headers['Content-Length'])
Expand Down
2 changes: 1 addition & 1 deletion qcloud_cos/version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

__version__ = '5.1.7.6'
__version__ = '5.1.7.8'
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def long_description():

setup(
name='cos-python-sdk-v5',
version='1.7.7',
version='1.7.8',
url='https://www.qcloud.com/',
license='MIT',
author='tiedu, lewzylu, channingliu',
Expand Down
Loading

0 comments on commit ef3d9f9

Please sign in to comment.