diff --git a/lib/server/push.api.js b/lib/server/push.api.js index 12064fe..c067eea 100644 --- a/lib/server/push.api.js +++ b/lib/server/push.api.js @@ -14,7 +14,7 @@ Push.setBadge = function(/* id, count */) { var isConfigured = false; -var sendWorker = function(task, interval) { +var scheduleTask = function(task, interval) { if (typeof Push.Log === 'function') { Push.Log('Push: Send worker started, using interval:', interval); } @@ -22,7 +22,7 @@ var sendWorker = function(task, interval) { console.log('Push: Send worker started, using interval: ' + interval); } - return Meteor.setInterval(function() { + return Meteor.setTimeout(function() { // xxx: add exponential backoff on error try { task(); @@ -668,41 +668,46 @@ Push.Configure = function(options) { } // Else could not reserve }; // EO sendNotification - sendWorker(function() { + const processQueue = async function() { - if (isSendingNotification) { - return; - } + if (isSendingNotification) { + return; + } - try { + // Set send fence + isSendingNotification = true; - // Set send fence - isSendingNotification = true; + // var countSent = 0; + var batchSize = options.sendBatchSize || 1; - // var countSent = 0; - var batchSize = options.sendBatchSize || 1; + var now = +new Date(); - var now = +new Date(); + // Find notifications that are not being or already sent + var pendingNotifications = Push.notifications.find({ $and: [ + // Message is not sent + { sent : false }, + // And not being sent by other instances + { sending: { $lt: now } }, + // And not queued for future + { $or: [ + { delayUntil: { $exists: false } }, + { delayUntil: { $lte: new Date() } } + ] + } + ]}, { + // Sort by created date + sort: { createdAt: 1 }, + limit: batchSize + }).fetch(); - // Find notifications that are not being or already sent - var pendingNotifications = Push.notifications.find({ $and: [ - // Message is not sent - { sent : false }, - // And not being sent by other instances - { sending: { $lt: now } }, - // And not queued for future - { $or: [ - { delayUntil: { $exists: false } }, - { delayUntil: { $lte: new Date() } } - ] - } - ]}, { - // Sort by created date - sort: { createdAt: 1 }, - limit: batchSize - }); - pendingNotifications.forEach(function(notification) { + if(pendingNotifications.length === 0) { + return scheduleTask(processQueue, options.sendInterval || 15000); + } + + await Promise.all(pendingNotifications.map(function(notification) { + return new Promise((resolve, reject) => { + Meteor.defer(() => { try { sendNotification(notification); } catch(error) { @@ -712,14 +717,21 @@ Push.Configure = function(options) { if (Push.debug) { console.log('Push: Could not send notification id: "' + notification._id + '", Error: ' + error.message); } + } finally { + resolve(); } - }); // EO forEach - } finally { + }); + }) + })); // EO forEach - // Remove the send fence - isSendingNotification = false; - } - }, options.sendInterval || 15000); // Default every 15th sec + // Remove the send fence + isSendingNotification = false; + + processQueue(); // continue asap + + } + + scheduleTask(processQueue, options.sendInterval || 15000); } else { if (Push.debug) {