Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ jobs:
build:
working_directory: ~/Clever/understream
docker:
- image: circleci/node:8.11.3-stretch
- image: circleci/node:10.15.1-stretch
- image: circleci/mongo:3.2.20-jessie-ram
environment:
CIRCLE_ARTIFACTS: /tmp/circleci-artifacts
CIRCLE_TEST_REPORTS: /tmp/circleci-test-results
Expand All @@ -19,3 +20,6 @@ jobs:
- run:
command: npm install
name: npm install
- run:
command: make test
name: make test
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
# `make test` runs all the tests
# `make test/each.coffee` runs just that test
.PHONY: test test-cov
TESTS=$(shell cd test && ls *.coffee | sed s/\.coffee$$//)
TESTS=$(shell ls test/*.coffee | sed s/\.coffee$$//)
LIBS=$(shell find . -regex "^./lib\/.*\.coffee\$$" | sed s/\.coffee$$/\.js/ | sed s/lib/lib-js/)
MONGO_URL ?= localhost # needed for tests
MONGO_URL ?= mongodb://127.0.0.1:27017 # needed for tests

build: $(LIBS)

Expand All @@ -16,7 +16,7 @@ lib-js/%.js : lib/%.coffee
test: $(TESTS)

$(TESTS): build
MONGO_URL=$(MONGO_URL) DEBUG=* NODE_ENV=test node_modules/mocha/bin/mocha --timeout 60000 --bail --compilers coffee:coffee-script test/$@.coffee
MONGO_URL=$(MONGO_URL) DEBUG=* NODE_ENV=test node_modules/mocha/bin/mocha --timeout 60000 --bail --compilers coffee:coffee-script $@.coffee

test-cov: build
# jscoverage only accepts directory arguments so have to rebuild everything
Expand Down
2 changes: 2 additions & 0 deletions lib/helpers.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ module.exports =
instance instanceof EventEmitter and
instance.pipe? and
(instance._read? or instance.read? or instance.readable)

DEFAULT_MAX_LISTENERS: 10
2 changes: 1 addition & 1 deletion lib/transforms/join.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class SortedMergeJoin extends Transform
# Hack to find out if a stream has no more data. This is totally not kosher
# since it relies on undocumented internals.
ended = (stream) ->
stream._readableState.ended and _.isEmpty stream._readableState.buffer
stream._readableState.ended and stream._readableState.buffer.length == 0

constructor: (stream_opts, {@right, @key}) ->
super stream_opts
Expand Down
12 changes: 9 additions & 3 deletions lib/understream.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ _.mixin require 'underscore.deep'
debug = require('debug') 'us:'
domain = require 'domain'
{EventEmitter} = require 'events'
{is_readable} = require './helpers'
{is_readable, DEFAULT_MAX_LISTENERS} = require './helpers'

# Adds a listener to an EventEmitter but first bumps the max listeners limit
# for that emitter. The limit is meant to prevent memory leaks, so this should
Expand All @@ -14,6 +14,8 @@ domain = require 'domain'
add_listener_unsafe = (emitter, event, listener) ->
if emitter._maxListeners?
emitter.setMaxListeners emitter._maxListeners + 1
else
emitter._maxListeners = DEFAULT_MAX_LISTENERS + 1
emitter.addListener event, listener

# Wraps a stream's _transform method in a domain, catching any thrown errors
Expand Down Expand Up @@ -92,8 +94,12 @@ class ArrayStream extends Readable
constructor: (@options, @arr, @index=0) ->
super _(@options).extend objectMode: true
_read: (size) =>
debug "_read #{size} #{JSON.stringify @arr[@index]}"
@push @arr[@index] # Note: push(undefined) signals the end of the stream, so this just works^tm
if @index > (@arr.length - 1)
@push null
return
data = @arr[@index]
debug "_read #{size} #{JSON.stringify data}"
@push data
@index += 1

class DevNull extends Writable
Expand Down
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"name": "understream",
"version": "0.10.14",
"version": "0.11.0",
"description": "stream helpers",
"engines": {
"node": ">=0.10.x"
"node": ">=10.x"
},
"main": "index.js",
"scripts": {
Expand All @@ -23,12 +23,12 @@
"underscore.deep": "^0.5.1"
},
"devDependencies": {
"JSONStream": "~0.7.1",
"coffee-script": "~1.6.2",
"sinon": "~1.5.2",
"mocha": "~1.9.0",
"through": "~2.3.4",
"mongoose": "~3.6.19",
"JSONStream": "~0.7.1"
"mongoose": "^5.7.4",
"sinon": "~1.5.2",
"through": "~2.3.4"
},
"publishConfig": {
"registry": "https://registry.npmjs.org"
Expand Down
2 changes: 1 addition & 1 deletion test/batch.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Understream = require "#{__dirname}/../index"

charRange = (start, stop) ->
if not stop? then stop = start; start = 0
_(_.range(start, stop))
_.range(start, stop)
.map (i) -> 'abcdefghijklmnopqrstuvwxyz'[i]

describe '_.batch(n)', ->
Expand Down
4 changes: 4 additions & 0 deletions test/combine.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ lazy_stream_from_array = (arr) ->
i = 0
rs._read = ->
setImmediate =>
data = arr[i]
if data is undefined
@push null
return
@push arr[i]
i += 1
rs
Expand Down
3 changes: 2 additions & 1 deletion test/errors.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ async = require 'async'
_ = require 'underscore'
Understream = require "#{__dirname}/../index"
{Readable} = require 'stream'
{DEFAULT_MAX_LISTENERS} = require '../lib/helpers'

# domain_thrown (0,8) vs domainThrown (0.10)
was_thrown = (domain_err) ->
Expand Down Expand Up @@ -119,7 +120,7 @@ describe '_.stream error handling', ->

stream = new Understream([]).stream()
_.each ['once', 'again'], (time) -> it time, (done) ->
starting_max_listeners = stream._maxListeners
starting_max_listeners = stream._maxListeners || DEFAULT_MAX_LISTENERS
starting_listeners = num_listeners stream
new Understream(stream).each(-> ).run (err) ->
assert.ifError err
Expand Down
6 changes: 5 additions & 1 deletion test/join.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ describe '_.join', ->
# Simulate some data being loaded after a delay
else
setTimeout ->
right.push right_input[i]
data = right_input[i]
if data?
right.push right_input[i]
else
right.push null
i += 1
, 100

Expand Down
2 changes: 1 addition & 1 deletion test/split.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe '_.split', ->
]
async.forEachSeries test_inputs, (test_input, cb_fe) ->
new Understream(test_input).split("test").run (err) ->
assert err.message.match /non-string\/buffer chunk/
assert err.message.match /argument must be one of type string or Buffer/
cb_fe()
, done

Expand Down
2 changes: 1 addition & 1 deletion test/wrap.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe '_.stream', ->
done()

it 'wraps a mongoose stream', (done) ->
mongoose.connect "#{process.env.MONGO_URL}/test-understream"
mongoose.connect "#{process.env.MONGO_URL}/test-understream", {useNewUrlParser: true}
Doc = mongoose.model "Doc", new mongoose.Schema { foo: String }
input = ['a', 'b', 'c']
async.waterfall [
Expand Down