From 46905134adc41b70cbeb4e730f3624c1286be9d9 Mon Sep 17 00:00:00 2001 From: Ravi Teja Gudapati Date: Sat, 7 Jul 2018 15:43:26 +0200 Subject: [PATCH] Connection pool --- .gitignore | 9 ++ conn_pool/.gitignore | 10 +++ conn_pool/CHANGELOG.md | 3 + conn_pool/README.md | 22 +++++ conn_pool/analysis_options.yaml | 15 ++++ conn_pool/example/example.dart | 5 ++ conn_pool/lib/conn_pool.dart | 8 ++ conn_pool/lib/src/counted_set.dart | 97 +++++++++++++++++++++ conn_pool/lib/src/pool.dart | 85 +++++++++++++++++++ conn_pool/pubspec.yaml | 11 +++ conn_pool/test/conn_pool_test.dart | 12 +++ jaguar_mongo/.gitignore | 6 +- jaguar_mongo/pubspec.yaml | 15 ++-- jaguar_postgres/.gitignore | 3 +- jaguar_postgres/example/simple/main.dart | 103 ++++++++++++++++++----- jaguar_postgres/lib/src/interceptor.dart | 98 +++++++++------------ jaguar_postgres/lib/src/pool.dart | 79 +++++++++-------- jaguar_postgres/pubspec.yaml | 17 ++-- 18 files changed, 461 insertions(+), 137 deletions(-) create mode 100644 .gitignore create mode 100644 conn_pool/.gitignore create mode 100644 conn_pool/CHANGELOG.md create mode 100644 conn_pool/README.md create mode 100644 conn_pool/analysis_options.yaml create mode 100644 conn_pool/example/example.dart create mode 100644 conn_pool/lib/conn_pool.dart create mode 100644 conn_pool/lib/src/counted_set.dart create mode 100644 conn_pool/lib/src/pool.dart create mode 100644 conn_pool/pubspec.yaml create mode 100644 conn_pool/test/conn_pool_test.dart diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..54a39b9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +# Files and directories created by pub +.packages +.dart_tool +.pub/ +packages +# Remove the following pattern if you wish to check in your lock file +pubspec.lock +.idea +*.iml \ No newline at end of file diff --git a/conn_pool/.gitignore b/conn_pool/.gitignore new file mode 100644 index 0000000..070e840 --- /dev/null +++ b/conn_pool/.gitignore @@ -0,0 +1,10 @@ +# Files and directories created by pub +.dart_tool/ +.packages +.pub/ +build/ +# Remove the following pattern if you wish to check in your lock file +pubspec.lock + +# Directory created by dartdoc +doc/api/ diff --git a/conn_pool/CHANGELOG.md b/conn_pool/CHANGELOG.md new file mode 100644 index 0000000..687440b --- /dev/null +++ b/conn_pool/CHANGELOG.md @@ -0,0 +1,3 @@ +## 1.0.0 + +- Initial version, created by Stagehand diff --git a/conn_pool/README.md b/conn_pool/README.md new file mode 100644 index 0000000..c06401d --- /dev/null +++ b/conn_pool/README.md @@ -0,0 +1,22 @@ +# conn_pool + +A library for Dart developers. + +Created from templates made available by Stagehand under a BSD-style +[license](https://github.com/dart-lang/stagehand/blob/master/LICENSE). + +## Usage + +A simple usage example: + + import 'package:conn_pool/conn_pool.dart'; + + main() { + var awesome = new Awesome(); + } + +## Features and bugs + +Please file feature requests and bugs at the [issue tracker][tracker]. + +[tracker]: http://example.com/issues/replaceme diff --git a/conn_pool/analysis_options.yaml b/conn_pool/analysis_options.yaml new file mode 100644 index 0000000..97f0908 --- /dev/null +++ b/conn_pool/analysis_options.yaml @@ -0,0 +1,15 @@ +analyzer: + strong-mode: true +# exclude: +# - path/to/excluded/files/** + +# Lint rules and documentation, see http://dart-lang.github.io/linter/lints +linter: + rules: + - cancel_subscriptions + - hash_and_equals + - iterable_contains_unrelated_type + - list_remove_unrelated_type + - test_types_in_equals + - unrelated_type_equality_checks + - valid_regexps diff --git a/conn_pool/example/example.dart b/conn_pool/example/example.dart new file mode 100644 index 0000000..f500921 --- /dev/null +++ b/conn_pool/example/example.dart @@ -0,0 +1,5 @@ +import 'package:conn_pool/conn_pool.dart'; + +main() { + +} diff --git a/conn_pool/lib/conn_pool.dart b/conn_pool/lib/conn_pool.dart new file mode 100644 index 0000000..171f418 --- /dev/null +++ b/conn_pool/lib/conn_pool.dart @@ -0,0 +1,8 @@ +/// Support for doing something awesome. +/// +/// More dartdocs go here. +library conn_pool; + +export 'src/pool.dart'; + +// TODO: Export any libraries intended for clients of this package. diff --git a/conn_pool/lib/src/counted_set.dart b/conn_pool/lib/src/counted_set.dart new file mode 100644 index 0000000..0a5aa42 --- /dev/null +++ b/conn_pool/lib/src/counted_set.dart @@ -0,0 +1,97 @@ +import 'dart:collection'; + +class CountedSet { + final _set = SplayTreeMap>(); + + final _values = Map(); + + CountedSet(); + + Iterable get values => _values.keys; + + int get length => _values.length; + + void add(int count, T value) { + if (_values.containsKey(value)) throw Exception('Already present!'); + Set set = _set[count] ??= Set(); + _values[value] = count; + set.add(value); + } + + void remove(T value) { + if (!_values.containsKey(value)) throw Exception('Not present!'); + int count = _values[value]; + + Set set = _set[count]; + if (set == null) { + _values.remove(value); + throw Exception('$value is not located at $count!'); + } + + if (!set.remove(value)) { + _values.remove(value); + throw Exception('$value is not located at $count!'); + } + if (set.isEmpty) _set.remove(count); + + _values.remove(value); + } + + void inc(T value) { + int count = _values[value]; + if (count == null) throw Exception('$value is not present!'); + + Set set = _set[count]; + if (set == null) throw Exception('$value is not located at $count!'); + if (!set.remove(value)) + throw Exception('$value is not located at $count!'); + if(set.isEmpty) _set.remove(count); + + count++; + set = _set[count] ??= Set(); + _values[value] = count; + set.add(value); + } + + void dec(T value) { + int count = _values[value]; + if (count == null) throw Exception('$value is not present!'); + + Set set = _set[count]; + if (set == null) throw Exception('$value is not located at $count!'); + if (!set.remove(value)) + throw Exception('$value is not located at $count!'); + if(set.isEmpty) _set.remove(count); + + count--; + set = _set[count] ??= Set(); + _values[value] = count; + set.add(value); + } + + T get leastUsed { + if (_values.length == 0) return null; + int firstKey = _set.firstKey(); + return _set[firstKey].first; + } + + int numAt(int count) { + Set set = _set[count]; + if(set == null) return 0; + return set.length; + } + + List removeAllAt(int count) { + Set set = _set[count]; + if(set == null) return []; + + for(T t in set) _values.remove(t); + _set.remove(count); + + return set.toList(); + } + + bool contains(T value) => _values.containsKey(value); + + int countOf(T value) => _values[value]; +} diff --git a/conn_pool/lib/src/pool.dart b/conn_pool/lib/src/pool.dart new file mode 100644 index 0000000..0d4fe42 --- /dev/null +++ b/conn_pool/lib/src/pool.dart @@ -0,0 +1,85 @@ +import 'dart:async'; +import 'counted_set.dart'; + +class SharedPool implements Pool { + final ConnectionManager manager; + + final int minSize; + final int maxSize; + final _pool = CountedSet>(); + + SharedPool(this.manager, {int minSize: 10, int maxSize: 10}) + : minSize = minSize, + maxSize = maxSize, + _d = maxSize - minSize; + + Future> _createNew() async { + var conn = Connection._(this); + _pool.add(1, conn); + T n = await manager.open(); + conn._connection = n; + return conn; + } + + Future> get() async { + if (_pool.numAt(0) > 0 || _pool.length >= maxSize) { + var conn = _pool.leastUsed; + _pool.inc(conn); + return conn; + } + return _createNew(); + } + + final int _d; + + void release(Connection conn) { + int count = _pool.countOf(conn); + if (count == null || count == 0) return; + _pool.dec(conn); + if (_pool.length != maxSize) return; + if (_pool.numAt(0) < _d) return; + var removes = _pool.removeAllAt(0); + for (var r in removes) { + try { + if (r.isReleased) continue; + r.isReleased = true; + manager.close(r.connection); + } catch (_) {} + } + } +} + +class Connection { + /// The connection pool this connection belongs to. + final Pool pool; + + /// The underlying connection + T _connection; + + bool isReleased = false; + + Connection._(this.pool); + + T get connection => _connection; + + Future release() => pool.release(this); +} + +/// Interface to open and close the connection [C] +abstract class ConnectionManager { + /// Establishes and returns a new connection + Future open(); + + /// Closes provided[connection] + void close(C connection); +} + +abstract class Pool { + ConnectionManager get manager; + + Future> get(); + + FutureOr release(Connection connection); +} + +// TODO class ExclusivePool implements Pool {} diff --git a/conn_pool/pubspec.yaml b/conn_pool/pubspec.yaml new file mode 100644 index 0000000..dd1b9b3 --- /dev/null +++ b/conn_pool/pubspec.yaml @@ -0,0 +1,11 @@ +name: conn_pool +description: A starting point for Dart libraries or applications. +version: 2.1.1 +homepage: https://github.com/Jaguar-dart/db +author: Ravi Teja Gudapati + +environment: + sdk: '>=2.0.0-dev.55.0 <2.0.0' + +dev_dependencies: + test: ^1.2.0 diff --git a/conn_pool/test/conn_pool_test.dart b/conn_pool/test/conn_pool_test.dart new file mode 100644 index 0000000..71e0a46 --- /dev/null +++ b/conn_pool/test/conn_pool_test.dart @@ -0,0 +1,12 @@ +import 'package:conn_pool/conn_pool.dart'; +import 'package:test/test.dart'; + +void main() { + group('A group of tests', () { + setUp(() { + }); + + test('First Test', () { + }); + }); +} diff --git a/jaguar_mongo/.gitignore b/jaguar_mongo/.gitignore index 7c06731..f7bcb3e 100644 --- a/jaguar_mongo/.gitignore +++ b/jaguar_mongo/.gitignore @@ -1,9 +1,9 @@ # Files and directories created by pub .packages +.dart_tool .pub/ packages # Remove the following pattern if you wish to check in your lock file pubspec.lock - -.dart_tool -.idea \ No newline at end of file +.idea +*.iml \ No newline at end of file diff --git a/jaguar_mongo/pubspec.yaml b/jaguar_mongo/pubspec.yaml index 2de16f5..f05647c 100644 --- a/jaguar_mongo/pubspec.yaml +++ b/jaguar_mongo/pubspec.yaml @@ -1,5 +1,5 @@ name: jaguar_mongo -version: 1.3.1 +version: 2.1.1 description: Mongo interceptor for jaguar authors: - Ravi Teja Gudapati @@ -11,12 +11,11 @@ environment: sdk: ">=1.20.0 <2.0.0" dependencies: - mongo_dart: ^0.3.0 - connection_pool: ">=0.1.2 <0.2.0" - jaguar: ^1.3.4 + mongo_dart: ^0.3.2 + conn_pool: ^2.1.1 + jaguar: ^2.1.1 dev_dependencies: - # test: 0.12.15+10 - jaguar_client: ^0.2.0 - jaguar_reflect: - http: + test: 1.2.0 + jaguar_client: ^2.1.4 + jaguar_reflect: ^2.1.1 diff --git a/jaguar_postgres/.gitignore b/jaguar_postgres/.gitignore index ff8a920..54a39b9 100644 --- a/jaguar_postgres/.gitignore +++ b/jaguar_postgres/.gitignore @@ -1,8 +1,9 @@ # Files and directories created by pub .packages +.dart_tool .pub/ packages # Remove the following pattern if you wish to check in your lock file pubspec.lock .idea -.dart_tool \ No newline at end of file +*.iml \ No newline at end of file diff --git a/jaguar_postgres/example/simple/main.dart b/jaguar_postgres/example/simple/main.dart index d840007..7e6b11b 100644 --- a/jaguar_postgres/example/simple/main.dart +++ b/jaguar_postgres/example/simple/main.dart @@ -4,36 +4,97 @@ library jaguar.example.silly; import 'dart:async'; import 'package:jaguar_reflect/jaguar_reflect.dart'; import 'package:jaguar/jaguar.dart'; -import 'package:postgres/postgres.dart'; +import 'package:postgres/postgres.dart' as pg; +import 'package:conn_pool/conn_pool.dart'; import 'package:jaguar_postgres/jaguar_postgres.dart'; -@Api(path: '/api') +final postgresPool = PostgresPool('jaguar_learn', + password: 'dart_jaguar', minPoolSize: 5, maxPoolSize: 10); + +Future dbInterceptor(Context ctx) => postgresPool.newInterceptor(ctx); + +@Controller(path: '/post') +// NOTE: This is how Postgres interceptor is wrapped around a route. +@Intercept(const [dbInterceptor]) class PostgresExampleApi { - @Get(path: '/post') - // NOTE: This is how postgre interceptor is wrapped - // around a route. - @WrapOne(#postgresDb) - Future> mongoTest(Context ctx) async { - // NOTE: This is how the opened postgres connection is injected - // into routes - PostgreSQLConnection db = ctx.getInput(PostgresDb); - await db.execute("delete FROM posts"); - await db.execute("insert into posts values (1, 'holla', 'jon')"); - List value = (await db.query("select * from posts WHERE _id = 1")).first; - return Response.json({"Columns": value}); + Future _fetchById(pg.PostgreSQLConnection db, int id) async { + List>> values = + await db.mappedResultsQuery("SELECT * FROM posts WHERE id = $id;"); + if (values.isEmpty) return null; + return values.first.values.first; } - PostgresDb postgresDb(Context ctx) => - new PostgresDb('localhost', 5432, 'postgres', - username: 'postgres', password: 'dart_jaguar'); + @GetJson(path: '/:id') + Future readById(Context ctx) async { + int id = ctx.pathParams.getInt('id'); + pg.PostgreSQLConnection db = ctx.getVariable(); + return _fetchById(db, id); + } + + @GetJson() + Future> readAll(Context ctx) async { + pg.PostgreSQLConnection db = ctx.getVariable(); + List>> values = + await db.mappedResultsQuery("SELECT * FROM posts;"); + return values.map((m) => m.values.first).toList(); + } + + @PostJson() + Future create(Context ctx) async { + Map body = await ctx.bodyAsJsonMap(); + pg.PostgreSQLConnection db = ctx.getVariable(); + List> id = await db.query( + "INSERT INTO posts (name, age) VALUES ('${body['name']}', ${body['age']}) RETURNING id;"); + if (id.isEmpty || id.first.isEmpty) Response.json(null); + return _fetchById(db, id.first.first); + } + + @PutJson(path: '/:id') + Future update(Context ctx) async { + int id = ctx.pathParams.getInt('id'); + Map body = await ctx.bodyAsJsonMap(); + pg.PostgreSQLConnection db = ctx.getVariable(); + await db.execute( + "UPDATE posts SET name = '${body['name']}', age = ${body['age']} WHERE id = $id;"); + return _fetchById(db, id); + } + + @Delete(path: '/:id') + Future delete(Context ctx) async { + String id = ctx.pathParams['id']; + pg.PostgreSQLConnection db = ctx.getVariable(); + await db.execute("DELETE FROM posts WHERE id = $id;"); + } +} + +Future setup() async { + Connection conn; + conn = await postgresPool.pool.get(); // TODO handle open error + pg.PostgreSQLConnection db = conn.connection; + + try { + await db.execute("CREATE DATABSE jaguar_learn;"); + } catch (e) {} finally {} + + try { + await db.execute("DROP TABLE posts;"); + } catch (e) {} finally {} + + try { + await db.execute( + "CREATE TABLE posts (id SERIAL PRIMARY KEY, name VARCHAR(255), age INT);"); + } catch (e) {} finally { + if (conn != null) await conn.release(); + } } Future main(List args) async { - final api = new PostgresExampleApi(); + await setup(); - Jaguar server = new Jaguar(multiThread: false); - server.addApi(reflectJaguar(api)); + final server = new Jaguar(port: 10000); + server.add(reflect(PostgresExampleApi())); + server.log.onRecord.listen(print); - await server.serve(); + await server.serve(logRequests: true); } diff --git a/jaguar_postgres/lib/src/interceptor.dart b/jaguar_postgres/lib/src/interceptor.dart index 90d05d8..b7802b6 100644 --- a/jaguar_postgres/lib/src/interceptor.dart +++ b/jaguar_postgres/lib/src/interceptor.dart @@ -4,71 +4,51 @@ import 'dart:async'; import 'package:jaguar/jaguar.dart'; import 'package:postgres/postgres.dart'; -import 'package:connection_pool/connection_pool.dart'; +import 'package:conn_pool/conn_pool.dart'; import 'pool.dart'; -class PostgresDb extends Interceptor { - /// ID for the interceptor instance - final String id; - - final String host; - - final int port; - - final String databaseName; - - final String username; - - final String password; - - final bool useSSL; - - final int poolSize; - +class PostgresPool { /// The connection pool - PostgresDbPool _pool; - - /// The connection - ManagedConnection _managedConnection; - - /// Returns the mongodb connection - PostgreSQLConnection get conn => _managedConnection?.conn; - - PostgresDb(this.host, this.port, this.databaseName, - {this.username, - this.password, - this.useSSL: false, - this.poolSize: 10, - this.id}); - - Future pre(Context ctx) async { - _pool = new PostgresDbPool.Named(host, port, databaseName, - username: username, - password: password, - useSSL: useSSL, - poolSize: poolSize); - _managedConnection = await _pool.getConnection(); - return conn; - } - - /// Closes the connection on route exit - Null post(Context ctx, Response incoming) { - _releaseConn(); - return null; - } - - /// Closes the connection in case an exception occurs in route chain before - /// [post] is called - Future onException() async { - _releaseConn(); + final Pool pool; + + PostgresPool(String databaseName, + {String host: 'localhost', + int port: 5432, + String username: 'postgres', + String password, + bool useSsl: false, + int timeoutInSeconds: 30, + String timeZone: "UTC", + int minPoolSize: 10, + int maxPoolSize: 10}) + : pool = SharedPool( + PostgresManager(databaseName, + host: host, + port: port, + username: username, + password: password, + useSsl: useSsl, + timeoutInSeconds: timeoutInSeconds, + timeZone: timeZone), + minSize: minPoolSize, + maxSize: maxPoolSize); + + PostgresPool.fromPool({this.pool}); + + PostgresPool.fromManager({PostgresManager manager}) + : pool = SharedPool(manager); + + Future newInterceptor(Context ctx) async { + Connection conn = await pool.get(); + ctx.addVariable(conn.connection); + ctx.after.add((_) => _releaseConn(conn)); + ctx.onException.add((Context ctx, _1, _2) => _releaseConn(conn)); + return conn.connection; } /// Releases connection - void _releaseConn() { - if (_managedConnection != null) { - _pool.releaseConnection(_managedConnection); - _managedConnection = null; - } + Future _releaseConn(Connection conn) async { + if (!conn.isReleased) await conn.release(); } } diff --git a/jaguar_postgres/lib/src/pool.dart b/jaguar_postgres/lib/src/pool.dart index a6e0a32..c9c49f1 100644 --- a/jaguar_postgres/lib/src/pool.dart +++ b/jaguar_postgres/lib/src/pool.dart @@ -1,59 +1,58 @@ import 'dart:async'; -import 'package:connection_pool/connection_pool.dart'; +import 'package:conn_pool/conn_pool.dart'; import 'package:postgres/postgres.dart'; -class PostgresDbPool extends ConnectionPool { - String host; +class PostgresManager extends ConnectionManager { + /// Hostname of database this connection refers to. + final String host; - int port; + /// Port of database this connection refers to. + final int port; - String databaseName; + /// Name of database this connection refers to. + final String databaseName; - String username; + /// Username for authenticating this connection. + final String username; - String password; + /// Password for authenticating this connection. + final String password; - bool useSsl; + /// Whether or not this connection should connect securely. + final bool useSsl; - PostgresDbPool(this.host, this.port, this.databaseName, - {this.username, this.password, this.useSsl: false, int poolSize: 10}) - : super(poolSize); + /// The amount of time this connection will wait during connecting before + /// giving up. + final int timeoutInSeconds; - @override - void closeConnection(PostgreSQLConnection connection) { - connection.close(); - } + /// The timezone of this connection for date operations that don't specify a + /// timezone. + final String timeZone; + + PostgresManager(this.databaseName, + {this.host: 'localhost', + this.port: 5432, + this.username: 'postgres', + this.password, + this.useSsl: false, + this.timeoutInSeconds: 30, + this.timeZone: "UTC"}); @override - Future openNewConnection() async { - PostgreSQLConnection conn = new PostgreSQLConnection( - host, port, databaseName, - username: username, password: password, useSSL: useSsl); + Future open() async { + PostgreSQLConnection conn = PostgreSQLConnection(host, port, databaseName, + username: username, + password: password, + useSSL: useSsl, + timeoutInSeconds: timeoutInSeconds, + timeZone: timeZone); await conn.open(); return conn; } - /// Collection of named pools - static Map _pools = {}; - - /// Creates a named pool or returns an existing one if the pool with given - /// name already exists - factory PostgresDbPool.Named(String host, int port, String databaseName, - {String username, - String password, - bool useSSL: false, - int poolSize: 10}) { - final String name = - '$username:$password@$host:$port/$databaseName/$poolSize/$useSSL'; - if (_pools[name] == null) { - _pools[name] = new PostgresDbPool(host, port, databaseName, - username: username, - password: password, - useSsl: useSSL, - poolSize: poolSize); - } - - return _pools[name]; + @override + Future close(PostgreSQLConnection connection) { + return connection.close(); } } diff --git a/jaguar_postgres/pubspec.yaml b/jaguar_postgres/pubspec.yaml index e53276c..93758c0 100644 --- a/jaguar_postgres/pubspec.yaml +++ b/jaguar_postgres/pubspec.yaml @@ -1,5 +1,5 @@ name: jaguar_postgres -version: 0.5.0 +version: 2.1.1 description: A postgres interceptor for jaguar authors: @@ -8,9 +8,16 @@ authors: homepage: http://jaguar-dart.github.io dependencies: - postgres: '>=0.9.1 <0.10.0' - connection_pool: '>=0.1.2 <0.2.0' - jaguar: '>=0.5.3 <0.6.0' + postgres: ^1.0.0-beta.1 + jaguar: ^2.1.8 dev_dependencies: - jaguar_reflect: ^0.5.0 + jaguar_reflect: ^2.1.1 + +dependency_overrides: + jaguar: + path: ../../jaguar/jaguar + jaguar_reflect: + path: ../../jaguar/reflect + conn_pool: + path: ../conn_pool