Skip to content

Commit

Permalink
Add Json file support for OSS select API (#141)
Browse files Browse the repository at this point in the history
Add Json file support for OSS select API
  • Loading branch information
qixu001 authored and hangzws committed Feb 28, 2019
1 parent df1e8fa commit 4b1d079
Show file tree
Hide file tree
Showing 10 changed files with 10,931 additions and 121 deletions.
89 changes: 56 additions & 33 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def progress_callback(bytes_consumed, total_bytes):
.. _select_params:
指定OSS Select的CSV文件格式,支持如下Keys:
指定OSS Select的文件格式。
对于Csv文件,支持如下Keys:
>>> CsvHeaderInfo: None|Use|Ignore #None表示没有CSV Schema头,Use表示启用CSV Schema头,可以在Select语句中使用Name,Ignore表示有CSV Schema头,但忽略它(Select语句中不可以使用Name)
默认值是None
>>> CommentCharacter: Comment字符,默认值是#,不支持多个字符
Expand All @@ -153,6 +154,20 @@ def progress_callback(bytes_consumed, total_bytes):
>>> SplitRange: 指定查询CSV文件的Split范围,参见 `split_range`.
注意LineRange和SplitRange两种不能同时指定。若同时指定LineRange会被忽略。
>>> CompressionType: 文件的压缩格式,默认值是None, 支持GZIP。
>>> OutputRawData: 指定是响应Body返回Raw数据,默认值是False.
>>> SkipPartialDataRecord: 当CSV行数据不完整时(select语句中出现的列在该行为空),是否跳过该行。默认是False。
>>> OutputHeader:是否输出CSV Header,默认是False.
>>> EnablePayloadCrc:是否启用对Payload的CRC校验,默认是False. 该选项不能和OutputRawData:True混用。
>>> MaxSkippedRecordsAllowed: 允许跳过的最大行数。默认值是0表示一旦有一行跳过就报错。当下列两种情况下该行CSV被跳过:1)当SkipPartialDataRecord为True时且该行不完整时 2)当该行的数据类型和SQL不匹配时
对于Json 文件, 支持如下Keys:
>>> Json_Type: DOCUMENT | LINES . DOCUMENT就是指一般的Json文件,LINES是指每一行是一个合法的JSON对象,文件由多行Json对象组成,整个文件本身不是合法的Json对象。
>>> LineRange: 指定查询JSON LINE文件的行范围,参见 `line_range`。注意该参数仅支持LINES类型
>>> SplitRange: 指定查询JSON LINE文件的Split范围,参见 `split_range`.注意该参数仅支持LINES类型
>>> CompressionType: 文件的压缩格式,默认值是None, 支持GZIP。
>>> OutputRawData: 指定是响应Body返回Raw数据,默认值是False.
>>> SkipPartialDataRecord: 当一条JSON记录数据不完整时(select语句中出现的Key在该对象为空),是否跳过该Json记录。默认是False。
>>> EnablePayloadCrc:是否启用对Payload的CRC校验,默认是False. 该选项不能和OutputRawData:True混用。
>>> MaxSkippedRecordsAllowed: 允许跳过的最大Json记录数。默认值是0表示一旦有一条Json记录跳过就报错。当下列两种情况下该JSON被跳过:1)当SkipPartialDataRecord为True时且该条Json记录不完整时 2)当该记录的数据类型和SQL不匹配时
.. _select_meta_params:
Expand Down Expand Up @@ -292,7 +307,6 @@ def list_buckets(self, prefix='', marker='', max_keys=100):
logger.debug("List buckets done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_list_buckets, ListBucketsResult)


class Bucket(_Base):
"""用于Bucket和Object操作的类,诸如创建、删除Bucket,上传、下载Object等。
Expand Down Expand Up @@ -624,20 +638,25 @@ def get_object(self, key,
return GetObjectResult(resp, progress_callback, self.enable_crc)

def select_object(self, key, sql,
progress_callback=None,
select_params=None
):
"""Select一个CSV文件内容.
progress_callback=None,
select_params=None
):
"""Select一个文件内容,支持(Csv,Json Doc,Json Lines及其GZIP压缩文件).
用法 ::
对于Csv:
>>> result = bucket.select_object('access.log', 'select * from ossobject where _4 > 40')
>>> print(result.read())
'hello world'
对于Json Doc: { contacts:[{"firstName":"abc", "lastName":"def"},{"firstName":"abc1", "lastName":"def1"}]}
>>> result = bucket.select_object('sample.json', 'select s.firstName, s.lastName from ossobject.contacts[*] s', select_params = {"Json_Type":"DOCUMENT"})
对于Json Lines: {"firstName":"abc", "lastName":"def"},{"firstName":"abc1", "lastName":"def1"}
>>> result = bucket.select_object('sample.json', 'select s.firstName, s.lastName from ossobject s', select_params = {"Json_Type":"LINES"})
:param key: 文件名
:param sql: sql statement
:param select_params: select参数集合。参见 :ref:`select_params`
:param select_params: select参数集合,对于Json文件必须制定Json_Type类型。参见 :ref:`select_params`
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: file-like object
Expand All @@ -646,7 +665,9 @@ def select_object(self, key, sql,
"""
headers = http.CaseInsensitiveDict()
body = xml_utils.to_select_object(sql, select_params)
params = {'x-oss-process': 'csv/select'}
params = {'x-oss-process': 'csv/select'}
if select_params is not None and 'Json_Type' in select_params:
params['x-oss-process'] = 'json/select'

self.timeout = 3600
resp = self.__do_object('POST', key, data=body, headers=headers, params=params)
Expand Down Expand Up @@ -761,18 +782,18 @@ def get_object_with_url_to_file(self, sign_url,
return result

def select_object_to_file(self, key, filename, sql,
progress_callback=None,
select_params=None
):
"""Select Content from OSS file to a local file
progress_callback=None,
select_params=None
):
"""Select一个文件的内容到本地文件
:param key: OSS key name
:param filename: local file name。The parent directory should exist
:param key: OSS文件名
:param filename: 本地文件名。其父亲目录已经存在且有写权限。
:param progress_callback: progress callback。参考 :ref:`progress_callback`
:param progress_callback: 调用进度的callback。参考 :ref:`progress_callback`
:param select_params: select参数集合。参见 :ref:`select_params`
:return: If file does not exist, throw :class:`NoSuchKey <oss2.exceptions.NoSuchKey>`
:return: 如果文件不存在, 抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>`
"""
with open(to_unicode(filename), 'wb') as f:
result = self.select_object(key, sql, progress_callback=progress_callback,
Expand Down Expand Up @@ -809,34 +830,36 @@ def head_object(self, key, headers=None):
return HeadObjectResult(resp)

def create_select_object_meta(self, key, select_meta_params=None):
"""获取或创建CSV文件元信息。如果元信息存在,返回之;不然则创建后返回之
"""获取或创建CSV,JSON LINES 文件元信息。如果元信息存在,返回之;不然则创建后返回之
HTTP响应的头部包含了文件元信息,可以通过 `RequestResult` 的 `headers` 成员获得。
用法 ::
CSV文件用法 ::
>>> select_meta_params = { 'FieldDelimiter': ',',
'RecordDelimiter': '\r\n',
'QuoteCharacter': '"',
'OverwriteIfExists' : 'false'}
>>> result = bucket.create_select_object_meta('csv.txt', csv_params)
>>> print(result.content_type)
text/plain
:param key: object name
:param select_meta_params: the parameter dictionary. For the supported keys, refer to :ref:`csv_meta_params`
:return: :class:`GetSelectObjectMetaResult <oss2.models.HeadObjectResult>`.
Beside the csv_rows, csv_splits field, it also include x-oss-select-csv-rows, x-oss-select-csv-splits and x-oss-select-csv-columns headers.
csv_rows are the total lines of the csv file.
csv_splits are the total splits of the csv file. One split a bunch of rows and each split has very similar size.
Header x-oss-select-csv-rows and x-oss-select-csv-splits are the raw data for csv_rows and csv_splits.
x-oss-select-csv-columns header specifies the first line's column count.
>>> result = bucket.create_select_object_meta('csv.txt', select_meta_params)
>>> print(result.rows)
JSON LINES文件用法 ::
>>> select_meta_params = { 'Json_Type':'LINES', 'OverwriteIfExists':'False'}
>>> result = bucket.create_select_object_meta('jsonlines.json', select_meta_params)
:param key: 文件名
:param select_meta_params: 参数词典,可以是dict,参见ref:`csv_meta_params`
:return: :class:`GetSelectObjectMetaResult <oss2.models.HeadObjectResult>`.
除了 rows 和splits 属性之外, 它也返回head object返回的其他属性。
rows表示该文件的总记录数。
splits表示该文件的总Split个数,一个Split包含若干条记录,每个Split的总字节数大致相当。用户可以以Split为单位进行分片查询。
:raises: If Bucket or object does not exist, throw:class:`NotFound <oss2.exceptions.NotFound>`
:raises: 如果Bucket不存在或者Object不存在,则抛出:class:`NotFound <oss2.exceptions.NotFound>`
"""
headers = http.CaseInsensitiveDict()

body = xml_utils.to_get_select_object_meta(select_meta_params)
params = {'x-oss-process': 'csv/meta'}
params = {'x-oss-process': 'csv/meta'}
if select_meta_params is not None and 'Json_Type' in select_meta_params:
params['x-oss-process'] = 'json/meta'

self.timeout = 3600
resp = self.__do_object('POST', key, data=body, headers=headers, params=params)
Expand Down
4 changes: 3 additions & 1 deletion oss2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,14 @@ class AccessDenied(ServerError):

class SelectOperationFailed(ServerError):
code = 'SelectOperationFailed'
def __init__(self, status, message):
def __init__(self, status, code, message):
self.status = status
self.code = code
self.message = message

def __str__(self):
error = {'status': self.status,
'code': self.code,
'details': self.message}
return str(error)

Expand Down
6 changes: 4 additions & 2 deletions oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ def __init__(self, resp):
for data in self.select_resp: # waiting the response body to finish
pass

self.csv_rows = self.select_resp.rows
self.csv_splits = self.select_resp.splits
self.csv_rows = self.select_resp.rows # to be compatible with previous version.
self.csv_splits = self.select_resp.splits # to be compatible with previous version.
self.rows = self.csv_rows
self.splits = self.csv_splits


class GetObjectMetaResult(RequestResult):
Expand Down
49 changes: 38 additions & 11 deletions oss2/select_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from .exceptions import SelectOperationClientError
from .exceptions import InconsistentError
from . import utils
import logging

logger = logging.getLogger(__name__)
"""
The adapter class for Select object's response.
The response consists of frames. Each frame has the following format:
Expand Down Expand Up @@ -39,6 +41,7 @@ class SelectResponseAdapter(object):
_DATA_FRAME_TYPE = 8388609
_END_FRAME_TYPE = 8388613
_META_END_FRAME_TYPE = 8388614
_JSON_META_END_FRAME_TYPE = 8388615
_FRAMES_FOR_PROGRESS_UPDATE = 10

def __init__(self, response, progress_callback = None, content_length = None, enable_crc = False):
Expand Down Expand Up @@ -140,8 +143,13 @@ def read_next_frame(self):
frame_type[0] = 0 #mask the version bit
utils.change_endianness_if_needed(frame_type) # convert to little endian
frame_type_val = struct.unpack("I", bytes(frame_type))[0]
if frame_type_val != SelectResponseAdapter._DATA_FRAME_TYPE and frame_type_val != SelectResponseAdapter._CONTINIOUS_FRAME_TYPE and frame_type_val != SelectResponseAdapter._END_FRAME_TYPE and frame_type_val != SelectResponseAdapter._META_END_FRAME_TYPE:
raise SelectOperationClientError(self.request_id, "Unexpected frame type:" + str(frame_type_val))
if (frame_type_val != SelectResponseAdapter._DATA_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._CONTINIOUS_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._END_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._META_END_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._JSON_META_END_FRAME_TYPE):
logger.warn("Unexpected frame type: {0}. RequestId:{1}. This could be due to the old version of client.".format(frame_type_val, self.request_id))
raise SelectOperationClientError(self.request_id, "Unexpected frame type:" + str(frame_type_val))

self.payload = self.read_raw(payload_length_val)
file_offset_bytes = bytearray(self.payload[0:8])
Expand All @@ -160,6 +168,7 @@ def read_next_frame(self):
crc32.update(self.payload)
checksum_calc = crc32.crc
if checksum_val != checksum_calc:
logger.warn("Incorrect checksum: Actual {0} and calculated {1}. RequestId:{2}".format(checksum_val, checksum_calc, self.request_id))
raise InconsistentError("Incorrect checksum: Actual" + str(checksum_val) + ". Calculated:" + str(checksum_calc), self.request_id)

elif frame_type_val == SelectResponseAdapter._CONTINIOUS_FRAME_TYPE:
Expand All @@ -173,18 +182,24 @@ def read_next_frame(self):
utils.change_endianness_if_needed(status_bytes)
status = struct.unpack("I", bytes(status_bytes))[0]
error_msg_size = payload_length_val - 20
error_msg=b'';
error_msg=b''
error_code = b''
if error_msg_size > 0:
error_msg = self.payload[20:error_msg_size + 20]
error_code_index = error_msg.find(b'.')
if error_code_index >= 0 and error_code_index < error_msg_size - 1:
error_code = error_msg[0:error_code_index]
error_msg = error_msg[error_code_index + 1:]

if status // 100 != 2:
raise SelectOperationFailed(status, error_msg)
raise SelectOperationFailed(status, error_code, error_msg)
self.frame_length = 0
if self.callback is not None:
self.callback(self.file_offset, self.content_length)
self.read_raw(4) # read the payload checksum
self.frame_length = 0
self.finished = 1
elif frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE:
elif frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE or frame_type_val == SelectResponseAdapter._JSON_META_END_FRAME_TYPE:
self.frame_off_set = 0
scanned_size_bytes = bytearray(self.payload[8:16])
status_bytes = bytearray(self.payload[16:20])
Expand All @@ -196,17 +211,29 @@ def read_next_frame(self):
lines_bytes = bytearray(self.payload[24:32])
utils.change_endianness_if_needed(lines_bytes)
self.rows = struct.unpack("Q", bytes(lines_bytes))[0]
column_bytes = bytearray(self.payload[32:36])
utils.change_endianness_if_needed(column_bytes)
self.columns = struct.unpack("I", bytes(column_bytes))[0]
error_size = payload_length_val - 36

error_index = 36
if frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE:
column_bytes = bytearray(self.payload[32:36])
utils.change_endianness_if_needed(column_bytes)
self.columns = struct.unpack("I", bytes(column_bytes))[0]
else:
error_index = 32

error_size = payload_length_val - error_index
error_msg = b''
error_code = b''
if (error_size > 0):
error_msg = self.payload[36:36 + error_size]
error_msg = self.payload[error_index:error_index + error_size]
error_code_index = error_msg.find(b'.')
if error_code_index >= 0 and error_code_index < error_size - 1:
error_code = error_msg[0:error_code_index]
error_msg = error_msg[error_code_index + 1:]

self.read_raw(4) # read the payload checksum
self.final_status = status
self.frame_length = 0
self.finished = 1
if (status / 100 != 2):
raise SelectOperationFailed(status, error_msg)
raise SelectOperationFailed(status, error_code, error_msg)

Loading

0 comments on commit 4b1d079

Please sign in to comment.