Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ services:
- "7020:7020"
- "7357:7357"

event-processor:
<<: *node
working_dir: /parcel-plug/services/event-processor
entrypoint: ["/parcel-plug/node_modules/.bin/gulp"]
environment:
<<: *env
ACCOUNT_KEY: devel
MONGO_DSN: mongodb://mongodb:27017/parcel-plug
depends_on:
- mongodb

volumes:
mongodb: {}
yarn-cache: {}
1 change: 1 addition & 0 deletions scripts/deploy-rancher.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ deploy-to-rancher "nginx:alpine" manage-nginx
deploy-to-rancher "basecms/parcel-plug:${TRAVIS_TAG}" manage
deploy-to-rancher "basecms/parcel-plug:${TRAVIS_TAG}" graphql
deploy-to-rancher "basecms/parcel-plug:${TRAVIS_TAG}" delivery
deploy-to-rancher "basecms/parcel-plug:${TRAVIS_TAG}" event-processor

./scripts/deploy-notify.sh
10 changes: 10 additions & 0 deletions services/event-processor/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const { join } = require('path');

module.exports = {
rules: {
'import/no-extraneous-dependencies': [
'error',
{ packageDir: [join(__dirname, 'package.json'), join(__dirname, '../../package.json')] },
],
},
};
36 changes: 36 additions & 0 deletions services/event-processor/gulpfile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const {
task,
watch,
src,
parallel,
} = require('gulp');
const eslint = require('gulp-eslint');
const cache = require('gulp-cached');
const { spawn } = require('child_process');

const { log } = console;

let node;

const serve = async () => {
if (node) node.kill();
node = await spawn('node', ['src/index.js'], { stdio: 'inherit' });
node.on('close', (code) => {
if (code === 8) {
log('Error detected, waiting for changes...');
}
});
};

const lint = () => src(['src/**/*.js'])
.pipe(cache('lint'))
.pipe(eslint())
.pipe(eslint.format());

task('default', () => {
watch(
['src/**/*.js'],
{ queue: false, ignoreInitial: false },
parallel([serve, lint]),
);
});
58 changes: 58 additions & 0 deletions services/event-processor/newrelic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* New Relic agent configuration.
*
* See lib/config/default.js in the agent distribution for a more complete
* description of configuration variables and their potential values.
*/
exports.config = {
/**
* Array of application names.
*/
app_name: ['email-x/event-processor-service'],
/**
* Your New Relic license key.
*/
license_key: process.env.NEW_RELIC_LICENSE_KEY,
logging: {
/**
* Level at which to log. 'trace' is most useful to New Relic when diagnosing
* issues with the agent, 'info' and higher will impose the least overhead on
* production applications.
*/
level: 'info',
},
/**
* Alias that should be ignored by New Relic.
*/
rules: {
ignore: [/^\/_health$/],
},
/**
* When true, all request headers except for those listed in attributes.exclude
* will be captured for all traces, unless otherwise specified in a destination's
* attributes include/exclude lists.
*/
allow_all_headers: true,
attributes: {
/**
* Prefix of attributes to exclude from all destinations. Allows * as wildcard
* at end.
*
* NOTE: If excluding headers, they must be in camelCase form to be filtered.
*
* @env NEW_RELIC_ATTRIBUTES_EXCLUDE
*/
exclude: [
'request.headers.cookie',
'request.headers.authorization',
'request.headers.proxyAuthorization',
'request.headers.setCookie*',
'request.headers.x*',
'response.headers.cookie',
'response.headers.authorization',
'response.headers.proxyAuthorization',
'response.headers.setCookie*',
'response.headers.x*',
],
},
};
17 changes: 17 additions & 0 deletions services/event-processor/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "@base-cms/parcel-plug-event-processor",
"version": "1.0.5",
"description": "Processes and flags fraudulent events",
"main": "src/index.js",
"author": "Josh Worden <josh@limit0.io>",
"repository": "https://github.com/base-cms/parcel-plug/tree/master/services/event-processor",
"license": "MIT",
"private": true,
"dependencies": {
"@newrelic/native-metrics": "^4.1.0",
"envalid": "^4.1.4",
"forever": "^1.0.0",
"mongoose": "^5.4.1",
"newrelic": "^5.9.1"
}
}
18 changes: 18 additions & 0 deletions services/event-processor/src/db.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
const mongoose = require('mongoose');
const env = require('./env');
const { name, version } = require('../../../package.json');

const { MONGO_DSN, ACCOUNT_KEY } = env;

const instanceDSN = MONGO_DSN.replace('/parcel-plug', `/parcel-plug-${ACCOUNT_KEY}`);

const connection = mongoose.createConnection(instanceDSN, {
autoIndex: false,
appname: `${name} v${version}`,
bufferMaxEntries: 0, // Default -1
ignoreUndefined: true,
useNewUrlParser: true,
useFindAndModify: false,
useCreateIndex: true,
});
module.exports = connection;
25 changes: 25 additions & 0 deletions services/event-processor/src/env.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const {
cleanEnv,
makeValidator,
bool,
num,
} = require('envalid');

const nonemptystr = makeValidator((v) => {
const err = new Error('Expected a non-empty string');
if (v === undefined || v === null || v === '') {
throw err;
}
const trimmed = String(v).trim();
if (!trimmed) throw err;
return trimmed;
});

module.exports = cleanEnv(process.env, {
ACCOUNT_KEY: nonemptystr({ desc: 'The account/tenant key. Is used for querying the account information and settings from the core database connection.' }),
MONGO_DSN: nonemptystr({ desc: 'The MongoDB DSN to connect to.' }),
NEW_RELIC_ENABLED: bool({ desc: 'Whether New Relic is enabled.', default: true, devDefault: false }),
NEW_RELIC_LICENSE_KEY: nonemptystr({ desc: 'The license key for New Relic.', devDefault: '(unset)' }),
CLICK_FREQUENCY_NUMBER: num({ desc: 'The click event threshold (x events in n time)', default: 10 }),
CLICK_FREQUENCY_INTERVAL: num({ desc: 'The click event threshold (n events in x time) (milliseconds)', default: 10000 }),
});
42 changes: 42 additions & 0 deletions services/event-processor/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
require('./newrelic');
const pkg = require('../package.json');
const db = require('./db');
const clickFrequency = require('./rules/click-frequency');

const { log } = console;

const stop = () => db.close();

const run = async () => {
const r = await db;
const { url } = r.client.s;
log(`> db connected (${url})`);

log('Processing rules');
await Promise.all([
clickFrequency(),
]);
log('Done processing.');

return stop();
};

// Simulate future NodeJS behavior by throwing unhandled Promise rejections.
process.on('unhandledRejection', (e) => {
log('> Unhandled promise rejection. Throwing error...');
throw e;
});

process.on('SIGINT', async () => {
log('> SIGINT recieved, shutting down gracefully.');
await stop();
});

process.on('SIGTERM', () => {
log('> SIGTERM recieved, shutting down!');
stop();
process.exit();
});

log(`> Booting ${pkg.name} v${pkg.version}...`);
run().catch(e => setImmediate(() => { throw e; }));
5 changes: 5 additions & 0 deletions services/event-processor/src/newrelic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const { NEW_RELIC_ENABLED } = require('./env');

process.env.NEW_RELIC_ENABLED = NEW_RELIC_ENABLED;

module.exports = require('newrelic');
84 changes: 84 additions & 0 deletions services/event-processor/src/rules/click-frequency.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
const db = require('../db');
const { CLICK_FREQUENCY_NUMBER, CLICK_FREQUENCY_INTERVAL } = require('../env');

const { log } = console;

const filterByThreshold = (items) => {
const values = items.reduce((arr, { _id, date }) => {
const v = date.valueOf();
return { ...arr, [v]: _id };
}, {});
const dates = Object.keys(values).map(n => parseInt(n, 10));
dates.sort((a, b) => a - b);
const badDates = [];
let bad = [];
let now = dates.shift();
while (dates.length) {
const n = dates.shift();
if (n < now + CLICK_FREQUENCY_INTERVAL) {
bad.push(n);
} else {
// Reset our frequency start to the current date
now = n;
// Push bad dates if we've met the frequency cap
if (bad.length >= CLICK_FREQUENCY_NUMBER) bad.forEach(d => badDates.push(d));
// Reset the bad date queue
bad = [];
}
}
if (bad.length >= CLICK_FREQUENCY_NUMBER) bad.forEach(d => badDates.push(d));

return [...new Set([...badDates])].map(v => values[v]);
};

/**
* This rule marks clicks as fraudlent if a significant number of clicks are recorded
* from the same IP address, within a set time period.
*
* It will query from the oldest record to the newest, and will only process records once.
* If a record is considered fraudulent, all clicks (events?) from that IP will be invalidated.
*/
module.exports = async () => {
log('click-frequency: Starting');
const coll = db.collection('events');

const pipeline = [
{ $match: { type: 'click', ip: { $exists: true } } },
{ $sort: { date: -1 } },
{
$group: {
_id: '$ip',
clicks: { $addToSet: { _id: '$_id', date: '$date' } },
count: { $sum: 1 },
},
},
{ $match: { count: { $gt: 1 } } },
{ $sort: { count: 1 } },
];

const events = await coll.aggregate(pipeline);

// eslint-disable-next-line no-await-in-loop
while (await events.hasNext()) {
// eslint-disable-next-line no-await-in-loop
const event = await events.next();
const { _id: ip, clicks } = event;

log(`click-frequency: Checking ip ${ip}...`);
const flagged = filterByThreshold(clicks);
log(`click-frequency: Found ${flagged.length} to flag for ${ip}`);

if (flagged.length) {
const bulkOps = flagged.map(_id => ({
updateOne: {
filter: { _id },
update: { $set: { flagged: true, flagReason: 'click-frequency' } },
},
}));
// eslint-disable-next-line no-await-in-loop
const { matchedCount } = await coll.bulkWrite(bulkOps);
log(`click-frequency: Updated ${matchedCount} flagged events.`);
}
}
log('click-frequency: Done');
};
Loading