diff --git a/qcloud_cos/cos_client.py b/qcloud_cos/cos_client.py index a866f85d..ab09452d 100644 --- a/qcloud_cos/cos_client.py +++ b/qcloud_cos/cos_client.py @@ -6,6 +6,7 @@ import base64 import os import sys +import time import copy import json import xml.dom.minidom @@ -23,7 +24,7 @@ from .cos_exception import CosClientError from .cos_exception import CosServiceError from .version import __version__ - +from .select_event_stream import EventStream logger = logging.getLogger(__name__) @@ -31,7 +32,7 @@ class CosConfig(object): """config类,保存用户相关信息""" def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token=None, Scheme=None, Timeout=None, Access_id=None, Access_key=None, Secret_id=None, Secret_key=None, Endpoint=None, IP=None, Port=None, - Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None): + Anonymous=None, UA=None, Proxies=None, Domain=None, ServiceDomain=None, PoolConnections=10, PoolMaxSize=10): """初始化,保存用户的信息 :param Appid(string): 用户APPID. @@ -53,6 +54,8 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token :param Proxies(dict): 使用代理来访问COS :param Domain(string): 使用自定义的域名来访问COS :param ServiceDomain(string): 使用自定义的域名来访问cos service + :param PoolConnections(int): 连接池个数 + :param PoolMaxSize(int): 连接池中最大连接数 """ self._appid = to_unicode(Appid) self._token = to_unicode(Token) @@ -66,6 +69,8 @@ def __init__(self, Appid=None, Region=None, SecretId=None, SecretKey=None, Token self._proxies = Proxies self._domain = Domain self._service_domain = ServiceDomain + self._pool_connections = PoolConnections + self._pool_maxsize = PoolMaxSize if self._domain is None: self._endpoint = format_endpoint(Endpoint, Region) @@ -175,6 +180,8 @@ def __init__(self, conf, retry=1, session=None): self._retry = retry # 重试的次数,分片上传时可适当增大 if session is None: self._session = requests.session() + self._session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=self._conf._pool_connections, pool_maxsize=self._conf._pool_maxsize)) + self._session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=self._conf._pool_connections, pool_maxsize=self._conf._pool_maxsize)) else: self._session = session @@ -235,6 +242,8 @@ def send_request(self, method, url, bucket, timeout=30, **kwargs): kwargs['verify'] = False for j in range(self._retry + 1): try: + if j != 0: + time.sleep(j) if method == 'POST': res = self._session.post(url, timeout=timeout, proxies=self._conf._proxies, **kwargs) elif method == 'GET': @@ -948,7 +957,7 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs): :param Bucket(string): 存储桶名称. :param Key(string): COS路径. - :param RestoreRequest: 取回object的属性设置 + :param RestoreRequest(dict): 取回object的属性设置 :param kwargs(dict): 设置请求headers. :return: None. """ @@ -972,6 +981,46 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs): params=params) return None + def select_object_content(self, Bucket, Key, Expression, ExpressionType, InputSerialization, OutputSerialization, RequestProgress=None, **kwargs): + """从指定文对象中检索内容 + + :param Bucket(string): 存储桶名称. + :param Key(string): 检索的路径. + :param Expression(string): 查询语句 + :param ExpressionType(string): 查询语句的类型 + :param RequestProgress(dict): 查询进度设置 + :param InputSerialization(dict): 输入格式设置 + :param OutputSerialization(dict): 输出格式设置 + :param kwargs(dict): 设置请求headers. + :return(dict): 检索内容. + """ + params = {'select': '', 'select-type': 2} + headers = mapped(kwargs) + url = self._conf.uri(bucket=Bucket, path=Key) + logger.info("select object content, url=:{url} ,headers=:{headers}".format( + url=url, + headers=headers)) + SelectRequest = { + 'Expression': Expression, + 'ExpressionType': ExpressionType, + 'InputSerialization': InputSerialization, + 'OutputSerialization': OutputSerialization + } + if RequestProgress is not None: + SelectRequest['RequestProgress'] = RequestProgress + xml_config = format_xml(data=SelectRequest, root='SelectRequest') + rt = self.send_request( + method='POST', + url=url, + stream=True, + bucket=Bucket, + data=xml_config, + auth=CosS3Auth(self._conf, Key, params=params), + headers=headers, + params=params) + data = {'Payload': EventStream(rt)} + return data + # s3 bucket interface begin def create_bucket(self, Bucket, **kwargs): """创建一个bucket @@ -2687,9 +2736,9 @@ def list_buckets(self, **kwargs): ) """ headers = mapped(kwargs) - url = 'https://service.cos.myqcloud.com/' + url = '{scheme}://service.cos.myqcloud.com/'.format(scheme=self._conf._scheme) if self._conf._service_domain is not None: - url = 'https://{domain}/'.format(domain=self._conf._service_domain) + url = '{scheme}://{domain}/'.format(scheme=self._conf._scheme, domain=self._conf._service_domain) rt = self.send_request( method='GET', url=url, diff --git a/qcloud_cos/cos_comm.py b/qcloud_cos/cos_comm.py index 446eeed0..00403708 100644 --- a/qcloud_cos/cos_comm.py +++ b/qcloud_cos/cos_comm.py @@ -56,7 +56,8 @@ 'SSECustomerKeyMD5': 'x-cos-server-side-encryption-customer-key-MD5', 'SSEKMSKeyId': 'x-cos-server-side-encryption-cos-kms-key-id', 'Referer': 'Referer', - 'PicOperations': 'Pic-Operations' + 'PicOperations': 'Pic-Operations', + 'TrafficLimit': 'x-cos-traffic-limit', } diff --git a/qcloud_cos/cos_exception.py b/qcloud_cos/cos_exception.py index c6f08411..bba32baa 100644 --- a/qcloud_cos/cos_exception.py +++ b/qcloud_cos/cos_exception.py @@ -5,7 +5,10 @@ class CosException(Exception): def __init__(self, message): - Exception.__init__(self, message) + self._message = message + + def __str__(self): + return str(self._message) def digest_xml(data): @@ -46,7 +49,7 @@ class CosServiceError(CosException): """COS Server端错误,可以获取特定的错误信息""" def __init__(self, method, message, status_code): CosException.__init__(self, message) - if method == 'HEAD': # 对HEAD进行特殊处理 + if isinstance(message, dict): self._origin_msg = '' self._digest_msg = message else: @@ -54,6 +57,9 @@ def __init__(self, method, message, status_code): self._digest_msg = digest_xml(message) self._status_code = status_code + def __str__(self): + return str(self._digest_msg) + def get_origin_msg(self): """获取原始的XML格式错误信息""" return self._origin_msg diff --git a/qcloud_cos/select_event_stream.py b/qcloud_cos/select_event_stream.py new file mode 100644 index 00000000..ae76a36b --- /dev/null +++ b/qcloud_cos/select_event_stream.py @@ -0,0 +1,97 @@ +# -*- coding=utf-8 +import os +import uuid +import struct +import logging +from .cos_comm import xml_to_dict +from .cos_comm import to_unicode +from .cos_exception import CosServiceError + +logger = logging.getLogger(__name__) + + +class EventStream(): + def __init__(self, rt): + self._rt = rt + self._raw = self._rt.raw + self._finish = False + + def __iter__(self): + return self + + def __next__(self): + return self.next_event() + + next = __next__ + + def next_event(self): + """获取下一个事件""" + if self._finish: + """要把剩下的内容读完丢弃或者自己关连接,否则不会自动关连接""" + self._raw.read() + raise StopIteration + total_byte_length = struct.unpack('>I', bytes(self._raw.read(4)))[0] # message总长度 + header_byte_length = struct.unpack('>I', bytes(self._raw.read(4)))[0] # header总长度 + prelude_crc = struct.unpack('>I', bytes(self._raw.read(4)))[0] + # 处理headers + offset = 0 + msg_headers = {} + while offset < header_byte_length: + header_name_length = struct.unpack('>B', bytes(self._raw.read(1)))[0] + header_name = to_unicode(self._raw.read(header_name_length)) + header_value_type = struct.unpack('>B', bytes(self._raw.read(1)))[0] + header_value_length = struct.unpack('>H', bytes(self._raw.read(2)))[0] + header_value = to_unicode(self._raw.read(header_value_length)) + msg_headers[header_name] = header_value + offset += 4 + header_name_length + header_value_length + # 处理payload(输出给用户的dict中也为bytes) + payload_byte_length = total_byte_length - header_byte_length - 16 # payload总长度 + payload = self._raw.read(payload_byte_length) + message_crc = struct.unpack('>I', bytes(self._raw.read(4)))[0] + if ':message-type' in msg_headers and msg_headers[':message-type'] == 'event': + if ':event-type' in msg_headers and msg_headers[':event-type'] == "Records": + return {'Records': {'Payload': payload}} + elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Stats": + return {'Stats': {'Details': xml_to_dict(payload)}} + elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Progress": + return {'Progress': {'Details': xml_to_dict(payload)}} + elif ':event-type' in msg_headers and msg_headers[':event-type'] == "Cont": + return {'Cont': {}} + elif ':event-type' in msg_headers and msg_headers[':event-type'] == "End": + self._finish = True + return {'End': {}} + # 处理Error Message(抛出异常) + if ':message-type' in msg_headers and msg_headers[':message-type'] == 'error': + error_info = dict() + error_info['code'] = msg_headers[':error-code'] + error_info['message'] = msg_headers[':error-message'] + error_info['resource'] = self._rt.request.url + error_info['requestid'] = '' + error_info['traceid'] = '' + if 'x-cos-request-id' in self._rt.headers: + error_info['requestid'] = self._rt.headers['x-cos-request-id'] + if 'x-cos-trace-id' in self._rt.headers: + error_info['traceid'] = self._rt.headers['x-cos-trace-id'] + logger.error(error_info) + e = CosServiceError('POST', error_info, self._rt.status_code) + raise e + + def get_select_result(self): + """获取查询结果""" + data = b"" + for event in self: + if 'Records' in event: + data += event['Records']['Payload'] + return data + + def get_select_result_to_file(self, file_name): + """保存查询结果到文件""" + tmp_file_name = "{file_name}_{uuid}".format(file_name=file_name, uuid=uuid.uuid4().hex) + with open(tmp_file_name, 'wb') as fp: + for event in self: + if 'Records' in event: + data = event['Records']['Payload'] + fp.write(data) + if os.path.exists(file_name): + os.remove(file_name) + os.rename(tmp_file_name, file_name) diff --git a/qcloud_cos/streambody.py b/qcloud_cos/streambody.py index 272eb2da..e373e807 100644 --- a/qcloud_cos/streambody.py +++ b/qcloud_cos/streambody.py @@ -7,13 +7,20 @@ class StreamBody(): def __init__(self, rt): self._rt = rt + def __iter__(self): + """提供一个默认的迭代器""" + return self._rt.iter_content(1024) + def get_raw_stream(self): + """提供原始流""" return self._rt.raw def get_stream(self, chunk_size=1024): + """提供一个chunk可变的迭代器""" return self._rt.iter_content(chunk_size=chunk_size) def get_stream_to_file(self, file_name, auto_decompress=False): + """保存流到本地文件""" use_chunked = False if 'Content-Length' in self._rt.headers: content_len = int(self._rt.headers['Content-Length']) diff --git a/qcloud_cos/version.py b/qcloud_cos/version.py index 9ec878f7..bfa56600 100644 --- a/qcloud_cos/version.py +++ b/qcloud_cos/version.py @@ -1,2 +1,2 @@ -__version__ = '5.1.7.6' +__version__ = '5.1.7.8' diff --git a/setup.py b/setup.py index 57f44f48..fb32a247 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ def long_description(): setup( name='cos-python-sdk-v5', - version='1.7.7', + version='1.7.8', url='https://www.qcloud.com/', license='MIT', author='tiedu, lewzylu, channingliu', diff --git a/ut/test.py b/ut/test.py index 8b46ac28..43f7c275 100644 --- a/ut/test.py +++ b/ut/test.py @@ -17,6 +17,7 @@ REGION = os.environ["REGION"] APPID = '1251668577' test_bucket = 'cos-python-v5-test-' + str(sys.version_info[0]) + '-' + str(sys.version_info[1]) + '-' + REGION + '-' + APPID +copy_test_bucket = 'copy-' + test_bucket test_object = "test.txt" special_file_name = "中文" + "→↓←→↖↗↙↘! \"#$%&'()*+,-./0123456789:;<=>@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~" conf = CosConfig( @@ -27,11 +28,22 @@ client = CosS3Client(conf, retry=3) -def _create_test_bucket(test_bucket): +def _create_test_bucket(test_bucket, create_region=None): try: - response = client.create_bucket( - Bucket=test_bucket, - ) + if create_region is None: + response = client.create_bucket( + Bucket=test_bucket, + ) + else: + bucket_conf = CosConfig( + Region=create_region, + Secret_id=SECRET_ID, + Secret_key=SECRET_KEY + ) + bucket_client = CosS3Client(bucket_conf) + response = bucket_client.create_bucket( + Bucket=test_bucket, + ) except Exception as e: if e.get_error_code() == 'BucketAlreadyOwnedByYou': print('BucketAlreadyOwnedByYou') @@ -40,6 +52,15 @@ def _create_test_bucket(test_bucket): return None +def _upload_test_file(test_bucket, test_key): + response = client.put_object( + Bucket=test_bucket, + Key=test_key, + Body='test' + ) + return None + + def get_raw_md5(data): m2 = hashlib.md5(data) etag = '"' + str(m2.hexdigest()) + '"' @@ -68,6 +89,8 @@ def setUp(): print ("start test...") print ("start create bucket " + test_bucket) _create_test_bucket(test_bucket) + _create_test_bucket(copy_test_bucket) + _upload_test_file(copy_test_bucket, test_object) def tearDown(): @@ -192,7 +215,7 @@ def test_get_object_acl(): def test_copy_object_diff_bucket(): """从另外的bucket拷贝object""" - copy_source = {'Bucket': 'test04-' + APPID, 'Key': '/test.txt', 'Region': 'ap-guangzhou'} + copy_source = {'Bucket': copy_test_bucket, 'Key': 'test.txt', 'Region': REGION} response = client.copy_object( Bucket=test_bucket, Key='test.txt', @@ -284,7 +307,7 @@ def test_upload_part_copy(): ) # upload part copy - copy_source = {'Bucket': 'test04-' + APPID, 'Key': '/test.txt', 'Region': 'ap-guangzhou'} + copy_source = {'Bucket': copy_test_bucket, 'Key': 'test.txt', 'Region': REGION} response = client.upload_part_copy( Bucket=test_bucket, Key='multipartfile.txt', @@ -522,6 +545,18 @@ def test_put_get_versioning(): def test_put_get_delete_replication(): """设置、获取、删除跨园区复制配置""" + replic_dest_bucket = 'replicationsh-' + APPID + _create_test_bucket(replic_dest_bucket, 'ap-shanghai') + sh_conf = CosConfig( + Region='ap-shanghai', + Secret_id=SECRET_ID, + Secret_key=SECRET_KEY + ) + sh_client = CosS3Client(sh_conf) + response = sh_client.put_bucket_versioning( + Bucket=replic_dest_bucket, + Status='Enabled' + ) replication_config = { 'Role': 'qcs::cam::uin/2779643970:uin/2779643970', 'Rule': [ @@ -530,7 +565,7 @@ def test_put_get_delete_replication(): 'Status': 'Enabled', 'Prefix': '中文', 'Destination': { - 'Bucket': 'qcs:id/0:cos:ap-shanghai:appid/1251668577:replicationsh' + 'Bucket': 'qcs::cos:ap-shanghai::' + replic_dest_bucket } } ] @@ -673,10 +708,10 @@ def test_upload_file_multithreading(): def test_copy_file_automatically(): """根据拷贝源文件的大小自动选择拷贝策略,不同园区,小于5G直接copy_object,大于5G分块拷贝""" - copy_source = {'Bucket': 'test01-' + APPID, 'Key': '/thread_1MB', 'Region': 'ap-guangzhou'} + copy_source = {'Bucket': copy_test_bucket, 'Key': 'test.txt', 'Region': REGION} response = client.copy( Bucket=test_bucket, - Key='copy_10G.txt', + Key='copy.txt', CopySource=copy_source, MAXThread=10 ) @@ -697,19 +732,6 @@ def test_upload_empty_file(): ) -def test_copy_10G_file_in_same_region(): - """同园区的拷贝,应该直接用copy_object接口,可以直接秒传""" - copy_source = {'Bucket': 'test01-' + APPID, 'Key': '10G.txt', 'Region': 'ap-guangzhou'} - copy_config = CosConfig(Region='ap-guangzhou', SecretId=SECRET_ID, SecretKey=SECRET_KEY) - copy_client = CosS3Client(copy_config) - response = copy_client.copy( - Bucket='test04-' + APPID, - Key='10G.txt', - CopySource=copy_source, - MAXThread=10 - ) - - def test_use_get_auth(): """测试利用get_auth方法直接生产签名,然后访问COS""" auth = client.get_auth( @@ -737,6 +759,7 @@ def test_upload_with_server_side_encryption(): def test_put_get_bucket_logging(): """测试bucket的logging服务""" logging_bucket = 'logging-beijing-' + APPID + _create_test_bucket(logging_bucket, 'ap-beijing') logging_config = { 'LoggingEnabled': { 'TargetBucket': logging_bucket, @@ -744,7 +767,7 @@ def test_put_get_bucket_logging(): } } beijing_conf = CosConfig( - Region="ap-beijing", + Region='ap-beijing', Secret_id=SECRET_ID, Secret_key=SECRET_KEY ) @@ -904,10 +927,13 @@ def test_put_get_gzip_file(): def test_put_get_delete_bucket_domain(): """测试设置获取删除bucket自定义域名""" + domain = 'tiedu-gz.coshelper.com' + if TRAVIS_FLAG == 'true': + domain = 'tiedu-ger.coshelper.com' domain_config = { 'DomainRule': [ { - 'Name': 'qq.com', + 'Name': domain, 'Type': 'REST', 'Status': 'ENABLED', }, @@ -925,6 +951,19 @@ def test_put_get_delete_bucket_domain(): ) domain_config['x-cos-domain-txt-verification'] = response['x-cos-domain-txt-verification'] assert domain_config == response + # test domain request + """ + domain_conf = CosConfig( + SecretId=SECRET_ID, + SecretKey=SECRET_KEY, + Domain=domain, + Scheme='http' + ) + domain_client = CosS3Client(domain_conf) + response = domain_client.head_bucket( + Bucket=test_bucket + ) + """ # delete domain response = client.delete_bucket_domain( Bucket=test_bucket @@ -1054,6 +1093,65 @@ def test_put_get_bucket_referer(): ) +def test_put_get_traffic_limit(): + """测试上传下载接口的单链接限速""" + traffic_test_key = 'traffic_test' + response = client.put_object( + Bucket=test_bucket, + Key=traffic_test_key, + Body='A'*1024*1024, + TrafficLimit='1048576' + ) + # 限速的单位为bit/s 1048576bit/s代表1Mb/s + response = client.get_object( + Bucket=test_bucket, + Key=traffic_test_key, + TrafficLimit='1048576' + ) + + +def test_select_object(): + """测试SQL检索COS对象(只支持国内)""" + select_obj = "select_test.json" + json_body = { + 'name': 'cos', + 'age': '999' + } + conf = CosConfig( + Region='ap-guangzhou', + SecretId=SECRET_ID, + SecretKey=SECRET_KEY, + ) + test_bucket = 'test-select-' + APPID + _create_test_bucket(test_bucket, 'ap-guangzhou') + client = CosS3Client(conf) + response = client.put_object( + Bucket=test_bucket, + Key=select_obj, + Body=(json.dumps(json_body)+'\n')*100 + ) + response = client.select_object_content( + Bucket=test_bucket, + Key=select_obj, + Expression='Select * from COSObject', + ExpressionType='SQL', + InputSerialization={ + 'CompressionType': 'NONE', + 'JSON': { + 'Type': 'LINES' + } + }, + OutputSerialization={ + 'CSV': { + 'RecordDelimiter': '\n' + } + } + ) + event_stream = response['Payload'] + for event in event_stream: + print(event) + + if __name__ == "__main__": setUp() """ @@ -1075,6 +1173,8 @@ def test_put_get_bucket_referer(): test_put_file_like_object() test_put_chunked_object() test_put_get_delete_bucket_inventory() + test_put_get_traffic_limit() + test_put_get_delete_bucket_domain() """ - test_put_get_bucket_referer() + test_select_object() tearDown()