diff --git a/README.md b/README.md index 95f24fe..0605e30 100644 --- a/README.md +++ b/README.md @@ -463,7 +463,18 @@ Disque node. #### `QLEN ` -Return the length of the queue. +Return the length of the queue in the local node. + +#### `GLOBALQLEN ` + +Return the length of the queue across the cluster. + +This is calculated by asking all the nodes their qlen and adding +all the values together. The values is cached for a second. + +The first client to run this command in a node/queue will be blocked +waiting for the answer. If some node is busy and takes more than .5s +to reply its items may not be counted. #### `QSTAT ` diff --git a/disque.conf b/disque.conf index a816fa5..bc4edd2 100644 --- a/disque.conf +++ b/disque.conf @@ -432,3 +432,10 @@ hz 10 # in order to commit the file to the disk more incrementally and avoid # big latency spikes. aof-rewrite-incremental-fsync yes + +# for cases where the queuing of items is more centralized than the +# dequeuing, we can configure the servers to always queue the jobs +# in a random node of those it was delivered to, instead of the local, +# to better distribute the work load. +queue-randomly no + diff --git a/src/blocked.c b/src/blocked.c index 01811fd..856292a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -138,6 +138,8 @@ void unblockClient(client *c) { unblockClientWaitingJobRepl(c); } else if (c->btype == BLOCKED_GETJOB) { unblockClientBlockedForJobs(c); + } else if (c->btype == BLOCKED_GLOBAL_QLEN) { + unblockClientBlockedForQLen(c); } else { serverPanic("Unknown btype in unblockClient()."); } @@ -164,6 +166,8 @@ void replyToBlockedClientTimedOut(client *c) { return; } else if (c->btype == BLOCKED_GETJOB) { addReply(c,shared.nullmultibulk); + } else if (c->btype == BLOCKED_GLOBAL_QLEN) { + /* Do nothing - unblock client will send the best value we got */ } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } diff --git a/src/cluster.c b/src/cluster.c index ad7e0a0..6b86ee8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -31,6 +31,7 @@ #include "server.h" #include "cluster.h" #include "endianconv.h" +#include "queue.h" #include "ack.h" #include @@ -69,6 +70,8 @@ void clusterSendGotJob(clusterNode *node, job *j); void clusterSendGotAck(clusterNode *node, char *jobid, int known); void clusterBroadcastQueued(job *j, unsigned char flags); void clusterBroadcastDelJob(job *j); +void clusterSendGetQLen(robj *qname, dict *nodes); +void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen); sds representClusterNodeFlags(sds ci, uint16_t flags); /* ----------------------------------------------------------------------------- @@ -1493,6 +1496,29 @@ int clusterProcessPacket(clusterLink *link) { else receivePauseQueue(qname,count); decrRefCount(qname); + } else if (type == CLUSTERMSG_TYPE_GETQLEN) { + if (!sender) return 1; + uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen); + robj *qname = createStringObject(hdr->data.queueop.about.qname, + qnamelen); + + serverLog(LL_VERBOSE,"RECEIVED GETQLEN FOR QUEUE %s", + (char*)qname->ptr); + + clusterSendMyQLen(sender, qname, queueNameLength(qname)); + + } else if (type == CLUSTERMSG_TYPE_MYQLEN) { + if (!sender) return 1; + uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen); + robj *qname = createStringObject(hdr->data.queueop.about.qname, + qnamelen); + uint32_t myqlen = ntohl(hdr->data.queueop.about.aux); + + serverLog(LL_VERBOSE,"RECEIVED MYQLEN FOR QUEUE %s FROM %s: %d", + (char*)qname->ptr, sender->name, myqlen); + + myQLenForQueueName(qname, myqlen); + } else { serverLog(LL_WARNING,"Received unknown packet type: %d", type); } @@ -2092,6 +2118,55 @@ void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count) { if (payload != buf) zfree(payload); } +/* broadcasts a request for the qlen to the whole cluster. + * Used by GLOBALQLEN. It replies with the local queue size. */ +void clusterSendGetQLen(robj *qname, dict *nodes) { + uint32_t totlen, qnamelen = sdslen(qname->ptr); + uint32_t alloclen; + clusterMsg *hdr; + + serverLog(LL_VERBOSE, "Sending GETQLEN for %s, %d nodes", + (char *)qname->ptr, (int)dictSize(nodes)); + + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen; + alloclen = totlen; + if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg); + hdr = zmalloc(alloclen); + + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_GETQLEN); + hdr->data.queueop.about.qnamelen = htonl(qnamelen); + memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen); + hdr->totlen = htonl(totlen); + clusterBroadcastMessage(nodes,hdr,totlen); + zfree(hdr); +} + +/* send a reply to a GETQLEN request */ +void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen) { + uint32_t totlen, qnamelen = sdslen(qname->ptr); + uint32_t alloclen; + + if (!node->link) return; + + serverLog(LL_VERBOSE, "Sending QLEN for %s of %d items to %s", + (char *)qname->ptr, (int)qlen, (char *)node->name); + + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen; + alloclen = totlen; + if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg); + unsigned char buf[alloclen]; + clusterMsg *hdr = (clusterMsg*) buf; + + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MYQLEN); + hdr->data.queueop.about.aux = htonl(qlen); + hdr->data.queueop.about.qnamelen = htonl(qnamelen); + memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen); + hdr->totlen = htonl(totlen); + clusterSendMessage(node->link,buf,totlen); +} + /* ----------------------------------------------------------------------------- * CLUSTER cron job * -------------------------------------------------------------------------- */ diff --git a/src/cluster.h b/src/cluster.h index fc0c9f3..9b02a9c 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -113,6 +113,8 @@ typedef struct clusterState { #define CLUSTERMSG_TYPE_YOURJOBS 13 /* NEEDJOBS reply with jobs. */ #define CLUSTERMSG_TYPE_WORKING 14 /* Postpone re-queueing & dequeue */ #define CLUSTERMSG_TYPE_PAUSE 15 /* Change queue paused state. */ +#define CLUSTERMSG_TYPE_GETQLEN 16 /* request the node qlen */ +#define CLUSTERMSG_TYPE_MYQLEN 17 /* reply to node qlen request */ /* Initially we don't know our "name", but we'll find it once we connect * to the first node, using the getsockname() function. Then we'll use this @@ -161,7 +163,10 @@ typedef struct { * the PAUSE command to specify the queue to change the paused state. */ typedef struct { uint32_t aux; /* For NEEDJOB, how many jobs we request. - * FOR PAUSE, the pause flags to set on the queue. */ + * FOR PAUSE, the pause flags to set on the queue. + * For MYQLEN, the numer of items in the queue + */ + uint32_t qnamelen; /* Queue name total length. */ char qname[8]; /* Defined as 8 bytes just for alignment. */ } clusterMsgDataQueueOp; @@ -241,5 +246,6 @@ void clusterSendNeedJobs(robj *qname, int numjobs, dict *nodes); void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count); void clusterBroadcastJobIDMessage(dict *nodes, char *id, int type, uint32_t aux, unsigned char flags); void clusterBroadcastPause(robj *qname, uint32_t flags); +void clusterSendGetQLen(robj *qname, dict *nodes); #endif /* __CLUSTER_H */ diff --git a/src/config.c b/src/config.c index 4873737..65cfd03 100644 --- a/src/config.c +++ b/src/config.c @@ -410,6 +410,11 @@ void loadServerConfigFromString(char *config) { server.client_obuf_limits[class].hard_limit_bytes = hard; server.client_obuf_limits[class].soft_limit_bytes = soft; server.client_obuf_limits[class].soft_limit_seconds = soft_seconds; + } else if (!strcasecmp(argv[0],"queue-randomly") && argc == 2) { + if ((server.queue_randomly = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } + } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/help.h b/src/help.h index 58b32c4..660f2d7 100644 --- a/src/help.h +++ b/src/help.h @@ -46,6 +46,7 @@ struct commandHelp { { "WORKING", "", "Attempt to postpone next delivery of the specified job", 4, "1.0.0" }, { "NACK", " [ ...]", "Negative acknowledge: increment the NACK counter for the job and ask for a next delivery ASAP", 4, "1.0.0" }, { "QLEN", "", "Return number of queued jobs in the specified queue in the local node", 3, "1.0.0" }, + { "GLOBALQLEN", "", "Return number of queued jobs in the specified queue across the cluster - it's cached for a second", 3, "1.0.0" }, { "QSTAT", "", "Return local node queue statistics", 3, "1.0.0" }, { "QPEEK", " ", "Inspect jobs into a queue without actually fetching them", 3, "1.0.0" }, { "ENQUEUE", " [ ...]", "Force local node to put the specified jobs back into the queue", 3, "1.0.0" }, diff --git a/src/job.c b/src/job.c index 6ce564e..62e8c0e 100644 --- a/src/job.c +++ b/src/job.c @@ -1133,7 +1133,8 @@ int clientsCronHandleDelayedJobReplication(client *c) { * selected replication level is achieved or before to returning to * the caller for asynchronous jobs. */ void addjobCommand(client *c) { - long long replicate = server.cluster->size > 3 ? 3 : server.cluster->size; + long long cluster_size = server.cluster->size; + long long replicate = cluster_size > 3 ? 3 : cluster_size; long long ttl = 3600*24; long long retry = -1; long long delay = 0; @@ -1141,6 +1142,7 @@ void addjobCommand(client *c) { mstime_t timeout; int j, retval; int async = 0; /* Asynchronous request? */ + int queue_randomly = cluster_size > 1 ? server.queue_randomly : 0; int extrepl = getMemoryWarningLevel() > 0; /* Replicate externally? */ int leaving = myselfLeaving(); static uint64_t prev_ctime = 0; @@ -1150,6 +1152,7 @@ void addjobCommand(client *c) { * new messages here. */ if (leaving) extrepl = 1; + /* Parse args. */ for (j = 4; j < c->argc; j++) { char *opt = c->argv[j]->ptr; @@ -1191,12 +1194,22 @@ void addjobCommand(client *c) { j++; } else if (!strcasecmp(opt,"async")) { async = 1; + } else if (!strcasecmp(opt,"queue-randomly")) { + if (cluster_size == 1) { + addReplyError(c,"no QUEUE-RANDOMLY in single node"); + return; + } + queue_randomly = 1; } else { addReply(c,shared.syntaxerr); return; } } + /* if we want to queue randomly, it's probably ok to + skip the current node altogether */ + if (queue_randomly) extrepl = 1; + /* Parse the timeout argument. */ if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_MILLISECONDS) != C_OK) return; diff --git a/src/queue.c b/src/queue.c index 81c9f4c..def1ba8 100644 --- a/src/queue.c +++ b/src/queue.c @@ -74,6 +74,11 @@ queue *createQueue(robj *name) { q->needjobs_adhoc_attempt = 0; q->needjobs_responders = NULL; /* Created on demand to save memory. */ q->clients = NULL; /* Created on demand to save memory. */ + q->qlenclients = NULL; /* Created on demand to save memory. */ + + q->globalqlen = 0; + q->last_globalqlen_time = 0; + q->globalqlen_nodes = 0; q->current_import_jobs_time = server.mstime; q->current_import_jobs_count = 0; @@ -251,6 +256,7 @@ int GCQueue(queue *q, time_t max_idle_time) { time_t idle = server.unixtime - q->atime; if (idle < max_idle_time) return C_ERR; if (q->clients && listLength(q->clients) != 0) return C_ERR; + if (q->qlenclients && listLength(q->qlenclients) != 0) return C_ERR; if (skiplistLength(q->sl)) return C_ERR; if (q->flags & QUEUE_FLAG_PAUSED_ALL) return C_ERR; destroyQueue(q->name); @@ -341,6 +347,50 @@ void unblockClientBlockedForJobs(client *c) { dictEmpty(c->bpop.queues,NULL); } +/* -------------------------- Blocking on global qlen ---------------------------- */ + +/* Handle blocking if GLOBALQLEN didn't have fresh information + * + * 1) We set q->qlenclients to the list of clients blocking for this queue. + * 2) We set client->bpop.queues as well, as a dictionary of queues a client + * is blocked for. So we can resolve queues from clients. + * 3) When we get myqlen from all the nodes, we unblock the clients + * waiting for the length. */ +void blockForQLen(client *c, queue *q, mstime_t timeout) { + c->bpop.timeout = timeout; + c->bpop.flags = 0; + dictAdd(c->bpop.queues, q->name, NULL); + incrRefCount( q->name ); + if (q->qlenclients == NULL) q->qlenclients = listCreate(); + listAddNodeTail(q->qlenclients, c); + blockClient(c, BLOCKED_GLOBAL_QLEN); +} + +/* Unblock client waiting for globalqlen in queue. + * Never call this directly, call unblockClient() instead. */ +void unblockClientBlockedForQLen(client *c) { + dictEntry *de; + dictIterator *di; + + di = dictGetIterator(c->bpop.queues); + while((de = dictNext(di)) != NULL) { + robj *qname = dictGetKey(de); + queue *q = lookupQueue(qname); + serverAssert(q != NULL); + + addReplyLongLong(c,q->globalqlen); + + listDelNode(q->qlenclients,listSearchKey(q->qlenclients,c)); + if (listLength(q->qlenclients) == 0) { + listRelease(q->qlenclients); + q->qlenclients = NULL; + GCQueue(q,QUEUE_MAX_IDLE_TIME); + } + } + dictReleaseIterator(di); + dictEmpty(c->bpop.queues,NULL); +} + /* Add the specified queue to server.ready_queues if there is at least * one client blocked for this queue. Otherwise no operation is performed. */ void signalQueueAsReady(queue *q) { @@ -1229,3 +1279,66 @@ void pauseCommand(client *c) { reply = sdscatlen(reply,"\r\n",2); addReplySds(c,reply); } + +/* GLOBALQLEN + * + * Returns the size of the queue across the full cluster, by broadcasting + * a request for the queue size, and waiting for the reply + * + * */ +#define GLOBALQLEN_MAX_AGE 1000 +#define GLOBALQLEN_CLIENT_TIMEOUT 500 +void globalqlenCommand(client *c) { + queue *q = lookupQueue(c->argv[1]); + + /* Create the queue if it does not exist. We need the queue structure + * to store meta-data needed to broadcast the QLEN request and keep + * the replies */ + if (!q) q = createQueue(c->argv[1]); + + mstime_t msnow = mstime(); + + if (q->last_globalqlen_time<(msnow-GLOBALQLEN_MAX_AGE)) { + q->last_globalqlen_time = msnow; + q->globalqlen = queueLength(q); + q->globalqlen_nodes = 1; + if (server.cluster->size > 1) + clusterSendGetQLen(q->name, server.cluster->nodes); + } + + if (q->last_globalqlen_time>(msnow-GLOBALQLEN_MAX_AGE) + && (int)q->globalqlen_nodes == server.cluster->size ) { + addReplyLongLong(c,q->globalqlen); + return; + } + + blockForQLen(c, q, msnow + GLOBALQLEN_MAX_AGE); +} + +void myQLenForQueue(queue *q, uint32_t qlen) { + if (!q) return; + + q->globalqlen = q->globalqlen + qlen; + q->globalqlen_nodes++; + + if (!q->qlenclients) + return; + + if ((int)q->globalqlen_nodes == server.cluster->size) { + int numclients = listLength(q->qlenclients); + while(numclients--) { + listNode *ln = listFirst(q->qlenclients); + client *c = ln->value; + /* This will remove it from q->qlenclients. + * and send the qlen to the client */ + unblockClient(c); + } + } +} + +void myQLenForQueueName(robj *qname, uint32_t qlen) { + queue *q = lookupQueue(qname); + if (!q) return; /* no queue, no need to keep this */ + + myQLenForQueue(q, qlen); +} diff --git a/src/queue.h b/src/queue.h index aa22d42..ec1a891 100644 --- a/src/queue.h +++ b/src/queue.h @@ -45,6 +45,7 @@ typedef struct queue { queued or when a new client fetches elements or blocks for elements to arrive. */ list *clients; /* Clients blocked here. */ + list *qlenclients;/* Clients waiting for globalqlen. */ /* === Federation related fields === */ mstime_t needjobs_bcast_time; /* Last NEEDJOBS cluster broadcast. */ @@ -76,6 +77,10 @@ typedef struct queue { uint32_t needjobs_bcast_attempt; /* Num of tries without new nodes. */ uint32_t needjobs_adhoc_attempt; /* Num of tries without new jobs. */ uint64_t jobs_in, jobs_out; /* Num of jobs enqueued and dequeued. */ + mstime_t last_globalqlen_time; /* time of last globalqlen request. */ + uint32_t globalqlen; /* best guess of the globalqlen */ + uint32_t globalqlen_nodes; /* nodes who reported myqlen */ + uint32_t flags; /* Queue flags. QUEUE_FLAG_* macros. */ uint32_t padding; /* Not used. Makes alignment obvious. */ } queue; @@ -104,5 +109,6 @@ void receiveYourJobs(struct clusterNode *node, uint32_t numjobs, unsigned char * void receiveNeedJobs(struct clusterNode *node, robj *qname, uint32_t count); void queueChangePausedState(queue *q, int flag, int set); void receivePauseQueue(robj *qname, uint32_t flags); +void myQLenForQueueName(robj *qname, uint32_t qlen); #endif diff --git a/src/server.c b/src/server.c index fbdf563..f22de05 100644 --- a/src/server.c +++ b/src/server.c @@ -137,6 +137,7 @@ struct serverCommand serverCommandTable[] = { /* Queues */ {"qlen",qlenCommand,2,"rF",0,NULL,0,0,0,0,0}, + {"globalqlen",globalqlenCommand,2,"rF",0,NULL,0,0,0,0,0}, {"qpeek",qpeekCommand,3,"r",0,NULL,0,0,0,0,0}, {"qstat",qstatCommand,2,"rF",0,NULL,0,0,0,0,0}, {"qscan",qscanCommand,-1,"r",0,NULL,0,0,0,0,0}, @@ -1064,6 +1065,7 @@ void initServerConfig(void) { server.assert_line = 0; server.bug_report_start = 0; server.watchdog_period = 0; + server.queue_randomly = 0; } extern char **environ; diff --git a/src/server.h b/src/server.h index 0a292c2..249ccc7 100644 --- a/src/server.h +++ b/src/server.h @@ -201,6 +201,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */ #define BLOCKED_JOB_REPL 1 /* Wait job synchronous replication. */ #define BLOCKED_GETJOB 2 /* Wait for new jobs in a set of queues. */ +#define BLOCKED_GLOBAL_QLEN 3 /* Wait for all the nodes to send MYQLEN */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -553,6 +554,7 @@ struct disqueServer { int assert_line; int bug_report_start; /* True if bug report header was already logged. */ int watchdog_period; /* Software watchdog period in ms. 0 = off */ + int queue_randomly; /* (by default) queue locally or in random node */ }; typedef void serverCommandProc(client *c); @@ -854,6 +856,7 @@ void commandCommand(client *c); void latencyCommand(client *c); void addjobCommand(client *c); void qlenCommand(client *c); +void globalqlenCommand(client *c); void getjobCommand(client *c); void showCommand(client *c); void ackjobCommand(client *c); diff --git a/tests/cluster/tests/13-globallqlen.tcl b/tests/cluster/tests/13-globallqlen.tcl new file mode 100644 index 0000000..87d140b --- /dev/null +++ b/tests/cluster/tests/13-globallqlen.tcl @@ -0,0 +1,43 @@ +source "../tests/includes/init-tests.tcl" +source "../tests/includes/job-utils.tcl" + +test "Globalqlen before anything else" { + set gqlen [D 2 globalqlen qlenqueue] + assert { $gqlen == 0 } +} + +test "Queue jobs into random nodes" { + for {set j 1} {$j <= 10} {incr j} { + set target_id [randomInt $::instances_count] + set body "somejob$j" + set id [D $target_id addjob qlenqueue $body 5000 replicate 3 retry 60] + assert {$id ne {}} + } +} + +test "check globalqlen on target node" { + set gqlen [D 0 globalqlen qlenqueue] + assert {$gqlen == 10} +} + +test "Get jobs from queue" { + for {set j 1} {$j <= 10} {incr j} { + set target_id [randomInt $::instances_count] + set myjob [lindex [D $target_id getjob from qlenqueue] 0] + assert {[lindex $myjob 0] eq "qlenqueue"} + assert {[lindex $myjob 1] ne {}} + assert {[lindex $myjob 2] ne {}} + + set res [D $target_id ackjob [lindex $myjob 1]] + } +} + +test "check globalqlen is now empty" { + wait_for_condition { + [D 1 globalqlen qlenqueue] == 0 + } else { + fail "globalqlen doesn't get empty" + } +} + + diff --git a/tests/cluster/tests/14-queue-randomly.tcl b/tests/cluster/tests/14-queue-randomly.tcl new file mode 100644 index 0000000..547c455 --- /dev/null +++ b/tests/cluster/tests/14-queue-randomly.tcl @@ -0,0 +1,27 @@ +source "../tests/includes/init-tests.tcl" +source "../tests/includes/job-utils.tcl" + +set extra_instances [ expr {$::instances_count - 1} ] +test "we have extra instances" { + assert { $extra_instances > 0 } +} + +test "check qlen first" { + set qlen [D 0 qlen randomly_queue] + assert { $qlen == 0 } +} + +test "Add job with queue-randomly" { + set id [D 0 addjob randomly_queue "somejob" 5000 replicate $extra_instances queue-randomly] + assert { $id ne {} } +} + +test "qlen is still 0" { + set qlen [D 0 qlen randomly_queue] + assert { $qlen == 0 } +} + +test "the job is some instance" { + set job [D 1 show $id] + assert { $job ne {} } +}