Skip to content

Commit

Permalink
add dart
Browse files Browse the repository at this point in the history
  • Loading branch information
Matrixbirds committed Dec 26, 2018
1 parent 960a3c2 commit 13b8d02
Show file tree
Hide file tree
Showing 15 changed files with 438 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ target/
.vs/

*.log
.packages
21 changes: 21 additions & 0 deletions dart/emit_log.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import "package:dart_amqp/dart_amqp.dart";

void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

String msg = arguments.isEmpty ? "Hello World!" : arguments[0];

client
.channel()
.then((Channel channel) =>
channel.exchange("logs", ExchangeType.FANOUT, durable: false))
.then((Exchange exchange) {
exchange.publish(msg, null);
print(" [x] Sent ${msg}");
return client.close();
});
}
23 changes: 23 additions & 0 deletions dart/emit_log_direct.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

String routingKey = arguments.length < 1 ? "info" : arguments[0];
String msg = arguments.length < 2 ? "Hello World!" : arguments[1];

client
.channel()
.then((Channel channel) =>
channel.exchange("direct_logs", ExchangeType.DIRECT,
durable: false))
.then((Exchange exchange) {
exchange.publish(msg, routingKey);
print(" [x] Sent ${routingKey}: ${msg}");
return client.close();
});
}
23 changes: 23 additions & 0 deletions dart/emit_log_topic.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

String routingKey = arguments.length < 1 ? "anonymous.info" : arguments[0];
String msg = arguments.length < 2 ? "Hello World!" : arguments[1];

client
.channel()
.then((Channel channel) =>
channel.exchange("topic_logs", ExchangeType.TOPIC,
durable: false))
.then((Exchange exchange) {
exchange.publish(msg, routingKey);
print(" [x] Sent ${routingKey}: ${msg}");
return client.close();
});
}
24 changes: 24 additions & 0 deletions dart/new_task.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

String consumeTag = "task_queue";
String msg = arguments.isEmpty ? "Hello World!" : arguments[0];

MessageProperties properties = new MessageProperties.persistentMessage();

client
.channel()
.then((Channel channel) =>
channel.queue(consumeTag, durable: true))
.then((Queue queue) {
queue.publish(msg, properties: properties);
print(" [x] Sent ${msg}");
return client.close();
});
}
19 changes: 19 additions & 0 deletions dart/pubspec.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by pub
# See https://www.dartlang.org/tools/pub/glossary#lockfile
packages:
dart_amqp:
dependency: "direct main"
description:
name: dart_amqp
url: "https://pub.dartlang.org"
source: hosted
version: "0.1.1"
logging:
dependency: transitive
description:
name: logging
url: "https://pub.dartlang.org"
source: hosted
version: "0.11.3+2"
sdks:
dart: ">=2.0.0-dev <3.0.0"
3 changes: 3 additions & 0 deletions dart/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: dart_rabbitmq_example
dependencies:
dart_amqp: 0.1.1
34 changes: 34 additions & 0 deletions dart/receive.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});

String msg = arguments.isEmpty ? "Hello World!": arguments[0];

String queueTag = "hello";

client
.channel()
.then((Channel channel) => channel.queue(queueTag, durable: false))
.then((Queue queue) {
print(" [*] Waiting for messages in ${queueTag}. To Exit press CTRL+C");
return queue.consume(consumerTag: queueTag, noAck: true);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] Received ${event.payloadAsString}");
});
});
}
34 changes: 34 additions & 0 deletions dart/receive_logs.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main (List<String> arguments) {
ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});

String msg = arguments.isEmpty ? "Hello World!": arguments[0];

client
.channel()
.then((Channel channel) {
return channel.exchange("logs", ExchangeType.FANOUT, durable: false);
})
.then((Exchange exchange) {
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
return exchange.bindPrivateQueueConsumer(null);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] Received ${event.payloadAsString}");
});
});
}
40 changes: 40 additions & 0 deletions dart/receive_logs_direct.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main (List<String> arguments) {
if (arguments.isEmpty) {
print("Usage: receive_logs_direct.dart [info] [warning] [error]");
return;
}

ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});

List<String> routingKeys = arguments.sublist(0, 1);
client
.channel()
.then((Channel channel) {
return channel.exchange("direct_logs", ExchangeType.DIRECT, durable: false);
})
.then((Exchange exchange) {
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
return exchange.bindPrivateQueueConsumer(routingKeys,
consumerTag: "direct_logs", noAck: true
);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] ${event.routingKey}:'${event.payloadAsString}'");
});
});
}
40 changes: 40 additions & 0 deletions dart/receive_logs_topic.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main (List<String> arguments) {
if (arguments.isEmpty) {
print("Usage: receive_logs_direct.dart <facility>.<routingKey>");
return;
}

ConnectionSettings settings = new ConnectionSettings(
host: "localhost"
);

Client client = new Client(settings: settings);

ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
print("close client");
exit(0);
});
});

List<String> routingKeys = arguments.sublist(0, 1);
client
.channel()
.then((Channel channel) {
return channel.exchange("topic_logs", ExchangeType.TOPIC, durable: false);
})
.then((Exchange exchange) {
print(" [*] Waiting for messages in logs. To Exit press CTRL+C");
return exchange.bindPrivateQueueConsumer(routingKeys,
consumerTag: "topic_logs", noAck: true
);
})
.then((Consumer consumer) {
consumer.listen((AmqpMessage event) {
print(" [x] ${event.routingKey}:'${event.payloadAsString}'");
});
});
}
80 changes: 80 additions & 0 deletions dart/rpc_client.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import "dart:io";
import "dart:async";
import "dart:math";
import "package:dart_amqp/dart_amqp.dart";

var UUID = () => "${(new Random()).nextDouble()}";

class RPCClient {
Client client;
String queueTag;
String _replyQueueTag;
Completer contextChannel;
Map<String, Completer> _channels = new Map<String, Completer>();
Queue _queue;
RPCClient() : client = new Client(),
queueTag = "rpc_queue" {
contextChannel = new Completer();
client
.channel()
.then((Channel channel) => channel.queue(queueTag))
.then((Queue rpcQueue) {
_queue = rpcQueue;
return rpcQueue.channel.privateQueue();
})
.then((Queue rpcQueue) {
rpcQueue.consume(noAck: true)
.then((Consumer consumer) {
_replyQueueTag = consumer.queue.name;
consumer.listen(handler);
contextChannel.complete();
});
});
}

void handler (AmqpMessage event) {
if (!_channels
.containsKey(
event.properties.corellationId)) return;
print(" [.] Got ${event.payloadAsString}");
_channels
.remove(event.properties.corellationId)
.complete(event.payloadAsString);
}

Future<String> call(int n) {
return contextChannel.future
.then((_) {
String uuid = UUID();
Completer<String> channel = new Completer<String>();
MessageProperties properties = new MessageProperties()
..replyTo = _replyQueueTag
..corellationId = uuid;
_channels[uuid] = channel;
print(" [x] Requesting ${n}");
_queue.publish({"n": n}, properties: properties);
return channel.future;
});
}

Future close() {
_channels.forEach((_, var next) => next.complete("RPC client closed"));
_channels.clear();
client.close();
}
}

void main(List<String> arguments) {
if (arguments.isEmpty) {
print("Usage: rpc_client.dart num");
return;
}
RPCClient client = new RPCClient();
int n = arguments.isEmpty ? 30 : num.parse(arguments[0]);
client.call(n)
.then((String res) {
print(" [x] fib(${n}) = ${res}");
})
.then((_) => client.close())
.then((_) => null);
}
36 changes: 36 additions & 0 deletions dart/rpc_server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

// Slow implementation of fib
int fib(int n) {
if (n >= 0 && n <= 1) {
return n;
} else
return fib(n - 1) + fib(n - 2);
}

void main(List<String> args) {

Client client = new Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
exit(0);
});
});

client
.channel()
.then((Channel channel) => channel.qos(0, 1))
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
print(" [x] Awaiting RPC request");
consumer.listen((AmqpMessage message) {
var n = message.payloadAsJson["n"];
print(" [.] fib(${n})");
message.reply(fib(n).toString());
});
});
}
Loading

0 comments on commit 13b8d02

Please sign in to comment.