Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 48 additions & 36 deletions lib/server/push.api.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ Push.setBadge = function(/* id, count */) {

var isConfigured = false;

var sendWorker = function(task, interval) {
var scheduleTask = function(task, interval) {
if (typeof Push.Log === 'function') {
Push.Log('Push: Send worker started, using interval:', interval);
}
if (Push.debug) {
console.log('Push: Send worker started, using interval: ' + interval);
}

return Meteor.setInterval(function() {
return Meteor.setTimeout(function() {
// xxx: add exponential backoff on error
try {
task();
Expand Down Expand Up @@ -668,41 +668,46 @@ Push.Configure = function(options) {
} // Else could not reserve
}; // EO sendNotification

sendWorker(function() {
const processQueue = async function() {

if (isSendingNotification) {
return;
}
if (isSendingNotification) {
return;
}

try {
// Set send fence
isSendingNotification = true;

// Set send fence
isSendingNotification = true;
// var countSent = 0;
var batchSize = options.sendBatchSize || 1;

// var countSent = 0;
var batchSize = options.sendBatchSize || 1;
var now = +new Date();

var now = +new Date();
// Find notifications that are not being or already sent
var pendingNotifications = Push.notifications.find({ $and: [
// Message is not sent
{ sent : false },
// And not being sent by other instances
{ sending: { $lt: now } },
// And not queued for future
{ $or: [
{ delayUntil: { $exists: false } },
{ delayUntil: { $lte: new Date() } }
]
}
]}, {
// Sort by created date
sort: { createdAt: 1 },
limit: batchSize
}).fetch();

// Find notifications that are not being or already sent
var pendingNotifications = Push.notifications.find({ $and: [
// Message is not sent
{ sent : false },
// And not being sent by other instances
{ sending: { $lt: now } },
// And not queued for future
{ $or: [
{ delayUntil: { $exists: false } },
{ delayUntil: { $lte: new Date() } }
]
}
]}, {
// Sort by created date
sort: { createdAt: 1 },
limit: batchSize
});

pendingNotifications.forEach(function(notification) {
if(pendingNotifications.length === 0) {
return scheduleTask(processQueue, options.sendInterval || 15000);
}

await Promise.all(pendingNotifications.map(function(notification) {
return new Promise((resolve, reject) => {
Meteor.defer(() => {
try {
sendNotification(notification);
} catch(error) {
Expand All @@ -712,14 +717,21 @@ Push.Configure = function(options) {
if (Push.debug) {
console.log('Push: Could not send notification id: "' + notification._id + '", Error: ' + error.message);
}
} finally {
resolve();
}
}); // EO forEach
} finally {
});
})
})); // EO forEach

// Remove the send fence
isSendingNotification = false;
}
}, options.sendInterval || 15000); // Default every 15th sec
// Remove the send fence
isSendingNotification = false;

processQueue(); // continue asap

}

scheduleTask(processQueue, options.sendInterval || 15000);

} else {
if (Push.debug) {
Expand Down