Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,18 @@ Disque node.

#### `QLEN <queue-name>`

Return the length of the queue.
Return the length of the queue in the local node.

#### `GLOBALQLEN <queue-name>`

Return the length of the queue across the cluster.

This is calculated by asking all the nodes their qlen and adding
all the values together. The values is cached for a second.

The first client to run this command in a node/queue will be blocked
waiting for the answer. If some node is busy and takes more than .5s
to reply its items may not be counted.

#### `QSTAT <queue-name>`

Expand Down
7 changes: 7 additions & 0 deletions disque.conf
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,10 @@ hz 10
# in order to commit the file to the disk more incrementally and avoid
# big latency spikes.
aof-rewrite-incremental-fsync yes

# for cases where the queuing of items is more centralized than the
# dequeuing, we can configure the servers to always queue the jobs
# in a random node of those it was delivered to, instead of the local,
# to better distribute the work load.
queue-randomly no

4 changes: 4 additions & 0 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ void unblockClient(client *c) {
unblockClientWaitingJobRepl(c);
} else if (c->btype == BLOCKED_GETJOB) {
unblockClientBlockedForJobs(c);
} else if (c->btype == BLOCKED_GLOBAL_QLEN) {
unblockClientBlockedForQLen(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand All @@ -164,6 +166,8 @@ void replyToBlockedClientTimedOut(client *c) {
return;
} else if (c->btype == BLOCKED_GETJOB) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_GLOBAL_QLEN) {
/* Do nothing - unblock client will send the best value we got */
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down
75 changes: 75 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "server.h"
#include "cluster.h"
#include "endianconv.h"
#include "queue.h"
#include "ack.h"

#include <sys/types.h>
Expand Down Expand Up @@ -69,6 +70,8 @@ void clusterSendGotJob(clusterNode *node, job *j);
void clusterSendGotAck(clusterNode *node, char *jobid, int known);
void clusterBroadcastQueued(job *j, unsigned char flags);
void clusterBroadcastDelJob(job *j);
void clusterSendGetQLen(robj *qname, dict *nodes);
void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen);
sds representClusterNodeFlags(sds ci, uint16_t flags);

/* -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1493,6 +1496,29 @@ int clusterProcessPacket(clusterLink *link) {
else
receivePauseQueue(qname,count);
decrRefCount(qname);
} else if (type == CLUSTERMSG_TYPE_GETQLEN) {
if (!sender) return 1;
uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen);
robj *qname = createStringObject(hdr->data.queueop.about.qname,
qnamelen);

serverLog(LL_VERBOSE,"RECEIVED GETQLEN FOR QUEUE %s",
(char*)qname->ptr);

clusterSendMyQLen(sender, qname, queueNameLength(qname));

} else if (type == CLUSTERMSG_TYPE_MYQLEN) {
if (!sender) return 1;
uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen);
robj *qname = createStringObject(hdr->data.queueop.about.qname,
qnamelen);
uint32_t myqlen = ntohl(hdr->data.queueop.about.aux);

serverLog(LL_VERBOSE,"RECEIVED MYQLEN FOR QUEUE %s FROM %s: %d",
(char*)qname->ptr, sender->name, myqlen);

myQLenForQueueName(qname, myqlen);

} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
Expand Down Expand Up @@ -2092,6 +2118,55 @@ void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count) {
if (payload != buf) zfree(payload);
}

/* broadcasts a request for the qlen to the whole cluster.
* Used by GLOBALQLEN. It replies with the local queue size. */
void clusterSendGetQLen(robj *qname, dict *nodes) {
uint32_t totlen, qnamelen = sdslen(qname->ptr);
uint32_t alloclen;
clusterMsg *hdr;

serverLog(LL_VERBOSE, "Sending GETQLEN for %s, %d nodes",
(char *)qname->ptr, (int)dictSize(nodes));

totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen;
alloclen = totlen;
if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg);
hdr = zmalloc(alloclen);

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_GETQLEN);
hdr->data.queueop.about.qnamelen = htonl(qnamelen);
memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen);
hdr->totlen = htonl(totlen);
clusterBroadcastMessage(nodes,hdr,totlen);
zfree(hdr);
}

/* send a reply to a GETQLEN request */
void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen) {
uint32_t totlen, qnamelen = sdslen(qname->ptr);
uint32_t alloclen;

if (!node->link) return;

serverLog(LL_VERBOSE, "Sending QLEN for %s of %d items to %s",
(char *)qname->ptr, (int)qlen, (char *)node->name);

totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen;
alloclen = totlen;
if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg);
unsigned char buf[alloclen];
clusterMsg *hdr = (clusterMsg*) buf;

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MYQLEN);
hdr->data.queueop.about.aux = htonl(qlen);
hdr->data.queueop.about.qnamelen = htonl(qnamelen);
memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,buf,totlen);
}

/* -----------------------------------------------------------------------------
* CLUSTER cron job
* -------------------------------------------------------------------------- */
Expand Down
8 changes: 7 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ typedef struct clusterState {
#define CLUSTERMSG_TYPE_YOURJOBS 13 /* NEEDJOBS reply with jobs. */
#define CLUSTERMSG_TYPE_WORKING 14 /* Postpone re-queueing & dequeue */
#define CLUSTERMSG_TYPE_PAUSE 15 /* Change queue paused state. */
#define CLUSTERMSG_TYPE_GETQLEN 16 /* request the node qlen */
#define CLUSTERMSG_TYPE_MYQLEN 17 /* reply to node qlen request */

/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
Expand Down Expand Up @@ -161,7 +163,10 @@ typedef struct {
* the PAUSE command to specify the queue to change the paused state. */
typedef struct {
uint32_t aux; /* For NEEDJOB, how many jobs we request.
* FOR PAUSE, the pause flags to set on the queue. */
* FOR PAUSE, the pause flags to set on the queue.
* For MYQLEN, the numer of items in the queue
*/

uint32_t qnamelen; /* Queue name total length. */
char qname[8]; /* Defined as 8 bytes just for alignment. */
} clusterMsgDataQueueOp;
Expand Down Expand Up @@ -241,5 +246,6 @@ void clusterSendNeedJobs(robj *qname, int numjobs, dict *nodes);
void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count);
void clusterBroadcastJobIDMessage(dict *nodes, char *id, int type, uint32_t aux, unsigned char flags);
void clusterBroadcastPause(robj *qname, uint32_t flags);
void clusterSendGetQLen(robj *qname, dict *nodes);

#endif /* __CLUSTER_H */
5 changes: 5 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ void loadServerConfigFromString(char *config) {
server.client_obuf_limits[class].hard_limit_bytes = hard;
server.client_obuf_limits[class].soft_limit_bytes = soft;
server.client_obuf_limits[class].soft_limit_seconds = soft_seconds;
} else if (!strcasecmp(argv[0],"queue-randomly") && argc == 2) {
if ((server.queue_randomly = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}

} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
Expand Down
1 change: 1 addition & 0 deletions src/help.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct commandHelp {
{ "WORKING", "<jobid>", "Attempt to postpone next delivery of the specified job", 4, "1.0.0" },
{ "NACK", "<jobid> [<jobid> <jobid> ...]", "Negative acknowledge: increment the NACK counter for the job and ask for a next delivery ASAP", 4, "1.0.0" },
{ "QLEN", "<queue>", "Return number of queued jobs in the specified queue in the local node", 3, "1.0.0" },
{ "GLOBALQLEN", "<queue>", "Return number of queued jobs in the specified queue across the cluster - it's cached for a second", 3, "1.0.0" },
{ "QSTAT", "<queue>", "Return local node queue statistics", 3, "1.0.0" },
{ "QPEEK", "<queue> <count>", "Inspect jobs into a queue without actually fetching them", 3, "1.0.0" },
{ "ENQUEUE", "<jobid> [<jobid> <jobid> ...]", "Force local node to put the specified jobs back into the queue", 3, "1.0.0" },
Expand Down
15 changes: 14 additions & 1 deletion src/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -1133,14 +1133,16 @@ int clientsCronHandleDelayedJobReplication(client *c) {
* selected replication level is achieved or before to returning to
* the caller for asynchronous jobs. */
void addjobCommand(client *c) {
long long replicate = server.cluster->size > 3 ? 3 : server.cluster->size;
long long cluster_size = server.cluster->size;
long long replicate = cluster_size > 3 ? 3 : cluster_size;
long long ttl = 3600*24;
long long retry = -1;
long long delay = 0;
long long maxlen = 0; /* Max queue length for job to be accepted. */
mstime_t timeout;
int j, retval;
int async = 0; /* Asynchronous request? */
int queue_randomly = cluster_size > 1 ? server.queue_randomly : 0;
int extrepl = getMemoryWarningLevel() > 0; /* Replicate externally? */
int leaving = myselfLeaving();
static uint64_t prev_ctime = 0;
Expand All @@ -1150,6 +1152,7 @@ void addjobCommand(client *c) {
* new messages here. */
if (leaving) extrepl = 1;


/* Parse args. */
for (j = 4; j < c->argc; j++) {
char *opt = c->argv[j]->ptr;
Expand Down Expand Up @@ -1191,12 +1194,22 @@ void addjobCommand(client *c) {
j++;
} else if (!strcasecmp(opt,"async")) {
async = 1;
} else if (!strcasecmp(opt,"queue-randomly")) {
if (cluster_size == 1) {
addReplyError(c,"no QUEUE-RANDOMLY in single node");
return;
}
queue_randomly = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}

/* if we want to queue randomly, it's probably ok to
skip the current node altogether */
if (queue_randomly) extrepl = 1;

/* Parse the timeout argument. */
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_MILLISECONDS)
!= C_OK) return;
Expand Down
113 changes: 113 additions & 0 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ queue *createQueue(robj *name) {
q->needjobs_adhoc_attempt = 0;
q->needjobs_responders = NULL; /* Created on demand to save memory. */
q->clients = NULL; /* Created on demand to save memory. */
q->qlenclients = NULL; /* Created on demand to save memory. */

q->globalqlen = 0;
q->last_globalqlen_time = 0;
q->globalqlen_nodes = 0;

q->current_import_jobs_time = server.mstime;
q->current_import_jobs_count = 0;
Expand Down Expand Up @@ -251,6 +256,7 @@ int GCQueue(queue *q, time_t max_idle_time) {
time_t idle = server.unixtime - q->atime;
if (idle < max_idle_time) return C_ERR;
if (q->clients && listLength(q->clients) != 0) return C_ERR;
if (q->qlenclients && listLength(q->qlenclients) != 0) return C_ERR;
if (skiplistLength(q->sl)) return C_ERR;
if (q->flags & QUEUE_FLAG_PAUSED_ALL) return C_ERR;
destroyQueue(q->name);
Expand Down Expand Up @@ -341,6 +347,50 @@ void unblockClientBlockedForJobs(client *c) {
dictEmpty(c->bpop.queues,NULL);
}

/* -------------------------- Blocking on global qlen ---------------------------- */

/* Handle blocking if GLOBALQLEN didn't have fresh information
*
* 1) We set q->qlenclients to the list of clients blocking for this queue.
* 2) We set client->bpop.queues as well, as a dictionary of queues a client
* is blocked for. So we can resolve queues from clients.
* 3) When we get myqlen from all the nodes, we unblock the clients
* waiting for the length. */
void blockForQLen(client *c, queue *q, mstime_t timeout) {
c->bpop.timeout = timeout;
c->bpop.flags = 0;
dictAdd(c->bpop.queues, q->name, NULL);
incrRefCount( q->name );
if (q->qlenclients == NULL) q->qlenclients = listCreate();
listAddNodeTail(q->qlenclients, c);
blockClient(c, BLOCKED_GLOBAL_QLEN);
}

/* Unblock client waiting for globalqlen in queue.
* Never call this directly, call unblockClient() instead. */
void unblockClientBlockedForQLen(client *c) {
dictEntry *de;
dictIterator *di;

di = dictGetIterator(c->bpop.queues);
while((de = dictNext(di)) != NULL) {
robj *qname = dictGetKey(de);
queue *q = lookupQueue(qname);
serverAssert(q != NULL);

addReplyLongLong(c,q->globalqlen);

listDelNode(q->qlenclients,listSearchKey(q->qlenclients,c));
if (listLength(q->qlenclients) == 0) {
listRelease(q->qlenclients);
q->qlenclients = NULL;
GCQueue(q,QUEUE_MAX_IDLE_TIME);
}
}
dictReleaseIterator(di);
dictEmpty(c->bpop.queues,NULL);
}

/* Add the specified queue to server.ready_queues if there is at least
* one client blocked for this queue. Otherwise no operation is performed. */
void signalQueueAsReady(queue *q) {
Expand Down Expand Up @@ -1229,3 +1279,66 @@ void pauseCommand(client *c) {
reply = sdscatlen(reply,"\r\n",2);
addReplySds(c,reply);
}

/* GLOBALQLEN <queuename>
*
* Returns the size of the queue across the full cluster, by broadcasting
* a request for the queue size, and waiting for the reply
*
* */
#define GLOBALQLEN_MAX_AGE 1000
#define GLOBALQLEN_CLIENT_TIMEOUT 500
void globalqlenCommand(client *c) {
queue *q = lookupQueue(c->argv[1]);

/* Create the queue if it does not exist. We need the queue structure
* to store meta-data needed to broadcast the QLEN request and keep
* the replies */
if (!q) q = createQueue(c->argv[1]);

mstime_t msnow = mstime();

if (q->last_globalqlen_time<(msnow-GLOBALQLEN_MAX_AGE)) {
q->last_globalqlen_time = msnow;
q->globalqlen = queueLength(q);
q->globalqlen_nodes = 1;
if (server.cluster->size > 1)
clusterSendGetQLen(q->name, server.cluster->nodes);
}

if (q->last_globalqlen_time>(msnow-GLOBALQLEN_MAX_AGE)
&& (int)q->globalqlen_nodes == server.cluster->size ) {
addReplyLongLong(c,q->globalqlen);
return;
}

blockForQLen(c, q, msnow + GLOBALQLEN_MAX_AGE);
}

void myQLenForQueue(queue *q, uint32_t qlen) {
if (!q) return;

q->globalqlen = q->globalqlen + qlen;
q->globalqlen_nodes++;

if (!q->qlenclients)
return;

if ((int)q->globalqlen_nodes == server.cluster->size) {
int numclients = listLength(q->qlenclients);
while(numclients--) {
listNode *ln = listFirst(q->qlenclients);
client *c = ln->value;
/* This will remove it from q->qlenclients.
* and send the qlen to the client */
unblockClient(c);
}
}
}

void myQLenForQueueName(robj *qname, uint32_t qlen) {
queue *q = lookupQueue(qname);
if (!q) return; /* no queue, no need to keep this */

myQLenForQueue(q, qlen);
}
Loading