Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TOOLS_PATH = /home/livetex/livetex-tools/0.3.4
TOOLS_PATH = /home/aleksey/work/Node-Pg/node_modules/livetex-tools/

include $(TOOLS_PATH)/rules/js.mk
include $(TOOLS_PATH)/rules/cpp.mk
16 changes: 13 additions & 3 deletions benchmarks/pg.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
var pg = require('../bin');

pg.init(5, {
var first = pg.init(5, 5, {
'dbname': 'relive',
'user': 'test',
'password': 'lttest',
'host': '192.168.48.14',
'port': '5432',
'connect_timeout': '5'
});

var second = pg.init(5, 5, {
'dbname': 'relive',
'user': 'test',
'password': 'lttest',
Expand All @@ -19,7 +28,7 @@ var t = Date.now();
var mem = 0;

function exec() {
pg.exec(query, complete, cancel);
pg.exec(first, query, complete(first.name), cancel);
}


Expand All @@ -30,11 +39,12 @@ function cancel() {
complete();
}

function complete() {
function complete(name) {
mem += process.memoryUsage().heapUsed/1024/1024;

if ((r += 1) === count) {
console.log('[LIVETEX-NODE-PG] | R:', r, ' | E:', e, ' | T:', Date.now() - t, ' | M:', (Math.round(mem/r*10)/10));

run();
}
}
Expand Down
29 changes: 29 additions & 0 deletions benchmarks/test_pg.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
var assert = require('assert');
var pg = require('../bin');

var init_info = {
'dbname': 'relive',
'user': 'test',
'password': 'lttest',
'host': '192.168.48.14',
'port': '5432',
'connect_timeout': '5'
}

var first, second;

assert.throws( function() { first = pg.init(); }, Error, "Bad init" );

assert.throws( function() { pg.exec(first, first, function(table) {}, console.error); }, Error, "Incorrect req with bad handle" );

assert.throws( function() { pg.exec(second, second, function(table) {}, console.error); }, Error, "Incorrect req with good handle");

assert.throws( function() { pg.destroy(); }, Error, "Destroy without handle" );

assert.throws( function() { pg.destroy(first); }, Error, "Destroy with incorrect handle" );

assert.doesNotThrow( function() { second = pg.init(5, init_info); }, Error, "Good init" );

assert.doesNotThrow( function() { pg.exec(second, "SELECT 1 AS value", function(table) {}, console.error); }, Error, "Good req with good handle" );

assert.doesNotThrow( function() { pg.destroy(second); }, Error, "Good destroy with correct handle" );
17 changes: 8 additions & 9 deletions lib/pg/pg.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ pg.escapeArray = function(array) {
* keys specified in <a href="http://goo.gl/eqPw4">documentation</a>.
* @param {!pg.ErrorHandler=} opt_errorHandler Connection error handler.
* <pre>console.error</pre> will be used by default.
* return handle
*/
pg.init = function(size, options, opt_errorHandler) {
__pg.init(size, querystring.unescape(
querystring.stringify(options, ' ')), opt_errorHandler || console.error);
pg.init = function(size, connectionLifetime, options, opt_errorHandler) {
return __pg.init(size, connectionLifetime, querystring.unescape(
querystring.stringify(options, ' ')), opt_errorHandler);
};


/**
* SQL-query executing.
*
Expand All @@ -158,8 +158,8 @@ pg.init = function(size, options, opt_errorHandler) {
* @param {!pg.ResultHandler} complete Success result handler.
* @param {!pg.ErrorHandler} cancel Execution error handler.
*/
pg.exec = function(query, complete, cancel) {
__pg.exec(query, function(error, result) {
pg.exec = function(handle, query, complete, cancel) {
__pg.exec(handle, query, function(error, result) {
if (error.length > 0) {
cancel(error);
} else {
Expand Down Expand Up @@ -221,11 +221,10 @@ pg.prepareQuery = function(query, params) {
return query.replace(pg.__PARAM_EXP, replacer);
};


/**
* Destroy connection pool.
*/
pg.destroy = function() {
__pg.destroy();
pg.destroy = function(handle) {
__pg.destroy(handle);
};

86 changes: 35 additions & 51 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "queue.h"
#include "utils.h"


void connection_connect_work(uv_work_t * work) {
connection_t * connection = (connection_t *) work->data;

Expand All @@ -26,7 +25,6 @@ void connection_connect_work(uv_work_t * work) {
}
}


void connection_exec_work(uv_work_t * work) {
connection_t * connection = (connection_t *) work->data;

Expand All @@ -40,27 +38,26 @@ void connection_exec_work(uv_work_t * work) {
PGresult * result = PQexec(connection->descriptor, query->request);

switch (PQresultStatus(result)) {
case PGRES_COMMAND_OK: {
PQclear(result);
break;
}

case PGRES_TUPLES_OK: {
query->result = result;
break;
}

default: {
query->error = copy_string(PQresultErrorMessage(result));
PQclear(result);
break;
}
case PGRES_COMMAND_OK: {
PQclear(result);
break;
}

case PGRES_TUPLES_OK: {
query->result = result;
break;
}

default: {
query->error = copy_string(PQresultErrorMessage(result));
PQclear(result);
break;
}
}
}
}
}


void connection_work_handler(uv_work_t * work) {
connection_t * connection = (connection_t *) work->data;

Expand All @@ -75,7 +72,7 @@ void connection_work_handler(uv_work_t * work) {
pool_handle_error(pool, connection->error);
pool_process(pool);

connection_destroy(connection);
connection_destroy_req(connection);
}

connection->activity_status = FREE;
Expand All @@ -84,26 +81,26 @@ void connection_work_handler(uv_work_t * work) {
free(work);
}


void connection_queue_work(connection_t * connection, uv_work_cb work) {
uv_work_t * work_item = (uv_work_t *) malloc(sizeof(uv_work_t));
work_item->data = connection;

connection->activity_status = BUSY;
connection->downtime_start = 0;

uv_queue_work(uv_default_loop(), work_item, work,
(uv_after_work_cb) connection_work_handler);
}


void connection_fetch_query(connection_t * connection) {
if (connection->current_query == NULL && connection->status != DESTROYING) {
if (connection->current_query == NULL ) {

queue_shift(connection->pool->query_queue, connection->current_query);

if (connection->current_query != NULL) {
connection_queue_work(connection, connection_exec_work);
} else {
connection_destroy(connection);
connection_destroy_req(connection);
}
}
}
Expand Down Expand Up @@ -131,49 +128,37 @@ connection_t * connection_alloc(char * connection_info, pool_t * pool) {
return connection;
}


void connection_init(connection_t * connection) {
connection->status = INITIALIZING;
connection_queue_work(connection, connection_connect_work);
}


void connection_destroy(connection_t * connection) {
connection->status = DESTROYING;
connection_process(connection);
void connection_destroy_req(connection_t * connection) {
connection->downtime_start = time(NULL);
}


void connection_process(connection_t * connection) {

if (connection->activity_status == FREE) {
query_t * query = connection->current_query;
connection->current_query = NULL;

switch (connection->status) {
case INITIALIZING: {
connection->status = ACTIVE;
connection_fetch_query(connection);

break;
}

case ACTIVE: {
connection_fetch_query(connection);

break;
}

case DESTROYING: {
connection_free(connection);

break;
}
case INITIALIZING: {
connection->status = ACTIVE;
connection_fetch_query(connection);
break;
}

case NEW: {
break;
}
case ACTIVE: {
connection_fetch_query(connection);
break;
}

case NEW: {
break;
}
}

if (query != NULL) {
query_apply(query);
Expand All @@ -182,7 +167,6 @@ void connection_process(connection_t * connection) {
}
}


void connection_free(connection_t * connection) {
if (connection->prev != NULL) {
queue_remove(connection);
Expand Down
30 changes: 15 additions & 15 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifndef CONNECTION_H_
#define CONNECTION_H_

#include <time.h>

#include <libpq-fe.h>

Expand All @@ -16,20 +17,14 @@
#include "query.h"
#include "pool.h"


typedef enum {
NEW = 0,
INITIALIZING,
ACTIVE,
DESTROYING
NEW = 0, INITIALIZING, ACTIVE
} entity_status_t;

typedef enum {
BUSY = 0,
FREE
BUSY = 0, FREE
} activity_status_t;


typedef struct connection_ {
char * connection_info;

Expand All @@ -45,20 +40,25 @@ typedef struct connection_ {
entity_status_t status;
activity_status_t activity_status;

time_t downtime_start;

char * error;

} connection_t;

connection_t *
connection_alloc(char * connection_info, struct pool_ * pool);

connection_t * connection_alloc(char * connection_info, struct pool_ * pool);

void connection_init(connection_t * connection);

void connection_process(connection_t * connection);
void
connection_init(connection_t * connection);

void connection_destroy(connection_t * connection);
void
connection_process(connection_t * connection);

void connection_free(connection_t * connection);
void
connection_destroy_req(connection_t * connection);

void
connection_free(connection_t * connection);

#endif /* CONNECTION_H_ */
Loading