diff --git a/src/http/DataQueryEndpoints.js b/src/http/DataQueryEndpoints.js index 0ecc9f98..bad8a7f7 100644 --- a/src/http/DataQueryEndpoints.js +++ b/src/http/DataQueryEndpoints.js @@ -23,28 +23,35 @@ const onRow = (res, unicastMessage, delimiter, format = 'object', version, metri } const streamData = (res, stream, format, version, metrics) => { - let delimiter = '' - stream.on('data', (row) => { - // first row - if (delimiter === '') { - onStarted(res) - } - onRow(res, row, delimiter, format, version, metrics) - delimiter = ',' - }) - stream.on('end', () => { - if (delimiter === '') { - onStarted(res) - } - res.write(']') - res.end() - }) - stream.on('error', (err) => { + try { + let delimiter = '' + stream.on('data', (row) => { + // first row + if (delimiter === '') { + onStarted(res) + } + onRow(res, row, delimiter, format, version, metrics) + delimiter = ',' + }) + stream.on('end', () => { + if (delimiter === '') { + onStarted(res) + } + res.write(']') + res.end() + }) + stream.on('error', (err) => { + logger.error(err) + res.status(500).send({ + error: 'Failed to fetch data!' + }) + }) + } catch (err) { logger.error(err) res.status(500).send({ error: 'Failed to fetch data!' }) - }) + } } function parseIntIfExists(x) { diff --git a/test/unit/http/DataQueryEndpoints.test.js b/test/unit/http/DataQueryEndpoints.test.js index e07a1ac6..cfcd755e 100644 --- a/test/unit/http/DataQueryEndpoints.test.js +++ b/test/unit/http/DataQueryEndpoints.test.js @@ -39,14 +39,12 @@ describe('DataQueryEndpoints', () => { app = express() networkNode = {} streamFetcher = { - authenticate(streamId, authKey) { - return new Promise(((resolve, reject) => { - if (authKey === 'authKey') { - resolve({}) - } else { - reject(new HttpError(403, 'GET', '')) - } - })) + async authenticate(_streamId, authKey) { + if (authKey === 'authKey') { + return {} + } else { + throw new HttpError(403, 'GET', '') + } }, } app.use('/api/v1', restEndpointRouter(networkNode, streamFetcher, new MetricsContext(null))) @@ -64,79 +62,73 @@ describe('DataQueryEndpoints', () => { world: 2, }), ] - networkNode.requestResendLast = jest.fn().mockReturnValue(intoStream.object( + networkNode.requestResendLast = jest.fn(() => intoStream.object( streamMessages.map((m) => createUnicastMessage(m)) )) }) describe('user errors', () => { - it('responds 400 and error message if param "partition" not a number', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/zero/last') + it('responds 400 and error message if param "partition" not a number', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/zero/last') .expect('Content-Type', /json/) .expect(400, { error: 'Path parameter "partition" not a number: zero', - }, done) + }) }) - it('responds 403 and error message if not authorized', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last', 'wrongKey') + it('responds 403 and error message if not authorized', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last', 'wrongKey') .expect('Content-Type', /json/) .expect(403, { error: 'Authentication failed.', - }, done) + }) }) - it('responds 400 and error message if optional param "count" not a number', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last?count=sixsixsix') + it('responds 400 and error message if optional param "count" not a number', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last?count=sixsixsix') .expect('Content-Type', /json/) .expect(400, { error: 'Query parameter "count" not a number: sixsixsix', - }, done) + }) }) }) describe('GET /api/v1/streams/streamId/data/partitions/0/last', () => { - it('responds 200 and Content-Type JSON', (done) => { - const res = testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') - res + it('responds 200 and Content-Type JSON', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') .expect('Content-Type', /json/) - .expect(200, done) + .expect(200) }) - it('responds with object representation of messages by default', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') - .expect(streamMessages.map((m) => m.toObject()), done) + it('responds with object representation of messages by default', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') + .expect(streamMessages.map((m) => m.toObject())) }) - it('responds with latest version protocol serialization of messages given format=protocol', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last?format=protocol') - .expect(streamMessages.map((msg) => msg.serialize(StreamMessage.LATEST_VERSION)), done) + it('responds with latest version protocol serialization of messages given format=protocol', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last?format=protocol') + .expect(streamMessages.map((msg) => msg.serialize(StreamMessage.LATEST_VERSION))) }) - it('responds with specific version protocol serialization of messages given format=protocol&version=30', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last?format=protocol&version=30') - .expect(streamMessages.map((msg) => msg.serialize(30)), done) + it('responds with specific version protocol serialization of messages given format=protocol&version=30', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last?format=protocol&version=30') + .expect(streamMessages.map((msg) => msg.serialize(30)),) }) - it('invokes networkNode#requestResendLast once with correct arguments', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') - .then(() => { - expect(networkNode.requestResendLast).toHaveBeenCalledTimes(1) - expect(networkNode.requestResendLast.mock.calls[0]) - .toEqual(['streamId', 0, expect.stringMatching(/\w+/), 1]) - done() - }) - .catch(done) + it('invokes networkNode#requestResendLast once with correct arguments', async () => { + await testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') + expect(networkNode.requestResendLast).toHaveBeenCalledTimes(1) + expect(networkNode.requestResendLast.mock.calls[0]).toEqual(['streamId', 0, expect.stringMatching(/\w+/), 1]) }) - it('responds 500 and error message if networkNode signals error', (done) => { - networkNode.requestResendLast = () => intoStream.object(Promise.reject(new Error('error'))) + it('responds 500 and error message if networkNode signals error', async () => { + networkNode.requestResendLast = () => intoStream.object(Promise.reject(new Error('expected error'))) - testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/last') .expect('Content-Type', /json/) .expect(500, { error: 'Failed to fetch data!', - }, done) + }) }) }) @@ -167,26 +159,27 @@ describe('DataQueryEndpoints', () => { z: 'z', }), ] - networkNode.requestResendFrom = () => intoStream.object( + networkNode.requestResendFrom = jest.fn(() => intoStream.object( streamMessages.map((m) => createUnicastMessage(m)) - ) + )) + networkNode.requestResendLast = jest.fn(() => intoStream.object( + streamMessages.map((m) => createUnicastMessage(m)) + )) }) describe('?fromTimestamp=1496408255672', () => { - it('responds 200 and Content-Type JSON', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') + it('responds 200 and Content-Type JSON', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') .expect('Content-Type', /json/) - .expect(200, done) + .expect(200) }) - it('responds with data points as body', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') - .expect(streamMessages.map((msg) => msg.toObject()), done) + it('responds with data points as body', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') + .expect(streamMessages.map((msg) => msg.toObject())) }) it('invokes networkNode#requestResendFrom once with correct arguments', async () => { - networkNode.requestResendFrom = jest.fn() - await testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') expect(networkNode.requestResendFrom).toHaveBeenCalledTimes(1) @@ -201,34 +194,32 @@ describe('DataQueryEndpoints', () => { ) }) - it('responds 500 and error message if networkNode signals error', (done) => { - networkNode.requestResendFrom = () => intoStream.object(Promise.reject(new Error('error'))) + it('responds 500 and error message if networkNode signals error', async () => { + networkNode.requestResendFrom = jest.fn(() => intoStream.object(Promise.reject(new Error('expected error')))) - testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/from?fromTimestamp=1496408255672') .expect('Content-Type', /json/) .expect(500, { error: 'Failed to fetch data!', - }, done) + }) }) }) describe('?fromTimestamp=1496408255672&fromSequenceNumber=1&publisherId=publisherId', () => { const query = 'fromTimestamp=1496408255672&fromSequenceNumber=1&publisherId=publisherId' - it('responds 200 and Content-Type JSON', (done) => { - testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) + it('responds 200 and Content-Type JSON', async () => { + return testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) .expect('Content-Type', /json/) - .expect(200, done) + .expect(200) }) - it('responds with data points as body', (done) => { - testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) - .expect(streamMessages.map((msg) => msg.toObject()), done) + it('responds with data points as body', async () => { + return testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) + .expect(streamMessages.map((msg) => msg.toObject())) }) it('invokes networkNode#requestResendFrom once with correct arguments', async () => { - networkNode.requestResendFrom = jest.fn() - await testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) expect(networkNode.requestResendFrom).toHaveBeenCalledTimes(1) @@ -243,63 +234,63 @@ describe('DataQueryEndpoints', () => { ) }) - it('responds 500 and error message if networkNode signals error', (done) => { - networkNode.requestResendFrom = () => intoStream.object(Promise.reject(new Error('error'))) + it('responds 500 and error message if networkNode signals error', async () => { + networkNode.requestResendFrom = () => intoStream.object(Promise.reject(new Error('expected error'))) - testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) + return testGetRequest(`/api/v1/streams/streamId/data/partitions/0/from?${query}`) .expect('Content-Type', /json/) .expect(500, { error: 'Failed to fetch data!', - }, done) + }) }) }) }) describe('Range queries', () => { describe('user errors', () => { - it('responds 400 and error message if param "partition" not a number', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/zero/range') + it('responds 400 and error message if param "partition" not a number', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/zero/range') .expect('Content-Type', /json/) .expect(400, { error: 'Path parameter "partition" not a number: zero', - }, done) + }) }) - it('responds 403 and error message if not authorized', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range', 'wrongKey') + it('responds 403 and error message if not authorized', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range', 'wrongKey') .expect('Content-Type', /json/) .expect(403, { error: 'Authentication failed.', - }, done) + }) }) - it('responds 400 and error message if param "fromTimestamp" not given', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range') + it('responds 400 and error message if param "fromTimestamp" not given', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range') .expect('Content-Type', /json/) .expect(400, { error: 'Query parameter "fromTimestamp" required.', - }, done) + }) }) - it('responds 400 and error message if param "fromTimestamp" not a number', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=endOfTime') + it('responds 400 and error message if param "fromTimestamp" not a number', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=endOfTime') .expect('Content-Type', /json/) .expect(400, { error: 'Query parameter "fromTimestamp" not a number: endOfTime', - }, done) + }) }) - it('responds 400 and error message if param "toTimestamp" not given', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1') + it('responds 400 and error message if param "toTimestamp" not given', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1') .expect('Content-Type', /json/) .expect(400, { error: 'Query parameter "toTimestamp" required as well. ' + 'To request all messages since a timestamp,' + 'use the endpoint /streams/:id/data/partitions/:partition/from', - }, done) + }) }) - it('responds 400 and error message if optional param "toTimestamp" not a number', (done) => { - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1&toTimestamp=endOfLife') + it('responds 400 and error message if optional param "toTimestamp" not a number', async () => { + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1&toTimestamp=endOfLife') .expect('Content-Type', /json/) .expect(400, { error: 'Query parameter "toTimestamp" not a number: endOfLife', - }, done) + }) }) }) @@ -312,27 +303,25 @@ describe('DataQueryEndpoints', () => { '6': '6', }), ] - networkNode.requestResendRange = () => intoStream.object( + networkNode.requestResendRange = jest.fn(() => intoStream.object( streamMessages.map((m) => createUnicastMessage(m)) - ) + )) }) - it('responds 200 and Content-Type JSON', (done) => { + it('responds 200 and Content-Type JSON', async () => { // eslint-disable-next-line max-len - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') .expect('Content-Type', /json/) - .expect(200, done) + .expect(200) }) - it('responds with data points as body', (done) => { + it('responds with data points as body', async () => { // eslint-disable-next-line max-len - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') - .expect(streamMessages.map((msg) => msg.toObject()), done) + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') + .expect(streamMessages.map((msg) => msg.toObject())) }) it('invokes networkNode#requestResendRange once with correct arguments', async () => { - networkNode.requestResendRange = jest.fn() - // eslint-disable-next-line max-len await testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') @@ -350,15 +339,15 @@ describe('DataQueryEndpoints', () => { ) }) - it('responds 500 and error message if networkNode signals error', (done) => { - networkNode.requestResendRange = () => intoStream.object(Promise.reject(new Error('error'))) + it('responds 500 and error message if networkNode signals error', async () => { + networkNode.requestResendRange = jest.fn(() => intoStream.object(Promise.reject(new Error('expected error')))) // eslint-disable-next-line max-len - testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') + return testGetRequest('/api/v1/streams/streamId/data/partitions/0/range?fromTimestamp=1496408255672&toTimestamp=1496415670909') .expect('Content-Type', /json/) .expect(500, { error: 'Failed to fetch data!', - }, done) + }) }) }) @@ -375,25 +364,23 @@ describe('DataQueryEndpoints', () => { '6': '6', }), ] - networkNode.requestResendRange = () => intoStream.object( + networkNode.requestResendRange = jest.fn(() => intoStream.object( streamMessages.map((m) => createUnicastMessage(m)) - ) + )) }) - it('responds 200 and Content-Type JSON', (done) => { - testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) + it('responds 200 and Content-Type JSON', async () => { + return testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) .expect('Content-Type', /json/) - .expect(200, done) + .expect(200) }) - it('responds with data points as body', (done) => { - testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) - .expect(streamMessages.map((msg) => msg.toObject()), done) + it('responds with data points as body', async () => { + return testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) + .expect(streamMessages.map((msg) => msg.toObject())) }) it('invokes networkNode#requestResendRange once with correct arguments', async () => { - networkNode.requestResendRange = jest.fn() - await testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) expect(networkNode.requestResendRange).toHaveBeenCalledTimes(1) expect(networkNode.requestResendRange).toHaveBeenCalledWith( @@ -409,14 +396,14 @@ describe('DataQueryEndpoints', () => { ) }) - it('responds 500 and error message if networkNode signals error', (done) => { - networkNode.requestResendRange = () => intoStream.object(Promise.reject(new Error('error'))) + it('responds 500 and error message if networkNode signals error', async () => { + networkNode.requestResendRange = jest.fn(() => intoStream.object(Promise.reject(new Error('expected error')))) - testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) + return testGetRequest(`/api/v1/streams/streamId/data/partitions/0/range?${query}`) .expect('Content-Type', /json/) .expect(500, { error: 'Failed to fetch data!', - }, done) + }) }) }) })