Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multipart] Add the frame -> bytes serializer #73

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Sources/OpenAPIRuntime/Multipart/ByteUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ enum ASCII {
/// Two dash characters.
static let dashes: [UInt8] = [dash, dash]

/// The `<CR>` character follow by the `<LF>` character.
/// The `<CR>` character followed by the `<LF>` character.
static let crlf: [UInt8] = [cr, lf]

/// The colon character followed by the space character.
static let colonSpace: [UInt8] = [colon, space]

/// The characters that represent optional whitespace (OWS).
static let optionalWhitespace: Set<UInt8> = [space, tab]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftOpenAPIGenerator open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HTTPTypes

/// A sequence that serializes multipart frames into bytes.
struct MultipartFramesToBytesSequence<Upstream: AsyncSequence & Sendable>: Sendable
where Upstream.Element == MultipartFrame {

/// The source of multipart frames.
var upstream: Upstream

/// The boundary string used to separate multipart parts.
var boundary: String
}

extension MultipartFramesToBytesSequence: AsyncSequence {

/// The type of element produced by this asynchronous sequence.
typealias Element = ArraySlice<UInt8>
czechboy0 marked this conversation as resolved.
Show resolved Hide resolved

/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce
/// elements of the asynchronous sequence.
func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator> {
Iterator(upstream: upstream.makeAsyncIterator(), boundary: boundary)
}

/// An iterator that pulls frames from the upstream iterator and provides
/// serialized byte chunks.
struct Iterator<UpstreamIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol
where UpstreamIterator.Element == MultipartFrame {

/// The iterator that provides the multipart frames.
private var upstream: UpstreamIterator

/// The multipart frame serializer.
private var serializer: MultipartSerializer

/// Creates a new iterator from the provided source of frames and a boundary string.
/// - Parameters:
/// - upstream: The iterator that provides the multipart frames.
/// - boundary: The boundary separating the multipart parts.
init(upstream: UpstreamIterator, boundary: String) {
self.upstream = upstream
self.serializer = .init(boundary: boundary)
}

/// Asynchronously advances to the next element and returns it, or ends the
/// sequence if there is no next element.
///
/// - Returns: The next element, if it exists, or `nil` to signal the end of
/// the sequence.
mutating func next() async throws -> ArraySlice<UInt8>? {
try await serializer.next { try await upstream.next() }
}
}
}
2 changes: 1 addition & 1 deletion Sources/OpenAPIRuntime/Multipart/MultipartParser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import Foundation
import HTTPTypes

/// A parser of mutlipart frames from bytes.
/// A parser of multipart frames from bytes.
struct MultipartParser {

/// The underlying state machine.
Expand Down
260 changes: 260 additions & 0 deletions Sources/OpenAPIRuntime/Multipart/MultipartSerializer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftOpenAPIGenerator open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import HTTPTypes

/// A serializer of multipart frames into bytes.
struct MultipartSerializer {

/// The boundary that separates parts.
private let boundary: ArraySlice<UInt8>

/// The underlying state machine.
private var stateMachine: StateMachine

/// The buffer of bytes ready to be written out.
private var outBuffer: [UInt8]

/// Creates a new serializer.
/// - Parameter boundary: The boundary that separates parts.
init(boundary: String) {
self.boundary = ArraySlice(boundary.utf8)
self.stateMachine = .init()
self.outBuffer = []
}
/// Requests the next byte chunk.
/// - Parameter fetchFrame: A closure that is called when the serializer is ready to serialize the next frame.
/// - Returns: A byte chunk.
/// - Throws: When a serialization error is encountered.
mutating func next(_ fetchFrame: () async throws -> MultipartFrame?) async throws -> ArraySlice<UInt8>? {

func flushedBytes() -> ArraySlice<UInt8> {
let outChunk = ArraySlice(outBuffer)
outBuffer.removeAll(keepingCapacity: true)
return outChunk
}

while true {
switch stateMachine.next() {
case .returnNil: return nil
case .emitStart:
emitStart()
return flushedBytes()
case .needsMore:
let frame = try await fetchFrame()
switch stateMachine.receivedFrame(frame) {
case .returnNil: return nil
case .emitEvents(let events):
for event in events {
switch event {
case .headerFields(let headerFields): emitHeaders(headerFields)
case .bodyChunk(let chunk): emitBodyChunk(chunk)
case .endOfPart: emitEndOfPart()
case .start: emitStart()
case .end: emitEnd()
}
}
return flushedBytes()
case .emitError(let error): throw SerializerError(error: error)
}
}
}
}
}

extension MultipartSerializer {

/// An error thrown by the serializer.
struct SerializerError: Swift.Error, CustomStringConvertible, LocalizedError {

/// The underlying error emitted by the state machine.
var error: StateMachine.ActionError

var description: String {
switch error {
case .noHeaderFieldsAtStart: return "No header fields found at the start of the multipart body."
}
}

var errorDescription: String? { description }
}
}

extension MultipartSerializer {

/// Writes the provided header fields into the buffer.
/// - Parameter headerFields: The header fields to serialize.
private mutating func emitHeaders(_ headerFields: HTTPFields) {
outBuffer.append(contentsOf: ASCII.crlf)
let sortedHeaders = headerFields.sorted { a, b in a.name.canonicalName < b.name.canonicalName }
for headerField in sortedHeaders {
outBuffer.append(contentsOf: headerField.name.canonicalName.utf8)
outBuffer.append(contentsOf: ASCII.colonSpace)
outBuffer.append(contentsOf: headerField.value.utf8)
outBuffer.append(contentsOf: ASCII.crlf)
}
outBuffer.append(contentsOf: ASCII.crlf)
}

/// Writes the part body chunk into the buffer.
/// - Parameter bodyChunk: The body chunk to write.
private mutating func emitBodyChunk(_ bodyChunk: ArraySlice<UInt8>) { outBuffer.append(contentsOf: bodyChunk) }

/// Writes an end of part boundary into the buffer.
private mutating func emitEndOfPart() {
outBuffer.append(contentsOf: ASCII.crlf)
outBuffer.append(contentsOf: ASCII.dashes)
outBuffer.append(contentsOf: boundary)
}

/// Writes the start boundary into the buffer.
private mutating func emitStart() {
outBuffer.append(contentsOf: ASCII.dashes)
outBuffer.append(contentsOf: boundary)
}

/// Writes the end double dash to the buffer.
private mutating func emitEnd() {
outBuffer.append(contentsOf: ASCII.dashes)
outBuffer.append(contentsOf: ASCII.crlf)
outBuffer.append(contentsOf: ASCII.crlf)
}
}

extension MultipartSerializer {

/// A state machine representing the multipart frame serializer.
struct StateMachine {

/// The possible states of the state machine.
enum State: Hashable {

/// Has not yet written any bytes.
case initial

/// Emitted start, but no frames yet.
case emittedStart

/// Finished, the terminal state.
case finished

/// Last emitted a header fields frame.
case emittedHeaders

/// Last emitted a part body chunk frame.
case emittedBodyChunk
}

/// The current state of the state machine.
private(set) var state: State

/// Creates a new state machine.
init() { self.state = .initial }

/// An error returned by the state machine.
enum ActionError: Hashable {

/// The first frame from upstream was not a header fields frame.
case noHeaderFieldsAtStart
}

/// An action returned by the `next` method.
enum NextAction: Hashable {

/// Return nil to the caller, no more bytes.
case returnNil

/// Emit the initial boundary.
case emitStart

/// Ready for the next frame.
case needsMore
}

/// Read the next byte chunk serialized from upstream frames.
/// - Returns: An action to perform.
mutating func next() -> NextAction {
switch state {
case .initial:
state = .emittedStart
return .emitStart
case .finished: return .returnNil
case .emittedStart, .emittedHeaders, .emittedBodyChunk: return .needsMore
}
}

/// An event to serialize to bytes.
enum Event: Hashable {

/// The header fields of a part.
case headerFields(HTTPFields)

/// A byte chunk of a part.
case bodyChunk(ArraySlice<UInt8>)

/// A boundary between parts.
case endOfPart

/// The initial boundary.
case start

/// The final dashes.
case end
}

/// An action returned by the `receivedFrame` method.
enum ReceivedFrameAction: Hashable {

/// Return nil to the caller, no more bytes.
case returnNil

/// Write the provided events as bytes.
case emitEvents([Event])

/// Throw the provided error.
case emitError(ActionError)
}

/// Ingest the provided frame.
/// - Parameter frame: A new frame. If `nil`, then the source of frames is finished.
/// - Returns: An action to perform.
mutating func receivedFrame(_ frame: MultipartFrame?) -> ReceivedFrameAction {
switch state {
case .initial: preconditionFailure("Invalid state: \(state)")
case .finished: return .returnNil
case .emittedStart, .emittedHeaders, .emittedBodyChunk: break
}
switch (state, frame) {
case (.initial, _), (.finished, _): preconditionFailure("Already handled above.")
case (_, .none):
state = .finished
return .emitEvents([.endOfPart, .end])
case (.emittedStart, .headerFields(let headerFields)):
state = .emittedHeaders
return .emitEvents([.headerFields(headerFields)])
case (.emittedStart, .bodyChunk):
state = .finished
return .emitError(.noHeaderFieldsAtStart)
case (.emittedHeaders, .headerFields(let headerFields)),
(.emittedBodyChunk, .headerFields(let headerFields)):
state = .emittedHeaders
return .emitEvents([.endOfPart, .headerFields(headerFields)])
case (.emittedHeaders, .bodyChunk(let bodyChunk)), (.emittedBodyChunk, .bodyChunk(let bodyChunk)):
state = .emittedBodyChunk
return .emitEvents([.bodyChunk(bodyChunk)])
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@ import Foundation

final class Test_MultipartBytesToFramesSequence: Test_Runtime {
func test() async throws {
var chunk = chunkFromStringLines([
let chunk = chunkFromStringLines([
"--__abcd__", #"Content-Disposition: form-data; name="name""#, "", "24", "--__abcd__",
#"Content-Disposition: form-data; name="info""#, "", "{}", "--__abcd__--",
])
let next: () async throws -> ArraySlice<UInt8>? = {
if let first = chunk.first {
let out: ArraySlice<UInt8> = [first]
chunk = chunk.dropFirst()
return out
} else {
return nil
}
}
let upstream = HTTPBody(AsyncThrowingStream(unfolding: next), length: .unknown, iterationBehavior: .single)
var iterator = chunk.makeIterator()
let upstream = AsyncStream { iterator.next().map { ArraySlice([$0]) } }
let sequence = MultipartBytesToFramesSequence(upstream: upstream, boundary: "__abcd__")
var frames: [MultipartFrame] = []
for try await frame in sequence { frames.append(frame) }
Expand Down
Loading