From f5086ede8b90518cec6925792e95de5a136725a5 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Tue, 12 Aug 2014 15:11:50 +0400 Subject: [PATCH 01/13] Added multiple connections support. --- src/pg.cc | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/pg.cc b/src/pg.cc index df8e75f..0596436 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -20,7 +20,6 @@ pool_t * pool; - v8::Handle pg_init(const v8::Arguments &args) { v8::HandleScope scope; @@ -41,23 +40,29 @@ v8::Handle pg_init(const v8::Arguments &args) { pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, v8::Local::Cast(args[2])); - return scope.Close(v8::Undefined()); + return scope.Close(pool); } v8::Handle pg_exec(const v8::Arguments &args) { v8::HandleScope scope; - if (args.Length() < 1) { - return throw_type_error("First argument must be query request!"); + if (args.Length() < 1) { + return throw_type_error("First argument must be pool handle!"); + } + + if (args.Length() < 2) { + return throw_type_error("Second argument must be query request!"); } - if (args.Length() < 2 && !args[1]->IsFunction()) { - return throw_type_error("Second argument must be query callback!"); + if (args.Length() < 3 && !args[2]->IsFunction()) { + return throw_type_error("Third argument must be query callback!"); } + + pool = args[0]; - v8::String::Utf8Value str(args[0]->ToString()); + v8::String::Utf8Value str(args[1]->ToString()); - query_t * query = query_alloc(v8::Local::Cast(args[1]), *str); + query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); pool_exec(pool, query); @@ -68,6 +73,12 @@ v8::Handle pg_exec(const v8::Arguments &args) { v8::Handle pg_destroy(const v8::Arguments &args) { v8::HandleScope scope; + if (args.Length() < 1) { + return throw_type_error("First argument must be pool handle!"); + } + + pool = args[0]; + pool_destroy(pool); return scope.Close(v8::Undefined()); From 6aecfa7a77d673e25f2a60a30c172239f22cbee5 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Tue, 12 Aug 2014 16:16:44 +0400 Subject: [PATCH 02/13] Added pool handle validation. --- src/pg.cc | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/pg.cc b/src/pg.cc index 0596436..7c233bc 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -20,6 +20,18 @@ pool_t * pool; + +bool setPool(pool_t * newPool){ + pool_t * handle = (pool_t *) newPool->ToObject()->GetPointerFromInternalField(0); + + if (handle == NULL) { + return false; + } + + pool = newPool; + return true; +} + v8::Handle pg_init(const v8::Arguments &args) { v8::HandleScope scope; @@ -58,6 +70,9 @@ v8::Handle pg_exec(const v8::Arguments &args) { return throw_type_error("Third argument must be query callback!"); } + if (!setPool(args[0]])) + return throw_type_error("Invalid handle!") + pool = args[0]; v8::String::Utf8Value str(args[1]->ToString()); @@ -77,7 +92,8 @@ v8::Handle pg_destroy(const v8::Arguments &args) { return throw_type_error("First argument must be pool handle!"); } - pool = args[0]; + if (!setPool(args[0]])) + return throw_type_error("Invalid handle!") pool_destroy(pool); From 88b744b9a79e407f8c7585ed5a9d97f1f391a9d1 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Tue, 12 Aug 2014 18:47:19 +0400 Subject: [PATCH 03/13] Added pool as local var. --- src/pg.cc | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/src/pg.cc b/src/pg.cc index 7c233bc..3ab3c12 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -17,19 +17,10 @@ #include +pool_t getPool(v8::Handle * newPool){ + pool_t * pool = (pool_t *) newPool->ToObject()->GetPointerFromInternalField(0); -pool_t * pool; - - -bool setPool(pool_t * newPool){ - pool_t * handle = (pool_t *) newPool->ToObject()->GetPointerFromInternalField(0); - - if (handle == NULL) { - return false; - } - - pool = newPool; - return true; + return pool; } v8::Handle pg_init(const v8::Arguments &args) { @@ -48,7 +39,9 @@ v8::Handle pg_init(const v8::Arguments &args) { } v8::String::Utf8Value str(args[1]->ToString()); - + + pool_t * pool; + pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, v8::Local::Cast(args[2])); @@ -70,10 +63,10 @@ v8::Handle pg_exec(const v8::Arguments &args) { return throw_type_error("Third argument must be query callback!"); } - if (!setPool(args[0]])) - return throw_type_error("Invalid handle!") + pool_t * pool = getPool(args[0]); - pool = args[0]; + if (pool == NULL) + return throw_type_error("Invalid handle!") v8::String::Utf8Value str(args[1]->ToString()); @@ -92,7 +85,9 @@ v8::Handle pg_destroy(const v8::Arguments &args) { return throw_type_error("First argument must be pool handle!"); } - if (!setPool(args[0]])) + pool_t * pool = getPool(args[0]); + + if (pool == NULL) return throw_type_error("Invalid handle!") pool_destroy(pool); From f69ef875d2db78aec6c9787d8ca884236aac8f92 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Mon, 18 Aug 2014 11:01:57 +0400 Subject: [PATCH 04/13] Working with several pg accounts --- Makefile | 2 +- benchmarks/pg.js | 4 ++-- benchmarks/test_pg.js | 43 +++++++++++++++++++++++++++++++++++ lib/pg/pg.js | 10 ++++++--- src/pg.cc | 52 +++++++++++++++++++++++-------------------- src/pool.cc | 2 ++ src/pool.h | 2 ++ 7 files changed, 85 insertions(+), 30 deletions(-) create mode 100644 benchmarks/test_pg.js diff --git a/Makefile b/Makefile index 9915c2a..f818166 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -TOOLS_PATH = /home/livetex/livetex-tools/0.3.4 +TOOLS_PATH = /home/aleksey/work/Node-Pg/node_modules/livetex-tools/ include $(TOOLS_PATH)/rules/js.mk include $(TOOLS_PATH)/rules/cpp.mk \ No newline at end of file diff --git a/benchmarks/pg.js b/benchmarks/pg.js index 4e7fe33..9872014 100644 --- a/benchmarks/pg.js +++ b/benchmarks/pg.js @@ -1,6 +1,6 @@ var pg = require('../bin'); -pg.init(5, { +var first = pg.init(5, { 'dbname': 'relive', 'user': 'test', 'password': 'lttest', @@ -19,7 +19,7 @@ var t = Date.now(); var mem = 0; function exec() { - pg.exec(query, complete, cancel); + pg.exec(first, query, complete, cancel); } diff --git a/benchmarks/test_pg.js b/benchmarks/test_pg.js new file mode 100644 index 0000000..f882fc9 --- /dev/null +++ b/benchmarks/test_pg.js @@ -0,0 +1,43 @@ +var pg = require('../bin'); +var util = require('util'); + +//pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, v8::Local::Cast(args[2])); + +var first = pg.init(5, { + 'dbname': 'relive', + 'user': 'test', + 'password': 'lttest', + 'host': '192.168.48.14', + 'port': '5432' +}); + +var second = pg.init(5, { + 'dbname': 'relive', + 'user': 'test', + 'password': 'lttest', + 'host': '192.168.48.14', + 'port': '5432', + 'connect_timeout': '5' +}); + +console.log("first", first); + + //console.log(util.inspect(first)); + // console.log(util.inspect(second)); + + +pg.exec(first, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); +}, console.error); + +pg.exec(second, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); +}, console.error); + +pg.exec(first, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); +}, console.error); + +pg.exec(second, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); +}, console.error); \ No newline at end of file diff --git a/lib/pg/pg.js b/lib/pg/pg.js index 7f01c18..884cac4 100644 --- a/lib/pg/pg.js +++ b/lib/pg/pg.js @@ -142,12 +142,16 @@ pg.escapeArray = function(array) { * keys specified in documentation. * @param {!pg.ErrorHandler=} opt_errorHandler Connection error handler. *
console.error
will be used by default. + * return handle */ pg.init = function(size, options, opt_errorHandler) { - __pg.init(size, querystring.unescape( + return __pg.init(size, querystring.unescape( querystring.stringify(options, ' ')), opt_errorHandler || console.error); }; +// pg.initpool = function() { +// return __pg.initpool(); +// }; /** * SQL-query executing. @@ -158,8 +162,8 @@ pg.init = function(size, options, opt_errorHandler) { * @param {!pg.ResultHandler} complete Success result handler. * @param {!pg.ErrorHandler} cancel Execution error handler. */ -pg.exec = function(query, complete, cancel) { - __pg.exec(query, function(error, result) { +pg.exec = function(handle, query, complete, cancel) { + __pg.exec(handle, query, function(error, result) { if (error.length > 0) { cancel(error); } else { diff --git a/src/pg.cc b/src/pg.cc index 3ab3c12..f7be4cf 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -17,15 +18,14 @@ #include -pool_t getPool(v8::Handle * newPool){ - pool_t * pool = (pool_t *) newPool->ToObject()->GetPointerFromInternalField(0); - - return pool; -} +v8::Persistent handle_tmpl = v8::Persistent::New(v8::ObjectTemplate::New()); v8::Handle pg_init(const v8::Arguments &args) { + v8::HandleScope scope; + handle_tmpl->SetInternalFieldCount(1); + if (args.Length() < 1) { return throw_type_error("First argument must be max pool size!"); } @@ -34,27 +34,31 @@ v8::Handle pg_init(const v8::Arguments &args) { return throw_type_error("Second argument must be connection string!"); } - if (args.Length() < 3 && !args[2]->IsFunction()) { + if (args.Length() < 3 && !args[2]->IsFunction()) { return throw_type_error("Third argument must be error callback!"); } v8::String::Utf8Value str(args[1]->ToString()); - - pool_t * pool; - + + pool_t * pool = pool_alloc(); + pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, - v8::Local::Cast(args[2])); + v8::Local::Cast(args[2])); + + pool->data = v8::Persistent::New(handle_tmpl->NewInstance()); - return scope.Close(pool); + pool->data->SetPointerInInternalField(0, pool); + + return scope.Close(pool->data); } -v8::Handle pg_exec(const v8::Arguments &args) { +v8::Handle pg_exec(const v8::Arguments& args) { v8::HandleScope scope; if (args.Length() < 1) { return throw_type_error("First argument must be pool handle!"); } - + if (args.Length() < 2) { return throw_type_error("Second argument must be query request!"); } @@ -62,11 +66,11 @@ v8::Handle pg_exec(const v8::Arguments &args) { if (args.Length() < 3 && !args[2]->IsFunction()) { return throw_type_error("Third argument must be query callback!"); } - - pool_t * pool = getPool(args[0]); - + + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); + if (pool == NULL) - return throw_type_error("Invalid handle!") + return throw_type_error("Invalid handle!1"); v8::String::Utf8Value str(args[1]->ToString()); @@ -78,26 +82,26 @@ v8::Handle pg_exec(const v8::Arguments &args) { } -v8::Handle pg_destroy(const v8::Arguments &args) { +v8::Handle pg_destroy(const v8::Arguments& args) { v8::HandleScope scope; if (args.Length() < 1) { return throw_type_error("First argument must be pool handle!"); } - pool_t * pool = getPool(args[0]); - + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); + if (pool == NULL) - return throw_type_error("Invalid handle!") - + return throw_type_error("Invalid handle!2"); + pool_destroy(pool); - return scope.Close(v8::Undefined()); + //like this? + return scope.Close(pool->data); } void init (v8::Handle target) { - pool = pool_alloc(); v8::HandleScope scope; diff --git a/src/pool.cc b/src/pool.cc index 7e3227e..fe2400e 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -6,6 +6,7 @@ */ #include +#include "stddef.h" #include "pool.h" #include "connection.h" @@ -13,6 +14,7 @@ #include "utils.h" + void pool_spawn_connection(pool_t * pool) { connection_t * connection = connection_alloc(pool->connection_info, pool); diff --git a/src/pool.h b/src/pool.h index 92d6684..d080c12 100644 --- a/src/pool.h +++ b/src/pool.h @@ -22,6 +22,8 @@ typedef struct pool_ { query_t * query_queue; + v8::Persistent data; + struct connection_ * connection_queue; v8::Persistent error_callback; From 7f93b55e1f54336dd20f6129dc31d43b56a79939 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Mon, 18 Aug 2014 12:58:03 +0400 Subject: [PATCH 05/13] With correct destroy --- benchmarks/test_pg.js | 17 ++++++----------- lib/pg/pg.js | 8 ++------ src/pg.cc | 19 ++++++++++++------- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/benchmarks/test_pg.js b/benchmarks/test_pg.js index f882fc9..bc86772 100644 --- a/benchmarks/test_pg.js +++ b/benchmarks/test_pg.js @@ -1,5 +1,4 @@ var pg = require('../bin'); -var util = require('util'); //pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, v8::Local::Cast(args[2])); @@ -20,24 +19,20 @@ var second = pg.init(5, { 'connect_timeout': '5' }); -console.log("first", first); +var un; + - //console.log(util.inspect(first)); - // console.log(util.inspect(second)); pg.exec(first, "SELECT 1 AS value", function(table) { console.log('Result table:', table); + pg.destroy(first); }, console.error); pg.exec(second, "SELECT 1 AS value", function(table) { console.log('Result table:', table); + pg.destroy(second); }, console.error); -pg.exec(first, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); -}, console.error); - -pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); -}, console.error); \ No newline at end of file + // pg.destroy(first); + // pg.destroy(second); \ No newline at end of file diff --git a/lib/pg/pg.js b/lib/pg/pg.js index 884cac4..b67a759 100644 --- a/lib/pg/pg.js +++ b/lib/pg/pg.js @@ -149,10 +149,6 @@ pg.init = function(size, options, opt_errorHandler) { querystring.stringify(options, ' ')), opt_errorHandler || console.error); }; -// pg.initpool = function() { -// return __pg.initpool(); -// }; - /** * SQL-query executing. * @@ -229,7 +225,7 @@ pg.prepareQuery = function(query, params) { /** * Destroy connection pool. */ -pg.destroy = function() { - __pg.destroy(); +pg.destroy = function(handle) { + return __pg.destroy(handle); }; diff --git a/src/pg.cc b/src/pg.cc index f7be4cf..d853c54 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -55,7 +55,7 @@ v8::Handle pg_init(const v8::Arguments &args) { v8::Handle pg_exec(const v8::Arguments& args) { v8::HandleScope scope; - if (args.Length() < 1) { + if ((args.Length() < 1) || (!args[0]->IsObject())) { return throw_type_error("First argument must be pool handle!"); } @@ -67,14 +67,17 @@ v8::Handle pg_exec(const v8::Arguments& args) { return throw_type_error("Third argument must be query callback!"); } + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); if (pool == NULL) - return throw_type_error("Invalid handle!1"); + return throw_type_error("Invalid handle!"); v8::String::Utf8Value str(args[1]->ToString()); - query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); + query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); pool_exec(pool, query); @@ -85,19 +88,21 @@ v8::Handle pg_exec(const v8::Arguments& args) { v8::Handle pg_destroy(const v8::Arguments& args) { v8::HandleScope scope; - if (args.Length() < 1) { + if ((args.Length() < 1) || (!args[0]->IsObject())) { return throw_type_error("First argument must be pool handle!"); } + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); if (pool == NULL) - return throw_type_error("Invalid handle!2"); + return throw_type_error("Invalid handle!"); pool_destroy(pool); - //like this? - return scope.Close(pool->data); + return scope.Close(v8::Undefined()); } From 4e1f2d5e37d8dc3a4c53480cbc11e51bbe8d161e Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Thu, 21 Aug 2014 17:15:07 +0400 Subject: [PATCH 06/13] Add timeout --- benchmarks/pg.js | 14 ++++- benchmarks/test_pg.js | 82 +++++++++++++++++++---------- lib/pg/pg.js | 5 +- src/connection.cc | 117 ++++++++++++++++++++++++++---------------- src/connection.h | 17 +++--- src/pg.cc | 57 ++++++++++---------- src/pool.cc | 19 +++---- src/pool.h | 1 + 8 files changed, 186 insertions(+), 126 deletions(-) diff --git a/benchmarks/pg.js b/benchmarks/pg.js index 9872014..108cc84 100644 --- a/benchmarks/pg.js +++ b/benchmarks/pg.js @@ -9,6 +9,15 @@ var first = pg.init(5, { 'connect_timeout': '5' }); +var second = pg.init(5, { + 'dbname': 'relive', + 'user': 'test', + 'password': 'lttest', + 'host': '192.168.48.14', + 'port': '5432', + 'connect_timeout': '5' +}); + var count = 0; var step = 1; var query = process.argv[3] || 'SELECT * FROM main.member LIMIT 100'; @@ -19,7 +28,7 @@ var t = Date.now(); var mem = 0; function exec() { - pg.exec(first, query, complete, cancel); + pg.exec(first, query, complete(first.name), cancel); } @@ -30,11 +39,12 @@ function cancel() { complete(); } -function complete() { +function complete(name) { mem += process.memoryUsage().heapUsed/1024/1024; if ((r += 1) === count) { console.log('[LIVETEX-NODE-PG] | R:', r, ' | E:', e, ' | T:', Date.now() - t, ' | M:', (Math.round(mem/r*10)/10)); + run(); } } diff --git a/benchmarks/test_pg.js b/benchmarks/test_pg.js index bc86772..63261ef 100644 --- a/benchmarks/test_pg.js +++ b/benchmarks/test_pg.js @@ -1,38 +1,68 @@ +var assert = require('assert'); var pg = require('../bin'); -//pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, v8::Local::Cast(args[2])); - -var first = pg.init(5, { - 'dbname': 'relive', - 'user': 'test', - 'password': 'lttest', - 'host': '192.168.48.14', - 'port': '5432' -}); - -var second = pg.init(5, { +var init_info = { 'dbname': 'relive', 'user': 'test', 'password': 'lttest', 'host': '192.168.48.14', 'port': '5432', 'connect_timeout': '5' -}); - -var un; - +} +var first, second; +//first = pg.init(5, init_info); +second = pg.init(5, init_info); -pg.exec(first, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.destroy(first); -}, console.error); - +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +//pg.exec(first, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); +pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.destroy(second); -}, console.error); - - // pg.destroy(first); - // pg.destroy(second); \ No newline at end of file + console.log('Result table:', table); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) { + console.log('Result table:', table); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); + }, console.error); + }, console.error); + }, console.error); + }, console.error); +}, console.error); \ No newline at end of file diff --git a/lib/pg/pg.js b/lib/pg/pg.js index b67a759..e1e451d 100644 --- a/lib/pg/pg.js +++ b/lib/pg/pg.js @@ -146,7 +146,7 @@ pg.escapeArray = function(array) { */ pg.init = function(size, options, opt_errorHandler) { return __pg.init(size, querystring.unescape( - querystring.stringify(options, ' ')), opt_errorHandler || console.error); + querystring.stringify(options, ' ')), opt_errorHandler); }; /** @@ -221,11 +221,10 @@ pg.prepareQuery = function(query, params) { return query.replace(pg.__PARAM_EXP, replacer); }; - /** * Destroy connection pool. */ pg.destroy = function(handle) { - return __pg.destroy(handle); + __pg.destroy(handle); }; diff --git a/src/connection.cc b/src/connection.cc index 94d7842..a722e4d 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -14,7 +14,6 @@ #include "queue.h" #include "utils.h" - void connection_connect_work(uv_work_t * work) { connection_t * connection = (connection_t *) work->data; @@ -26,7 +25,6 @@ void connection_connect_work(uv_work_t * work) { } } - void connection_exec_work(uv_work_t * work) { connection_t * connection = (connection_t *) work->data; @@ -40,27 +38,26 @@ void connection_exec_work(uv_work_t * work) { PGresult * result = PQexec(connection->descriptor, query->request); switch (PQresultStatus(result)) { - case PGRES_COMMAND_OK: { - PQclear(result); - break; - } - - case PGRES_TUPLES_OK: { - query->result = result; - break; - } - - default: { - query->error = copy_string(PQresultErrorMessage(result)); - PQclear(result); - break; - } + case PGRES_COMMAND_OK: { + PQclear(result); + break; + } + + case PGRES_TUPLES_OK: { + query->result = result; + break; + } + + default: { + query->error = copy_string(PQresultErrorMessage(result)); + PQclear(result); + break; + } } } } } - void connection_work_handler(uv_work_t * work) { connection_t * connection = (connection_t *) work->data; @@ -68,7 +65,8 @@ void connection_work_handler(uv_work_t * work) { pool_t * pool = connection->pool; if (connection->current_query != NULL) { - queue_unshift(pool->query_queue, connection->current_query); + queue_unshift(pool->query_queue, connection->current_query) + ; connection->current_query = NULL; } @@ -84,25 +82,27 @@ void connection_work_handler(uv_work_t * work) { free(work); } - void connection_queue_work(connection_t * connection, uv_work_cb work) { uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t)); work_item->data = connection; connection->activity_status = BUSY; + connection->readyForFree = false; uv_queue_work(uv_default_loop(), work_item, work, (uv_after_work_cb) connection_work_handler); } - void connection_fetch_query(connection_t * connection) { if (connection->current_query == NULL && connection->status != DESTROYING) { + queue_shift(connection->pool->query_queue, connection->current_query); if (connection->current_query != NULL) { + connection_queue_work(connection, connection_exec_work); } else { + connection_destroy(connection); } } @@ -126,23 +126,48 @@ connection_t * connection_alloc(char * connection_info, pool_t * pool) { connection->error = NULL; - queue_push(pool->connection_queue, connection); + connection->timer = (uv_timer_t *) malloc(sizeof(uv_timer_t)); + + queue_push(pool->connection_queue, connection) + ; return connection; } - void connection_init(connection_t * connection) { connection->status = INITIALIZING; + + uv_timer_init(uv_default_loop(), connection->timer); + connection->timer->data = connection; + connection_queue_work(connection, connection_connect_work); } - void connection_destroy(connection_t * connection) { - connection->status = DESTROYING; - connection_process(connection); + connection->status = WAIT; } +void cbk(uv_idle_t * handle, int status) { + + connection_t * connection = (connection_t *) handle->data; + + if (connection->activity_status == BUSY) { + return; + } + + connection_process(connection); + + if (connection->status == WAIT) { + if (connection->readyForFree) { + connection->status = DESTROYING; + connection_process(connection); + } else { + connection->readyForFree = true; + connection_process(connection); + } + + } +} void connection_process(connection_t * connection) { if (connection->activity_status == FREE) { @@ -150,30 +175,31 @@ void connection_process(connection_t * connection) { connection->current_query = NULL; switch (connection->status) { - case INITIALIZING: { - connection->status = ACTIVE; - connection_fetch_query(connection); - - break; - } + case INITIALIZING: { + uv_timer_start(connection->timer, (uv_timer_cb) cbk, 0, 100); + connection->status = ACTIVE; + connection_fetch_query(connection); - case ACTIVE: { - connection_fetch_query(connection); + break; + } - break; - } + case ACTIVE: + case WAIT: { + connection_fetch_query(connection); - case DESTROYING: { - connection_free(connection); + break; + } - break; - } + case DESTROYING: { + connection_free(connection); - case NEW: { - break; - } + break; } + case NEW: { + break; + } + } if (query != NULL) { query_apply(query); @@ -182,7 +208,6 @@ void connection_process(connection_t * connection) { } } - void connection_free(connection_t * connection) { if (connection->prev != NULL) { queue_remove(connection); @@ -196,6 +221,10 @@ void connection_free(connection_t * connection) { query_free(connection->current_query); } + uv_timer_stop(connection->timer); + + free(connection->timer); free(connection->connection_info); free(connection); + } diff --git a/src/connection.h b/src/connection.h index 7883fe0..2fb7d42 100644 --- a/src/connection.h +++ b/src/connection.h @@ -8,7 +8,6 @@ #ifndef CONNECTION_H_ #define CONNECTION_H_ - #include #include @@ -16,20 +15,14 @@ #include "query.h" #include "pool.h" - typedef enum { - NEW = 0, - INITIALIZING, - ACTIVE, - DESTROYING + NEW = 0, INITIALIZING, ACTIVE, WAIT, DESTROYING } entity_status_t; typedef enum { - BUSY = 0, - FREE + BUSY = 0, FREE } activity_status_t; - typedef struct connection_ { char * connection_info; @@ -45,11 +38,14 @@ typedef struct connection_ { entity_status_t status; activity_status_t activity_status; + uv_timer_t * timer; + + bool readyForFree; + char * error; } connection_t; - connection_t * connection_alloc(char * connection_info, struct pool_ * pool); void connection_init(connection_t * connection); @@ -60,5 +56,4 @@ void connection_destroy(connection_t * connection); void connection_free(connection_t * connection); - #endif /* CONNECTION_H_ */ diff --git a/src/pg.cc b/src/pg.cc index d853c54..ad96425 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -10,7 +10,6 @@ #include #include - #include "pool.h" #include "query.h" #include "connection.h" @@ -18,19 +17,20 @@ #include -v8::Persistent handle_tmpl = v8::Persistent::New(v8::ObjectTemplate::New()); +v8::Persistent handle_tmpl = v8::Persistent< + v8::ObjectTemplate>::New(v8::ObjectTemplate::New()); v8::Handle pg_init(const v8::Arguments &args) { - v8::HandleScope scope; + v8::HandleScope scope; - handle_tmpl->SetInternalFieldCount(1); + handle_tmpl->SetInternalFieldCount(1); - if (args.Length() < 1) { + if (args.Length() < 1 || (!args[0]->IsNumber())) { return throw_type_error("First argument must be max pool size!"); } - if (args.Length() < 2) { + if (args.Length() < 2 || (!args[1]->IsString())) { return throw_type_error("Second argument must be connection string!"); } @@ -49,28 +49,29 @@ v8::Handle pg_init(const v8::Arguments &args) { pool->data->SetPointerInInternalField(0, pool); - return scope.Close(pool->data); + return scope.Close(pool->data); } v8::Handle pg_exec(const v8::Arguments& args) { - v8::HandleScope scope; + v8::HandleScope scope; if ((args.Length() < 1) || (!args[0]->IsObject())) { return throw_type_error("First argument must be pool handle!"); } - if (args.Length() < 2) { + if (args.Length() < 2 || (!args[1]->IsString())) { return throw_type_error("Second argument must be query request!"); } - if (args.Length() < 3 && !args[2]->IsFunction()) { + if (args.Length() < 3 && !args[2]->IsFunction()) { return throw_type_error("Third argument must be query callback!"); } - if (args[0]->ToObject()->InternalFieldCount() < 1) - return throw_type_error("Invalid handle!"); + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); if (pool == NULL) return throw_type_error("Invalid handle!"); @@ -79,46 +80,44 @@ v8::Handle pg_exec(const v8::Arguments& args) { query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); - pool_exec(pool, query); + pool_exec(pool, query); - return scope.Close(v8::Undefined()); + return scope.Close(v8::Undefined()); } - v8::Handle pg_destroy(const v8::Arguments& args) { - v8::HandleScope scope; + v8::HandleScope scope; if ((args.Length() < 1) || (!args[0]->IsObject())) { return throw_type_error("First argument must be pool handle!"); } if (args[0]->ToObject()->InternalFieldCount() < 1) - return throw_type_error("Invalid handle!"); + return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); if (pool == NULL) return throw_type_error("Invalid handle!"); - pool_destroy(pool); + pool_destroy(pool); return scope.Close(v8::Undefined()); } - -void init (v8::Handle target) { +void init(v8::Handle target) { v8::HandleScope scope; - target->Set(v8::String::New("init"), - v8::FunctionTemplate::New(pg_init)->GetFunction()); + target->Set(v8::String::New("init"), + v8::FunctionTemplate::New(pg_init)->GetFunction()); - target->Set(v8::String::New("exec"), - v8::FunctionTemplate::New(pg_exec)->GetFunction()); + target->Set(v8::String::New("exec"), + v8::FunctionTemplate::New(pg_exec)->GetFunction()); - target->Set(v8::String::New("destroy"), - v8::FunctionTemplate::New(pg_destroy)->GetFunction()); + target->Set(v8::String::New("destroy"), + v8::FunctionTemplate::New(pg_destroy)->GetFunction()); } - NODE_MODULE(pg, init) diff --git a/src/pool.cc b/src/pool.cc index fe2400e..9ffe369 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -13,15 +13,12 @@ #include "queue.h" #include "utils.h" - - void pool_spawn_connection(pool_t * pool) { connection_t * connection = connection_alloc(pool->connection_info, pool); connection_init(connection); } - pool_t * pool_alloc() { pool_t * pool = (pool_t *) malloc(sizeof(pool_t)); @@ -33,29 +30,27 @@ pool_t * pool_alloc() { pool->query_queue = (query_t *) malloc(sizeof(query_t)); queue_init(pool->connection_queue); + queue_init(pool->query_queue); return pool; } - void pool_init(pool_t * pool, size_t max_size, const char * connection_info, - v8::Local error_callback) { - + v8::Local error_callback) { pool->connection_info = copy_string(connection_info); pool->max_size = max_size; pool->error_callback = v8::Persistent::New(error_callback); } - void pool_exec(pool_t * pool, query_t * query) { - queue_push(pool->query_queue, query); + queue_push(pool->query_queue, query) + ; pool_process(pool); } - void pool_handle_error(pool_t * pool, char * error) { v8::HandleScope scope; v8::Handle argv[1]; @@ -65,13 +60,13 @@ void pool_handle_error(pool_t * pool, char * error) { pool->error_callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); } - void pool_process(pool_t * pool) { size_t i = 0; connection_t * connection = pool->connection_queue->prev; connection_t * prev = NULL; while (connection != pool->connection_queue) { + prev = connection->prev; connection_process(connection); @@ -94,6 +89,9 @@ void pool_destroy(pool_t * pool) { pool->max_size = 0; pool->error_callback.Dispose(); + pool->data->SetPointerInInternalField(0, NULL); + pool->data.Dispose(); + free(pool->connection_info); pool->connection_info = NULL; } @@ -107,4 +105,3 @@ void pool_free(pool_t * pool) { free(pool); } - diff --git a/src/pool.h b/src/pool.h index d080c12..d1643dc 100644 --- a/src/pool.h +++ b/src/pool.h @@ -27,6 +27,7 @@ typedef struct pool_ { struct connection_ * connection_queue; v8::Persistent error_callback; + } pool_t; From 62a6500da706ae3f909f9a2a3eb2190caf8e2710 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Thu, 21 Aug 2014 17:34:52 +0400 Subject: [PATCH 07/13] Fix formatting --- benchmarks/test_pg.js | 67 ++------ src/connection.cc | 390 ++++++++++++++++++++++++------------------ src/connection.h | 50 +++--- src/pg.cc | 139 ++++++++------- src/pool.cc | 128 ++++++++------ 5 files changed, 414 insertions(+), 360 deletions(-) diff --git a/benchmarks/test_pg.js b/benchmarks/test_pg.js index 63261ef..c0c3891 100644 --- a/benchmarks/test_pg.js +++ b/benchmarks/test_pg.js @@ -12,57 +12,18 @@ var init_info = { var first, second; -//first = pg.init(5, init_info); -second = pg.init(5, init_info); +assert.throws( function() { first = pg.init(); }, Error, "Bad init" ); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -//pg.exec(first, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); -pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) { - console.log('Result table:', table); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - pg.exec(second, "SELECT 1 AS value", function(table) {console.log('Result table:', table);}, console.error); - }, console.error); - }, console.error); - }, console.error); - }, console.error); -}, console.error); \ No newline at end of file +assert.throws( function() { pg.exec(first, first, function(table) {}, console.error); }, Error, "Incorrect req with bad handle" ); + +assert.throws( function() { pg.exec(second, second, function(table) {}, console.error); }, Error, "Incorrect req with good handle"); + +assert.throws( function() { pg.destroy(); }, Error, "Destroy without handle" ); + +assert.throws( function() { pg.destroy(first); }, Error, "Destroy with incorrect handle" ); + +assert.doesNotThrow( function() { second = pg.init(5, init_info); }, Error, "Good init" ); + +assert.doesNotThrow( function() { pg.exec(second, "SELECT 1 AS value", function(table) {}, console.error); }, Error, "Good req with good handle" ); + +assert.doesNotThrow( function() { pg.destroy(second); }, Error, "Good destroy with correct handle" ); diff --git a/src/connection.cc b/src/connection.cc index a722e4d..7e5d864 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -14,217 +14,269 @@ #include "queue.h" #include "utils.h" -void connection_connect_work(uv_work_t * work) { - connection_t * connection = (connection_t *) work->data; - - connection->descriptor = PQconnectdb(connection->connection_info); - - ConnStatusType status = PQstatus(connection->descriptor); - if (status != CONNECTION_OK) { - connection->error = copy_string(PQerrorMessage(connection->descriptor)); - } +void +connection_connect_work(uv_work_t * work) +{ + connection_t * connection = (connection_t *) work->data; + + connection->descriptor = PQconnectdb(connection->connection_info); + + ConnStatusType status = PQstatus(connection->descriptor); + if (status != CONNECTION_OK) + { + connection->error = copy_string(PQerrorMessage(connection->descriptor)); + } } -void connection_exec_work(uv_work_t * work) { - connection_t * connection = (connection_t *) work->data; - - ConnStatusType status = PQstatus(connection->descriptor); - if (status != CONNECTION_OK) { - connection->error = copy_string(PQerrorMessage(connection->descriptor)); - } else { - query_t * query = connection->current_query; - - if (query != NULL) { - PGresult * result = PQexec(connection->descriptor, query->request); - - switch (PQresultStatus(result)) { - case PGRES_COMMAND_OK: { - PQclear(result); - break; - } - - case PGRES_TUPLES_OK: { - query->result = result; - break; - } - - default: { - query->error = copy_string(PQresultErrorMessage(result)); - PQclear(result); - break; - } - } - } - } +void +connection_exec_work(uv_work_t * work) +{ + connection_t * connection = (connection_t *) work->data; + + ConnStatusType status = PQstatus(connection->descriptor); + if (status != CONNECTION_OK) + { + connection->error = copy_string(PQerrorMessage(connection->descriptor)); + } + else + { + query_t * query = connection->current_query; + + if (query != NULL) + { + PGresult * result = PQexec(connection->descriptor, query->request); + + switch (PQresultStatus(result)) + { + case PGRES_COMMAND_OK: + { + PQclear(result); + break; + } + + case PGRES_TUPLES_OK: + { + query->result = result; + break; + } + + default: + { + query->error = copy_string(PQresultErrorMessage(result)); + PQclear(result); + break; + } + } + } + } } -void connection_work_handler(uv_work_t * work) { - connection_t * connection = (connection_t *) work->data; +void +connection_work_handler(uv_work_t * work) +{ + connection_t * connection = (connection_t *) work->data; - if (connection->error != NULL) { - pool_t * pool = connection->pool; + if (connection->error != NULL) + { + pool_t * pool = connection->pool; - if (connection->current_query != NULL) { - queue_unshift(pool->query_queue, connection->current_query) - ; - connection->current_query = NULL; - } + if (connection->current_query != NULL) + { + queue_unshift(pool->query_queue, connection->current_query) + ; + connection->current_query = NULL; + } - pool_handle_error(pool, connection->error); - pool_process(pool); + pool_handle_error(pool, connection->error); + pool_process(pool); - connection_destroy(connection); - } + connection_destroy(connection); + } - connection->activity_status = FREE; - connection_process(connection); + connection->activity_status = FREE; + connection_process(connection); - free(work); + free(work); } -void connection_queue_work(connection_t * connection, uv_work_cb work) { - uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t)); - work_item->data = connection; +void +connection_queue_work(connection_t * connection, uv_work_cb work) +{ + uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t)); + work_item->data = connection; - connection->activity_status = BUSY; - connection->readyForFree = false; + connection->activity_status = BUSY; + connection->readyForFree = false; - uv_queue_work(uv_default_loop(), work_item, work, - (uv_after_work_cb) connection_work_handler); + uv_queue_work(uv_default_loop(), work_item, work, + (uv_after_work_cb) connection_work_handler); } -void connection_fetch_query(connection_t * connection) { - if (connection->current_query == NULL && connection->status != DESTROYING) { +void +connection_fetch_query(connection_t * connection) +{ + if (connection->current_query == NULL && connection->status != DESTROYING) + { - queue_shift(connection->pool->query_queue, connection->current_query); + queue_shift(connection->pool->query_queue, connection->current_query); - if (connection->current_query != NULL) { + if (connection->current_query != NULL) + { - connection_queue_work(connection, connection_exec_work); - } else { + connection_queue_work(connection, connection_exec_work); + } + else + { - connection_destroy(connection); - } - } + connection_destroy(connection); + } + } } -connection_t * connection_alloc(char * connection_info, pool_t * pool) { - connection_t * connection = (connection_t *) malloc(sizeof(connection_t)); - connection->status = NEW; - connection->activity_status = FREE; +connection_t * +connection_alloc(char * connection_info, pool_t * pool) +{ + connection_t * connection = (connection_t *) malloc(sizeof(connection_t)); + connection->status = NEW; + connection->activity_status = FREE; - connection->connection_info = copy_string(connection_info); + connection->connection_info = copy_string(connection_info); - connection->pool = pool; + connection->pool = pool; - connection->current_query = NULL; + connection->current_query = NULL; - connection->descriptor = NULL; + connection->descriptor = NULL; - connection->prev = NULL; - connection->next = NULL; + connection->prev = NULL; + connection->next = NULL; - connection->error = NULL; + connection->error = NULL; - connection->timer = (uv_timer_t *) malloc(sizeof(uv_timer_t)); + connection->timer = (uv_timer_t *) malloc(sizeof(uv_timer_t)); - queue_push(pool->connection_queue, connection) - ; + queue_push(pool->connection_queue, connection) + ; - return connection; + return connection; } -void connection_init(connection_t * connection) { - connection->status = INITIALIZING; +void +connection_init(connection_t * connection) +{ + connection->status = INITIALIZING; - uv_timer_init(uv_default_loop(), connection->timer); - connection->timer->data = connection; + uv_timer_init(uv_default_loop(), connection->timer); + connection->timer->data = connection; - connection_queue_work(connection, connection_connect_work); + connection_queue_work(connection, connection_connect_work); } -void connection_destroy(connection_t * connection) { - connection->status = WAIT; +void +connection_destroy(connection_t * connection) +{ + connection->status = WAIT; } -void cbk(uv_idle_t * handle, int status) { - - connection_t * connection = (connection_t *) handle->data; - - if (connection->activity_status == BUSY) { - return; - } - - connection_process(connection); - - if (connection->status == WAIT) { - if (connection->readyForFree) { - connection->status = DESTROYING; - connection_process(connection); - } else { - connection->readyForFree = true; - connection_process(connection); - } - - } +void +cbk(uv_idle_t * handle, int status) +{ + + connection_t * connection = (connection_t *) handle->data; + + if (connection->activity_status == BUSY) + { + return; + } + + connection_process(connection); + + if (connection->status == WAIT) + { + if (connection->readyForFree) + { + connection->status = DESTROYING; + connection_process(connection); + } + else + { + connection->readyForFree = true; + connection_process(connection); + } + + } } -void connection_process(connection_t * connection) { - if (connection->activity_status == FREE) { - query_t * query = connection->current_query; - connection->current_query = NULL; - - switch (connection->status) { - case INITIALIZING: { - uv_timer_start(connection->timer, (uv_timer_cb) cbk, 0, 100); - connection->status = ACTIVE; - connection_fetch_query(connection); - - break; - } - - case ACTIVE: - case WAIT: { - connection_fetch_query(connection); - - break; - } - - case DESTROYING: { - connection_free(connection); - - break; - } - - case NEW: { - break; - } - } - - if (query != NULL) { - query_apply(query); - query_free(query); - } - } +void +connection_process(connection_t * connection) +{ + if (connection->activity_status == FREE) + { + query_t * query = connection->current_query; + connection->current_query = NULL; + + switch (connection->status) + { + case INITIALIZING: + { + uv_timer_start(connection->timer, (uv_timer_cb) cbk, 0, 100); + connection->status = ACTIVE; + connection_fetch_query(connection); + + break; + } + + case ACTIVE: + case WAIT: + { + connection_fetch_query(connection); + + break; + } + + case DESTROYING: + { + connection_free(connection); + + break; + } + + case NEW: + { + break; + } + } + + if (query != NULL) + { + query_apply(query); + query_free(query); + } + } } -void connection_free(connection_t * connection) { - if (connection->prev != NULL) { - queue_remove(connection); - } - - if (connection->descriptor != NULL) { - PQfinish(connection->descriptor); - } - - if (connection->current_query != NULL) { - query_free(connection->current_query); - } - - uv_timer_stop(connection->timer); - - free(connection->timer); - free(connection->connection_info); - free(connection); +void +connection_free(connection_t * connection) +{ + if (connection->prev != NULL) + { + queue_remove(connection); + } + + if (connection->descriptor != NULL) + { + PQfinish(connection->descriptor); + } + + if (connection->current_query != NULL) + { + query_free(connection->current_query); + } + + uv_timer_stop(connection->timer); + + free(connection->timer); + free(connection->connection_info); + free(connection); } diff --git a/src/connection.h b/src/connection.h index 2fb7d42..ca0c271 100644 --- a/src/connection.h +++ b/src/connection.h @@ -15,45 +15,53 @@ #include "query.h" #include "pool.h" -typedef enum { - NEW = 0, INITIALIZING, ACTIVE, WAIT, DESTROYING +typedef enum +{ + NEW = 0, INITIALIZING, ACTIVE, WAIT, DESTROYING } entity_status_t; -typedef enum { - BUSY = 0, FREE +typedef enum +{ + BUSY = 0, FREE } activity_status_t; -typedef struct connection_ { - char * connection_info; +typedef struct connection_ +{ + char * connection_info; - PGconn * descriptor; + PGconn * descriptor; - struct pool_ * pool; + struct pool_ * pool; - struct connection_ * next; - struct connection_ * prev; + struct connection_ * next; + struct connection_ * prev; - struct query_ * current_query; + struct query_ * current_query; - entity_status_t status; - activity_status_t activity_status; + entity_status_t status; + activity_status_t activity_status; - uv_timer_t * timer; + uv_timer_t * timer; - bool readyForFree; + bool readyForFree; - char * error; + char * error; } connection_t; -connection_t * connection_alloc(char * connection_info, struct pool_ * pool); +connection_t * +connection_alloc(char * connection_info, struct pool_ * pool); -void connection_init(connection_t * connection); +void +connection_init(connection_t * connection); -void connection_process(connection_t * connection); +void +connection_process(connection_t * connection); -void connection_destroy(connection_t * connection); +void +connection_destroy(connection_t * connection); -void connection_free(connection_t * connection); +void +connection_free(connection_t * connection); #endif /* CONNECTION_H_ */ diff --git a/src/pg.cc b/src/pg.cc index ad96425..b228065 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -18,106 +18,121 @@ #include v8::Persistent handle_tmpl = v8::Persistent< - v8::ObjectTemplate>::New(v8::ObjectTemplate::New()); + v8::ObjectTemplate>::New(v8::ObjectTemplate::New()); -v8::Handle pg_init(const v8::Arguments &args) { +v8::Handle +pg_init(const v8::Arguments &args) +{ - v8::HandleScope scope; + v8::HandleScope scope; - handle_tmpl->SetInternalFieldCount(1); + handle_tmpl->SetInternalFieldCount(1); - if (args.Length() < 1 || (!args[0]->IsNumber())) { - return throw_type_error("First argument must be max pool size!"); - } + if (args.Length() < 1 || (!args[0]->IsNumber())) + { + return throw_type_error("First argument must be max pool size!"); + } - if (args.Length() < 2 || (!args[1]->IsString())) { - return throw_type_error("Second argument must be connection string!"); - } + if (args.Length() < 2 || (!args[1]->IsString())) + { + return throw_type_error("Second argument must be connection string!"); + } - if (args.Length() < 3 && !args[2]->IsFunction()) { - return throw_type_error("Third argument must be error callback!"); - } + if (args.Length() < 3 && !args[2]->IsFunction()) + { + return throw_type_error("Third argument must be error callback!"); + } - v8::String::Utf8Value str(args[1]->ToString()); + v8::String::Utf8Value str(args[1]->ToString()); - pool_t * pool = pool_alloc(); + pool_t * pool = pool_alloc(); - pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, - v8::Local::Cast(args[2])); + pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, + v8::Local::Cast(args[2])); - pool->data = v8::Persistent::New(handle_tmpl->NewInstance()); + pool->data = v8::Persistent::New(handle_tmpl->NewInstance()); - pool->data->SetPointerInInternalField(0, pool); + pool->data->SetPointerInInternalField(0, pool); - return scope.Close(pool->data); + return scope.Close(pool->data); } -v8::Handle pg_exec(const v8::Arguments& args) { - v8::HandleScope scope; +v8::Handle +pg_exec(const v8::Arguments& args) +{ + v8::HandleScope scope; - if ((args.Length() < 1) || (!args[0]->IsObject())) { - return throw_type_error("First argument must be pool handle!"); - } + if ((args.Length() < 1) || (!args[0]->IsObject())) + { + return throw_type_error("First argument must be pool handle!"); + } - if (args.Length() < 2 || (!args[1]->IsString())) { - return throw_type_error("Second argument must be query request!"); - } + if (args.Length() < 2 || (!args[1]->IsString())) + { + return throw_type_error("Second argument must be query request!"); + } - if (args.Length() < 3 && !args[2]->IsFunction()) { - return throw_type_error("Third argument must be query callback!"); - } + if (args.Length() < 3 && !args[2]->IsFunction()) + { + return throw_type_error("Third argument must be query callback!"); + } - if (args[0]->ToObject()->InternalFieldCount() < 1) - return throw_type_error("Invalid handle!"); + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( - 0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); - if (pool == NULL) - return throw_type_error("Invalid handle!"); + if (pool == NULL) + return throw_type_error("Invalid handle!"); - v8::String::Utf8Value str(args[1]->ToString()); + v8::String::Utf8Value str(args[1]->ToString()); - query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); + query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); - pool_exec(pool, query); + pool_exec(pool, query); - return scope.Close(v8::Undefined()); + return scope.Close(v8::Undefined()); } -v8::Handle pg_destroy(const v8::Arguments& args) { - v8::HandleScope scope; +v8::Handle +pg_destroy(const v8::Arguments& args) +{ + v8::HandleScope scope; - if ((args.Length() < 1) || (!args[0]->IsObject())) { - return throw_type_error("First argument must be pool handle!"); - } + if ((args.Length() < 1) || (!args[0]->IsObject())) + { + return throw_type_error("First argument must be pool handle!"); + } - if (args[0]->ToObject()->InternalFieldCount() < 1) - return throw_type_error("Invalid handle!"); + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( - 0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); - if (pool == NULL) - return throw_type_error("Invalid handle!"); + if (pool == NULL) + return throw_type_error("Invalid handle!"); - pool_destroy(pool); + pool_destroy(pool); - return scope.Close(v8::Undefined()); + return scope.Close(v8::Undefined()); } -void init(v8::Handle target) { +void +init(v8::Handle target) +{ - v8::HandleScope scope; + v8::HandleScope scope; - target->Set(v8::String::New("init"), - v8::FunctionTemplate::New(pg_init)->GetFunction()); + target->Set(v8::String::New("init"), + v8::FunctionTemplate::New(pg_init)->GetFunction()); - target->Set(v8::String::New("exec"), - v8::FunctionTemplate::New(pg_exec)->GetFunction()); + target->Set(v8::String::New("exec"), + v8::FunctionTemplate::New(pg_exec)->GetFunction()); - target->Set(v8::String::New("destroy"), - v8::FunctionTemplate::New(pg_destroy)->GetFunction()); + target->Set(v8::String::New("destroy"), + v8::FunctionTemplate::New(pg_destroy)->GetFunction()); } NODE_MODULE(pg, init) diff --git a/src/pool.cc b/src/pool.cc index 9ffe369..c8426a3 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -13,95 +13,113 @@ #include "queue.h" #include "utils.h" -void pool_spawn_connection(pool_t * pool) { - connection_t * connection = connection_alloc(pool->connection_info, pool); +void +pool_spawn_connection(pool_t * pool) +{ + connection_t * connection = connection_alloc(pool->connection_info, pool); - connection_init(connection); + connection_init(connection); } -pool_t * pool_alloc() { - pool_t * pool = (pool_t *) malloc(sizeof(pool_t)); +pool_t * +pool_alloc() +{ + pool_t * pool = (pool_t *) malloc(sizeof(pool_t)); - pool->connection_info = NULL; + pool->connection_info = NULL; - pool->max_size = 0; + pool->max_size = 0; - pool->connection_queue = (connection_t *) malloc(sizeof(connection_t)); - pool->query_queue = (query_t *) malloc(sizeof(query_t)); + pool->connection_queue = (connection_t *) malloc(sizeof(connection_t)); + pool->query_queue = (query_t *) malloc(sizeof(query_t)); - queue_init(pool->connection_queue); + queue_init(pool->connection_queue); - queue_init(pool->query_queue); + queue_init(pool->query_queue); - return pool; + return pool; } -void pool_init(pool_t * pool, size_t max_size, const char * connection_info, - v8::Local error_callback) { - pool->connection_info = copy_string(connection_info); - pool->max_size = max_size; +void +pool_init(pool_t * pool, size_t max_size, const char * connection_info, + v8::Local error_callback) +{ + pool->connection_info = copy_string(connection_info); + pool->max_size = max_size; - pool->error_callback = v8::Persistent::New(error_callback); + pool->error_callback = v8::Persistent::New(error_callback); } -void pool_exec(pool_t * pool, query_t * query) { - queue_push(pool->query_queue, query) - ; +void +pool_exec(pool_t * pool, query_t * query) +{ + queue_push(pool->query_queue, query) + ; - pool_process(pool); + pool_process(pool); } -void pool_handle_error(pool_t * pool, char * error) { - v8::HandleScope scope; - v8::Handle argv[1]; +void +pool_handle_error(pool_t * pool, char * error) +{ + v8::HandleScope scope; + v8::Handle argv[1]; - argv[0] = v8::String::New(error); + argv[0] = v8::String::New(error); - pool->error_callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); + pool->error_callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); } -void pool_process(pool_t * pool) { - size_t i = 0; +void +pool_process(pool_t * pool) +{ + size_t i = 0; - connection_t * connection = pool->connection_queue->prev; - connection_t * prev = NULL; - while (connection != pool->connection_queue) { + connection_t * connection = pool->connection_queue->prev; + connection_t * prev = NULL; + while (connection != pool->connection_queue) + { - prev = connection->prev; - connection_process(connection); + prev = connection->prev; + connection_process(connection); - connection = prev; - i++; - } + connection = prev; + i++; + } - if (!queue_is_empty(pool->query_queue) && i < pool->max_size) { - pool_spawn_connection(pool); - } + if (!queue_is_empty(pool->query_queue) && i < pool->max_size) + { + pool_spawn_connection(pool); + } } -void pool_destroy(pool_t * pool) { - connection_t * connection; - query_t * query; +void +pool_destroy(pool_t * pool) +{ + connection_t * connection; + query_t * query; - queue_flush(pool->connection_queue, connection, connection_destroy); - queue_flush(pool->query_queue, query, query_free); + queue_flush(pool->connection_queue, connection, connection_destroy); + queue_flush(pool->query_queue, query, query_free); - pool->max_size = 0; - pool->error_callback.Dispose(); + pool->max_size = 0; + pool->error_callback.Dispose(); - pool->data->SetPointerInInternalField(0, NULL); - pool->data.Dispose(); + pool->data->SetPointerInInternalField(0, NULL); + pool->data.Dispose(); - free(pool->connection_info); - pool->connection_info = NULL; + free(pool->connection_info); + pool->connection_info = NULL; } -void pool_free(pool_t * pool) { - pool_destroy(pool); +void +pool_free(pool_t * pool) +{ + pool_destroy(pool); - free(pool->connection_queue); - free(pool->query_queue); + free(pool->connection_queue); + free(pool->query_queue); - free(pool); + free(pool); } From 3b166c6a876d76680adedf26f7150eaa34687a26 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Mon, 25 Aug 2014 15:26:21 +0400 Subject: [PATCH 08/13] Restore formatting and moving timer to pool --- lib/pg/pg.js | 4 +- src/connection.cc | 370 ++++++++++++++++++---------------------------- src/connection.h | 40 +++-- src/pg.cc | 144 +++++++++--------- src/pool.cc | 174 +++++++++++++--------- src/pool.h | 14 +- src/query.cc | 4 - src/query.h | 2 - 8 files changed, 342 insertions(+), 410 deletions(-) diff --git a/lib/pg/pg.js b/lib/pg/pg.js index e1e451d..5ab86f8 100644 --- a/lib/pg/pg.js +++ b/lib/pg/pg.js @@ -144,8 +144,8 @@ pg.escapeArray = function(array) { *
console.error
will be used by default. * return handle */ -pg.init = function(size, options, opt_errorHandler) { - return __pg.init(size, querystring.unescape( +pg.init = function(size, connection_lifetime, options, opt_errorHandler) { + return __pg.init(size, connection_lifetime, querystring.unescape( querystring.stringify(options, ' ')), opt_errorHandler); }; diff --git a/src/connection.cc b/src/connection.cc index 7e5d864..b67b2f5 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -14,269 +14,189 @@ #include "queue.h" #include "utils.h" -void -connection_connect_work(uv_work_t * work) -{ - connection_t * connection = (connection_t *) work->data; - - connection->descriptor = PQconnectdb(connection->connection_info); - - ConnStatusType status = PQstatus(connection->descriptor); - if (status != CONNECTION_OK) - { - connection->error = copy_string(PQerrorMessage(connection->descriptor)); - } +void connection_connect_work(uv_work_t * work) { + connection_t * connection = (connection_t *) work->data; + + connection->descriptor = PQconnectdb(connection->connection_info); + + ConnStatusType status = PQstatus(connection->descriptor); + if (status != CONNECTION_OK) { + connection->error = copy_string(PQerrorMessage(connection->descriptor)); + } } -void -connection_exec_work(uv_work_t * work) -{ - connection_t * connection = (connection_t *) work->data; - - ConnStatusType status = PQstatus(connection->descriptor); - if (status != CONNECTION_OK) - { - connection->error = copy_string(PQerrorMessage(connection->descriptor)); - } - else - { - query_t * query = connection->current_query; - - if (query != NULL) - { - PGresult * result = PQexec(connection->descriptor, query->request); - - switch (PQresultStatus(result)) - { - case PGRES_COMMAND_OK: - { - PQclear(result); - break; - } - - case PGRES_TUPLES_OK: - { - query->result = result; - break; - } - - default: - { - query->error = copy_string(PQresultErrorMessage(result)); - PQclear(result); - break; - } - } - } - } +void connection_exec_work(uv_work_t * work) { + connection_t * connection = (connection_t *) work->data; + + ConnStatusType status = PQstatus(connection->descriptor); + if (status != CONNECTION_OK) { + connection->error = copy_string(PQerrorMessage(connection->descriptor)); + } else { + query_t * query = connection->current_query; + + if (query != NULL) { + PGresult * result = PQexec(connection->descriptor, query->request); + + switch (PQresultStatus(result)) { + case PGRES_COMMAND_OK: { + PQclear(result); + break; + } + + case PGRES_TUPLES_OK: { + query->result = result; + break; + } + + default: { + query->error = copy_string(PQresultErrorMessage(result)); + PQclear(result); + break; + } + } + } + } } -void -connection_work_handler(uv_work_t * work) -{ - connection_t * connection = (connection_t *) work->data; +void connection_work_handler(uv_work_t * work) { + connection_t * connection = (connection_t *) work->data; - if (connection->error != NULL) - { - pool_t * pool = connection->pool; + if (connection->error != NULL) { + pool_t * pool = connection->pool; - if (connection->current_query != NULL) - { - queue_unshift(pool->query_queue, connection->current_query) - ; - connection->current_query = NULL; - } + if (connection->current_query != NULL) { + queue_unshift(pool->query_queue, connection->current_query) + ; + connection->current_query = NULL; + } - pool_handle_error(pool, connection->error); - pool_process(pool); + pool_handle_error(pool, connection->error); + pool_process(pool); - connection_destroy(connection); - } + connection_destroy_req(connection); + } - connection->activity_status = FREE; - connection_process(connection); + connection->activity_status = FREE; + connection_process(connection); - free(work); + free(work); } -void -connection_queue_work(connection_t * connection, uv_work_cb work) -{ - uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t)); - work_item->data = connection; +void connection_queue_work(connection_t * connection, uv_work_cb work) { + uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t)); + work_item->data = connection; - connection->activity_status = BUSY; - connection->readyForFree = false; + connection->activity_status = BUSY; + connection->readyForFree = false; - uv_queue_work(uv_default_loop(), work_item, work, - (uv_after_work_cb) connection_work_handler); + uv_queue_work(uv_default_loop(), work_item, work, + (uv_after_work_cb) connection_work_handler); } -void -connection_fetch_query(connection_t * connection) -{ - if (connection->current_query == NULL && connection->status != DESTROYING) - { - - queue_shift(connection->pool->query_queue, connection->current_query); +void connection_fetch_query(connection_t * connection) { + if (connection->current_query == NULL && connection->status != DESTROYING) { - if (connection->current_query != NULL) - { + queue_shift(connection->pool->query_queue, connection->current_query); - connection_queue_work(connection, connection_exec_work); - } - else - { - - connection_destroy(connection); - } - } + if (connection->current_query != NULL) { + connection_queue_work(connection, connection_exec_work); + } else { + connection_destroy_req(connection); + } + } } connection_t * -connection_alloc(char * connection_info, pool_t * pool) -{ - connection_t * connection = (connection_t *) malloc(sizeof(connection_t)); - connection->status = NEW; - connection->activity_status = FREE; - - connection->connection_info = copy_string(connection_info); +connection_alloc(char * connection_info, pool_t * pool) { + connection_t * connection = (connection_t *) malloc(sizeof(connection_t)); + connection->status = NEW; + connection->activity_status = FREE; - connection->pool = pool; + connection->connection_info = copy_string(connection_info); - connection->current_query = NULL; + connection->pool = pool; - connection->descriptor = NULL; + connection->current_query = NULL; - connection->prev = NULL; - connection->next = NULL; + connection->descriptor = NULL; - connection->error = NULL; + connection->prev = NULL; + connection->next = NULL; - connection->timer = (uv_timer_t *) malloc(sizeof(uv_timer_t)); + connection->error = NULL; - queue_push(pool->connection_queue, connection) - ; + queue_push(pool->connection_queue, connection) + ; - return connection; + return connection; } -void -connection_init(connection_t * connection) -{ - connection->status = INITIALIZING; +void connection_init(connection_t * connection) { + connection->status = INITIALIZING; + connection_queue_work(connection, connection_connect_work); +} - uv_timer_init(uv_default_loop(), connection->timer); - connection->timer->data = connection; +void connection_destroy_req(connection_t * connection) { + if (!connection->readyForFree) { + connection->readyForFree = true; + connection->downtimeStarting = time(NULL); + } - connection_queue_work(connection, connection_connect_work); + connection->status = WAITFORDESTROY; } -void -connection_destroy(connection_t * connection) -{ - connection->status = WAIT; -} +void connection_process(connection_t * connection) { -void -cbk(uv_idle_t * handle, int status) -{ - - connection_t * connection = (connection_t *) handle->data; - - if (connection->activity_status == BUSY) - { - return; - } - - connection_process(connection); - - if (connection->status == WAIT) - { - if (connection->readyForFree) - { - connection->status = DESTROYING; - connection_process(connection); - } - else - { - connection->readyForFree = true; - connection_process(connection); - } - - } -} + if (connection->activity_status == FREE) { + query_t * query = connection->current_query; + connection->current_query = NULL; + + switch (connection->status) { + case INITIALIZING: { + connection->status = ACTIVE; + connection_fetch_query(connection); -void -connection_process(connection_t * connection) -{ - if (connection->activity_status == FREE) - { - query_t * query = connection->current_query; - connection->current_query = NULL; - - switch (connection->status) - { - case INITIALIZING: - { - uv_timer_start(connection->timer, (uv_timer_cb) cbk, 0, 100); - connection->status = ACTIVE; - connection_fetch_query(connection); - - break; - } - - case ACTIVE: - case WAIT: - { - connection_fetch_query(connection); - - break; - } - - case DESTROYING: - { - connection_free(connection); - - break; - } - - case NEW: - { - break; - } - } - - if (query != NULL) - { - query_apply(query); - query_free(query); - } - } + break; + } + + case ACTIVE: + case WAITFORDESTROY: { + connection_fetch_query(connection); + + break; + } + + case DESTROYING: { + connection_free(connection); + break; + } + + case NEW: { + break; + } + } + + if (query != NULL) { + query_apply(query); + query_free(query); + } + } } -void -connection_free(connection_t * connection) -{ - if (connection->prev != NULL) - { - queue_remove(connection); - } - - if (connection->descriptor != NULL) - { - PQfinish(connection->descriptor); - } - - if (connection->current_query != NULL) - { - query_free(connection->current_query); - } - - uv_timer_stop(connection->timer); - - free(connection->timer); - free(connection->connection_info); - free(connection); +void connection_free(connection_t * connection) { + if (connection->prev != NULL) { + queue_remove(connection); + } + + if (connection->descriptor != NULL) { + PQfinish(connection->descriptor); + } + + if (connection->current_query != NULL) { + query_free(connection->current_query); + } + + free(connection->connection_info); + free(connection); } diff --git a/src/connection.h b/src/connection.h index ca0c271..82e9ff0 100644 --- a/src/connection.h +++ b/src/connection.h @@ -8,6 +8,8 @@ #ifndef CONNECTION_H_ #define CONNECTION_H_ +#include + #include #include @@ -15,37 +17,33 @@ #include "query.h" #include "pool.h" -typedef enum -{ - NEW = 0, INITIALIZING, ACTIVE, WAIT, DESTROYING +typedef enum { + NEW = 0, INITIALIZING, ACTIVE, WAITFORDESTROY, DESTROYING } entity_status_t; -typedef enum -{ - BUSY = 0, FREE +typedef enum { + BUSY = 0, FREE } activity_status_t; -typedef struct connection_ -{ - char * connection_info; - - PGconn * descriptor; +typedef struct connection_ { + char * connection_info; - struct pool_ * pool; + PGconn * descriptor; - struct connection_ * next; - struct connection_ * prev; + struct pool_ * pool; - struct query_ * current_query; + struct connection_ * next; + struct connection_ * prev; - entity_status_t status; - activity_status_t activity_status; + struct query_ * current_query; - uv_timer_t * timer; + entity_status_t status; + activity_status_t activity_status; - bool readyForFree; + time_t downtimeStarting; + bool readyForFree; - char * error; + char * error; } connection_t; @@ -59,7 +57,7 @@ void connection_process(connection_t * connection); void -connection_destroy(connection_t * connection); +connection_destroy_req(connection_t * connection); void connection_free(connection_t * connection); diff --git a/src/pg.cc b/src/pg.cc index b228065..4e26bc9 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -18,121 +18,111 @@ #include v8::Persistent handle_tmpl = v8::Persistent< - v8::ObjectTemplate>::New(v8::ObjectTemplate::New()); + v8::ObjectTemplate>::New(v8::ObjectTemplate::New()); -v8::Handle -pg_init(const v8::Arguments &args) -{ +v8::Handle pg_init(const v8::Arguments &args) { - v8::HandleScope scope; + v8::HandleScope scope; - handle_tmpl->SetInternalFieldCount(1); + handle_tmpl->SetInternalFieldCount(1); - if (args.Length() < 1 || (!args[0]->IsNumber())) - { - return throw_type_error("First argument must be max pool size!"); - } + if (args.Length() < 1 || (!args[0]->IsNumber())) { + return throw_type_error("First argument must be max pool size!"); + } - if (args.Length() < 2 || (!args[1]->IsString())) - { - return throw_type_error("Second argument must be connection string!"); - } + if (args.Length() < 2 || (!args[1]->IsNumber())) { + return throw_type_error("Second argument must be connection lifetime!"); + } - if (args.Length() < 3 && !args[2]->IsFunction()) - { - return throw_type_error("Third argument must be error callback!"); - } + if (args.Length() < 3 || (!args[2]->IsString())) { + return throw_type_error("Third argument must be connection string!"); + } - v8::String::Utf8Value str(args[1]->ToString()); + if (args.Length() < 4 && !args[3]->IsFunction()) { + return throw_type_error("Fourth argument must be error callback!"); + } - pool_t * pool = pool_alloc(); + v8::String::Utf8Value str(args[2]->ToString()); - pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, - v8::Local::Cast(args[2])); + pool_t * pool = pool_alloc(); - pool->data = v8::Persistent::New(handle_tmpl->NewInstance()); + pool_init(pool, args[0]->ToInteger()->Int32Value(), + args[1]->ToInteger()->Int32Value(), *str, + v8::Local::Cast(args[3])); - pool->data->SetPointerInInternalField(0, pool); + pool->data = v8::Persistent::New(handle_tmpl->NewInstance()); - return scope.Close(pool->data); + pool->data->SetPointerInInternalField(0, pool); + + return scope.Close(pool->data); } -v8::Handle -pg_exec(const v8::Arguments& args) -{ - v8::HandleScope scope; +v8::Handle pg_exec(const v8::Arguments& args) { + v8::HandleScope scope; - if ((args.Length() < 1) || (!args[0]->IsObject())) - { - return throw_type_error("First argument must be pool handle!"); - } + if ((args.Length() < 1) || (!args[0]->IsObject())) { + return throw_type_error("First argument must be pool handle!"); + } - if (args.Length() < 2 || (!args[1]->IsString())) - { - return throw_type_error("Second argument must be query request!"); - } + if (args.Length() < 2 || (!args[1]->IsString())) { + return throw_type_error("Second argument must be query request!"); + } - if (args.Length() < 3 && !args[2]->IsFunction()) - { - return throw_type_error("Third argument must be query callback!"); - } + if (args.Length() < 3 && !args[2]->IsFunction()) { + return throw_type_error("Third argument must be query callback!"); + } - if (args[0]->ToObject()->InternalFieldCount() < 1) - return throw_type_error("Invalid handle!"); + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( - 0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); - if (pool == NULL) - return throw_type_error("Invalid handle!"); + if (pool == NULL) + return throw_type_error("Invalid handle!"); - v8::String::Utf8Value str(args[1]->ToString()); + v8::String::Utf8Value str(args[1]->ToString()); - query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); + query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); - pool_exec(pool, query); + pool_exec(pool, query); - return scope.Close(v8::Undefined()); + return scope.Close(v8::Undefined()); } -v8::Handle -pg_destroy(const v8::Arguments& args) -{ - v8::HandleScope scope; +v8::Handle pg_destroy(const v8::Arguments& args) { + v8::HandleScope scope; - if ((args.Length() < 1) || (!args[0]->IsObject())) - { - return throw_type_error("First argument must be pool handle!"); - } + if ((args.Length() < 1) || (!args[0]->IsObject())) { + return throw_type_error("First argument must be pool handle!"); + } - if (args[0]->ToObject()->InternalFieldCount() < 1) - return throw_type_error("Invalid handle!"); + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( - 0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); - if (pool == NULL) - return throw_type_error("Invalid handle!"); + if (pool == NULL) + return throw_type_error("Invalid handle!"); - pool_destroy(pool); + pool_destroy(pool); - return scope.Close(v8::Undefined()); + return scope.Close(v8::Undefined()); } -void -init(v8::Handle target) -{ +void init(v8::Handle target) { - v8::HandleScope scope; + v8::HandleScope scope; - target->Set(v8::String::New("init"), - v8::FunctionTemplate::New(pg_init)->GetFunction()); + target->Set(v8::String::New("init"), + v8::FunctionTemplate::New(pg_init)->GetFunction()); - target->Set(v8::String::New("exec"), - v8::FunctionTemplate::New(pg_exec)->GetFunction()); + target->Set(v8::String::New("exec"), + v8::FunctionTemplate::New(pg_exec)->GetFunction()); - target->Set(v8::String::New("destroy"), - v8::FunctionTemplate::New(pg_destroy)->GetFunction()); + target->Set(v8::String::New("destroy"), + v8::FunctionTemplate::New(pg_destroy)->GetFunction()); } NODE_MODULE(pg, init) diff --git a/src/pool.cc b/src/pool.cc index c8426a3..846c26b 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -7,119 +7,149 @@ #include #include "stddef.h" +#include #include "pool.h" #include "connection.h" #include "queue.h" #include "utils.h" -void -pool_spawn_connection(pool_t * pool) -{ - connection_t * connection = connection_alloc(pool->connection_info, pool); +size_t cbk_repeat = 2000; - connection_init(connection); +void pool_spawn_connection(pool_t * pool) { + connection_t * connection = connection_alloc(pool->connection_info, pool); + + connection_init(connection); } pool_t * -pool_alloc() -{ - pool_t * pool = (pool_t *) malloc(sizeof(pool_t)); +pool_alloc() { + pool_t * pool = (pool_t *) malloc(sizeof(pool_t)); + + pool->connection_info = NULL; + + pool->max_size = 0; + + pool->lifetime = 0; + + pool->connection_queue = (connection_t *) malloc(sizeof(connection_t)); + pool->query_queue = (query_t *) malloc(sizeof(query_t)); + + queue_init(pool->connection_queue); + + pool->timer = (uv_timer_t *) malloc(sizeof(uv_timer_t)); - pool->connection_info = NULL; + queue_init(pool->query_queue); - pool->max_size = 0; + return pool; +} + +void cbk(uv_idle_t * handle, int status) { + + pool_t * pool = (pool_t *) handle->data; + + if (!queue_is_empty(pool->query_queue)) { + pool_process(pool); + return; + } - pool->connection_queue = (connection_t *) malloc(sizeof(connection_t)); - pool->query_queue = (query_t *) malloc(sizeof(query_t)); + connection_t * connection = pool->connection_queue->prev; + connection_t * prev = NULL; - queue_init(pool->connection_queue); +// if (connection == pool->connection_queue) { +// uv_timer_stop(pool->timer); +// } - queue_init(pool->query_queue); + while (connection != pool->connection_queue) { + prev = connection->prev; - return pool; + if (connection->status == WAITFORDESTROY) { + if (connection->readyForFree) { + if (difftime(time(NULL), connection->downtimeStarting) > pool->lifetime) { + connection->status = DESTROYING; + } + } + } + + connection_process(connection); + connection = prev; + } } -void -pool_init(pool_t * pool, size_t max_size, const char * connection_info, - v8::Local error_callback) -{ - pool->connection_info = copy_string(connection_info); - pool->max_size = max_size; +void pool_init(pool_t * pool, size_t max_size, size_t lifetime, + const char * connection_info, v8::Local error_callback) { + pool->connection_info = copy_string(connection_info); + pool->max_size = max_size; + pool->lifetime = lifetime; + + uv_timer_init(uv_default_loop(), pool->timer); + pool->timer->data = pool; + uv_timer_start(pool->timer, (uv_timer_cb) cbk, 0, cbk_repeat); - pool->error_callback = v8::Persistent::New(error_callback); + pool->error_callback = v8::Persistent::New(error_callback); } -void -pool_exec(pool_t * pool, query_t * query) -{ - queue_push(pool->query_queue, query) - ; +void pool_exec(pool_t * pool, query_t * query) { + queue_push(pool->query_queue, query) + ; - pool_process(pool); + pool_process(pool); } -void -pool_handle_error(pool_t * pool, char * error) -{ - v8::HandleScope scope; - v8::Handle argv[1]; +void pool_handle_error(pool_t * pool, char * error) { + v8::HandleScope scope; + v8::Handle argv[1]; - argv[0] = v8::String::New(error); + argv[0] = v8::String::New(error); - pool->error_callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); + pool->error_callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); } -void -pool_process(pool_t * pool) -{ - size_t i = 0; +void pool_process(pool_t * pool) { + size_t i = 0; - connection_t * connection = pool->connection_queue->prev; - connection_t * prev = NULL; - while (connection != pool->connection_queue) - { + connection_t * connection = pool->connection_queue->prev; + connection_t * prev = NULL; + while (connection != pool->connection_queue) { - prev = connection->prev; - connection_process(connection); + prev = connection->prev; + connection_process(connection); - connection = prev; - i++; - } + connection = prev; + i++; + } - if (!queue_is_empty(pool->query_queue) && i < pool->max_size) - { - pool_spawn_connection(pool); - } + if (!queue_is_empty(pool->query_queue) && i < pool->max_size) { + pool_spawn_connection(pool); + } } -void -pool_destroy(pool_t * pool) -{ - connection_t * connection; - query_t * query; +void pool_destroy(pool_t * pool) { + connection_t * connection; + query_t * query; + + queue_flush(pool->connection_queue, connection, connection_destroy_req); + queue_flush(pool->query_queue, query, query_free); - queue_flush(pool->connection_queue, connection, connection_destroy); - queue_flush(pool->query_queue, query, query_free); + pool->max_size = 0; + pool->error_callback.Dispose(); - pool->max_size = 0; - pool->error_callback.Dispose(); + pool->data->SetPointerInInternalField(0, NULL); + pool->data.Dispose(); - pool->data->SetPointerInInternalField(0, NULL); - pool->data.Dispose(); + uv_timer_stop(pool->timer); + free(pool->timer); - free(pool->connection_info); - pool->connection_info = NULL; + free(pool->connection_info); + pool->connection_info = NULL; } -void -pool_free(pool_t * pool) -{ - pool_destroy(pool); +void pool_free(pool_t * pool) { + pool_destroy(pool); - free(pool->connection_queue); - free(pool->query_queue); + free(pool->connection_queue); + free(pool->query_queue); - free(pool); + free(pool); } diff --git a/src/pool.h b/src/pool.h index d1643dc..d49361f 100644 --- a/src/pool.h +++ b/src/pool.h @@ -8,33 +8,34 @@ #ifndef POOL_H_ #define POOL_H_ - #include #include "query.h" #include "connection.h" - typedef struct pool_ { char * connection_info; size_t max_size; + size_t lifetime; + query_t * query_queue; v8::Persistent data; - + struct connection_ * connection_queue; v8::Persistent error_callback; -} pool_t; + uv_timer_t * timer; +} pool_t; pool_t * pool_alloc(); -void pool_init(pool_t * pool, size_t max_size, const char * connection_info, - v8::Local error_callback); +void pool_init(pool_t * pool, size_t max_size, size_t lifetime, + const char * connection_info, v8::Local error_callback); void pool_exec(pool_t * pool, query_t * query); @@ -46,5 +47,4 @@ void pool_destroy(pool_t * pool); void pool_free(pool_t * pool); - #endif /* POOL_H_ */ diff --git a/src/query.cc b/src/query.cc index 545d628..76b3f5a 100644 --- a/src/query.cc +++ b/src/query.cc @@ -5,13 +5,11 @@ * Author: kononencheg */ - #include #include "query.h" #include "utils.h" - query_t * query_alloc(v8::Local callback, const char * request) { query_t * query = (query_t *) malloc(sizeof(query_t)); @@ -24,7 +22,6 @@ query_t * query_alloc(v8::Local callback, const char * request) { return query; } - void query_apply(query_t * query) { v8::HandleScope scope; @@ -46,7 +43,6 @@ void query_apply(query_t * query) { query->callback->Call(v8::Context::GetCurrent()->Global(), argc, argv); } - void query_free(query_t * query) { query->callback.Dispose(); diff --git a/src/query.h b/src/query.h index 59e97d2..b534b1d 100644 --- a/src/query.h +++ b/src/query.h @@ -11,7 +11,6 @@ #include #include - typedef struct query_ { v8::Persistent callback; @@ -25,7 +24,6 @@ typedef struct query_ { } query_t; - query_t * query_alloc(v8::Local callback, const char * request); void query_apply(query_t * query); From c9527840f81558b86d728e52a724b9b56a15dcd1 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Mon, 25 Aug 2014 16:28:46 +0400 Subject: [PATCH 09/13] Fix formatting --- src/connection.cc | 4 +--- src/pg.cc | 3 +-- src/pool.cc | 9 ++------- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index b67b2f5..0fa8d84 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -125,8 +125,7 @@ connection_alloc(char * connection_info, pool_t * pool) { connection->error = NULL; - queue_push(pool->connection_queue, connection) - ; + queue_push(pool->connection_queue, connection); return connection; } @@ -198,5 +197,4 @@ void connection_free(connection_t * connection) { free(connection->connection_info); free(connection); - } diff --git a/src/pg.cc b/src/pg.cc index 4e26bc9..b85e61f 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -75,8 +75,7 @@ v8::Handle pg_exec(const v8::Arguments& args) { if (args[0]->ToObject()->InternalFieldCount() < 1) return throw_type_error("Invalid handle!"); - pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( - 0); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); if (pool == NULL) return throw_type_error("Invalid handle!"); diff --git a/src/pool.cc b/src/pool.cc index 846c26b..4f77c0e 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -14,7 +14,7 @@ #include "queue.h" #include "utils.h" -size_t cbk_repeat = 2000; +size_t cbk_repeat = 1000; void pool_spawn_connection(pool_t * pool) { connection_t * connection = connection_alloc(pool->connection_info, pool); @@ -56,10 +56,6 @@ void cbk(uv_idle_t * handle, int status) { connection_t * connection = pool->connection_queue->prev; connection_t * prev = NULL; -// if (connection == pool->connection_queue) { -// uv_timer_stop(pool->timer); -// } - while (connection != pool->connection_queue) { prev = connection->prev; @@ -90,8 +86,7 @@ void pool_init(pool_t * pool, size_t max_size, size_t lifetime, } void pool_exec(pool_t * pool, query_t * query) { - queue_push(pool->query_queue, query) - ; + queue_push(pool->query_queue, query); pool_process(pool); } From d5b3161dc1bcd60be5fcd42d66857e6e25155791 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Mon, 25 Aug 2014 16:38:16 +0400 Subject: [PATCH 10/13] Remove excess connection status --- src/connection.cc | 7 +------ src/connection.h | 2 +- src/pool.cc | 8 +++----- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index 0fa8d84..6ed17bf 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -140,8 +140,6 @@ void connection_destroy_req(connection_t * connection) { connection->readyForFree = true; connection->downtimeStarting = time(NULL); } - - connection->status = WAITFORDESTROY; } void connection_process(connection_t * connection) { @@ -154,14 +152,11 @@ void connection_process(connection_t * connection) { case INITIALIZING: { connection->status = ACTIVE; connection_fetch_query(connection); - break; } - case ACTIVE: - case WAITFORDESTROY: { + case ACTIVE: { connection_fetch_query(connection); - break; } diff --git a/src/connection.h b/src/connection.h index 82e9ff0..45cb358 100644 --- a/src/connection.h +++ b/src/connection.h @@ -18,7 +18,7 @@ #include "pool.h" typedef enum { - NEW = 0, INITIALIZING, ACTIVE, WAITFORDESTROY, DESTROYING + NEW = 0, INITIALIZING, ACTIVE, DESTROYING } entity_status_t; typedef enum { diff --git a/src/pool.cc b/src/pool.cc index 4f77c0e..3b0e4e0 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -59,11 +59,9 @@ void cbk(uv_idle_t * handle, int status) { while (connection != pool->connection_queue) { prev = connection->prev; - if (connection->status == WAITFORDESTROY) { - if (connection->readyForFree) { - if (difftime(time(NULL), connection->downtimeStarting) > pool->lifetime) { - connection->status = DESTROYING; - } + if (connection->readyForFree) { + if (difftime(time(NULL), connection->downtimeStarting) > pool->lifetime) { + connection->status = DESTROYING; } } From 6354cd3f9785fa7d9d08dfa5852d8d42e66f2fb7 Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Mon, 25 Aug 2014 17:02:59 +0400 Subject: [PATCH 11/13] Optimize cbk function --- src/pool.cc | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/pool.cc b/src/pool.cc index 3b0e4e0..5dbe096 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -48,24 +48,20 @@ void cbk(uv_idle_t * handle, int status) { pool_t * pool = (pool_t *) handle->data; - if (!queue_is_empty(pool->query_queue)) { - pool_process(pool); - return; - } - connection_t * connection = pool->connection_queue->prev; connection_t * prev = NULL; while (connection != pool->connection_queue) { prev = connection->prev; + connection_process(connection); + if (connection->readyForFree) { if (difftime(time(NULL), connection->downtimeStarting) > pool->lifetime) { connection->status = DESTROYING; } } - connection_process(connection); connection = prev; } } From 3986337b5e4ea3d9a686a98a2630c87dc0804d9b Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Tue, 26 Aug 2014 12:09:25 +0400 Subject: [PATCH 12/13] Fixes after review --- benchmarks/pg.js | 4 ++-- lib/pg/pg.js | 4 ++-- src/connection.cc | 13 ++++--------- src/connection.h | 3 +-- src/pool.cc | 13 +++++-------- 5 files changed, 14 insertions(+), 23 deletions(-) diff --git a/benchmarks/pg.js b/benchmarks/pg.js index 108cc84..3835f0d 100644 --- a/benchmarks/pg.js +++ b/benchmarks/pg.js @@ -1,6 +1,6 @@ var pg = require('../bin'); -var first = pg.init(5, { +var first = pg.init(5, 5, { 'dbname': 'relive', 'user': 'test', 'password': 'lttest', @@ -9,7 +9,7 @@ var first = pg.init(5, { 'connect_timeout': '5' }); -var second = pg.init(5, { +var second = pg.init(5, 5, { 'dbname': 'relive', 'user': 'test', 'password': 'lttest', diff --git a/lib/pg/pg.js b/lib/pg/pg.js index 5ab86f8..ce01f79 100644 --- a/lib/pg/pg.js +++ b/lib/pg/pg.js @@ -144,8 +144,8 @@ pg.escapeArray = function(array) { *
console.error
will be used by default. * return handle */ -pg.init = function(size, connection_lifetime, options, opt_errorHandler) { - return __pg.init(size, connection_lifetime, querystring.unescape( +pg.init = function(size, connectionLifetime, options, opt_errorHandler) { + return __pg.init(size, connectionLifetime, querystring.unescape( querystring.stringify(options, ' ')), opt_errorHandler); }; diff --git a/src/connection.cc b/src/connection.cc index 6ed17bf..f4d0bd7 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -65,8 +65,7 @@ void connection_work_handler(uv_work_t * work) { pool_t * pool = connection->pool; if (connection->current_query != NULL) { - queue_unshift(pool->query_queue, connection->current_query) - ; + queue_unshift(pool->query_queue, connection->current_query); connection->current_query = NULL; } @@ -87,7 +86,7 @@ void connection_queue_work(connection_t * connection, uv_work_cb work) { work_item->data = connection; connection->activity_status = BUSY; - connection->readyForFree = false; + connection->downtime_start = 0; uv_queue_work(uv_default_loop(), work_item, work, (uv_after_work_cb) connection_work_handler); @@ -106,8 +105,7 @@ void connection_fetch_query(connection_t * connection) { } } -connection_t * -connection_alloc(char * connection_info, pool_t * pool) { +connection_t * connection_alloc(char * connection_info, pool_t * pool) { connection_t * connection = (connection_t *) malloc(sizeof(connection_t)); connection->status = NEW; connection->activity_status = FREE; @@ -136,10 +134,7 @@ void connection_init(connection_t * connection) { } void connection_destroy_req(connection_t * connection) { - if (!connection->readyForFree) { - connection->readyForFree = true; - connection->downtimeStarting = time(NULL); - } + connection->downtime_start = time(NULL); } void connection_process(connection_t * connection) { diff --git a/src/connection.h b/src/connection.h index 45cb358..2882543 100644 --- a/src/connection.h +++ b/src/connection.h @@ -40,8 +40,7 @@ typedef struct connection_ { entity_status_t status; activity_status_t activity_status; - time_t downtimeStarting; - bool readyForFree; + time_t downtime_start; char * error; diff --git a/src/pool.cc b/src/pool.cc index 5dbe096..cc18efc 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -14,7 +14,7 @@ #include "queue.h" #include "utils.h" -size_t cbk_repeat = 1000; +size_t pool_tick_repeat = 1000; void pool_spawn_connection(pool_t * pool) { connection_t * connection = connection_alloc(pool->connection_info, pool); @@ -44,7 +44,7 @@ pool_alloc() { return pool; } -void cbk(uv_idle_t * handle, int status) { +void pool_tick(uv_idle_t * handle, int status) { pool_t * pool = (pool_t *) handle->data; @@ -54,12 +54,9 @@ void cbk(uv_idle_t * handle, int status) { while (connection != pool->connection_queue) { prev = connection->prev; - connection_process(connection); - - if (connection->readyForFree) { - if (difftime(time(NULL), connection->downtimeStarting) > pool->lifetime) { + if ((time(NULL) - pool->lifetime ) < connection->downtime_start) { connection->status = DESTROYING; - } + connection_free(connection); } connection = prev; @@ -74,7 +71,7 @@ void pool_init(pool_t * pool, size_t max_size, size_t lifetime, uv_timer_init(uv_default_loop(), pool->timer); pool->timer->data = pool; - uv_timer_start(pool->timer, (uv_timer_cb) cbk, 0, cbk_repeat); + uv_timer_start(pool->timer, (uv_timer_cb) pool_tick, 0, pool_tick_repeat); pool->error_callback = v8::Persistent::New(error_callback); } From 53d180cd8cd3669d95e1ec6289013dd9d3051e6f Mon Sep 17 00:00:00 2001 From: Alexey Belyakov Date: Tue, 26 Aug 2014 12:16:37 +0400 Subject: [PATCH 13/13] Remove DESTROYING status --- src/connection.cc | 7 +------ src/connection.h | 2 +- src/pool.cc | 1 - 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/connection.cc b/src/connection.cc index f4d0bd7..9dd45fe 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -93,7 +93,7 @@ void connection_queue_work(connection_t * connection, uv_work_cb work) { } void connection_fetch_query(connection_t * connection) { - if (connection->current_query == NULL && connection->status != DESTROYING) { + if (connection->current_query == NULL ) { queue_shift(connection->pool->query_queue, connection->current_query); @@ -155,11 +155,6 @@ void connection_process(connection_t * connection) { break; } - case DESTROYING: { - connection_free(connection); - break; - } - case NEW: { break; } diff --git a/src/connection.h b/src/connection.h index 2882543..d5d9099 100644 --- a/src/connection.h +++ b/src/connection.h @@ -18,7 +18,7 @@ #include "pool.h" typedef enum { - NEW = 0, INITIALIZING, ACTIVE, DESTROYING + NEW = 0, INITIALIZING, ACTIVE } entity_status_t; typedef enum { diff --git a/src/pool.cc b/src/pool.cc index cc18efc..65b4179 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -55,7 +55,6 @@ void pool_tick(uv_idle_t * handle, int status) { prev = connection->prev; if ((time(NULL) - pool->lifetime ) < connection->downtime_start) { - connection->status = DESTROYING; connection_free(connection); }