diff --git a/Makefile b/Makefile index 9915c2a..f818166 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -TOOLS_PATH = /home/livetex/livetex-tools/0.3.4 +TOOLS_PATH = /home/aleksey/work/Node-Pg/node_modules/livetex-tools/ include $(TOOLS_PATH)/rules/js.mk include $(TOOLS_PATH)/rules/cpp.mk \ No newline at end of file diff --git a/benchmarks/pg.js b/benchmarks/pg.js index 4e7fe33..3835f0d 100644 --- a/benchmarks/pg.js +++ b/benchmarks/pg.js @@ -1,6 +1,15 @@ var pg = require('../bin'); -pg.init(5, { +var first = pg.init(5, 5, { + 'dbname': 'relive', + 'user': 'test', + 'password': 'lttest', + 'host': '192.168.48.14', + 'port': '5432', + 'connect_timeout': '5' +}); + +var second = pg.init(5, 5, { 'dbname': 'relive', 'user': 'test', 'password': 'lttest', @@ -19,7 +28,7 @@ var t = Date.now(); var mem = 0; function exec() { - pg.exec(query, complete, cancel); + pg.exec(first, query, complete(first.name), cancel); } @@ -30,11 +39,12 @@ function cancel() { complete(); } -function complete() { +function complete(name) { mem += process.memoryUsage().heapUsed/1024/1024; if ((r += 1) === count) { console.log('[LIVETEX-NODE-PG] | R:', r, ' | E:', e, ' | T:', Date.now() - t, ' | M:', (Math.round(mem/r*10)/10)); + run(); } } diff --git a/benchmarks/test_pg.js b/benchmarks/test_pg.js new file mode 100644 index 0000000..c0c3891 --- /dev/null +++ b/benchmarks/test_pg.js @@ -0,0 +1,29 @@ +var assert = require('assert'); +var pg = require('../bin'); + +var init_info = { + 'dbname': 'relive', + 'user': 'test', + 'password': 'lttest', + 'host': '192.168.48.14', + 'port': '5432', + 'connect_timeout': '5' +} + +var first, second; + +assert.throws( function() { first = pg.init(); }, Error, "Bad init" ); + +assert.throws( function() { pg.exec(first, first, function(table) {}, console.error); }, Error, "Incorrect req with bad handle" ); + +assert.throws( function() { pg.exec(second, second, function(table) {}, console.error); }, Error, "Incorrect req with good handle"); + +assert.throws( function() { pg.destroy(); }, Error, "Destroy without handle" ); + +assert.throws( function() { pg.destroy(first); }, Error, "Destroy with incorrect handle" ); + +assert.doesNotThrow( function() { second = pg.init(5, init_info); }, Error, "Good init" ); + +assert.doesNotThrow( function() { pg.exec(second, "SELECT 1 AS value", function(table) {}, console.error); }, Error, "Good req with good handle" ); + +assert.doesNotThrow( function() { pg.destroy(second); }, Error, "Good destroy with correct handle" ); diff --git a/lib/pg/pg.js b/lib/pg/pg.js index 7f01c18..ce01f79 100644 --- a/lib/pg/pg.js +++ b/lib/pg/pg.js @@ -142,13 +142,13 @@ pg.escapeArray = function(array) { * keys specified in documentation. * @param {!pg.ErrorHandler=} opt_errorHandler Connection error handler. *
console.error
will be used by default. + * return handle */ -pg.init = function(size, options, opt_errorHandler) { - __pg.init(size, querystring.unescape( - querystring.stringify(options, ' ')), opt_errorHandler || console.error); +pg.init = function(size, connectionLifetime, options, opt_errorHandler) { + return __pg.init(size, connectionLifetime, querystring.unescape( + querystring.stringify(options, ' ')), opt_errorHandler); }; - /** * SQL-query executing. * @@ -158,8 +158,8 @@ pg.init = function(size, options, opt_errorHandler) { * @param {!pg.ResultHandler} complete Success result handler. * @param {!pg.ErrorHandler} cancel Execution error handler. */ -pg.exec = function(query, complete, cancel) { - __pg.exec(query, function(error, result) { +pg.exec = function(handle, query, complete, cancel) { + __pg.exec(handle, query, function(error, result) { if (error.length > 0) { cancel(error); } else { @@ -221,11 +221,10 @@ pg.prepareQuery = function(query, params) { return query.replace(pg.__PARAM_EXP, replacer); }; - /** * Destroy connection pool. */ -pg.destroy = function() { - __pg.destroy(); +pg.destroy = function(handle) { + __pg.destroy(handle); }; diff --git a/src/connection.cc b/src/connection.cc index 94d7842..9dd45fe 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -14,7 +14,6 @@ #include "queue.h" #include "utils.h" - void connection_connect_work(uv_work_t * work) { connection_t * connection = (connection_t *) work->data; @@ -26,7 +25,6 @@ void connection_connect_work(uv_work_t * work) { } } - void connection_exec_work(uv_work_t * work) { connection_t * connection = (connection_t *) work->data; @@ -40,27 +38,26 @@ void connection_exec_work(uv_work_t * work) { PGresult * result = PQexec(connection->descriptor, query->request); switch (PQresultStatus(result)) { - case PGRES_COMMAND_OK: { - PQclear(result); - break; - } - - case PGRES_TUPLES_OK: { - query->result = result; - break; - } - - default: { - query->error = copy_string(PQresultErrorMessage(result)); - PQclear(result); - break; - } + case PGRES_COMMAND_OK: { + PQclear(result); + break; + } + + case PGRES_TUPLES_OK: { + query->result = result; + break; + } + + default: { + query->error = copy_string(PQresultErrorMessage(result)); + PQclear(result); + break; + } } } } } - void connection_work_handler(uv_work_t * work) { connection_t * connection = (connection_t *) work->data; @@ -75,7 +72,7 @@ void connection_work_handler(uv_work_t * work) { pool_handle_error(pool, connection->error); pool_process(pool); - connection_destroy(connection); + connection_destroy_req(connection); } connection->activity_status = FREE; @@ -84,26 +81,26 @@ void connection_work_handler(uv_work_t * work) { free(work); } - void connection_queue_work(connection_t * connection, uv_work_cb work) { uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t)); work_item->data = connection; connection->activity_status = BUSY; + connection->downtime_start = 0; uv_queue_work(uv_default_loop(), work_item, work, (uv_after_work_cb) connection_work_handler); } - void connection_fetch_query(connection_t * connection) { - if (connection->current_query == NULL && connection->status != DESTROYING) { + if (connection->current_query == NULL ) { + queue_shift(connection->pool->query_queue, connection->current_query); if (connection->current_query != NULL) { connection_queue_work(connection, connection_exec_work); } else { - connection_destroy(connection); + connection_destroy_req(connection); } } } @@ -131,49 +128,37 @@ connection_t * connection_alloc(char * connection_info, pool_t * pool) { return connection; } - void connection_init(connection_t * connection) { connection->status = INITIALIZING; connection_queue_work(connection, connection_connect_work); } - -void connection_destroy(connection_t * connection) { - connection->status = DESTROYING; - connection_process(connection); +void connection_destroy_req(connection_t * connection) { + connection->downtime_start = time(NULL); } - void connection_process(connection_t * connection) { + if (connection->activity_status == FREE) { query_t * query = connection->current_query; connection->current_query = NULL; switch (connection->status) { - case INITIALIZING: { - connection->status = ACTIVE; - connection_fetch_query(connection); - - break; - } - - case ACTIVE: { - connection_fetch_query(connection); - - break; - } - - case DESTROYING: { - connection_free(connection); - - break; - } + case INITIALIZING: { + connection->status = ACTIVE; + connection_fetch_query(connection); + break; + } - case NEW: { - break; - } + case ACTIVE: { + connection_fetch_query(connection); + break; } + case NEW: { + break; + } + } if (query != NULL) { query_apply(query); @@ -182,7 +167,6 @@ void connection_process(connection_t * connection) { } } - void connection_free(connection_t * connection) { if (connection->prev != NULL) { queue_remove(connection); diff --git a/src/connection.h b/src/connection.h index 7883fe0..d5d9099 100644 --- a/src/connection.h +++ b/src/connection.h @@ -8,6 +8,7 @@ #ifndef CONNECTION_H_ #define CONNECTION_H_ +#include #include @@ -16,20 +17,14 @@ #include "query.h" #include "pool.h" - typedef enum { - NEW = 0, - INITIALIZING, - ACTIVE, - DESTROYING + NEW = 0, INITIALIZING, ACTIVE } entity_status_t; typedef enum { - BUSY = 0, - FREE + BUSY = 0, FREE } activity_status_t; - typedef struct connection_ { char * connection_info; @@ -45,20 +40,25 @@ typedef struct connection_ { entity_status_t status; activity_status_t activity_status; + time_t downtime_start; + char * error; } connection_t; +connection_t * +connection_alloc(char * connection_info, struct pool_ * pool); -connection_t * connection_alloc(char * connection_info, struct pool_ * pool); - -void connection_init(connection_t * connection); - -void connection_process(connection_t * connection); +void +connection_init(connection_t * connection); -void connection_destroy(connection_t * connection); +void +connection_process(connection_t * connection); -void connection_free(connection_t * connection); +void +connection_destroy_req(connection_t * connection); +void +connection_free(connection_t * connection); #endif /* CONNECTION_H_ */ diff --git a/src/pg.cc b/src/pg.cc index df8e75f..b85e61f 100644 --- a/src/pg.cc +++ b/src/pg.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -9,7 +10,6 @@ #include #include - #include "pool.h" #include "query.h" #include "connection.h" @@ -17,77 +17,111 @@ #include +v8::Persistent handle_tmpl = v8::Persistent< + v8::ObjectTemplate>::New(v8::ObjectTemplate::New()); -pool_t * pool; +v8::Handle pg_init(const v8::Arguments &args) { + v8::HandleScope scope; -v8::Handle pg_init(const v8::Arguments &args) { - v8::HandleScope scope; + handle_tmpl->SetInternalFieldCount(1); - if (args.Length() < 1) { + if (args.Length() < 1 || (!args[0]->IsNumber())) { return throw_type_error("First argument must be max pool size!"); } - if (args.Length() < 2) { - return throw_type_error("Second argument must be connection string!"); + if (args.Length() < 2 || (!args[1]->IsNumber())) { + return throw_type_error("Second argument must be connection lifetime!"); } - if (args.Length() < 3 && !args[2]->IsFunction()) { - return throw_type_error("Third argument must be error callback!"); + if (args.Length() < 3 || (!args[2]->IsString())) { + return throw_type_error("Third argument must be connection string!"); } - v8::String::Utf8Value str(args[1]->ToString()); + if (args.Length() < 4 && !args[3]->IsFunction()) { + return throw_type_error("Fourth argument must be error callback!"); + } - pool_init(pool, args[0]->ToInteger()->Int32Value(), *str, - v8::Local::Cast(args[2])); + v8::String::Utf8Value str(args[2]->ToString()); - return scope.Close(v8::Undefined()); + pool_t * pool = pool_alloc(); + + pool_init(pool, args[0]->ToInteger()->Int32Value(), + args[1]->ToInteger()->Int32Value(), *str, + v8::Local::Cast(args[3])); + + pool->data = v8::Persistent::New(handle_tmpl->NewInstance()); + + pool->data->SetPointerInInternalField(0, pool); + + return scope.Close(pool->data); } -v8::Handle pg_exec(const v8::Arguments &args) { - v8::HandleScope scope; +v8::Handle pg_exec(const v8::Arguments& args) { + v8::HandleScope scope; + + if ((args.Length() < 1) || (!args[0]->IsObject())) { + return throw_type_error("First argument must be pool handle!"); + } - if (args.Length() < 1) { - return throw_type_error("First argument must be query request!"); + if (args.Length() < 2 || (!args[1]->IsString())) { + return throw_type_error("Second argument must be query request!"); } - if (args.Length() < 2 && !args[1]->IsFunction()) { - return throw_type_error("Second argument must be query callback!"); + if (args.Length() < 3 && !args[2]->IsFunction()) { + return throw_type_error("Third argument must be query callback!"); } - v8::String::Utf8Value str(args[0]->ToString()); + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); + + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField(0); + + if (pool == NULL) + return throw_type_error("Invalid handle!"); - query_t * query = query_alloc(v8::Local::Cast(args[1]), *str); + v8::String::Utf8Value str(args[1]->ToString()); + + query_t * query = query_alloc(v8::Local::Cast(args[2]), *str); - pool_exec(pool, query); + pool_exec(pool, query); - return scope.Close(v8::Undefined()); + return scope.Close(v8::Undefined()); } +v8::Handle pg_destroy(const v8::Arguments& args) { + v8::HandleScope scope; + + if ((args.Length() < 1) || (!args[0]->IsObject())) { + return throw_type_error("First argument must be pool handle!"); + } -v8::Handle pg_destroy(const v8::Arguments &args) { - v8::HandleScope scope; + if (args[0]->ToObject()->InternalFieldCount() < 1) + return throw_type_error("Invalid handle!"); - pool_destroy(pool); + pool_t * pool = (pool_t *) args[0]->ToObject()->GetPointerFromInternalField( + 0); - return scope.Close(v8::Undefined()); -} + if (pool == NULL) + return throw_type_error("Invalid handle!"); + pool_destroy(pool); -void init (v8::Handle target) { - pool = pool_alloc(); + return scope.Close(v8::Undefined()); +} + +void init(v8::Handle target) { v8::HandleScope scope; - target->Set(v8::String::New("init"), - v8::FunctionTemplate::New(pg_init)->GetFunction()); + target->Set(v8::String::New("init"), + v8::FunctionTemplate::New(pg_init)->GetFunction()); - target->Set(v8::String::New("exec"), - v8::FunctionTemplate::New(pg_exec)->GetFunction()); + target->Set(v8::String::New("exec"), + v8::FunctionTemplate::New(pg_exec)->GetFunction()); - target->Set(v8::String::New("destroy"), - v8::FunctionTemplate::New(pg_destroy)->GetFunction()); + target->Set(v8::String::New("destroy"), + v8::FunctionTemplate::New(pg_destroy)->GetFunction()); } - NODE_MODULE(pg, init) diff --git a/src/pool.cc b/src/pool.cc index 7e3227e..65b4179 100644 --- a/src/pool.cc +++ b/src/pool.cc @@ -6,12 +6,15 @@ */ #include +#include "stddef.h" +#include #include "pool.h" #include "connection.h" #include "queue.h" #include "utils.h" +size_t pool_tick_repeat = 1000; void pool_spawn_connection(pool_t * pool) { connection_t * connection = connection_alloc(pool->connection_info, pool); @@ -19,41 +22,65 @@ void pool_spawn_connection(pool_t * pool) { connection_init(connection); } - -pool_t * pool_alloc() { +pool_t * +pool_alloc() { pool_t * pool = (pool_t *) malloc(sizeof(pool_t)); pool->connection_info = NULL; pool->max_size = 0; + pool->lifetime = 0; + pool->connection_queue = (connection_t *) malloc(sizeof(connection_t)); pool->query_queue = (query_t *) malloc(sizeof(query_t)); queue_init(pool->connection_queue); + + pool->timer = (uv_timer_t *) malloc(sizeof(uv_timer_t)); + queue_init(pool->query_queue); return pool; } +void pool_tick(uv_idle_t * handle, int status) { + + pool_t * pool = (pool_t *) handle->data; + + connection_t * connection = pool->connection_queue->prev; + connection_t * prev = NULL; + + while (connection != pool->connection_queue) { + prev = connection->prev; -void pool_init(pool_t * pool, size_t max_size, const char * connection_info, - v8::Local error_callback) { + if ((time(NULL) - pool->lifetime ) < connection->downtime_start) { + connection_free(connection); + } + connection = prev; + } +} + +void pool_init(pool_t * pool, size_t max_size, size_t lifetime, + const char * connection_info, v8::Local error_callback) { pool->connection_info = copy_string(connection_info); pool->max_size = max_size; + pool->lifetime = lifetime; + + uv_timer_init(uv_default_loop(), pool->timer); + pool->timer->data = pool; + uv_timer_start(pool->timer, (uv_timer_cb) pool_tick, 0, pool_tick_repeat); pool->error_callback = v8::Persistent::New(error_callback); } - void pool_exec(pool_t * pool, query_t * query) { queue_push(pool->query_queue, query); pool_process(pool); } - void pool_handle_error(pool_t * pool, char * error) { v8::HandleScope scope; v8::Handle argv[1]; @@ -63,13 +90,13 @@ void pool_handle_error(pool_t * pool, char * error) { pool->error_callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); } - void pool_process(pool_t * pool) { size_t i = 0; connection_t * connection = pool->connection_queue->prev; connection_t * prev = NULL; while (connection != pool->connection_queue) { + prev = connection->prev; connection_process(connection); @@ -86,12 +113,18 @@ void pool_destroy(pool_t * pool) { connection_t * connection; query_t * query; - queue_flush(pool->connection_queue, connection, connection_destroy); + queue_flush(pool->connection_queue, connection, connection_destroy_req); queue_flush(pool->query_queue, query, query_free); pool->max_size = 0; pool->error_callback.Dispose(); + pool->data->SetPointerInInternalField(0, NULL); + pool->data.Dispose(); + + uv_timer_stop(pool->timer); + free(pool->timer); + free(pool->connection_info); pool->connection_info = NULL; } @@ -105,4 +138,3 @@ void pool_free(pool_t * pool) { free(pool); } - diff --git a/src/pool.h b/src/pool.h index 92d6684..d49361f 100644 --- a/src/pool.h +++ b/src/pool.h @@ -8,30 +8,34 @@ #ifndef POOL_H_ #define POOL_H_ - #include #include "query.h" #include "connection.h" - typedef struct pool_ { char * connection_info; size_t max_size; + size_t lifetime; + query_t * query_queue; + v8::Persistent data; + struct connection_ * connection_queue; v8::Persistent error_callback; -} pool_t; + uv_timer_t * timer; + +} pool_t; pool_t * pool_alloc(); -void pool_init(pool_t * pool, size_t max_size, const char * connection_info, - v8::Local error_callback); +void pool_init(pool_t * pool, size_t max_size, size_t lifetime, + const char * connection_info, v8::Local error_callback); void pool_exec(pool_t * pool, query_t * query); @@ -43,5 +47,4 @@ void pool_destroy(pool_t * pool); void pool_free(pool_t * pool); - #endif /* POOL_H_ */ diff --git a/src/query.cc b/src/query.cc index 545d628..76b3f5a 100644 --- a/src/query.cc +++ b/src/query.cc @@ -5,13 +5,11 @@ * Author: kononencheg */ - #include #include "query.h" #include "utils.h" - query_t * query_alloc(v8::Local callback, const char * request) { query_t * query = (query_t *) malloc(sizeof(query_t)); @@ -24,7 +22,6 @@ query_t * query_alloc(v8::Local callback, const char * request) { return query; } - void query_apply(query_t * query) { v8::HandleScope scope; @@ -46,7 +43,6 @@ void query_apply(query_t * query) { query->callback->Call(v8::Context::GetCurrent()->Global(), argc, argv); } - void query_free(query_t * query) { query->callback.Dispose(); diff --git a/src/query.h b/src/query.h index 59e97d2..b534b1d 100644 --- a/src/query.h +++ b/src/query.h @@ -11,7 +11,6 @@ #include #include - typedef struct query_ { v8::Persistent callback; @@ -25,7 +24,6 @@ typedef struct query_ { } query_t; - query_t * query_alloc(v8::Local callback, const char * request); void query_apply(query_t * query);