diff --git a/README.md b/README.md index 60b74922b..c886700af 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,132 @@ or visit our [Support and Help](https://www.ably.io/support) site to discuss privately. +## Running the example + +* Clone the repo +* cd to `example` folder +* run `flutter packages get` to install dependencies +* `flutter run` will start the application on connected android / iOS device + + +## Using the Realtime API + +##### Import the package + +```dart +import 'package:ably_flutter_plugin/ably.dart' as ably; +``` + +##### Create an Ably instance + +```dart +final ably.Ably ablyPlugin = ably.Ably(); +``` + +##### create a ClientOptions + +```dart +final clientOptions = ably.ClientOptions.fromKey(""); +clientOptions.logLevel = ably.LogLevel.verbose; //optional +``` + +##### Rest API + +```dart +ably.Rest rest = await ablyPlugin.createRest(options: clientOptions); +``` + +Getting a channel instance + +```dart +ably.RestChannel channel = rest.channels.get('test'); +``` + +Publish rest messages + +```dart +//passing both name and data +await channel.publish(name: "Hello", data: "Ably"); +//passing just name +await channel.publish(name: "Hello"); +//passing just data +await channel.publish(data: "Ably"); +//publishing an empty message +await channel.publish(); +``` + +##### Realtime API + +```dart +ably.Realtime realtime = await ablyPlugin.createRealtime(options: clientOptions); +``` + +Listen to connection state change event + +```dart +realtime.connection.on().listen((ably.ConnectionStateChange stateChange) async { + print('Realtime connection state changed: ${stateChange.event}'); + setState(() { _realtimeConnectionState = stateChange.current; }); +}); +``` + +Listening to a particular event: `connected` + +```dart +realtime.connection.on(ably.ConnectionEvent.connected).listen((ably.ConnectionStateChange stateChange) async { + print('Realtime connection state changed: ${stateChange.event}'); + setState(() { _realtimeConnectionState = stateChange.current; }); +}); +``` + +Connect and disconnect to a realtime instance + +```dart +realtime.connect(); //connect to realtime +realtime.disconnect(); //disconnect from realtime +``` + +## Caveats + +#### RTE6a compliance + +Using Streams based approach doesn't fully conform with [RTE6a](https://docs.ably.io/client-lib-development-guide/features/#RTE6a) from the spec. + +##### Problem + +```dart +StreamSubscription subscriptionToBeCancelled; + +//Listener registered 1st +realtime.connection.on().listen((ably.ConnectionStateChange stateChange) async { + if (stateChange.event == ably.ConnectionEvent.connected) { + await subscriptionToBeCancelled.cancel(); //Cancelling 2nd listener + } +}); + +//Listener registered 2nd +subscriptionToBeCancelled = realtime.connection.on().listen((ably.ConnectionStateChange stateChange) async { + print('State changed'); +}); +``` + +In the example above, 2nd listener is cancelled when 1st listener is notified about the "connected" event. +As per RTE6a, 2nd listener should also be triggered, but it will not be as 2nd listener is registered after 1st listener and stream subscription is cancelled immediately after first listener is triggered. + +This wouldn't have happened if 2nd listener was registered before the 1st. + +However, using a neat little trick will fix this. + +##### Suggestion - Cancelling using delay + +Instead of `await subscriptionToBeCancelled.cancel();`, use + +```dart +Future.delayed(Duration.zero, (){ + subscriptionToBeCancelled.cancel(); +}); +``` + ## Contributing We have some [Developer Notes](DeveloperNotes.md), but we're still learning too so they'll only help you so far, in fact there's probably a lot you can teach us! diff --git a/android/src/main/java/io/ably/flutter/plugin/AblyEventStreamHandler.java b/android/src/main/java/io/ably/flutter/plugin/AblyEventStreamHandler.java new file mode 100644 index 000000000..4139fb19b --- /dev/null +++ b/android/src/main/java/io/ably/flutter/plugin/AblyEventStreamHandler.java @@ -0,0 +1,117 @@ +package io.ably.flutter.plugin; + +import android.os.Handler; +import android.os.Looper; + +import io.ably.lib.realtime.ChannelStateListener; +import io.ably.lib.realtime.ConnectionStateListener; +import io.flutter.plugin.common.EventChannel; + + +/** + * Dart side can listen to Event Streams by pushing data to eventSink available in onListen method. + * Event listening can be cancelled when stream subscription is cancelled on dart side + * + * ref: https://api.flutter.dev/javadoc/io/flutter/plugin/common/EventChannel.StreamHandler.html + * */ +public class AblyEventStreamHandler implements EventChannel.StreamHandler { + private final AblyMethodCallHandler methodCallHandler; + + /** + * Constructor requiring methodCallHandler, as it is a singleton and has all instances stored + * Event listening can be started on an instance picked from the stored instances + * */ + AblyEventStreamHandler(AblyMethodCallHandler methodCallHandler){ + this.methodCallHandler = methodCallHandler; + } + + /** + * Refer to the comments on AblyMethodCallHandler.MethodResultWrapper + * on why this customized EventSink is required + * */ + private static class MainThreadEventSink implements EventChannel.EventSink { + private EventChannel.EventSink eventSink; + private Handler handler; + + MainThreadEventSink(EventChannel.EventSink eventSink) { + this.eventSink = eventSink; + handler = new Handler(Looper.getMainLooper()); + } + + @Override + public void success(final Object o) { + handler.post(() -> eventSink.success(o)); //lambda for new Runnable + } + + @Override + public void error(final String s, final String s1, final Object o) { + handler.post(() -> eventSink.error(s, s1, o)); + } + + @Override + public void endOfStream() { + //TODO work on this if required, or remove this TODO once all features are covered + } + } + + // Listeners + private PluginConnectionStateListener connectionStateListener; + private PluginChannelStateListener channelStateListener; + + private class Listener{ + EventChannel.EventSink eventSink; + Listener(EventChannel.EventSink eventSink){ this.eventSink = eventSink; } + } + + private class PluginConnectionStateListener extends Listener implements ConnectionStateListener { + PluginConnectionStateListener(EventChannel.EventSink eventSink){super(eventSink);} + public void onConnectionStateChanged(ConnectionStateChange stateChange){ + eventSink.success(stateChange); + } + } + + private class PluginChannelStateListener extends Listener implements ChannelStateListener { + PluginChannelStateListener(EventChannel.EventSink eventSink){super(eventSink);} + public void onChannelStateChanged(io.ably.lib.realtime.ChannelStateListener.ChannelStateChange stateChange){ + eventSink.success(stateChange); + } + } + + @Override + public void onListen(Object object, EventChannel.EventSink uiThreadEventSink) { + MainThreadEventSink eventSink = new MainThreadEventSink(uiThreadEventSink); + methodCallHandler.>ablyDo((AblyFlutterMessage)object, (ablyLibrary, message) -> { + String eventName = message.message; + switch(eventName) { + case "realtime_onConnectionStateChanged": + connectionStateListener = new PluginConnectionStateListener(eventSink); + ablyLibrary.getRealtime(message.handle).connection.on(connectionStateListener); + return; + case "realtime_onChannelStateChanged": + // channelStateListener = new PluginChannelStateListener(eventSink); + // ablyLibrary.getRealtime(message.handle).connection.on(channelStateListener); + // return; + default: + eventSink.error("unhandled event", null, null); + } + }); + } + + @Override + public void onCancel(Object object) { + if(object==null){ + System.out.println("Cannot process null input on cancel"); + return; + } + methodCallHandler.>ablyDo((AblyFlutterMessage)object, (ablyLibrary, message) -> { + String eventName = message.message; + switch (eventName) { + case "realtime_onConnectionStateChanged": + ablyLibrary.getRealtime(message.handle).connection.off(connectionStateListener); + case "realtime_onChannelStateChanged": + // ablyLibrary.getRealtime(handle).connection.off(connectionStateListener); + } + }); + } + +} diff --git a/android/src/main/java/io/ably/flutter/plugin/AblyFlutterPlugin.java b/android/src/main/java/io/ably/flutter/plugin/AblyFlutterPlugin.java index ba77af509..ec034982d 100644 --- a/android/src/main/java/io/ably/flutter/plugin/AblyFlutterPlugin.java +++ b/android/src/main/java/io/ably/flutter/plugin/AblyFlutterPlugin.java @@ -2,7 +2,9 @@ import androidx.annotation.NonNull; +import app.loup.streams_channel.StreamsChannel; import io.flutter.embedding.engine.plugins.FlutterPlugin; +import io.flutter.plugin.common.BinaryMessenger; import io.flutter.plugin.common.MethodChannel; import io.flutter.plugin.common.MethodCodec; import io.flutter.plugin.common.PluginRegistry.Registrar; @@ -15,8 +17,7 @@ private static MethodCodec createCodec() { @Override public void onAttachedToEngine(@NonNull FlutterPluginBinding flutterPluginBinding) { - final MethodChannel channel = new MethodChannel(flutterPluginBinding.getBinaryMessenger(), "ably_flutter_plugin", createCodec()); - channel.setMethodCallHandler(AblyMethodCallHandler.getInstance()); + setupChannels(flutterPluginBinding.getBinaryMessenger()); } // This static function is optional and equivalent to onAttachedToEngine. It supports the old @@ -29,8 +30,19 @@ public void onAttachedToEngine(@NonNull FlutterPluginBinding flutterPluginBindin // depending on the user's project. onAttachedToEngine or registerWith must both be defined // in the same class. public static void registerWith(Registrar registrar) { - final MethodChannel channel = new MethodChannel(registrar.messenger(), "ably_flutter_plugin", createCodec()); - channel.setMethodCallHandler(AblyMethodCallHandler.getInstance()); + AblyFlutterPlugin.setupChannels(registrar.messenger()); + } + + private static void setupChannels(BinaryMessenger messenger){ + MethodCodec codec = createCodec(); + AblyMethodCallHandler methodCallHandler = AblyMethodCallHandler.getInstance(); + + final MethodChannel channel = new MethodChannel(messenger, "io.ably.flutter.plugin", codec); + channel.setMethodCallHandler(methodCallHandler); + + final StreamsChannel streamsChannel = new StreamsChannel(messenger, "io.ably.flutter.stream", codec); + streamsChannel.setStreamHandlerFactory(arguments -> new AblyEventStreamHandler(methodCallHandler)); + } @Override diff --git a/android/src/main/java/io/ably/flutter/plugin/AblyLibrary.java b/android/src/main/java/io/ably/flutter/plugin/AblyLibrary.java index b031f4b09..dfdc23766 100644 --- a/android/src/main/java/io/ably/flutter/plugin/AblyLibrary.java +++ b/android/src/main/java/io/ably/flutter/plugin/AblyLibrary.java @@ -15,7 +15,6 @@ class AblyLibrary { // and as per this answer https://stackoverflow.com/a/31413003 private final LongSparseArray _restInstances = new LongSparseArray<>(); private final LongSparseArray _realtimeInstances = new LongSparseArray<>(); - private final LongSparseArray _connectionListeners = new LongSparseArray<>(); private void assertNotDisposed() { if (_disposed) { @@ -51,21 +50,6 @@ AblyRest getRest(final long handle){ return _restInstances.get(handle); } - long createConnectionListener(final AblyRealtime realtime) { - assertNotDisposed(); - - final AblyRealtimeConnectionListener listener = new AblyRealtimeConnectionListener(); - realtime.connection.on(listener); - _connectionListeners.put(_nextHandle, listener); - return _nextHandle++; - } - - AblyRealtimeConnectionListener getConnectionListener(final long handle) { - assertNotDisposed(); - - return _connectionListeners.get(handle); - } - void dispose() { assertNotDisposed(); diff --git a/android/src/main/java/io/ably/flutter/plugin/AblyMessageCodec.java b/android/src/main/java/io/ably/flutter/plugin/AblyMessageCodec.java index 2b0adf993..a6285fa12 100644 --- a/android/src/main/java/io/ably/flutter/plugin/AblyMessageCodec.java +++ b/android/src/main/java/io/ably/flutter/plugin/AblyMessageCodec.java @@ -2,32 +2,98 @@ import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import java.util.function.Consumer; +import io.ably.lib.realtime.ChannelEvent; +import io.ably.lib.realtime.ChannelState; +import io.ably.lib.realtime.ChannelStateListener; +import io.ably.lib.realtime.ConnectionEvent; +import io.ably.lib.realtime.ConnectionState; +import io.ably.lib.realtime.ConnectionStateListener; import io.ably.lib.rest.Auth; import io.ably.lib.rest.Auth.TokenDetails; import io.ably.lib.types.ClientOptions; -import io.ably.lib.types.Param; import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.Param; import io.flutter.plugin.common.StandardMessageCodec; public class AblyMessageCodec extends StandardMessageCodec { - private static final byte _valueClientOptions = (byte)128; - private static final byte _valueTokenDetails = (byte)129; + + //Ably flutter plugin protocol message + private static final byte _valueAblyMessage = (byte)128; + + //Other ably objects + private static final byte _valueClientOptions = (byte)129; + private static final byte _valueTokenDetails = (byte)130; private static final byte _errorInfo = (byte)144; - private static final byte _valueAblyMessage = (byte)255; - @Override - protected Object readValueOfType(final byte type, final ByteBuffer buffer) { - switch (type) { - case _valueClientOptions: - return readClientOptions(buffer); + // Events + private static final byte _connectionEvent = (byte)201; + private static final byte _connectionState = (byte)202; + private static final byte _connectionStateChange = (byte)203; + private static final byte _channelEvent = (byte)204; + private static final byte _channelState = (byte)205; + private static final byte _channelStateChange = (byte)206; + +// @FunctionalInterface + interface CodecEncoder{ void encode(ByteArrayOutputStream stream, T value); } +// @FunctionalInterface + interface CodecDecoder{ T decode(ByteBuffer buffer); } + class CodecPair{ + + final CodecEncoder encoder; + final CodecDecoder decoder; + CodecPair(CodecEncoder encoder, CodecDecoder decoder){ + this.encoder = encoder; + this.decoder = decoder; + } - case _valueTokenDetails: - return readTokenDetails(buffer); + void encode(final ByteArrayOutputStream stream, final Object value){ + if(this.encoder==null){ + System.out.println("Codec encoder not defined"); + return; + } + this.encoder.encode(stream, (T)value); + } - case _valueAblyMessage: - return readAblyFlutterMessage(buffer); + T decode(ByteBuffer buffer){ + if(this.decoder==null){ + System.out.println("Codec decoder not defined"); + return null; + } + return this.decoder.decode(buffer); + } + } + + private Map codecMap; + + AblyMessageCodec(){ + AblyMessageCodec self = this; + codecMap = new HashMap(){ + { + put(_valueAblyMessage, new CodecPair<>(null, self::readAblyFlutterMessage)); + put(_valueClientOptions, new CodecPair<>(null, self::readClientOptions)); + put(_valueTokenDetails, new CodecPair<>(null, self::readTokenDetails)); + put(_errorInfo, new CodecPair<>(self::writeErrorInfo, null)); + + put(_connectionEvent, new CodecPair<>(self::writeConnectionEvent, null)); + put(_connectionState, new CodecPair<>(self::writeConnectionState, null)); + put(_connectionStateChange, new CodecPair<>(self::writeConnectionStateChange, null)); + + put(_channelEvent, new CodecPair<>(self::writeChannelEvent, null)); + put(_channelState, new CodecPair<>(self::writeChannelState, null)); + put(_channelStateChange, new CodecPair<>(self::writeChannelStateChange, null)); + } + }; + } + + @Override + protected Object readValueOfType(final byte type, final ByteBuffer buffer) { + CodecPair pair = codecMap.get(type); + if(pair!=null){ + return pair.decode(buffer); } return super.readValueOfType(type, buffer); } @@ -39,6 +105,34 @@ private void readValue(final ByteBuffer buffer, final Consumer consumer) } } + @Override + protected void writeValue(ByteArrayOutputStream stream, Object value) { + Byte type = null; + if(value instanceof ErrorInfo){ + type = _errorInfo; + }else if(value instanceof ConnectionEvent){ + type = _connectionEvent; + }else if(value instanceof ConnectionState){ + type = _connectionState; + }else if(value instanceof ConnectionStateListener.ConnectionStateChange){ + type = _connectionStateChange; + }else if(value instanceof ChannelEvent){ + type = _channelEvent; + }else if(value instanceof ChannelState){ + type = _channelState; + }else if(value instanceof ChannelStateListener.ChannelStateChange){ + type = _channelStateChange; + } + if(type!=null){ + CodecPair pair = codecMap.get(type); + if(pair!=null) { + pair.encode(stream, value); + return; + } + } + super.writeValue(stream, value); + } + /** * Dart int types get delivered to Java as Integer, unless '32 bits not enough' in which case * they are delivered as Long. @@ -61,7 +155,7 @@ private AblyFlutterMessage readAblyFlutterMessage(final ByteBuffer buffer) { return new AblyFlutterMessage<>(handle, message); } - private Object readClientOptions(final ByteBuffer buffer) { + private ClientOptions readClientOptions(final ByteBuffer buffer) { final ClientOptions o = new ClientOptions(); // AuthOptions (super class of ClientOptions) @@ -102,7 +196,7 @@ private Object readClientOptions(final ByteBuffer buffer) { return o; } - private Object readTokenDetails(final ByteBuffer buffer) { + private TokenDetails readTokenDetails(final ByteBuffer buffer) { final TokenDetails o = new TokenDetails(); readValue(buffer, v -> o.token = (String)v); @@ -116,18 +210,8 @@ private Object readTokenDetails(final ByteBuffer buffer) { //HANDLING WRITE - - @Override - protected void writeValue(ByteArrayOutputStream stream, Object value) { - if(value instanceof ErrorInfo){ - stream.write(_errorInfo); - writeErrorInfo(stream, (ErrorInfo) value); - return; - } - super.writeValue(stream, value); - } - private void writeErrorInfo(ByteArrayOutputStream stream, ErrorInfo e){ + stream.write(_errorInfo); writeValue(stream, e.code); writeValue(stream, e.message); writeValue(stream, e.statusCode); @@ -135,4 +219,32 @@ private void writeErrorInfo(ByteArrayOutputStream stream, ErrorInfo e){ writeValue(stream, null); //requestId - not available in ably-java writeValue(stream, null); //cause - not available in ably-java } + + private void writeConnectionEvent(ByteArrayOutputStream stream, ConnectionEvent e){ writeEnum(stream, _connectionEvent, e); } + private void writeConnectionState(ByteArrayOutputStream stream, ConnectionState e){ writeEnum(stream, _connectionState, e); } + private void writeConnectionStateChange(ByteArrayOutputStream stream, ConnectionStateListener.ConnectionStateChange c){ + stream.write(_connectionStateChange); + writeValue(stream, c.current); + writeValue(stream, c.previous); + writeValue(stream, c.event); + writeValue(stream, c.retryIn); + writeValue(stream, c.reason); + } + + private void writeChannelEvent(ByteArrayOutputStream stream, ChannelEvent e){ writeEnum(stream, _channelEvent, e); } + private void writeChannelState(ByteArrayOutputStream stream, ChannelState e){ writeEnum(stream, _channelState, e); } + private void writeChannelStateChange(ByteArrayOutputStream stream, ChannelStateListener.ChannelStateChange c){ + stream.write(_channelStateChange); + writeValue(stream, c.current); + writeValue(stream, c.previous); + writeValue(stream, c.event); + writeValue(stream, c.resumed); + writeValue(stream, c.reason); + } + + private void writeEnum(ByteArrayOutputStream stream, int eventCode, Enum e){ + stream.write(eventCode); + writeValue(stream, e.ordinal()); + } + } diff --git a/android/src/main/java/io/ably/flutter/plugin/AblyMethodCallHandler.java b/android/src/main/java/io/ably/flutter/plugin/AblyMethodCallHandler.java index ada05a91c..190c4c07d 100644 --- a/android/src/main/java/io/ably/flutter/plugin/AblyMethodCallHandler.java +++ b/android/src/main/java/io/ably/flutter/plugin/AblyMethodCallHandler.java @@ -46,8 +46,8 @@ private AblyMethodCallHandler() { //Realtime _map.put("createRealtimeWithOptions", this::createRealtimeWithOptions); _map.put("connectRealtime", this::connectRealtime); - _map.put("createListener", this::createListener); - _map.put("eventOnce", this::eventOnce); + _map.put("closeRealtime", this::closeRealtime); + } // MethodChannel.Result wrapper that responds on the platform thread. @@ -193,43 +193,15 @@ private void connectRealtime(@NonNull MethodCall call, @NonNull MethodChannel.Re }); } - private void createListener(@NonNull MethodCall call, @NonNull MethodChannel.Result result) { + private void closeRealtime(@NonNull MethodCall call, @NonNull MethodChannel.Result result) { final AblyFlutterMessage message = (AblyFlutterMessage)call.arguments; - this.ablyDo(message, (ablyLibrary, innerMessage) -> { - final Object resultValue; - final int type = ((Number)innerMessage.message).intValue(); - switch (type) { - case 1: // connection - resultValue = ablyLibrary.createConnectionListener(ablyLibrary.getRealtime(innerMessage.handle)); - break; - default: - result.error("unhandled type", null, null); - return; - } - result.success(resultValue); - }); - } - - // https://flutter.dev/docs/development/platform-integration/platform-channels#jumping-to-the-ui-thread-in-android - private final Handler androidMainMessageQueue = new Handler(Looper.getMainLooper()); - - private void eventOnce(@NonNull MethodCall call, @NonNull MethodChannel.Result result) { - final AblyFlutterMessage message = (AblyFlutterMessage)call.arguments; - this.ablyDo(message, (ablyLibrary, listenerHandle) -> { - // TODO actually, the argument could be an AblyFlutterMessage if the user specifies the - // specific event they want to listen for. Either that needs refactor or we need to - // handle it here. - ablyLibrary.getConnectionListener(listenerHandle.longValue()).listen().thenAcceptAsync(connectionStateChange -> androidMainMessageQueue.post(new Runnable() { - @Override - public void run() { - // TODO consider serialising numerically - perhaps supporting this type in the codec - result.success(connectionStateChange.current.toString()); - } - })); + this.ablyDo(message, (ablyLibrary, realtimeHandle) -> { + ablyLibrary.getRealtime(realtimeHandle.longValue()).close(); + result.success(null); }); } - private void ablyDo(final AblyFlutterMessage message, final BiConsumer consumer) { + public void ablyDo(final AblyFlutterMessage message, final BiConsumer consumer) { if (!message.handle.equals(_ablyHandle)) { // TODO an error response, perhaps? or allow Dart side to understand null response? return; diff --git a/android/src/main/java/io/ably/flutter/plugin/AblyRealtimeConnectionListener.java b/android/src/main/java/io/ably/flutter/plugin/AblyRealtimeConnectionListener.java deleted file mode 100644 index 2daf0f5c3..000000000 --- a/android/src/main/java/io/ably/flutter/plugin/AblyRealtimeConnectionListener.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.ably.flutter.plugin; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import io.ably.lib.realtime.ConnectionStateListener; - -/** - * Not the most sophisticated of implementations but should be safe to call from multiple threads. - */ -public class AblyRealtimeConnectionListener implements ConnectionStateListener { - private final List> _once = new ArrayList<>(); - - @Override - public synchronized void onConnectionStateChanged(final ConnectionStateChange state) { - System.out.println("AblyRealtimeConnectionListener onConnectionStateChanged: " + state); - for (final CompletableFuture future : _once) { - future.complete(state); - } - _once.clear(); - } - - synchronized CompletableFuture listen() { - final CompletableFuture future = new CompletableFuture<>(); - _once.add(future); - return future; - } -} diff --git a/example/ios/Podfile.lock b/example/ios/Podfile.lock index f5785068c..284e27e98 100644 --- a/example/ios/Podfile.lock +++ b/example/ios/Podfile.lock @@ -75,11 +75,14 @@ PODS: - msgpack (0.3.1) - SAMKeychain (1.5.3) - SocketRocketAblyFork (0.5.2-ably-4) + - streams_channel (0.0.1): + - Flutter - ULID (1.1.0) DEPENDENCIES: - ably_flutter_plugin (from `.symlinks/plugins/ably_flutter_plugin/ios`) - Flutter (from `Flutter`) + - streams_channel (from `.symlinks/plugins/streams_channel/ios`) SPEC REPOS: trunk: @@ -95,6 +98,8 @@ EXTERNAL SOURCES: :path: ".symlinks/plugins/ably_flutter_plugin/ios" Flutter: :path: Flutter + streams_channel: + :path: ".symlinks/plugins/streams_channel/ios" SPEC CHECKSUMS: Ably: b8dd747b116787f0759844dc1a045046ef9a2c7a @@ -104,6 +109,7 @@ SPEC CHECKSUMS: msgpack: a14de9216d29cfd0a7aff5af5150601a27e899a4 SAMKeychain: 483e1c9f32984d50ca961e26818a534283b4cd5c SocketRocketAblyFork: ed717517ed5cb5217878987c84bc415890582bb3 + streams_channel: 16855ffd0568eded9563b2fec6a1bfa94420b353 ULID: b4714891a02819364faecd574a53e391c4c6de9d PODFILE CHECKSUM: 49ec7d4076524b7e225c38b98147173651ac4b9d diff --git a/example/lib/main.dart b/example/lib/main.dart index fc584fb0d..c471f4e57 100644 --- a/example/lib/main.dart +++ b/example/lib/main.dart @@ -29,6 +29,7 @@ class _MyAppState extends State { ably.Ably _ablyPlugin; ably.Realtime _realtime; ably.Rest _rest; + ably.ConnectionState _realtimeConnectionState; @override void initState() { @@ -73,7 +74,7 @@ class _MyAppState extends State { void provisionAbly() async { setState(() { _provisioningState = OpState.InProgress; }); - + provisioning.AppKey appKey; try { appKey = await provisioning.provision('sandbox-'); @@ -82,7 +83,7 @@ class _MyAppState extends State { setState(() { _provisioningState = OpState.Failed; }); return; } - + setState(() { _appKey = appKey; _provisioningState = OpState.Succeeded; @@ -101,7 +102,7 @@ class _MyAppState extends State { ably.Rest rest; try{ - rest = await _ablyPlugin.createRest(clientOptions); + rest = await _ablyPlugin.createRest(options: clientOptions); } catch (error) { print('Error creating Ably Rest: ${error}'); setState(() { _restCreationState = OpState.Failed; }); @@ -117,8 +118,9 @@ class _MyAppState extends State { dynamic data = "Flutter"; print('publishing messages... name "$name", message "$data"'); try { - await rest.channels.get('test').publish(name, data); - await rest.channels.get('test').publish(name); + await rest.channels.get('test').publish(name: name, data: data); + await rest.channels.get('test').publish(name: name); + await rest.channels.get('test').publish(data: data); await rest.channels.get('test').publish(); } on ably.AblyException catch(e) { print(e.errorInfo); @@ -131,7 +133,7 @@ class _MyAppState extends State { final clientOptions = ably.ClientOptions.fromKey(_appKey.toString()); clientOptions.environment = 'sandbox'; - clientOptions.logLevel = ably.LogLevel.verbose; + clientOptions.logLevel = ably.LogLevel.error; clientOptions.logHandler = ({String msg, ably.AblyException exception}){ print("Custom logger :: $msg $exception"); }; @@ -139,23 +141,69 @@ class _MyAppState extends State { ably.Realtime realtime; try { - realtime = await _ablyPlugin.createRealtime(clientOptions); + realtime = await _ablyPlugin.createRealtime(options: clientOptions); + + //One can listen from multiple listeners on the same event, + // and must cancel each subscription one by one + //RETAINING LISTENER - α + realtime.connection.on().listen((ably.ConnectionStateChange stateChange) async { + print('RETAINING LISTENER α :: Change event arrived!: ${stateChange.event}'); + setState(() { _realtimeConnectionState = stateChange.current; }); + }); + + //DISPOSE ON CONNECTED + Stream stream = realtime.connection.on(); + StreamSubscription subscription; + subscription = stream.listen((ably.ConnectionStateChange stateChange) async { + print('DISPOSABLE LISTENER ω :: Change event arrived!: ${stateChange.event}'); + if (stateChange.event == ably.ConnectionEvent.connected) { + await subscription.cancel(); + } + }); + + //RETAINING LISTENER - β + realtime.connection.on().listen((ably.ConnectionStateChange stateChange) async { + print('RETAINING LISTENER β :: Change event arrived!: ${stateChange.event}'); + //NESTED LISTENER - ξ + realtime.connection.on().listen((ably.ConnectionStateChange stateChangeX) async { + //k ξ listeners will be registered and each listener will be called `n-k` times respectively if listener β is called `n` times + print('NESTED LISTENER ξ: ${stateChangeX.event}'); + }); + }); + + StreamSubscription preZetaSubscription; + StreamSubscription postZetaSubscription; + preZetaSubscription = realtime.connection.on().listen((ably.ConnectionStateChange stateChangeX) async { + //This listener "pre ζ" will be cancelled from γ + print('NESTED LISTENER "pre ζ": ${stateChangeX.event}'); + }); + + + //RETAINING LISTENER - γ + realtime.connection.on().listen((ably.ConnectionStateChange stateChange) async { + print('RETAINING LISTENER γ :: Change event arrived!: ${stateChange.event}'); + if (stateChange.event == ably.ConnectionEvent.connected) { + await preZetaSubscription.cancel(); //by the time this cancel is triggered, preZeta will already have received current event. + await postZetaSubscription.cancel(); //by the time this cancel is triggered, postZeta hasn't received the event yet. And will never receive as it is cancelled. + } + }); + + postZetaSubscription = realtime.connection.on().listen((ably.ConnectionStateChange stateChangeX) async { + //This listener "post ζ" will be cancelled from γ + print('NESTED LISTENER "post ζ": ${stateChangeX.event}'); + }); + + setState(() { + _realtime = realtime; + _realtimeCreationState = OpState.Succeeded; + }); + } catch (error) { print('Error creating Ably Realtime: ${error}'); setState(() { _realtimeCreationState = OpState.Failed; }); - return; + rethrow; } - final listener = await realtime.connection.createListener(); - - setState(() { - _realtime = realtime; - _realtimeCreationState = OpState.Succeeded; - }); - - print('Awaiting one event...'); - final event = await listener.once(); - print('The one event arrived: $event'); } // https://github.com/dart-lang/sdk/issues/37498 @@ -193,20 +241,24 @@ class _MyAppState extends State { Widget createRestButton() => button(_restCreationState, createAblyRest, 'Create Ably Rest', 'Create Ably Rest', 'Ably Rest Created'); Widget createRealtimeButton() => button(_realtimeCreationState, createAblyRealtime, 'Create Ably Realtime', 'Creating Ably Realtime', 'Ably Realtime Created'); - Widget createConnectButton() => FlatButton( - onPressed: () async { - print('Calling connect...'); - await _realtime.connect(); - print('Connect call completed.'); - }, + Widget createRTCConnectButton() => FlatButton( + onPressed: _realtime?.connect, child: Text('Connect'), ); + Widget createRTCloseButton() => FlatButton( + onPressed: _realtime?.close, + child: Text('Close Connection'), + ); + int msgCounter = 0; Widget sendRestMessage() => FlatButton( onPressed: () async { print('Sendimg rest message...'); - await _rest.channels.get('test').publish('Hello', 'Flutter ${++msgCounter}'); + await _rest.channels.get('test').publish( + name: 'Hello', + data: 'Flutter ${++msgCounter}' + ); print('Rest message sent.'); setState(() {}); }, @@ -216,7 +268,6 @@ class _MyAppState extends State { @override Widget build(BuildContext context) { - print('widget build'); return MaterialApp( home: Scaffold( appBar: AppBar( @@ -234,7 +285,9 @@ class _MyAppState extends State { Divider(), createRealtimeButton(), Text('Realtime: ' + ((_realtime == null) ? 'Ably Realtime not created yet.' : _realtime.toString())), - createConnectButton(), + Text('Connection Status: $_realtimeConnectionState'), + createRTCConnectButton(), + createRTCloseButton(), Divider(), createRestButton(), Text('Rest: ' + ((_rest == null) ? 'Ably Rest not created yet.' : _rest.toString())), diff --git a/example/pubspec.lock b/example/pubspec.lock index 1980d34b6..996de5cc9 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -170,6 +170,13 @@ packages: url: "https://pub.dartlang.org" source: hosted version: "2.0.0" + streams_channel: + dependency: transitive + description: + name: streams_channel + url: "https://pub.dartlang.org" + source: hosted + version: "0.3.0" string_scanner: dependency: transitive description: diff --git a/ios/Classes/AblyFlutter.h b/ios/Classes/AblyFlutter.h index 6297e30e6..e158dba1d 100644 --- a/ios/Classes/AblyFlutter.h +++ b/ios/Classes/AblyFlutter.h @@ -1,8 +1,8 @@ @import Foundation; @class ARTRest; +@class ARTRealtime; @class ARTClientOptions; -@class AblyFlutterSurfaceRealtime; NS_ASSUME_NONNULL_BEGIN @@ -14,7 +14,7 @@ NS_ASSUME_NONNULL_BEGIN -(NSNumber *)createRealtimeWithOptions:(ARTClientOptions *)options; --(nullable AblyFlutterSurfaceRealtime *)realtimeWithHandle:(NSNumber *)handle; +-(nullable ARTRealtime *)realtimeWithHandle:(NSNumber *)handle; /** This method must be called from the main dispatch queue. It may only be called diff --git a/ios/Classes/AblyFlutter.m b/ios/Classes/AblyFlutter.m index c66e2d953..e8f070fbc 100644 --- a/ios/Classes/AblyFlutter.m +++ b/ios/Classes/AblyFlutter.m @@ -4,11 +4,10 @@ // @import Ably; #import "Ably.h" -#import "AblyFlutterSurfaceRealtime.h" @implementation AblyFlutter { BOOL _disposed; - NSMutableDictionary* _realtimeInstances; + NSMutableDictionary* _realtimeInstances; NSMutableDictionary* _restInstances; long long _nextHandle; } @@ -57,13 +56,12 @@ -(NSNumber *)createRealtimeWithOptions:(ARTClientOptions *const)options { ASSERT_NOT_DISPOSED ARTRealtime *const instance = [[ARTRealtime alloc] initWithOptions:options]; - AblyFlutterSurfaceRealtime *const realtime = [[AblyFlutterSurfaceRealtime alloc] initWithInstance:instance]; NSNumber *const handle = @(_nextHandle++); - [_realtimeInstances setObject:realtime forKey:handle]; + [_realtimeInstances setObject:instance forKey:handle]; return handle; } --(AblyFlutterSurfaceRealtime *)realtimeWithHandle:(NSNumber *const)handle { +-(ARTRealtime *)realtimeWithHandle:(NSNumber *const)handle { return [_realtimeInstances objectForKey:handle]; } diff --git a/ios/Classes/AblyFlutterPlugin.h b/ios/Classes/AblyFlutterPlugin.h index 318c1ba37..b4df808a9 100644 --- a/ios/Classes/AblyFlutterPlugin.h +++ b/ios/Classes/AblyFlutterPlugin.h @@ -1,4 +1,5 @@ #import +#import "AblyFlutter.h" NS_ASSUME_NONNULL_BEGIN @@ -7,6 +8,8 @@ NS_ASSUME_NONNULL_BEGIN +(instancetype)new NS_UNAVAILABLE; +(instancetype)init NS_UNAVAILABLE; +- (AblyFlutter *)ablyWithHandle:(NSNumber *)handle; + @end NS_ASSUME_NONNULL_END diff --git a/ios/Classes/AblyFlutterPlugin.m b/ios/Classes/AblyFlutterPlugin.m index 623469364..037cf235f 100644 --- a/ios/Classes/AblyFlutterPlugin.m +++ b/ios/Classes/AblyFlutterPlugin.m @@ -7,7 +7,8 @@ #import "codec/AblyFlutterReaderWriter.h" #import "AblyFlutterMessage.h" #import "AblyFlutter.h" -#import "AblyFlutterSurfaceRealtime.h" +#import "AblyFlutterStreamHandler.h" +#import "FlutterStreamsChannel.h" #define LOG(fmt, ...) NSLog((@"%@:%d " fmt), [[NSString stringWithUTF8String:__FILE__] lastPathComponent], __LINE__, ##__VA_ARGS__) @@ -88,7 +89,16 @@ -(nullable AblyFlutter *)ablyWithHandle:(NSNumber *)handle; // TODO if ably is nil here then an error response, perhaps? or allow Dart side to understand null response? NSNumber *const handle = message.message; [[ably realtimeWithHandle:handle] connect]; - result(nil); // success with void response + result(nil); +}; + +static FlutterHandler _closeRealtime = ^void(AblyFlutterPlugin *const plugin, FlutterMethodCall *const call, const FlutterResult result) { + AblyFlutterMessage *const message = call.arguments; + LOG(@"message for handle %@", message.handle); + AblyFlutter *const ably = [plugin ablyWithHandle:message.handle]; + NSNumber *const handle = message.message; + [[ably realtimeWithHandle:handle] close]; + result(nil); }; static FlutterHandler _dispose = ^void(AblyFlutterPlugin *const plugin, FlutterMethodCall *const call, const FlutterResult result) { @@ -106,14 +116,20 @@ @implementation AblyFlutterPlugin { +(void)registerWithRegistrar:(NSObject*)registrar { LOG(@"registrar: %@", [registrar class]); FlutterStandardReaderWriter *const readerWriter = [AblyFlutterReaderWriter new]; - FlutterStandardMethodCodec *const methodCodec = - [FlutterStandardMethodCodec codecWithReaderWriter:readerWriter]; + FlutterStandardMethodCodec *const methodCodec = [FlutterStandardMethodCodec codecWithReaderWriter:readerWriter]; + FlutterMethodChannel *const channel = - [FlutterMethodChannel methodChannelWithName:@"ably_flutter_plugin" - binaryMessenger:[registrar messenger] - codec:methodCodec]; - AblyFlutterPlugin *const plugin = [[AblyFlutterPlugin alloc] initWithChannel:channel]; + [FlutterMethodChannel methodChannelWithName:@"io.ably.flutter.plugin" binaryMessenger:[registrar messenger] codec:methodCodec]; + AblyFlutterPlugin *const plugin = + [[AblyFlutterPlugin alloc] initWithChannel:channel]; + [registrar addMethodCallDelegate:plugin channel:channel]; + + FlutterStreamsChannel *const eventChannel = [FlutterStreamsChannel streamsChannelWithName:@"io.ably.flutter.stream" binaryMessenger:registrar.messenger codec: methodCodec]; + [eventChannel setStreamHandlerFactory:^NSObject *(id arguments) { + return [[AblyFlutterStreamHandler alloc] initWithAbly: plugin]; + }]; + } -(instancetype)initWithChannel:(FlutterMethodChannel *const)channel { @@ -130,6 +146,7 @@ -(instancetype)initWithChannel:(FlutterMethodChannel *const)channel { @"publish": _publishRestMessage, @"createRealtimeWithOptions": _createRealtimeWithOptions, @"connectRealtime": _connectRealtime, + @"closeRealtime": _closeRealtime, @"dispose": _dispose, }; diff --git a/ios/Classes/AblyFlutterStreamHandler.h b/ios/Classes/AblyFlutterStreamHandler.h new file mode 100644 index 000000000..faabc6468 --- /dev/null +++ b/ios/Classes/AblyFlutterStreamHandler.h @@ -0,0 +1,20 @@ +@import Foundation; +@import Flutter; +#import "AblyFlutterPlugin.h" + +NS_ASSUME_NONNULL_BEGIN + +@interface AblyFlutterStreamHandler : NSObject + ++(instancetype)new NS_UNAVAILABLE; +-(instancetype)init NS_UNAVAILABLE; + +-(instancetype)initWithAbly:(AblyFlutterPlugin *const)plugin NS_DESIGNATED_INITIALIZER; + +@property(nonatomic, readonly) AblyFlutterPlugin * plugin; + +- (nullable FlutterError *)onListenWithArguments:(nullable id)arguments eventSink:(FlutterEventSink)eventSink; +- (nullable FlutterError *)onCancelWithArguments:(nullable id)arguments; +@end + +NS_ASSUME_NONNULL_END diff --git a/ios/Classes/AblyFlutterStreamHandler.m b/ios/Classes/AblyFlutterStreamHandler.m new file mode 100644 index 000000000..bc1236de9 --- /dev/null +++ b/ios/Classes/AblyFlutterStreamHandler.m @@ -0,0 +1,54 @@ +#import "AblyFlutterStreamHandler.h" +#import "AblyFlutterPlugin.h" +#import "AblyFlutterMessage.h" +#import "ARTRealtime.h" + +@implementation AblyFlutterStreamHandler{ + ARTEventListener *listener; +} + +@synthesize plugin = _plugin; + +- (instancetype)initWithAbly:(AblyFlutterPlugin *const)plugin { + _plugin = plugin; + listener = [ARTEventListener new]; + return self; +} + +- (nullable FlutterError *)onListenWithArguments:(nullable id)arguments eventSink:(FlutterEventSink)eventSink { + [self startListening:arguments emitter:eventSink]; + return nil; +} + +- (nullable FlutterError *)onCancelWithArguments:(nullable id)arguments { + [self cancelListening:arguments]; + return nil; +} + +- (void) startListening:(AblyFlutterMessage *const)message emitter:(FlutterEventSink)emitter { + AblyFlutter *const ably = [_plugin ablyWithHandle: message.handle]; + AblyFlutterMessage *const _message = message.message; + NSString *const eventName = _message.message; + + if([@"realtime_onConnectionStateChanged" isEqual: eventName]) { + listener = [[ably realtimeWithHandle: message.handle].connection on: ^(ARTConnectionStateChange * const stateChange) { + emitter(stateChange); + }]; + } else if([@"realtime_onChannelStateChanged" isEqual: eventName]) { + + } +} + +- (void) cancelListening:(AblyFlutterMessage *const)message { + AblyFlutter *const ably = [_plugin ablyWithHandle: message.handle]; + AblyFlutterMessage *const _message = message.message; + NSString *const eventName = _message.message; + + if([@"realtime_onConnectionStateChanged" isEqual: eventName]) { + [[ably realtimeWithHandle: message.handle].connection off: listener]; + } else if([@"realtime_onChannelStateChanged" isEqual: eventName]) { + + } +} + +@end diff --git a/ios/Classes/codec/AblyFlutterReader.h b/ios/Classes/codec/AblyFlutterReader.h index 45cff704b..20c0e4676 100644 --- a/ios/Classes/codec/AblyFlutterReader.h +++ b/ios/Classes/codec/AblyFlutterReader.h @@ -10,8 +10,19 @@ NS_ASSUME_NONNULL_END typedef NS_ENUM(UInt8, _Value) { - _valueClientOptions = 128, - _valueTokenDetails = 129, + //Ably flutter plugin protocol message + _valueAblyMessage = 128, + + //Other ably objects + _valueClientOptions = 129, + _valueTokenDetails = 130, _ValueErrorInfo = 144, - _valueAblyMessage = 255, + + //Events + _connectionEvent = 201, + _connectionState = 202, + _connectionStateChange = 203, + _channelEvent = 204, + _channelState = 205, + _channelStateChange = 206, }; diff --git a/ios/Classes/codec/AblyFlutterWriter.m b/ios/Classes/codec/AblyFlutterWriter.m index a43dc1d63..b0f1e502d 100644 --- a/ios/Classes/codec/AblyFlutterWriter.m +++ b/ios/Classes/codec/AblyFlutterWriter.m @@ -1,5 +1,6 @@ #import "AblyFlutterWriter.h" #import "Ably.h" +#import "ARTTypes.h" #import "AblyFlutterMessage.h" #import "AblyFlutterReader.h" @@ -8,15 +9,25 @@ @implementation AblyFlutterWriter - (void)writeValue:(id)value { if([value isKindOfClass:[ARTErrorInfo class]]){ - [self writeByte:_ValueErrorInfo]; [self writeErrorInfo: value]; return; + }else if([value isKindOfClass:[ARTConnectionStateChange class]]){ + [self writeConnectionStateChange: value]; + return; + }else if([value isKindOfClass:[ARTChannelStateChange class]]){ + [self writeConnectionStateChange: value]; + return; } - [super writeValue:value]; } -- (void) writeErrorInfo:(ARTErrorInfo *) e{ +- (void) writeEnum:(UInt8) type enumValue: (int const) enumValue{ + [self writeByte:type]; + [self writeValue: [NSNumber numberWithInt:enumValue]]; +} + +- (void) writeErrorInfo:(ARTErrorInfo *const) e{ + [self writeByte:_ValueErrorInfo]; [self writeValue: nil]; //code - not available in ably-cocoa [self writeValue: [e message]]; [self writeValue: @([e statusCode])]; @@ -25,4 +36,27 @@ - (void) writeErrorInfo:(ARTErrorInfo *) e{ [self writeValue: nil]; //cause - not available in ably-java } +- (void) writeConnectState:(ARTRealtimeConnectionState const) state{ [self writeEnum:_connectionState enumValue:state]; } +- (void) writeConnectEvent:(ARTRealtimeConnectionEvent const) event{ [self writeEnum:_connectionEvent enumValue:event]; } +- (void) writeConnectionStateChange:(ARTConnectionStateChange *const) stateChange{ + [self writeByte:_connectionStateChange]; + [self writeConnectState: [stateChange current]]; + [self writeConnectState: [stateChange previous]]; + [self writeConnectEvent: [stateChange event]]; + [self writeValue: @((int)([stateChange retryIn] * 1000))]; + [self writeValue: [stateChange reason]]; +} + +- (void) writeChannelState:(ARTRealtimeChannelState const) state{ [self writeEnum:_channelState enumValue:state]; } +- (void) writeChannelEvent:(ARTChannelEvent const) event{ [self writeEnum:_channelEvent enumValue:event]; } +- (void) writeChannelStateChange:(ARTChannelStateChange *const) stateChange{ + [self writeByte:_channelStateChange]; + [self writeChannelState: [stateChange current]]; + [self writeChannelState: [stateChange previous]]; + [self writeChannelEvent: [stateChange event]]; + [self writeValue: @([stateChange resumed])]; + [self writeValue: [stateChange reason]]; +} + + @end diff --git a/lib/src/ably_implementation.dart b/lib/src/ably_implementation.dart index daf73a58e..fdf045860 100644 --- a/lib/src/ably_implementation.dart +++ b/lib/src/ably_implementation.dart @@ -4,10 +4,11 @@ import 'package:ably_flutter_plugin/src/impl/message.dart'; import 'package:ably_flutter_plugin/src/interface.dart'; import 'package:flutter/services.dart'; +import 'package:streams_channel/streams_channel.dart'; import '../ably.dart'; import 'codec.dart'; import 'impl/platform_object.dart'; -import 'impl/realtime.dart'; +import 'impl/realtime/realtime.dart'; import 'impl/rest/rest.dart'; ///Extension to extract string name from PlatformMethod @@ -25,6 +26,9 @@ class AblyImplementation implements Ably { /// instance of method channel to interact with android/ios code final MethodChannel methodChannel; + /// instance of method channel to listen to android/ios events + final StreamsChannel streamsChannel; + /// Storing all platform objects, for easy references/cleanup final List _platformObjects = []; @@ -36,11 +40,14 @@ class AblyImplementation implements Ably { factory AblyImplementation() { /// Uses our custom message codec so that we can pass Ably types to the /// platform implementations. - final methodChannel = MethodChannel('ably_flutter_plugin', StandardMethodCodec(Codec())); - return AblyImplementation._constructor(methodChannel); + StandardMethodCodec codec = StandardMethodCodec(Codec()); + return AblyImplementation._constructor( + MethodChannel('io.ably.flutter.plugin', codec), + StreamsChannel('io.ably.flutter.stream', codec) + ); } - AblyImplementation._constructor(this.methodChannel); + AblyImplementation._constructor(this.methodChannel, this.streamsChannel); /// Registering instance with ably. /// On registration, older ably instance id destroyed! @@ -48,40 +55,46 @@ class AblyImplementation implements Ably { Future _register() async => (null != _handle) ? _handle : _handle = await methodChannel.invokeMethod(PlatformMethod.register.toName()); @override - Future createRealtime(final ClientOptions options) async { + Future createRealtime({ + ClientOptions options, + final String key + }) async { // TODO options.authCallback // TODO options.logHandler final handle = await _register(); final message = AblyMessage(handle, options); final r = RealtimePlatformObject( handle, - methodChannel, + this, await methodChannel.invokeMethod( PlatformMethod.createRealtimeWithOptions.toName(), message - ) + ), + options: options, + key: key ); _platformObjects.add(r); return r; } @override - Future createRestWithKey(final String key) async => createRest(ClientOptions.fromKey(key)); - - @override - Future createRest(final ClientOptions options) async { + Future createRest({ + ClientOptions options, + final String key + }) async { // TODO options.authCallback // TODO options.logHandler final handle = await _register(); final message = AblyMessage(handle, options); - final r = RestPlatformObject.fromOptions( + final r = RestPlatformObject( handle, - methodChannel, + this, await methodChannel.invokeMethod( PlatformMethod.createRestWithOptions.toName(), message ), - options + options: options, + key: key ); _platformObjects.add(r); return r; diff --git a/lib/src/codec.dart b/lib/src/codec.dart index 6ca8812e1..116eb5f0b 100644 --- a/lib/src/codec.dart +++ b/lib/src/codec.dart @@ -5,21 +5,21 @@ import 'package:flutter/services.dart'; import '../ably.dart'; -typedef CodecEncoder(final WriteBuffer buffer, final dynamic value); +typedef CodecEncoder(final WriteBuffer buffer, final T value); typedef T CodecDecoder(ReadBuffer buffer); class CodecPair{ - final Function encoder; - final CodecDecoder decoder; + final CodecEncoder encoder; + final CodecDecoder decoder; CodecPair(this.encoder, this.decoder); encode(final WriteBuffer buffer, final dynamic value){ - if(this.encoder==null) throw AblyException("Codec encoder not defined"); - return this.encoder(buffer, value); + if (this.encoder==null) throw AblyException("Codec encoder not defined"); + return this.encoder(buffer, value as T); } - decode(ReadBuffer buffer){ - if(this.decoder==null) throw AblyException("Codec decoder not defined"); + T decode(ReadBuffer buffer){ + if (this.decoder==null) throw AblyException("Codec decoder not defined"); return this.decoder(buffer); } @@ -36,19 +36,44 @@ class Codec extends StandardMessageCodec { // by a subtype value - perhaps of a wider type. // // https://api.flutter.dev/flutter/services/StandardMessageCodec/writeValue.html - static const _valueClientOptions = 128; - static const _valueTokenDetails = 129; + + //Ably flutter plugin protocol message + static const _valueAblyMessage = 128; + + //Other ably objects + static const _valueClientOptions = 129; + static const _valueTokenDetails = 130; static const _valueErrorInfo = 144; - static const _valueAblyMessage = 255; + + // Events + static const _connectionEvent = 201; + static const _connectionState = 202; + static const _connectionStateChange = 203; + static const _channelEvent = 204; + static const _channelState = 205; + static const _channelStateChange = 206; Map codecMap; Codec():super(){ this.codecMap = { + //Ably flutter plugin protocol message + _valueAblyMessage: CodecPair(encodeAblyMessage, decodeAblyMessage), + + //Other ably objects _valueClientOptions: CodecPair(encodeClientOptions, decodeClientOptions), _valueTokenDetails: CodecPair(encodeTokenDetails, decodeTokenDetails), _valueErrorInfo: CodecPair(null, decodeErrorInfo), - _valueAblyMessage: CodecPair(encodeAblyMessage, decodeAblyMessage), + + //Events - Connection + _connectionEvent: CodecPair(null, decodeConnectionEvent), + _connectionState: CodecPair(null, decodeConnectionState), + _connectionStateChange: CodecPair(null, decodeConnectionStateChange), + + //Events - Channel + _channelEvent: CodecPair(null, decodeChannelEvent), + _channelState: CodecPair(null, decodeChannelState), + _channelStateChange: CodecPair(null, decodeChannelStateChange), }; } @@ -67,9 +92,9 @@ class Codec extends StandardMessageCodec { @override void writeValue (final WriteBuffer buffer, final dynamic value) { int type = getCodecType(value); - if(type==null){ + if (type==null) { super.writeValue(buffer, value); - }else { + } else { buffer.putUint8(type); codecMap[type].encode(buffer, value); } @@ -77,16 +102,15 @@ class Codec extends StandardMessageCodec { dynamic readValueOfType(int type, ReadBuffer buffer) { CodecPair pair = codecMap[type]; - if(pair==null){ + if (pair==null) { return super.readValueOfType(type, buffer); - }else{ + } else { return pair.decode(buffer); } } - encodeClientOptions(final WriteBuffer buffer, final dynamic value){ - final ClientOptions v = value; - + // =========== ENCODERS =========== + encodeClientOptions(final WriteBuffer buffer, final ClientOptions v){ // AuthOptions (super class of ClientOptions) writeValue(buffer, v.authUrl); writeValue(buffer, v.authMethod); @@ -125,8 +149,7 @@ class Codec extends StandardMessageCodec { writeValue(buffer, v.transportParams); } - encodeTokenDetails(final WriteBuffer buffer, final dynamic value){ - final TokenDetails v = value; + encodeTokenDetails(final WriteBuffer buffer, final TokenDetails v){ writeValue(buffer, v.token); writeValue(buffer, v.expires); writeValue(buffer, v.issued); @@ -134,8 +157,7 @@ class Codec extends StandardMessageCodec { writeValue(buffer, v.clientId); } - encodeAblyMessage(final WriteBuffer buffer, final dynamic value){ - final AblyMessage v = value; + encodeAblyMessage(final WriteBuffer buffer, final AblyMessage v){ writeValue(buffer, v.registrationHandle); writeValue(buffer, v.message); } @@ -211,4 +233,27 @@ class Codec extends StandardMessageCodec { ); } + ConnectionEvent decodeConnectionEvent(ReadBuffer buffer) => ConnectionEvent.values[readValue(buffer) as int]; + ConnectionState decodeConnectionState(ReadBuffer buffer) => ConnectionState.values[readValue(buffer) as int]; + ChannelEvent decodeChannelEvent(ReadBuffer buffer) => ChannelEvent.values[readValue(buffer) as int]; + ChannelState decodeChannelState(ReadBuffer buffer) => ChannelState.values[readValue(buffer) as int]; + + ConnectionStateChange decodeConnectionStateChange(ReadBuffer buffer){ + ConnectionState current = readValue(buffer) as ConnectionState; + ConnectionState previous = readValue(buffer) as ConnectionState; + ConnectionEvent event = readValue(buffer) as ConnectionEvent; + int retryIn = readValue(buffer) as int; + ErrorInfo reason = readValue(buffer) as ErrorInfo; + return ConnectionStateChange(current, previous, event, retryIn: retryIn, reason: reason); + } + + ChannelStateChange decodeChannelStateChange(ReadBuffer buffer){ + ChannelState current = readValue(buffer) as ChannelState; + ChannelState previous = readValue(buffer) as ChannelState; + ChannelEvent event = readValue(buffer) as ChannelEvent; + bool resumed = readValue(buffer) as bool; + ErrorInfo reason = readValue(buffer) as ErrorInfo; + return ChannelStateChange(current, previous, event, resumed: resumed, reason: reason); + } + } diff --git a/lib/src/impl/connection.dart b/lib/src/impl/connection.dart deleted file mode 100644 index 83744f177..000000000 --- a/lib/src/impl/connection.dart +++ /dev/null @@ -1,53 +0,0 @@ -import 'platform_object.dart'; -import '../spec/spec.dart' show Connection, ConnectionEvent, EventListener, ErrorInfo, ConnectionState; -import 'event_listener.dart'; - - -class ConnectionIndirectPlatformObject extends IndirectPlatformObject implements Connection { - ConnectionIndirectPlatformObject(PlatformObject provider) : super(provider, IndirectPlatformObject.connection); - - @override - Future> createListener() async { - final handle = await provider.invoke(PlatformMethod.createListener, type); - return ConnectionListenerPlatformObject(provider.ablyHandle, provider.methodChannel, handle); - } - - @override - Future off() async { - await provider.invoke(PlatformMethod.eventsOff, type); - } - - @override - ErrorInfo errorReason; - - @override - String id; - - @override - String key; - - @override - String recoveryKey; - - @override - int serial; - - @override - ConnectionState state; - - @override - void close() { - // TODO: implement close - } - - @override - void connect() { - // TODO: implement connect - } - - @override - Future ping() { - // TODO: implement ping - return null; - } -} \ No newline at end of file diff --git a/lib/src/impl/event_listener.dart b/lib/src/impl/event_listener.dart deleted file mode 100644 index 1f33f0d1d..000000000 --- a/lib/src/impl/event_listener.dart +++ /dev/null @@ -1,43 +0,0 @@ - import 'platform_object.dart'; - import '../spec/spec.dart' show ConnectionEvent, EventListener; - import 'package:flutter/services.dart'; - - -class ConnectionListenerPlatformObject extends PlatformObject implements EventListener { - ConnectionListenerPlatformObject(int ablyHandle, MethodChannel methodChannel, int handle) - : super(ablyHandle, methodChannel, handle); - - @override - Future off() async { - await invoke(PlatformMethod.eventsOff); - } - - @override - Stream on([ConnectionEvent event]) async* { - // Based on: - // https://medium.com/flutter/flutter-platform-channels-ce7f540a104e#03ed - // https://dart.dev/tutorials/language/streams#transform-function - // TODO do we need to send a message to register first? - final stream = EventChannel('com.ably/$handle').receiveBroadcastStream(); - await for (final event in stream) { - yield event; - } - } - - @override - Future once([ConnectionEvent event]) async { - final result = await invoke(PlatformMethod.eventOnce, event); - switch (result) { - case 'initialized': return ConnectionEvent.initialized; - case 'connecting': return ConnectionEvent.connecting; - case 'connected': return ConnectionEvent.connected; - case 'disconnected': return ConnectionEvent.disconnected; - case 'suspended': return ConnectionEvent.suspended; - case 'closing': return ConnectionEvent.closing; - case 'closed': return ConnectionEvent.closed; - case 'failed': return ConnectionEvent.failed; - case 'update': return ConnectionEvent.update; - } - throw('Unhandled result "$result".'); - } -} \ No newline at end of file diff --git a/lib/src/impl/platform_object.dart b/lib/src/impl/platform_object.dart index ffb18a60f..4d855b96f 100644 --- a/lib/src/impl/platform_object.dart +++ b/lib/src/impl/platform_object.dart @@ -1,5 +1,7 @@ +import 'package:ably_flutter_plugin/src/ably_implementation.dart'; import 'package:ably_flutter_plugin/src/impl/message.dart'; import 'package:flutter/services.dart'; +import 'package:streams_channel/streams_channel.dart'; /// A method with a corresponding handler in platform code. @@ -15,15 +17,11 @@ enum PlatformMethod { /// Realtime createRealtimeWithOptions, connectRealtime, + closeRealtime, - /// Create an event listener. Called against a platform object (e.g. Realtime) with - /// the argument being the type of indirect platform object against which the - /// listener is to be created (e.g. Connection). - createListener, - - eventsOff, - eventsOn, - eventOnce, + ///Realtime events + realtime_onConnectionStateChanged, + realtime_onChannelStateChanged, } ///Extension to extract string name from PlatformMethod @@ -40,18 +38,19 @@ extension on PlatformMethod { /// implementation. abstract class PlatformObject { final int _ablyHandle; - final MethodChannel _methodChannel; + final AblyImplementation _ablyPlugin; final int _handle; - PlatformObject(this._ablyHandle, this._methodChannel, this._handle); + PlatformObject(this._ablyHandle, this._ablyPlugin, this._handle); @override String toString() => 'Ably Platform Object $_handle'; get handle => _handle; get ablyHandle => _ablyHandle; - get methodChannel => _methodChannel; + MethodChannel get methodChannel => _ablyPlugin.methodChannel; + StreamsChannel get eventChannel => _ablyPlugin.streamsChannel; static Future dispose() async { //TODO implement or convert to abstract! @@ -63,28 +62,11 @@ abstract class PlatformObject { final message = (null != argument) ? AblyMessage(_ablyHandle, AblyMessage(_handle, argument)) : AblyMessage(_ablyHandle, _handle); - return await _methodChannel.invokeMethod(method.toName(), message); + return await methodChannel.invokeMethod(method.toName(), message); } -} -/// An object which has a live counterpart in the Platform client library SDK, -/// where that live counterpart is only ever accessed by the plugin implementation -/// by reading a property on another platform object on demand. -abstract class IndirectPlatformObject { - // Ideally the constant value for connection would be grouped or typed more strongly. - // Possible approaches, albeit impossible (for now) with dart... - // 1) Dart enums are not as feature rich as other languages: - // https://github.com/dart-lang/language/issues/158 - // 2) The concept of 'type branding' might help but that's also not yet a thing: - // https://github.com/dart-lang/sdk/issues/2626#issuecomment-464638272 - static final int connection = 1; - - final PlatformObject _provider; - final int _type; - - IndirectPlatformObject(this._provider, this._type); - - PlatformObject get provider => _provider; - int get type => _type; + Stream listen(final PlatformMethod method){ + return eventChannel.receiveBroadcastStream(AblyMessage(_ablyHandle, AblyMessage(_handle, method.toName()))); + } } diff --git a/lib/src/impl/realtime.dart b/lib/src/impl/realtime.dart deleted file mode 100644 index df0b3515e..000000000 --- a/lib/src/impl/realtime.dart +++ /dev/null @@ -1,71 +0,0 @@ -import 'dart:async'; -import 'connection.dart'; -import 'package:flutter/services.dart'; -import '../../ably.dart'; -import '../spec/spec.dart' as spec; -import 'platform_object.dart'; - - - -class RealtimePlatformObject extends PlatformObject implements spec.Realtime { - // The _connection instance keeps a reference to this platform object. - // Ideally _connection would be final, but that would need 'late final' which is coming. - // https://stackoverflow.com/questions/59449666/initialize-a-final-variable-with-this-in-dart#comment105082936_59450231 - ConnectionIndirectPlatformObject _connection; - - RealtimePlatformObject(int ablyHandle, MethodChannel methodChannel, int handle) - : super(ablyHandle, methodChannel, handle) { - _connection = ConnectionIndirectPlatformObject(this); - } - - @override - void close() { - // TODO: implement close - } - - @override - Future connect() async { - await invoke(PlatformMethod.connectRealtime); - } - - @override - Connection get connection => _connection; - - @override - Auth auth; - - @override - String clientId; - - @override - ClientOptions options; - - @override - Push push; - - @override - void set connection(Connection _connection) { - // TODO: implement connection - } - - @override - Future request({String method, String path, Map params, body, Map headers}) { - // TODO: implement request - return null; - } - - @override - Future> stats([Map params]) { - // TODO: implement stats - return null; - } - - @override - Future time() { - // TODO: implement time - return null; - } - - @override - RealtimeChannels channels; -} \ No newline at end of file diff --git a/lib/src/impl/realtime/channels.dart b/lib/src/impl/realtime/channels.dart new file mode 100644 index 000000000..00b486819 --- /dev/null +++ b/lib/src/impl/realtime/channels.dart @@ -0,0 +1,127 @@ +import 'dart:async'; + +import 'package:ably_flutter_plugin/ably.dart'; +import 'package:ably_flutter_plugin/src/spec/push/channels.dart'; +import 'package:ably_flutter_plugin/src/spec/spec.dart' as spec; +import 'package:flutter/services.dart'; +import '../platform_object.dart'; + + +class RealtimePlatformChannel extends PlatformObject implements spec.RealtimeChannel{ + + @override + spec.AblyBase ably; + + @override + String name; + + @override + spec.ChannelOptions options; + + @override + spec.RealtimePresence presence; + + RealtimePlatformChannel(int ablyHandle, Ably ablyPlugin, int restHandle, this.ably, this.name, this.options) + : super(ablyHandle, ablyPlugin, restHandle); + + @override + Future> history([spec.RealtimeHistoryParams params]) { + // TODO: implement history + return null; + } + + @override + Future publish({ + spec.Message message, + List messages, + String name, + dynamic data + }) async { + try { + await this.invoke(PlatformMethod.publish, { + //TODO support Message and List + "channel": this.name, + "name": name, + "message": data + }); + } on PlatformException catch (pe) { + throw spec.AblyException(pe.code, pe.message, pe.details); + } + } + + @override + spec.ErrorInfo errorReason; + + @override + List modes; + + @override + Map params; + + @override + PushChannel push; + + @override + spec.ChannelState state; + + @override + Future attach() { + // TODO: implement attach + return null; + } + + @override + Future detach() { + // TODO: implement detach + return null; + } + + @override + Future off() { + // TODO: implement off + return null; + } + + @override + void setOptions(spec.ChannelOptions options) { + // TODO: implement setOptions + } + + @override + Stream on([ChannelEvent state]) { + // TODO: implement on + Stream stream = listen(PlatformMethod.realtime_onChannelStateChanged); + if (state!=null) { + return stream.takeWhile((ChannelStateChange _stateChange) => _stateChange.event==state); + } + return stream; + } + + @override + Future subscribe({String event, List events, spec.EventListener listener}) { + // TODO: implement subscribe + return null; + } + + @override + void unsubscribe({String event, List events, spec.EventListener listener}) { + // TODO: implement unsubscribe + } + +} + + +class RealtimePlatformChannels extends spec.RealtimeChannels{ + + int ablyHandle; + Ably ablyPlugin; + int restHandle; + + RealtimePlatformChannels(this.ablyHandle, this.ablyPlugin, this.restHandle, spec.AblyBase ably): super(ably); + + @override + RealtimePlatformChannel createChannel(name, options){ + return RealtimePlatformChannel(ablyHandle, ablyPlugin, restHandle, ably, name, options); + } + +} \ No newline at end of file diff --git a/lib/src/impl/realtime/connection.dart b/lib/src/impl/realtime/connection.dart new file mode 100644 index 000000000..1ba7b8747 --- /dev/null +++ b/lib/src/impl/realtime/connection.dart @@ -0,0 +1,63 @@ +import 'dart:async'; + +import 'package:ably_flutter_plugin/ably.dart'; + +import '../platform_object.dart'; +import '../../spec/spec.dart' show Connection, ConnectionState, ErrorInfo; + + +class ConnectionPlatformObject extends PlatformObject implements Connection { + + ConnectionPlatformObject(int ablyHandle, Ably ablyPlugin, int realtimeHandle) + : super(ablyHandle, ablyPlugin, realtimeHandle); + + @override + ErrorInfo errorReason; + + @override + String id; + + @override + String key; + + @override + String recoveryKey; + + @override + int serial; + + @override + ConnectionState state; + + @override + Stream on([ConnectionEvent state]) { + Stream stream = listen(PlatformMethod.realtime_onConnectionStateChanged).transform( + StreamTransformer.fromHandlers( + handleData: (dynamic value, EventSink sink){ + sink.add(value as ConnectionStateChange); + } + ) + ); + if (state!=null) { + return stream.takeWhile((ConnectionStateChange _stateChange) => _stateChange.event==state); + } + return stream; + } + + @override + void close() { + // TODO: implement close + } + + @override + void connect() { + // TODO: implement connect + } + + @override + Future ping() { + // TODO: implement ping + return null; + } + +} \ No newline at end of file diff --git a/lib/src/impl/realtime/realtime.dart b/lib/src/impl/realtime/realtime.dart new file mode 100644 index 000000000..2ac79553c --- /dev/null +++ b/lib/src/impl/realtime/realtime.dart @@ -0,0 +1,62 @@ +import 'dart:async'; + +import '../../../ably.dart'; +import '../../spec/spec.dart' as spec; +import '../platform_object.dart'; +import 'connection.dart'; + + +class RealtimePlatformObject extends PlatformObject implements spec.Realtime { + + RealtimePlatformObject(int ablyHandle, Ably ablyPlugin, int handle, { + ClientOptions options, + final String key + }) + :assert(options!=null || key!=null), + connection = ConnectionPlatformObject(ablyHandle, ablyPlugin, handle), + super(ablyHandle, ablyPlugin, handle) { + this.options = (options==null)?ClientOptions.fromKey(key):options; + } + + @override + final Connection connection; + + @override + Auth auth; + + @override + String clientId; + + @override + ClientOptions options; + + @override + Push push; + + @override + RealtimeChannels channels; + + @override + Future close() async => await invoke(PlatformMethod.closeRealtime); + + @override + Future connect() async => await invoke(PlatformMethod.connectRealtime); + + @override + Future request({String method, String path, Map params, body, Map headers}) { + // TODO: implement request + return null; + } + + @override + Future> stats([Map params]) { + // TODO: implement stats + return null; + } + + @override + Future time() { + // TODO: implement time + return null; + } +} \ No newline at end of file diff --git a/lib/src/impl/rest/channels.dart b/lib/src/impl/rest/channels.dart index ade305586..ed7e62218 100644 --- a/lib/src/impl/rest/channels.dart +++ b/lib/src/impl/rest/channels.dart @@ -1,11 +1,12 @@ import 'dart:async'; +import 'package:ably_flutter_plugin/ably.dart'; import 'package:ably_flutter_plugin/src/spec/spec.dart' as spec; import 'package:flutter/services.dart'; import '../platform_object.dart'; -class RestPlatformChannel extends PlatformObject implements spec.Channel{ +class RestPlatformChannel extends PlatformObject implements spec.RestChannel{ @override spec.AblyBase ably; @@ -19,8 +20,8 @@ class RestPlatformChannel extends PlatformObject implements spec.Channel{ @override spec.Presence presence; - RestPlatformChannel(int ablyHandle, MethodChannel methodChannel, int restHandle, this.ably, this.name, this.options) - : super(ablyHandle, methodChannel, restHandle); + RestPlatformChannel(int ablyHandle, Ably ablyPlugin, int restHandle, this.ably, this.name, this.options) + : super(ablyHandle, ablyPlugin, restHandle); @override Future> history([spec.RestHistoryParams params]) { @@ -29,11 +30,11 @@ class RestPlatformChannel extends PlatformObject implements spec.Channel{ } @override - Future publish([String name, dynamic data]) async { + Future publish({String name, dynamic data}) async { try { Map _map = { "channel": this.name, }; - if(name!=null) _map["name"] = name; - if(data!=null) _map["message"] = data; + if (name!=null) _map["name"] = name; + if (data!=null) _map["message"] = data; await this.invoke(PlatformMethod.publish, _map); } on PlatformException catch (pe) { throw spec.AblyException(pe.code, pe.message, pe.details); @@ -46,14 +47,14 @@ class RestPlatformChannel extends PlatformObject implements spec.Channel{ class RestPlatformChannels extends spec.RestChannels{ int ablyHandle; - MethodChannel methodChannel; int restHandle; + Ably ablyPlugin; - RestPlatformChannels(this.ablyHandle, this.methodChannel, this.restHandle, spec.AblyBase ably): super(ably); + RestPlatformChannels(this.ablyHandle, this.ablyPlugin, this.restHandle, spec.AblyBase ably): super(ably); @override RestPlatformChannel createChannel(name, options){ - return RestPlatformChannel(ablyHandle, methodChannel, restHandle, ably, name, options); + return RestPlatformChannel(ablyHandle, ablyPlugin, restHandle, ably, name, options); } } \ No newline at end of file diff --git a/lib/src/impl/rest/rest.dart b/lib/src/impl/rest/rest.dart index f66d5dcf7..ff4c46eb9 100644 --- a/lib/src/impl/rest/rest.dart +++ b/lib/src/impl/rest/rest.dart @@ -1,7 +1,5 @@ import 'dart:async'; -import 'package:flutter/services.dart'; - import '../../../ably.dart'; import '../../spec/spec.dart' as spec; import '../platform_object.dart'; @@ -10,15 +8,14 @@ import 'channels.dart'; class RestPlatformObject extends PlatformObject implements spec.Rest { - RestPlatformObject.fromOptions(int ablyHandle, MethodChannel methodChannel, int handle, this.options) - :super(ablyHandle, methodChannel, handle){ - this.channels = RestPlatformChannels(ablyHandle, methodChannel, handle, this); - } - - RestPlatformObject.fromKey(int ablyHandle, MethodChannel methodChannel, int handle, String key) - :super(ablyHandle, methodChannel, handle){ - this.options = ClientOptions.fromKey(key); - this.channels = RestPlatformChannels(ablyHandle, methodChannel, handle, this); + RestPlatformObject(int ablyHandle, Ably ablyPlugin, int handle, { + ClientOptions options, + final String key + }) + :assert(options!=null || key!=null), + super(ablyHandle, ablyPlugin, handle){ + this.options = (options==null)?ClientOptions.fromKey(key):options; + this.channels = RestPlatformChannels(ablyHandle, ablyPlugin, handle, this); } @override @@ -40,7 +37,7 @@ class RestPlatformObject extends PlatformObject implements spec.Rest time() { + Future time() { // TODO: implement time return null; } diff --git a/lib/src/interface.dart b/lib/src/interface.dart index c90305c20..d9a48a902 100644 --- a/lib/src/interface.dart +++ b/lib/src/interface.dart @@ -10,12 +10,18 @@ abstract class Ably { /// Returns ably library version Future get version; - /// Creates a [Realtime] instance with [options] - Future createRealtime(final ClientOptions options); + /// Creates a [Realtime] instance either with [options] or with [key] + /// obtained from Ably dashboard + Future createRealtime({ + ClientOptions options, + final String key + }); - /// Creates a [Rest] instance with [options] - Future createRest(final ClientOptions options); + /// Creates a [Rest] instance either with [options] or with [key] + /// obtained from Ably dashboard + Future createRest({ + ClientOptions options, + final String key + }); - /// Creates a [Rest] instance with [key] obtained from ably - Future createRestWithKey(final String key); } diff --git a/lib/src/spec/common.dart b/lib/src/spec/common.dart index 564638e0a..24c9ecef8 100644 --- a/lib/src/spec/common.dart +++ b/lib/src/spec/common.dart @@ -116,18 +116,20 @@ abstract class RestPresenceParams { String connectionId; } -abstract class RealtimePresenceParams { - bool waitForSync; //waitForSync; //?: bool; - String clientId; //clientId; //?: String; - String connectionId; //; //?: String; +class RealtimePresenceParams { + bool waitForSync; + String clientId; + String connectionId; + RealtimePresenceParams({this.waitForSync, this.clientId, this.connectionId}); } -abstract class RealtimeHistoryParams { +class RealtimeHistoryParams { int start; // start; //?: int; int end; // end; //?: int; int direction; // direction; //?: String; int limit; // limit; //?: int; bool untilAttach; // untilAttach; //?: bool; + RealtimeHistoryParams({this.start, this.end, this.direction, this.limit, this.untilAttach}); } @@ -147,18 +149,28 @@ class AblyException implements Exception { } -abstract class ChannelStateChange { - ChannelState current; - ChannelState previous; - ErrorInfo reason; //optional - bool resumed; +class ChannelStateChange { + final ChannelEvent event; + final ChannelState current; + final ChannelState previous; + ErrorInfo reason; + final bool resumed; + ChannelStateChange(this.current, this.previous, this.event, { + this.reason, + this.resumed=false + }); } -abstract class ConnectionStateChange { - ConnectionState current; - ConnectionState previous; - ErrorInfo reason; //optional - int retryIn; //optional +class ConnectionStateChange { + final ConnectionEvent event; + final ConnectionState current; + final ConnectionState previous; + ErrorInfo reason; + int retryIn; + ConnectionStateChange(this.current, this.previous, this.event, { + this.reason, + this.retryIn + }); } abstract class DevicePushDetails { @@ -217,12 +229,15 @@ abstract class EventListener { /// Interface implemented by Ably classes that can emit events, offering the capability /// to create listeners for those events. -abstract class EventEmitter { - /// Remove all listener registrations, irrespective of type. - Future off(); +/// [E] is type of event to listen for +/// [G] is the instance which will be passed back in streams +abstract class EventEmitter { + + // Remove all listener registrations, irrespective of type. + //Future off(); /// Create a listener, with which registrations may be made. - Future> createListener(); + Stream on([E event]); } abstract class PaginatedResult { @@ -255,19 +270,17 @@ class Stats { StatsRequestCount tokenRequests; } -class Channels { +abstract class Channels { Channels(this.ably); AblyBase ably; Map _channels = {}; - ChannelType createChannel(name, options){ - return Channel(ably, name, options) as ChannelType; - } + ChannelType createChannel(name, options); ChannelType get(String name, [ChannelOptions options]) { - if(_channels[name]==null){ + if (_channels[name]==null) { _channels[name] = createChannel(name, options); } return _channels[name]; diff --git a/lib/src/spec/connection.dart b/lib/src/spec/connection.dart index a3d48c1ba..f60de6a98 100644 --- a/lib/src/spec/connection.dart +++ b/lib/src/spec/connection.dart @@ -2,7 +2,7 @@ import 'enums.dart'; import 'common.dart'; -abstract class ConnectionBase extends EventEmitter { +abstract class Connection implements EventEmitter { ///current state of this connection ConnectionState state; @@ -25,10 +25,7 @@ abstract class ConnectionBase extends EventEmitter { int serial; void close(); void connect(); -} - -abstract class Connection extends ConnectionBase { Future ping(); -// Future whenState(ConnectionState targetState); //TODO remove? + } \ No newline at end of file diff --git a/lib/src/spec/constants.dart b/lib/src/spec/constants.dart index 4b7782b80..740b7c10f 100644 --- a/lib/src/spec/constants.dart +++ b/lib/src/spec/constants.dart @@ -1,8 +1,8 @@ class LogLevel{ - static final int none = 99; - static final int verbose = 2; - static final int debug = 3; - static final int info = 4; - static final int warn = 5; - static final int error = 6; + static const int none = 99; + static const int verbose = 2; + static const int debug = 3; + static const int info = 4; + static const int warn = 5; + static const int error = 6; } diff --git a/lib/src/spec/enums.dart b/lib/src/spec/enums.dart index f75024a31..f722a4db6 100644 --- a/lib/src/spec/enums.dart +++ b/lib/src/spec/enums.dart @@ -8,6 +8,13 @@ enum ChannelState { failed } +enum ChannelMode{ // TB2d + presence, + publish, + subscribe, + presence_subscribe +} + enum ChannelEvent { initialized, attaching, diff --git a/lib/src/spec/push/channels.dart b/lib/src/spec/push/channels.dart new file mode 100644 index 000000000..d585f3eeb --- /dev/null +++ b/lib/src/spec/push/channels.dart @@ -0,0 +1,9 @@ +import '../common.dart'; + +abstract class PushChannel{ + Future subscribeDevice(); // RSH7a + Future subscribeClient(); // RSH7b + Future unsubscribeDevice(); // RSH7c + Future unsubscribeClient(); // RSH7d + Future> listSubscriptions(); // RSH7e +} \ No newline at end of file diff --git a/lib/src/spec/realtime/channels.dart b/lib/src/spec/realtime/channels.dart index 08c379026..b8f4a255d 100644 --- a/lib/src/spec/realtime/channels.dart +++ b/lib/src/spec/realtime/channels.dart @@ -1,3 +1,4 @@ +import 'package:ably_flutter_plugin/src/spec/push/channels.dart'; import 'package:ably_flutter_plugin/src/spec/rest/ably_base.dart'; import 'package:ably_flutter_plugin/src/spec/rest/channels.dart'; @@ -7,83 +8,47 @@ import '../message.dart'; import 'presence.dart'; -abstract class RealtimeChannelBase extends EventEmitter { +abstract class RealtimeChannel extends EventEmitter { - RealtimeChannelBase(this.ably, this.name, this.options); + RealtimeChannel(this.ably, this.name, this.options); AblyBase ably; - String name; - ChannelOptions options; + + String name; //Not in IDL + ChannelOptions options; //Not in IDL + ErrorInfo errorReason; ChannelState state; - void setOptions(dynamic options); - void unsubscribe({ - String event, - List events, - EventListener listener //TODO check if this is the type that is expected - }); -} - -class RealtimeChannel extends RealtimeChannelBase { RealtimePresence presence; - - RealtimeChannel(AblyBase ably, String name, ChannelOptions options) : super(ably, name, options); - - Future attach() async { - //TODO impl - return; - } - Future detach() async { - //TODO impl - return; - } - Future> history([RealtimeHistoryParams params]) async { - //TODO impl - return null; - } +// ChannelProperties properties; + PushChannel push; + List modes; + Map params; + + Future attach(); + Future detach(); + Future> history([RealtimeHistoryParams params]); + Future publish({ + Message message, + List messages, + String name, + dynamic data + }); Future subscribe({ String event, List events, EventListener listener - }) async { - //TODO impl - return; - } - Future publish({String name, dynamic data}) async { - //TODO impl - return; - } - Future whenState(ChannelState targetState) async { - //TODO impl - return null; - } - - // Implement events - @override - Future> createListener() { - // TODO: implement createListener - return null; - } - - @override - Future off() { - // TODO: implement off - return null; - } - - @override - void setOptions(options) { - // TODO: implement setOptions - } - - @override - void unsubscribe({String event, List events, EventListener listener}) { - // TODO: implement unsubscribe - } + }); + void unsubscribe({ + String event, + List events, + EventListener listener //TODO check if this is the type that is expected + }); + void setOptions(ChannelOptions options); } -class RealtimeChannels extends Channels { +abstract class RealtimeChannels extends Channels { RealtimeChannels(AblyBase ably): super(ably); diff --git a/lib/src/spec/realtime/presence.dart b/lib/src/spec/realtime/presence.dart index 58cc14234..d70cdd297 100644 --- a/lib/src/spec/realtime/presence.dart +++ b/lib/src/spec/realtime/presence.dart @@ -3,19 +3,16 @@ import '../common.dart'; import '../message.dart'; -abstract class RealtimePresenceBase { +abstract class RealtimePresence { bool syncComplete; - void unsubscribe({ + Future> get([RealtimePresenceParams params]); + Future> history([RealtimeHistoryParams params]); + Future subscribe({ PresenceAction action, List actions, EventListener listener //TODO check if this is the type that is expected }); -} - -abstract class RealtimePresence extends RealtimePresenceBase { - Future> get([RealtimePresenceParams params]); - Future> history([RealtimeHistoryParams params]); - Future subscribe({ + void unsubscribe({ PresenceAction action, List actions, EventListener listener //TODO check if this is the type that is expected diff --git a/lib/src/spec/realtime/realtime.dart b/lib/src/spec/realtime/realtime.dart index 75e2a9dd2..4861350a5 100644 --- a/lib/src/spec/realtime/realtime.dart +++ b/lib/src/spec/realtime/realtime.dart @@ -1,54 +1,21 @@ -import 'package:flutter/foundation.dart'; - -import '../common.dart'; import '../connection.dart'; import '../rest/ably_base.dart'; import '../rest/options.dart'; import 'channels.dart'; -//REST BASE -abstract class RealtimeBase extends AblyBase { - RealtimeBase.fromOptions(ClientOptions options): super.fromOptions(options); - RealtimeBase.fromKey(String key): super.fromKey(key); +abstract class Realtime extends AblyBase { + + Realtime({ + ClientOptions options, + final String key + }): connection=null, //To be assigned as required on implementation + super(options: options, key: key); + String clientId; void close(); void connect(); -} - -class Realtime extends RealtimeBase { - Realtime.fromOptions(ClientOptions options): super.fromOptions(options){ - channels = RealtimeChannels(this); - } - Realtime.fromKey(String key): super.fromKey(key){ - channels = RealtimeChannels(this); - } - Connection connection; + final Connection connection; RealtimeChannels channels; - close(){ - //TODO implement - } - connect(){ - //TODO implement - } - Future> stats([Map params]){ - //TODO implement - return null; - } - Future request({ - @required String method, - @required String path, - Map params, - dynamic body, - Map headers - }){ - //TODO implement - return null; - } - - Future time(){ - //TODO implement - return null; - } } \ No newline at end of file diff --git a/lib/src/spec/rest/ably_base.dart b/lib/src/spec/rest/ably_base.dart index 4b7d04f57..ca972bd56 100644 --- a/lib/src/spec/rest/ably_base.dart +++ b/lib/src/spec/rest/ably_base.dart @@ -13,10 +13,14 @@ abstract class Crypto { ///io.ably.lib.rest.AblyBase abstract class AblyBase { - AblyBase.fromOptions(this.options); - AblyBase.fromKey(String key){ - this.options = ClientOptions.fromKey(key); + + AblyBase({ + ClientOptions options, + final String key + }):assert(options!=null || key!=null){ + this.options = (options==null)?ClientOptions.fromKey(key):options; } + ClientOptions options; static Crypto crypto; static MessageStatic message; @@ -31,5 +35,5 @@ abstract class AblyBase { dynamic body, Map headers }); - Future time(); + Future time(); } \ No newline at end of file diff --git a/lib/src/spec/rest/channels.dart b/lib/src/spec/rest/channels.dart index d0d6180ec..21f945ece 100644 --- a/lib/src/spec/rest/channels.dart +++ b/lib/src/spec/rest/channels.dart @@ -10,38 +10,22 @@ abstract class ChannelOptions { dynamic cipher; } -abstract class ChannelBase { +abstract class RestChannel { - ChannelBase(this.ably, this.name, this.options); - - AblyBase ably; - String name; - ChannelOptions options; - Presence presence; -} - -class Channel extends ChannelBase { - - @override - Channel(AblyBase ably, String name, ChannelOptions options): super(ably, name, options) { + RestChannel(this.ably, this.name, this.options){ this.presence = Presence(); } AblyBase ably; + String name; ChannelOptions options; Presence presence; - Future> history([RestHistoryParams params]){ - //TODO - return null; - } - Future publish([String name, dynamic data]){ - //TODO - return null; - } + Future> history([RestHistoryParams params]); + Future publish({String name, dynamic data}); } -class RestChannels extends Channels { +abstract class RestChannels extends Channels { RestChannels(ably) : super(ably); diff --git a/lib/src/spec/rest/options.dart b/lib/src/spec/rest/options.dart index f4b0a73b0..bec37b63b 100644 --- a/lib/src/spec/rest/options.dart +++ b/lib/src/spec/rest/options.dart @@ -1,3 +1,5 @@ +import 'package:ably_flutter_plugin/ably.dart'; + import '../enums.dart'; import '../common.dart'; @@ -17,9 +19,9 @@ abstract class AuthOptions { /// param [key]: the full key string as obtained from the dashboard AuthOptions.fromKey(String key){ assert(key!=null); - if(key.contains(':')){ + if (key.contains(':')) { this.key = key; - }else{ + } else { this.tokenDetails = TokenDetails(key); } } @@ -51,11 +53,11 @@ typedef void LogHandler({String msg, AblyException exception}); class ClientOptions extends AuthOptions { ClientOptions(){ - logLevel = 4; + logLevel = LogLevel.info; } ClientOptions.fromKey(String key): super.fromKey(key){ - logLevel = 4; + logLevel = LogLevel.info; } ///Optional clientId that can be used to specify the identity for this client. diff --git a/lib/src/spec/rest/rest.dart b/lib/src/spec/rest/rest.dart index 1eb0e50e2..93a379ffd 100644 --- a/lib/src/spec/rest/rest.dart +++ b/lib/src/spec/rest/rest.dart @@ -16,11 +16,14 @@ abstract class Rest extends AblyBase { Push push; C channels; - Rest.fromOptions(ClientOptions options): super.fromOptions(options); + Rest({ + ClientOptions options, + final String key + }): super(options: options, key: key); + /*{ channels = RestChannels(this); }*/ - Rest.fromKey(String key): super.fromKey(key); /*{ channels = RestChannels(this); }*/ diff --git a/pubspec.lock b/pubspec.lock index 0d9be71fc..7e99931aa 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -331,6 +331,13 @@ packages: url: "https://pub.dartlang.org" source: hosted version: "2.0.0" + streams_channel: + dependency: "direct main" + description: + name: streams_channel + url: "https://pub.dartlang.org" + source: hosted + version: "0.3.0" string_scanner: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index 3838631e8..f86d12860 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -9,6 +9,7 @@ environment: dependencies: flutter: sdk: flutter + streams_channel: ^0.3.0 dev_dependencies: flutter_test: @@ -16,10 +17,10 @@ dev_dependencies: # Specifying a slightly older version minimum for the 'pure' Dart test package # because the flutter_test package needs an older version of test_api. - test: ^1.14.2 + test: ^1.9.4 # Stricter Linting - pedantic: ^1.9.0 + pedantic: ^1.0.0 # For information on the generic Dart part of this file, see the # following page: https://dart.dev/tools/pub/pubspec diff --git a/test/ably_event_listener_test.dart b/test/ably_event_listener_test.dart new file mode 100644 index 000000000..dc9d8aaef --- /dev/null +++ b/test/ably_event_listener_test.dart @@ -0,0 +1,84 @@ +import 'dart:async'; + +import 'package:test/test.dart'; +import 'package:collection/collection.dart'; + + +typedef int Injector(int id); +Stream emitter(int id, Injector injector) async* { + int injectable = injector(id); + while (injectable!=null) { + yield injectable; + injectable = injector(id); + } +} + +class MockEmitter{ + + MockEmitter(this.streamsCount, this.injectables){ + streams = []; + for(int i=0; i indexes = {}; + List injectables; + List> streams; + + int emitCount = 0; + int injector(id){ + if(indexes[id] >= injectables.length) return null; + int ret = injectables[indexes[id]]; + indexes[id]++; + return ret; + } + +} + +Function eq = const ListEquality().equals; + +void main() { + + test('RTE6a: nested cancellation of a listener', () async { + //lists to store data received by listeners + List resultsDefault = []; + List resultsNestedPre = []; + List resultsNestedPost = []; + + MockEmitter emitter = MockEmitter(3, [1, 2, 3, 4, 5]); + List> streams = emitter.streams; + + StreamSubscription subscriptionPre; + StreamSubscription subscriptionPost; + + subscriptionPre = streams[1].listen((nestedValue){ + resultsNestedPre.add(nestedValue); + }); + + //subscription2 + streams[0].listen((int eventValue){ + resultsDefault.add(eventValue); + //cancelling subscriptionPre and subscriptionPost when value is 2 + if(eventValue==2){ + subscriptionPre.cancel(); + subscriptionPost.cancel(); + } + }); + + subscriptionPost = streams[2].listen((nestedValue){ + resultsNestedPost.add(nestedValue); + }); + + //Waiting for stream to end + await Future.delayed(Duration.zero); + + //Checking if data received by stream is same as expected + expect(ListEquality().equals(resultsDefault, emitter.injectables), true); + expect(ListEquality().equals(resultsNestedPre, [1, 2]), true); + expect(ListEquality().equals(resultsNestedPost, [1]), true); + }); + +} diff --git a/test/ably_flutter_plugin_test.dart b/test/ably_flutter_plugin_test.dart index fe31f5db5..0bb55a445 100644 --- a/test/ably_flutter_plugin_test.dart +++ b/test/ably_flutter_plugin_test.dart @@ -40,7 +40,6 @@ void main() { case "createrestWithKey": case "createRestWithOptions": case "createRealtimeWithOptions": - case "createListener": return ++counter; case "publish": @@ -70,21 +69,21 @@ void main() { ClientOptions o = ClientOptions(); String host = "http://rest.ably.io/"; o.restHost = host; - RestPlatformObject rest = await ably.createRest(o); + RestPlatformObject rest = await ably.createRest(options: o); expect(rest.ablyHandle, ablyCounter); expect(rest.handle, counter); expect(rest.options.restHost, host); }); test("createRestWithKey", () async { - RestPlatformObject rest = await ably.createRestWithKey('TEST-KEY'); + RestPlatformObject rest = await ably.createRest(key: 'TEST-KEY'); expect(rest.ablyHandle, ablyCounter); expect(rest.handle, counter); }); test("publishMessage", () async { - RestPlatformObject rest = await ably.createRestWithKey('TEST-KEY'); - await rest.channels.get('test').publish('name', 'data'); + RestPlatformObject rest = await ably.createRest(key: 'TEST-KEY'); + await rest.channels.get('test').publish(name: 'name', data: 'data'); expect(1, 1); });