-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bind Event-Stream Encode Decode (#124)
- Loading branch information
Showing
13 changed files
with
666 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 87 additions & 0 deletions
87
Source/AwsCommonRuntimeKit/event-stream/EventStreamHeader.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0. | ||
|
||
import AwsCEventStream | ||
import Foundation | ||
|
||
public struct EventStreamHeader { | ||
/// max header name length is 127 bytes (Int8.max) | ||
public static let maxNameLength = AWS_EVENT_STREAM_HEADER_NAME_LEN_MAX | ||
|
||
public static let maxValueLength = Int16.max | ||
|
||
/// name.count can not be greater than EventStreamHeader.maxNameLength | ||
public var name: String | ||
|
||
/// value.count can not be greater than EventStreamHeader.maxValueLength for supported types. | ||
public var value: EventStreamHeaderValue | ||
} | ||
|
||
public enum EventStreamHeaderValue: Equatable { | ||
case bool(value: Bool) | ||
case byte(value: Int8) | ||
case int16(value: Int16) | ||
case int32(value: Int32) | ||
case int64(value: Int64) | ||
/// Data length can not be greater than EventStreamHeader.maxValueLength | ||
case byteBuf(value: Data) | ||
/// String length can not be greater than EventStreamHeader.maxValueLength | ||
case string(value: String) | ||
/// Date is only precise up to milliseconds. | ||
/// It will lose the sub-millisecond precision during encoding. | ||
case timestamp(value: Date) | ||
case uuid(value: UUID) | ||
} | ||
|
||
extension EventStreamHeaderValue { | ||
static func parseRaw(rawValue: UnsafeMutablePointer<aws_event_stream_header_value_pair>) -> EventStreamHeaderValue { | ||
let value: EventStreamHeaderValue | ||
switch rawValue.pointee.header_value_type { | ||
case AWS_EVENT_STREAM_HEADER_BOOL_TRUE: | ||
value = .bool( | ||
value: aws_event_stream_header_value_as_bool(rawValue) != 0) | ||
case AWS_EVENT_STREAM_HEADER_BOOL_FALSE: | ||
value = .bool( | ||
value: aws_event_stream_header_value_as_bool(rawValue) != 0) | ||
case AWS_EVENT_STREAM_HEADER_BYTE: | ||
value = .byte(value: aws_event_stream_header_value_as_byte(rawValue)) | ||
case AWS_EVENT_STREAM_HEADER_INT16: | ||
value = .int16( | ||
value: aws_event_stream_header_value_as_int16(rawValue)) | ||
case AWS_EVENT_STREAM_HEADER_INT32: | ||
value = .int32( | ||
value: aws_event_stream_header_value_as_int32(rawValue)) | ||
case AWS_EVENT_STREAM_HEADER_INT64: | ||
value = .int64( | ||
value: aws_event_stream_header_value_as_int64(rawValue)) | ||
case AWS_EVENT_STREAM_HEADER_BYTE_BUF: | ||
value = .byteBuf( | ||
value: aws_event_stream_header_value_as_bytebuf(rawValue).toData()) | ||
case AWS_EVENT_STREAM_HEADER_STRING: | ||
value = .string( | ||
value: aws_event_stream_header_value_as_string(rawValue).toString()) | ||
case AWS_EVENT_STREAM_HEADER_TIMESTAMP: | ||
value = .timestamp( | ||
value: Date( | ||
millisecondsSince1970: aws_event_stream_header_value_as_timestamp(rawValue))) | ||
case AWS_EVENT_STREAM_HEADER_UUID: | ||
let uuid = UUID(uuid: rawValue.pointee.header_value.static_val) | ||
value = .uuid(value: uuid) | ||
default: | ||
fatalError("Unexpected header value type found.") | ||
} | ||
return value | ||
} | ||
} | ||
|
||
extension EventStreamHeader: Equatable { | ||
public static func == (lhs: EventStreamHeader, rhs: EventStreamHeader) -> Bool { | ||
if case let EventStreamHeaderValue.timestamp(value1) = lhs.value, | ||
case let EventStreamHeaderValue.timestamp(value2) = rhs.value { | ||
return lhs.name == rhs.name && | ||
value1.millisecondsSince1970 == value2.millisecondsSince1970 | ||
} | ||
return lhs.name == rhs.name && | ||
lhs.value == rhs.value | ||
} | ||
} |
137 changes: 137 additions & 0 deletions
137
Source/AwsCommonRuntimeKit/event-stream/EventStreamMessage.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0. | ||
|
||
import AwsCEventStream | ||
import Foundation | ||
|
||
public struct EventStreamMessage { | ||
var headers: [EventStreamHeader] = [EventStreamHeader]() | ||
var payload: Data = Data() | ||
var allocator: Allocator = defaultAllocator | ||
|
||
/// Get the binary format of this message (i.e. for sending across the wire manually) | ||
/// - Returns: binary Data. | ||
public func getEncoded() throws -> Data { | ||
var rawValue = aws_event_stream_message() | ||
var rawHeaders = aws_array_list() | ||
defer { | ||
aws_event_stream_headers_list_cleanup(&rawHeaders) | ||
aws_event_stream_message_clean_up(&rawValue) | ||
} | ||
|
||
guard aws_event_stream_headers_list_init(&rawHeaders, allocator.rawValue) == AWS_OP_SUCCESS else { | ||
throw CommonRunTimeError.crtError(.makeFromLastError()) | ||
} | ||
try headers.forEach { | ||
try addHeader(header: $0, rawHeaders: &rawHeaders) | ||
} | ||
|
||
guard payload.withAWSByteBufPointer({ byteBuff in | ||
// TODO (optimization): we could avoid the extra copies of headers and data | ||
// if there were an API in C that let us encode everything directly into a pre-allocated buffer | ||
aws_event_stream_message_init(&rawValue, allocator.rawValue, &rawHeaders, byteBuff) | ||
}) == AWS_OP_SUCCESS else { | ||
throw CommonRunTimeError.crtError(.makeFromLastError()) | ||
} | ||
|
||
return Data( | ||
bytes: aws_event_stream_message_buffer(&rawValue), | ||
count: Int(aws_event_stream_message_total_length(&rawValue))) | ||
} | ||
} | ||
|
||
extension EventStreamMessage { | ||
func addHeader(header: EventStreamHeader, rawHeaders: UnsafeMutablePointer<aws_array_list>) throws { | ||
if header.name.count > EventStreamHeader.maxNameLength { | ||
throw CommonRunTimeError.crtError( | ||
.init( | ||
code: AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN.rawValue)) | ||
} | ||
let addCHeader: () throws -> Int32 = { | ||
return try header.name.withCString { headerName in | ||
switch header.value { | ||
case .bool(let value): | ||
return aws_event_stream_add_bool_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
Int8(value.uintValue)) | ||
case .byte(let value): | ||
return aws_event_stream_add_byte_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
value) | ||
case .int16(let value): | ||
return aws_event_stream_add_int16_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
value) | ||
case .int32(let value): | ||
return aws_event_stream_add_int32_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
value) | ||
case .int64(let value): | ||
return aws_event_stream_add_int64_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
value) | ||
case .byteBuf(var value): | ||
if value.count > EventStreamHeader.maxValueLength { | ||
throw CommonRunTimeError.crtError( | ||
.init( | ||
code: AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN.rawValue)) | ||
} | ||
return value.withUnsafeMutableBytes { | ||
let bytes = $0.bindMemory(to: UInt8.self).baseAddress! | ||
return aws_event_stream_add_bytebuf_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
bytes, | ||
UInt16($0.count), | ||
1) | ||
} | ||
case .string(let value): | ||
if value.count > EventStreamHeader.maxValueLength { | ||
throw CommonRunTimeError.crtError( | ||
.init( | ||
code: AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN.rawValue)) | ||
} | ||
return value.withCString { | ||
aws_event_stream_add_string_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
$0, | ||
UInt16(value.count), | ||
1) | ||
} | ||
case .timestamp(let value): | ||
return aws_event_stream_add_timestamp_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
Int64(value.millisecondsSince1970)) | ||
case .uuid(let value): | ||
return withUnsafeBytes(of: value) { | ||
let address = $0.baseAddress?.assumingMemoryBound(to: UInt8.self) | ||
return aws_event_stream_add_uuid_header( | ||
rawHeaders, | ||
headerName, | ||
UInt8(header.name.count), | ||
address) | ||
} | ||
} | ||
} | ||
} | ||
|
||
guard try addCHeader() == AWS_OP_SUCCESS else { | ||
throw CommonRunTimeError.crtError(.makeFromLastError()) | ||
} | ||
} | ||
} |
Oops, something went wrong.