-
Notifications
You must be signed in to change notification settings - Fork 357
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a length-delimited StreamChannelTransformer (#1)
This implements the packet scheme that the embedded protocol uses when communicating over stdin and stdout.
- Loading branch information
Showing
4 changed files
with
296 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Created by https://www.gitignore.io/api/dart | ||
# Edit at https://www.gitignore.io/?templates=dart | ||
|
||
### Dart ### | ||
# See https://www.dartlang.org/guides/libraries/private-files | ||
|
||
# Files and directories created by pub | ||
.dart_tool/ | ||
.packages | ||
build/ | ||
# If you're building an application, you may want to check-in your pubspec.lock | ||
pubspec.lock | ||
|
||
# Directory created by dartdoc | ||
# If you don't generate documentation locally you can remove this line. | ||
doc/api/ | ||
|
||
# Avoid committing generated Javascript files: | ||
*.dart.js | ||
*.info.json # Produced by the --dump-info flag. | ||
*.js # When generated by dart2js. Don't specify *.js if your | ||
# project includes source files written in JavaScript. | ||
*.js_ | ||
*.js.deps | ||
*.js.map | ||
|
||
# End of https://www.gitignore.io/api/dart | ||
|
||
# Generated protocol buffer files. | ||
*.pb*.dart |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright 2019 Google Inc. Use of this source code is governed by an | ||
// MIT-style license that can be found in the LICENSE file or at | ||
// https://opensource.org/licenses/MIT. | ||
|
||
import 'dart:async'; | ||
import 'dart:math' as math; | ||
import 'dart:typed_data'; | ||
|
||
import 'package:async/async.dart'; | ||
import 'package:stream_channel/stream_channel.dart'; | ||
|
||
/// A [StreamChannelTransformer] that converts a channel that sends and receives | ||
/// arbitrarily-chunked binary data to one that sends and receives packets of | ||
/// set length using [lengthDelimitedEncoder] and [lengthDelimitedDecoder]. | ||
final StreamChannelTransformer<Uint8List, List<int>> lengthDelimited = | ||
StreamChannelTransformer<Uint8List, List<int>>(lengthDelimitedDecoder, | ||
StreamSinkTransformer.fromStreamTransformer(lengthDelimitedEncoder)); | ||
|
||
/// A transformer that converts an arbitrarily-chunked byte stream where each | ||
/// packet is prefixed with a 32-bit little-endian number indicating its length | ||
/// into a stream of packet contents. | ||
final lengthDelimitedDecoder = | ||
StreamTransformer<List<int>, Uint8List>.fromBind((stream) { | ||
// The buffer into which the four-byte little-endian length of the next packet | ||
// will be written. | ||
var lengthBuffer = Uint8List(4); | ||
|
||
// The index of the next byte to write to [lengthBuffer]. Once this is equal | ||
// to [lengthBuffer.length], the full length is available. | ||
var lengthBufferIndex = 0; | ||
|
||
// The length of the next message, in bytes, read from [lengthBuffer] once | ||
// it's full. | ||
int nextMessageLength; | ||
|
||
// The buffer into which the packet data itself is written. Initialized once | ||
// [nextMessageLength] is known. | ||
Uint8List buffer; | ||
|
||
// The index of the next byte to write to [buffer]. Once this is equal to | ||
// [buffer.length] (or equivalently [nextMessageLength]), the full packet is | ||
// available. | ||
int bufferIndex; | ||
|
||
// It seems a little silly to use a nested [StreamTransformer] here, but we | ||
// need the outer one to establish a closure context so we can share state | ||
// across different input chunks, and the inner one takes care of all the | ||
// boilerplate of creating a new stream based on [stream]. | ||
return stream | ||
.transform(StreamTransformer.fromHandlers(handleData: (chunk, sink) { | ||
// The index of the next byte to read from [chunk]. We have to track this | ||
// because the chunk may contain the length *and* the message, or even | ||
// multiple messages. | ||
var i = 0; | ||
|
||
// Adds bytes from [chunk] to [destination] at [destinationIndex] without | ||
// overflowing the bounds of [destination], and increments [i] for each byte | ||
// written. | ||
// | ||
// Returns the number of bytes written. | ||
int writeFromChunk(Uint8List destination, int destinationIndex) { | ||
var bytesToWrite = | ||
math.min(destination.length - destinationIndex, chunk.length - i); | ||
destination.setRange( | ||
destinationIndex, destinationIndex + bytesToWrite, chunk, i); | ||
i += bytesToWrite; | ||
return bytesToWrite; | ||
} | ||
|
||
while (i < chunk.length) { | ||
// We can be in one of two states here: | ||
// | ||
// * Both [nextMessageLength] and [buffer] are `null`, in which case we're | ||
// waiting until we have four bytes in [lengthBuffer] to know how big of | ||
// a buffer to allocate. | ||
// | ||
// * Neither [nextMessageLength] nor [buffer] are `null`, in which case | ||
// we're waiting for [buffer] to have [nextMessageLength] in it before | ||
// we send it to [queue.local.sink] and start waiting for the next | ||
// message. | ||
if (nextMessageLength == null) { | ||
lengthBufferIndex += writeFromChunk(lengthBuffer, lengthBufferIndex); | ||
if (lengthBufferIndex < 4) return; | ||
|
||
nextMessageLength = | ||
ByteData.view(lengthBuffer.buffer).getUint32(0, Endian.little); | ||
buffer = Uint8List(nextMessageLength); | ||
bufferIndex = 0; | ||
} | ||
|
||
bufferIndex += writeFromChunk(buffer, bufferIndex); | ||
if (bufferIndex < nextMessageLength) return; | ||
|
||
sink.add(Uint8List.view(buffer.buffer, 0, nextMessageLength)); | ||
lengthBufferIndex = 0; | ||
nextMessageLength = null; | ||
buffer = null; | ||
bufferIndex = null; | ||
} | ||
})); | ||
}); | ||
|
||
/// A transformer that adds 32-bit little-endian numbers indicating the length | ||
/// of each packet, so that they can safely be sent over a medium that doesn't | ||
/// preserve packet boundaries. | ||
final lengthDelimitedEncoder = | ||
StreamTransformer<Uint8List, List<int>>.fromHandlers( | ||
handleData: (message, sink) { | ||
var messageLength = Uint8List(4); | ||
ByteData.view(messageLength.buffer) | ||
.setUint32(0, message.length, Endian.little); | ||
sink.add(messageLength); | ||
sink.add(message); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
name: sass_embedded | ||
version: 1.0.0-dev | ||
description: An implementation of the Sass embedded protocol using Dart Sass. | ||
author: Sass Team | ||
homepage: https://github.com/sass/dart-sass-embedded | ||
|
||
environment: | ||
sdk: '>=2.4.0 <3.0.0' | ||
|
||
dependencies: | ||
async: ">=1.13.0 <3.0.0" | ||
stream_channel: ">=1.6.0 <3.0.0" | ||
|
||
dev_dependencies: | ||
test: ^1.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
// Copyright 2019 Google Inc. Use of this source code is governed by an | ||
// MIT-style license that can be found in the LICENSE file or at | ||
// https://opensource.org/licenses/MIT. | ||
|
||
import 'dart:async'; | ||
import 'dart:typed_data'; | ||
|
||
import 'package:sass_embedded/src/util/length_delimited_transformer.dart'; | ||
|
||
import 'package:async/async.dart'; | ||
import 'package:test/test.dart'; | ||
|
||
void main() { | ||
group("encoder", () { | ||
Sink<List<int>> sink; | ||
Stream<List<int>> stream; | ||
setUp(() { | ||
var controller = StreamController<List<int>>(); | ||
sink = controller.sink; | ||
stream = controller.stream.transform(lengthDelimitedEncoder); | ||
}); | ||
|
||
test("encodes an empty message", () { | ||
sink.add([]); | ||
sink.close(); | ||
expect(collectBytes(stream), completion(equals([0, 0, 0, 0]))); | ||
}); | ||
|
||
test("encodes a message of length 1", () { | ||
sink.add([123]); | ||
sink.close(); | ||
expect(collectBytes(stream), completion(equals([1, 0, 0, 0, 123]))); | ||
}); | ||
|
||
test("encodes a message of length greater than 256", () { | ||
sink.add(List.filled(300, 1)); | ||
sink.close(); | ||
expect(collectBytes(stream), | ||
completion(equals([44, 1, 0, 0, ...List.filled(300, 1)]))); | ||
}); | ||
|
||
test("encodes multiple messages", () { | ||
sink.add([10]); | ||
sink.add([20, 30]); | ||
sink.add([40, 50, 60]); | ||
sink.close(); | ||
expect( | ||
collectBytes(stream), | ||
completion(equals( | ||
[1, 0, 0, 0, 10, 2, 0, 0, 0, 20, 30, 3, 0, 0, 0, 40, 50, 60]))); | ||
}); | ||
}); | ||
|
||
group("decoder", () { | ||
Sink<List<int>> sink; | ||
StreamQueue<Uint8List> queue; | ||
setUp(() { | ||
var controller = StreamController<List<int>>(); | ||
sink = controller.sink; | ||
queue = StreamQueue(controller.stream.transform(lengthDelimitedDecoder)); | ||
}); | ||
|
||
group("decodes an empty message", () { | ||
test("from a single chunk", () { | ||
sink.add([0, 0, 0, 0]); | ||
expect(queue, emits(isEmpty)); | ||
}); | ||
|
||
test("from multiple chunks", () { | ||
sink.add([0, 0]); | ||
sink.add([0, 0]); | ||
expect(queue, emits(isEmpty)); | ||
}); | ||
|
||
test("from one chunk per byte", () { | ||
sink..add([0])..add([0])..add([0])..add([0]); | ||
expect(queue, emits(isEmpty)); | ||
}); | ||
|
||
test("from a chunk that contains more data", () { | ||
sink.add([0, 0, 0, 0, 1, 0, 0, 0, 100]); | ||
expect(queue, emits(isEmpty)); | ||
}); | ||
}); | ||
|
||
group("decodes a longer message", () { | ||
test("from a single chunk", () { | ||
sink.add([4, 0, 0, 0, 1, 2, 3, 4]); | ||
expect(queue, emits([1, 2, 3, 4])); | ||
}); | ||
|
||
test("from multiple chunks", () { | ||
sink..add([4, 0])..add([0, 0, 1, 2])..add([3, 4]); | ||
expect(queue, emits([1, 2, 3, 4])); | ||
}); | ||
|
||
test("from one chunk per byte", () { | ||
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4]) { | ||
sink.add([byte]); | ||
} | ||
expect(queue, emits([1, 2, 3, 4])); | ||
}); | ||
|
||
test("from a chunk that contains more data", () { | ||
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0]); | ||
expect(queue, emits([1, 2, 3, 4])); | ||
}); | ||
|
||
test("of length greater than 256", () { | ||
sink.add([44, 1, 0, 0, ...List.filled(300, 1)]); | ||
expect(queue, emits(List.filled(300, 1))); | ||
}); | ||
}); | ||
|
||
group("decodes multiple messages", () { | ||
test("from single chunk", () { | ||
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]); | ||
expect(queue, emits([1, 2, 3, 4])); | ||
expect(queue, emits([101, 102])); | ||
}); | ||
|
||
test("from multiple chunks", () { | ||
sink..add([4, 0])..add([0, 0, 1, 2, 3, 4, 2, 0])..add([0, 0, 101, 102]); | ||
expect(queue, emits([1, 2, 3, 4])); | ||
expect(queue, emits([101, 102])); | ||
}); | ||
|
||
test("from one chunk per byte", () { | ||
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) { | ||
sink.add([byte]); | ||
} | ||
expect(queue, emits([1, 2, 3, 4])); | ||
expect(queue, emits([101, 102])); | ||
}); | ||
}); | ||
}); | ||
} |