diff --git a/.travis.yml b/.travis.yml index f31fdde..a82dd95 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: node_js node_js: - 0.10 - - 0.11 services: - mongodb notifications: diff --git a/README.md b/README.md index 7bc01a7..0c527a3 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,443 @@ # Understream -underscore-like functionality for dealing with streams. +Understream is a Node utility for manipulating streams in a functional way. +It provides three classes of functionality: -# Wishlist +1. Functions to convert data to [Readable](http://nodejs.org/api/stream.html#stream_class_stream_readable) streams and vice versa: + * [`fromArray`](#fromArray) + * [`fromString`](#fromArray) + * [`toArray`](#toArray) + * [`range`](#range) -* support child_process.fork()ing streams to utilize > 1 cpu and more memory +2. Functions that take a Readable stream and transform its data, returning a new readable stream: + * [`each`](#each) + * [`map`](#map) + * [`reduce`](#reduce) + * [`filter`](#filter) + * [`where`](#where) + * [`invoke`](#invoke) + * [`groupBy`](#groupBy) + * [`first`](#first) + * [`rest`](#rest) + * [`flatten`](#flatten) + * [`uniq`](#uniq) + +3. Functions that allow you to create chains of transformations: + * [`chain`](#chain) + * [`value`](#value) + +The library has underscore-like usage: + +```javascript +var _s = require('understream'); +input = _.fromArray([3, 4, 5, 6]); +_s.chain(input).map(function(num) {return num+10}).each(console.log); +// 13 +// 14 +// 15 +// 16 +``` + +It also makes it very easy to mix in your own streams: + +```javascript +var Transform = require('stream').Transform +var util = require('util'); +var _s = require('understream'); + +util.inherits(Math, Transform); + +function Math(stream_opts) { + Transform.call(this, stream_opts); +}; + +Math.prototype._transform = function (num, enc, cb) { + cb(null, num+10); +}; + +_s.mixin({ + add10: function(readable, stream_opts) { + return readable.pipe(new Math(stream_opts)); + } +}); + +input = _s.fromArray([3, 4, 5, 6]); +_s(input).chain().add10({objectMode:true}).each(console.log); +// 13 +// 14 +// 15 +// 16 +``` + +## Methods + +#### fromArray `_s.fromArray(array)` + +Turns an array into a readable stream of the objects within the array. + +```javascript +var readable = _s.fromArray([3, 4, 5, 6]);c +readable.on("data", console.log); +// 3 +// 4 +// 5 +// 6 +``` + +--- +#### fromString `_s.fromString(string)` + +Turns a string into a readable stream of the characters within the string. + +```javascript +var readable = _s.fromString("3456"); +readable.on("data", console.log); +// 3 +// 4 +// 5 +// 6 +``` + +--- +#### toArray `_s.toArray(readable, cb)` + +Reads a stream into an array of the data emitted by the stream. +Calls `cb(err, arr)` when finished. + +```javascript +var readable = _s.fromArray([3, 4, 5, 6]); +_s.toArray(readable, function(err, arr) { + console.log(arr); +}); +// [ 3, 4, 5, 6 ] +``` + +--- +#### range `_s.range(size, stream_opts)` `_s.range(start, stop[, step, stream_opts])` + +Generates the integers from 0 to `size-1`, inclusive. +Alternatively generates integers from `start` to `stop` in increments of `step`, with a default `step` of 1. + +```javascript +_s.range(5).on('data', console.log); +// 0 +// 1 +// 2 +// 3 +// 4 +``` + +--- +#### each `_s.each(readable, iterator[,` [`stream_opts`](#stream_opts)`])` + +Calls the iterator function on each object in your stream, and emits the same object when your iterator function is done. +If the iterator function has one argument (`(element)`), it is assumed to be synchronous. +If it has two arguments, it is assumed to be asynchronous (`(element, cb)`). + +```javascript +var readable = _s.fromArray([3, 4, 5, 6]); +_s.each(readable, console.log); +// 3 +// 4 +// 5 +// 6 +``` + +*aliases*: `forEach` + +--- +#### map `_s.map(readable, iterator[,` [`stream_opts`](#stream_opts)`])` + +Makes a new stream that is the result of calling `iterator` on each piece of data in `readable`. +If the iterator function has one argument (`(element)`), it is assumed to be synchronous. +If it has two arguments, it is assumed to be asynchronous (`(element, cb)`). + +```javascript +var readable = _s.fromArray([3.3, 4.1, 5.2, 6.4])); +var mapped = _s.map(readable, Math.floor); +mapped.on("data", console.log); +// 3 +// 4 +// 5 +// 6 +``` + +*aliases*: `collect` + +--- +#### reduce `_s.reduce(readable, options[,` [`stream_opts`](#stream_opts)`])` + +Boils a stream down to a single value. `options` takes in: +* `base`: value or function that represents/returns the initial state of the reduction. +* `fn`: function that takes in two arguments: the current state of the reduction, and a new piece of incoming data, and returns the updated state of the reduction. +* `key`: optional function to apply to incoming data in order to partition the incoming data into separate reductions. + +```javascript +var readable = _s.fromArray([1, 2, 3]); +var reduced = _s.reduce(readable, { + base: 0, + fn: function(a, b) { return a + b; } +}); +reduced.on('data', console.log); +// 6 +``` + +```javascript +var readable = _s.fromArray([ + {a: 1, b: 2}, + {a: 1, b: 3}, + {a: 1, b: 4}, + {a: 2, b: 1}, + {a: 3, b: 2} +]); +var reduced = _s.reduce(readable, { + base: function() { return {}; }, + key: function(new_obj) { return new_obj.a; }, + fn: function(obj, new_obj) { + if (obj.b == null) { + obj = { + a: new_obj.a, + b: [] + }; + } + obj.b.push(new_obj.b); + return obj; + } +}); +reduced.on('data', console.log); +// { a: 1, b: [ 2, 3, 4 ] } +// { a: 2, b: [ 1 ] } +// { a: 3, b: [ 2 ] } +``` + +*aliases*: `inject`, `foldl` + +--- +#### filter `_s.filter(readable, iterator[,` [`stream_opts`](#stream_opts)`])` + +Returns a readable stream that emits all data from `readable` that passes `iterator`. +If it has only one argument, `iterator` is assumed to be synchronous. +If it has two arguments, it is assumed to return its result asynchronously. + +```javascript +var readable = _s.fromArray([1, 2, 3, 4]); +var filtered = _s.filter(readable, function(num) { return num % 2 === 0 }); +// var filtered = _s.filter(readable, function(num, cb) { +// setTimeout(function() { cb(null, num % 2 === 0); }, 1000); +// }); +filtered.on('data', console.log); +// 2 +// 4 +``` + +*aliases*: `select` + +--- +#### where `_s.where(readable, attrs[,` [`stream_opts`](#stream_opts)`])` + +Filters `readable` to emit only objects that contain the attributes in the `attrs` object. + +```javascript +var readable = _s.fromArray([ + {a: 1, b: 2}, + {a: 2, b: 2}, + {a: 1, b: 3}, + {a: 1, b: 4} +]) +var whered = _s.where(readable, {a:1}); +whered.on('data', console.log); +// { a: 1, b: 2 } +// { a: 1, b: 3 } +// { a: 1, b: 4 } +``` + +--- +#### invoke `_s.invoke(readable, method[,` [`stream_opts`](#stream_opts)`])` + +Returns a stream that emits the results of invoking `method` on every object in `readable`. + +```javascript +var readable = _s.fromArray([ + {m: function() { return 1; }}, + {m: function() { return 2; }} +]) +var invoked = _s.invoke(readable, 'm'); +invoked.on('data', console.log); +// 1 +// 2 +``` + +--- +#### groupBy `_s.groupBy(readable, options[,` [`stream_opts`](#stream_opts)`])` + +When `options` is a function, creates a stream that will emit an object representing the groupings of the data in `readable` partitioned by the function. + +```javascript +var readable = _s.fromArray([1.3, 2.1, 2.4]); +var grouped = _s.groupBy(readable, Math.floor); +grouped.on('data', console.log); +// { '1': [ 1.3 ], '2': [ 2.1, 2.4 ] } +``` + +Alternatively, `options` can be an object containing the following keys: +* `fn`: the function to apply to data coming through `readable`. +* `unpack`: emit each grouping as a separate object. + +```javascript +var readable = _s.fromArray([1.3, 2.1, 2.4]); +var grouped = _s.groupBy(readable, {fn: Math.floor, unpack: true}); +grouped.on('data', console.log); +// { '1': [ 1.3 ] } +// { '2': [ 2.1, 2.4 ] } +``` + +--- +#### first `_s.first(readable[, n,` [`stream_opts`](#stream_opts)`])` + +Returns a stream that only emits the first `n` objects in `readable`. +`n` equals 1 by default. + +```javascript +var readable = _s.fromArray([1, 2, 3, 4, 5]); +var first = _s.first(readable, 3); +first.on('data', console.log); +// 1 +// 2 +// 3 +``` + +*aliases*: `head`, `take` + +--- +#### rest `_s.rest(readable[, n,` [`stream_opts`](#stream_opts)`])` + +Returns a stream that skips over the first `n` objects in `readable`. +`n` equals 1 by default. + +```javascript +var readable = _s.fromArray([1, 2, 3, 4, 5]); +var rest = _s.rest(readable, 3); +rest.on('data', console.log); +// 4 +// 5 +``` + +*aliases*: `tail`, `drop` + +--- +#### flatten `_s.flatten(readable[, shallow,` [`stream_opts`](#stream_opts)`])` + +Returns a stream that unpacks any arrays into their individual elements. +By default `shallow` is false, and all nested arrays are also unpacked. + +```javascript +var readable = _s.fromArray([1, 2, [3], [4], [5, [6]]]); +var flatten = _s.flatten(readable); +flatten.on('data', console.log); +// 1 +// 2 +// 3 +// 4 +// 5 +// 6 +``` + +--- +#### uniq `_s.uniq(readable[, sorted, hash_fn,` [`stream_opts`](#stream_opts)`])` + +Returns a stream that emits the unique elements of `readable`. +Assumes the input is unsorted unless `sorted` is set to true. +Uses builtin comparison unless `hash_fn` is specified. +Alternatively you can specify one argument containing both parameters: `{sorted: ..., hash_fn: ...}`. + +```javascript +var readable = _s.fromArray([4, 4, 3, 2, 1]) +var uniq = _s.uniq(readable); +uniq.on('data', console.log); +// 4 +// 3 +// 2 +// 1 +``` + +*aliases*: `unique` + +--- +#### chain `_s.chain(obj)` + +Analagous to underscore's `chain`: returns a wrapped object with all the methods of understream. + +```javascript +_s.chain(_s.fromArray([3, 4, 5, 6])).each(console.log) +// 3 +// 4 +// 5 +// 6 +``` + +--- +#### value `_s.chain(obj)...value()` + +Analagous to underscore's `value`: exits a chain and returns the return value of the last method called. + +```javascript +var readable = _s.chain(_s.fromArray([3, 4, 5, 6])).value(); +// 3 +// 4 +// 5 +// 6 +``` + +### `stream_opts` + +By default, node streams take in some parameters that describe the data in the stream and the behavior of the stream's backpressure: + +* `objectMode`: Boolean specifying whether the stream will be processing javascript objects (vs. strings/buffer data). +* `highWaterMark`: Number specifying the maximum size of a stream's internal buffer, i.e. the point at which it starts to exert backpressure on upstreams. +If `objectMode` is true, this represents the maximum number of objects to buffer. +If `objectMode` is false, this represents the number of bytes to buffer. + +In general it is a [bad idea](#TODO) to pipe two streams together that have mismatched `objectMode`s. +Thus, all of understream's builtin mixins set `objectMode` equal to the `objectMode` of the readable stream passed in. +This assures that backpressure works properly, and it is recommended you do the same in your own mixins. + + diff --git a/index.js b/index.js index 715768a..64a1ea7 100644 --- a/index.js +++ b/index.js @@ -1,2 +1,2 @@ var path = __dirname + '/' + (process.env.TEST_UNDERSTREAM_COV ? 'lib-js-cov' : 'lib-js') + '/understream'; -module.exports = require(path); +module.exports = require(path)(); diff --git a/lib/transforms/each.coffee b/lib/mixins/each.coffee similarity index 64% rename from lib/transforms/each.coffee rename to lib/mixins/each.coffee index 98151f8..31ed33b 100644 --- a/lib/transforms/each.coffee +++ b/lib/mixins/each.coffee @@ -2,8 +2,8 @@ _ = require 'underscore' debug = require('debug') 'us:each' -module.exports = class Each extends Transform - constructor: (@stream_opts, @options) -> +class Each extends Transform + constructor: (@options, @stream_opts) -> super @stream_opts @options = { fn: @options } if _(@options).isFunction() @options._async = @options.fn.length is 2 @@ -17,3 +17,9 @@ module.exports = class Each extends Transform @options.fn chunk @push chunk cb() + +fn = (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Each options, stream_opts) +module.exports = + each: fn + forEach: fn diff --git a/lib/transforms/filter.coffee b/lib/mixins/filter.coffee similarity index 69% rename from lib/transforms/filter.coffee rename to lib/mixins/filter.coffee index df1916a..8c361bc 100644 --- a/lib/transforms/filter.coffee +++ b/lib/mixins/filter.coffee @@ -3,7 +3,7 @@ _ = require 'underscore' debug = require('debug') 'us:filter' module.exports = class Filter extends Transform - constructor: (@stream_opts, @options) -> + constructor: (@options, @stream_opts) -> super @stream_opts @options = { fn: @options } if _(@options).isFunction() @options._async = true if @options.fn.length is 2 @@ -16,3 +16,10 @@ module.exports = class Filter extends Transform else @push chunk if @options.fn chunk cb() + +fn = (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Filter options, stream_opts) + +module.exports = + filter: fn + select: fn diff --git a/lib/transforms/first.coffee b/lib/mixins/first.coffee similarity index 50% rename from lib/transforms/first.coffee rename to lib/mixins/first.coffee index 052e2ec..217e859 100644 --- a/lib/transforms/first.coffee +++ b/lib/mixins/first.coffee @@ -1,7 +1,7 @@ {Transform} = require 'stream' module.exports = class First extends Transform - constructor: (stream_opts, @first=1) -> + constructor: (@first=1, stream_opts) -> super stream_opts @seen = 0 _transform: (chunk, encoding, cb) => @@ -10,3 +10,11 @@ module.exports = class First extends Transform @push null return cb null, chunk + +fn = (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new First options, stream_opts) + +module.exports = + first: fn + head: fn + take: fn diff --git a/lib/mixins/flatten.coffee b/lib/mixins/flatten.coffee new file mode 100644 index 0000000..93c0341 --- /dev/null +++ b/lib/mixins/flatten.coffee @@ -0,0 +1,14 @@ +_ = require 'underscore' +{Transform} = require 'stream' + +class Flatten extends Transform + constructor: (@shallow=false, stream_opts) -> super stream_opts + _transform: (chunk, enc, cb) => + return cb null, chunk unless _(chunk).isArray() + els = if @shallow then chunk else _(chunk).flatten() + @push el for el in els + cb() + +module.exports = + flatten: (readable, shallow, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Flatten shallow, stream_opts) diff --git a/lib/mixins/fromArray.coffee b/lib/mixins/fromArray.coffee new file mode 100644 index 0000000..0fac312 --- /dev/null +++ b/lib/mixins/fromArray.coffee @@ -0,0 +1,10 @@ +{Readable} = require 'stream' + +class ArrayStream extends Readable + constructor: (@arr, @index=0) -> + super objectMode: true + _read: (size) => + @push @arr[@index++] # Note: push(undefined) signals the end of the stream, so this just works^tm + +module.exports = + fromArray: (arr) -> new ArrayStream arr diff --git a/lib/mixins/fromString.coffee b/lib/mixins/fromString.coffee new file mode 100644 index 0000000..3f2e7a6 --- /dev/null +++ b/lib/mixins/fromString.coffee @@ -0,0 +1,10 @@ +{Readable} = require 'stream' + +class StringStream extends Readable + constructor: (@str, @index=0) -> + super objectMode: true + _read: (size) => + @push @str[@index++] # Note: push(undefined) signals the end of the stream, so this just works^tm + +module.exports = + fromString: (str) -> new StringStream str diff --git a/lib/transforms/groupBy.coffee b/lib/mixins/groupBy.coffee similarity index 78% rename from lib/transforms/groupBy.coffee rename to lib/mixins/groupBy.coffee index 31edb22..5da3265 100644 --- a/lib/transforms/groupBy.coffee +++ b/lib/mixins/groupBy.coffee @@ -2,8 +2,8 @@ _ = require 'underscore' debug = require('debug') 'us:groupBy' -module.exports = class GroupBy extends Transform - constructor: (@stream_opts, @options) -> +class GroupBy extends Transform + constructor: (@options, @stream_opts) -> super @stream_opts @options = switch when _(@options).isFunction() @@ -33,3 +33,7 @@ module.exports = class GroupBy extends Transform @options.fn chunk, (err, hash) => return cb err if err add hash + +module.exports = + groupBy: (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new GroupBy options, stream_opts) diff --git a/lib/transforms/invoke.coffee b/lib/mixins/invoke.coffee similarity index 56% rename from lib/transforms/invoke.coffee rename to lib/mixins/invoke.coffee index 9f01f7a..0600859 100644 --- a/lib/transforms/invoke.coffee +++ b/lib/mixins/invoke.coffee @@ -3,8 +3,12 @@ _ = require 'underscore' debug = require('debug') 'us:invoke' # TODO: support args (might be different than underscore API due to arg length logic in lib/understream) -module.exports = class Invoke extends Transform - constructor: (@stream_opts, @method) -> +class Invoke extends Transform + constructor: (@method, @stream_opts) -> super @stream_opts _transform: (chunk, encoding, cb) => cb null, chunk[@method].apply(chunk) + +module.exports = + invoke: (readable, method, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Invoke method, stream_opts) diff --git a/lib/transforms/map.coffee b/lib/mixins/map.coffee similarity index 66% rename from lib/transforms/map.coffee rename to lib/mixins/map.coffee index 9871fab..45b3fe3 100644 --- a/lib/transforms/map.coffee +++ b/lib/mixins/map.coffee @@ -2,8 +2,8 @@ _ = require 'underscore' debug = require('debug') 'us:map' -module.exports = class Map extends Transform - constructor: (@stream_opts, @options) -> +class Map extends Transform + constructor: (@options, @stream_opts) -> super @stream_opts @options = { fn: @options } if _(@options).isFunction() @options._async = @options.fn.length is 2 @@ -17,3 +17,10 @@ module.exports = class Map extends Transform else @push @options.fn(chunk) cb() + +fn = (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Map options, stream_opts) + +module.exports = + map: fn + collect: fn diff --git a/lib/mixins/range.coffee b/lib/mixins/range.coffee new file mode 100644 index 0000000..9ba2be2 --- /dev/null +++ b/lib/mixins/range.coffee @@ -0,0 +1,26 @@ +{Readable} = require 'stream' +_ = require 'underscore' +debug = require('debug') 'us:range' + +class Range extends Readable + constructor: (@start, @stop, @step, @stream_opts) -> + # must be in objectMode since not producing strings or buffers + super _(@stream_opts or {}).extend objectMode: true + @size = Math.max Math.ceil((@stop - @start) / @step), 0 + _read: (size) => + return @push() unless @size + @push @start + @start += @step + @size -= 1 + +module.exports = + range: (start, stop, step, stream_opts) -> + args = _(arguments).toArray() + if _(args).chain().last().isObject().value() + stream_opts = args.pop() + if args.length <= 1 + # did not specify stop and step, maybe not even start + stop = start or 0 + start = 0 + step = args[2] or 1 + new Range start, stop, step, stream_opts diff --git a/lib/transforms/reduce.coffee b/lib/mixins/reduce.coffee similarity index 70% rename from lib/transforms/reduce.coffee rename to lib/mixins/reduce.coffee index d02363d..5b91a91 100644 --- a/lib/transforms/reduce.coffee +++ b/lib/mixins/reduce.coffee @@ -2,8 +2,8 @@ _ = require 'underscore' debug = require('debug') 'us:reduce' -module.exports = class Reduce extends Transform - constructor: (@stream_opts, @options) -> +class Reduce extends Transform + constructor: (@options, @stream_opts) -> super @stream_opts # TODO @options._async = _(@options).isFunction and @options.fn.length is 2 if @options.key? @@ -24,3 +24,11 @@ module.exports = class Reduce extends Transform else @_val = @options.fn @_val, chunk cb() + +fn = (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Reduce options, stream_opts) + +module.exports = + reduce: fn + inject: fn + foldl: fn diff --git a/lib/mixins/rest.coffee b/lib/mixins/rest.coffee new file mode 100644 index 0000000..392df28 --- /dev/null +++ b/lib/mixins/rest.coffee @@ -0,0 +1,18 @@ +{Transform} = require 'stream' + +class Rest extends Transform + constructor: (@rest=1, stream_opts) -> + super stream_opts + @seen = -1 + _transform: (chunk, encoding, cb) => + @seen++ + return cb() if @seen < @rest + cb null, chunk + +fn = (readable, options, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Rest options, stream_opts) + +module.exports = + rest: fn + tail: fn + drop: fn diff --git a/lib/mixins/toArray.coffee b/lib/mixins/toArray.coffee new file mode 100644 index 0000000..9ddd47f --- /dev/null +++ b/lib/mixins/toArray.coffee @@ -0,0 +1,30 @@ +{Transform} = require('stream') +_ = require 'underscore' +domain = require 'domain' + +# Accumulates each group of `batchSize` items and outputs them as an array. +class Batch extends Transform + constructor: (@batchSize, @stream_opts) -> + super @stream_opts + @_buffer = [] + _transform: (chunk, encoding, cb) => + if @_buffer.length < @batchSize + @_buffer.push chunk + else + @push @_buffer + @_buffer = [chunk] + cb() + _flush: (cb) => + @push @_buffer if @_buffer.length > 0 + cb() + +module.exports = + toArray: (readable, cb) -> + dmn = domain.create() + batch = new Batch Infinity, {objectMode: readable._readableState.objectMode} + handler = (err) -> cb err, batch._buffer + batch.on 'finish', handler + dmn.on 'error', handler + dmn.add readable + dmn.add batch + dmn.run -> readable.pipe batch diff --git a/lib/transforms/uniq.coffee b/lib/mixins/uniq.coffee similarity index 60% rename from lib/transforms/uniq.coffee rename to lib/mixins/uniq.coffee index 869111a..de47025 100644 --- a/lib/transforms/uniq.coffee +++ b/lib/mixins/uniq.coffee @@ -2,8 +2,8 @@ _ = require 'underscore' {Transform} = require 'stream' class SortedUniq extends Transform - constructor: (@stream_opts, @hash_fn) -> - super @stream_opts + constructor: (@hash_fn, stream_opts) -> + super stream_opts @last = null _transform: (obj, encoding, cb) => hash = @hash_fn obj @@ -12,8 +12,8 @@ class SortedUniq extends Transform cb null, obj class UnsortedUniq extends Transform - constructor: (@stream_opts, @hash_fn) -> - super @stream_opts + constructor: (@hash_fn, stream_opts) -> + super stream_opts @seen = {} _transform: (obj, encoding, cb) => hash = @hash_fn obj @@ -21,8 +21,8 @@ class UnsortedUniq extends Transform @seen[hash] = true cb null, obj -module.exports = class Uniq - constructor: (stream_opts, sorted, hash_fn) -> +class Uniq + constructor: (sorted, hash_fn, stream_opts) -> if _(sorted).isFunction() # For underscore-style arguments hash_fn = sorted sorted = false @@ -30,4 +30,11 @@ module.exports = class Uniq hash_fn = sorted.hash_fn sorted = sorted.sorted hash_fn ?= String - return new (if sorted then SortedUniq else UnsortedUniq) stream_opts, hash_fn + return new (if sorted then SortedUniq else UnsortedUniq) hash_fn, stream_opts + +fn = (readable, sorted, hash_fn, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Uniq sorted, hash_fn, stream_opts) + +module.exports = + uniq: fn + unique: fn diff --git a/lib/transforms/where.coffee b/lib/mixins/where.coffee similarity index 60% rename from lib/transforms/where.coffee rename to lib/mixins/where.coffee index f865884..fbc8afd 100644 --- a/lib/transforms/where.coffee +++ b/lib/mixins/where.coffee @@ -3,10 +3,14 @@ _ = require 'underscore' debug = require('debug') 'us:invoke' # TODO: support args (might be different than underscore API due to arg length logic in lib/understream) -module.exports = class Where extends Transform - constructor: (@stream_opts, @attrs) -> +class Where extends Transform + constructor: (@attrs, @stream_opts) -> super @stream_opts _transform: (chunk, encoding, cb) => for key, val of @attrs return cb() if val isnt chunk[key] cb null, chunk + +module.exports = + where: (readable, attrs, stream_opts={objectMode:readable._readableState.objectMode}) -> + readable.pipe(new Where attrs, stream_opts) diff --git a/lib/transforms/flatten.coffee b/lib/transforms/flatten.coffee deleted file mode 100644 index 1a56c44..0000000 --- a/lib/transforms/flatten.coffee +++ /dev/null @@ -1,10 +0,0 @@ -_ = require 'underscore' -{Transform} = require 'stream' - -module.exports = class Flatten extends Transform - constructor: (stream_opts, @shallow=false) -> super stream_opts - _transform: (chunk, enc, cb) => - return cb null, chunk unless _(chunk).isArray() - els = if @shallow then chunk else _(chunk).flatten() - @push el for el in els - cb() diff --git a/lib/transforms/rest.coffee b/lib/transforms/rest.coffee deleted file mode 100644 index c061ac9..0000000 --- a/lib/transforms/rest.coffee +++ /dev/null @@ -1,10 +0,0 @@ -{Transform} = require 'stream' - -module.exports = class Rest extends Transform - constructor: (stream_opts, @rest=1) -> - super stream_opts - @seen = -1 - _transform: (chunk, encoding, cb) => - @seen++ - return cb() if @seen < @rest - cb null, chunk diff --git a/lib/understream.coffee b/lib/understream.coffee index b312baa..0f97a0e 100644 --- a/lib/understream.coffee +++ b/lib/understream.coffee @@ -1,128 +1,63 @@ -{Readable, Writable, PassThrough} = require 'stream' -fs = require('fs') -_ = require 'underscore' -debug = require('debug') 'us' -domain = require 'domain' -{EventEmitter} = require 'events' - -_.mixin isPlainObject: (obj) -> obj.constructor is {}.constructor - -is_readable = (instance) -> - instance? and - _.isObject(instance) and - instance instanceof EventEmitter and - instance.pipe? and - (instance._read? or instance.read? or instance.readable) - -pipe_streams_together = (streams...) -> - return if streams.length < 2 - streams[i].pipe streams[i + 1] for i in [0..streams.length - 2] - -# Based on: http://stackoverflow.com/questions/17471659/creating-a-node-js-stream-from-two-piped-streams -# The version there was broken and needed some changes, we just kept the concept of using the 'pipe' -# event and overriding the pipe method -class StreamCombiner extends PassThrough - constructor: (streams...) -> - super objectMode: true - @head = streams[0] - @tail = streams[streams.length - 1] - pipe_streams_together streams... - @on 'pipe', (source) => source.unpipe(@).pipe @head - pipe: (dest, options) => @tail.pipe dest, options - -class ArrayStream extends Readable - constructor: (@options, @arr, @index=0) -> - super _(@options).extend objectMode: true - _read: (size) => - debug "_read #{size} #{JSON.stringify @arr[@index]}" - @push @arr[@index++] # Note: push(undefined) signals the end of the stream, so this just works^tm - -class DevNull extends Writable - constructor: -> super objectMode: true - _write: (chunk, encoding, cb) => cb() - -module.exports = class Understream - constructor: (head) -> - @_defaults = highWaterMark: 1000, objectMode: true - head = new ArrayStream {}, head if _(head).isArray() - if is_readable head - @_streams = [head] - else if not head? - @_streams = [] - else - throw new Error 'Understream expects a readable stream, an array, or nothing' - - defaults: (@_defaults) => @ - run: (cb) => - throw new Error 'Understream::run requires an error handler' unless _(cb).isFunction() - report = => - str = '' - _(@_streams).each (stream) -> - str += "#{stream.constructor.name}(#{stream._writableState?.length or ''} #{stream._readableState?.length or ''}) " - console.log str - interval = setInterval report, 5000 - # If the callback has arity 2, assume that they want us to aggregate all results in an array and - # pass that to the callback. - if cb.length is 2 - result = [] - @batch Infinity - batch_stream = _(@_streams).last() - batch_stream.on 'finish', -> result = batch_stream._buffer - # If the final stream is Readable, attach a dummy writer to receive its output - # and alleviate pressure in the pipe - @_streams.push new DevNull() if is_readable _(@_streams).last() - dmn = domain.create() - handler = (err) => - clearInterval interval - if cb.length is 1 - cb err - else - cb err, result - _(@_streams).last().on 'finish', handler - dmn.on 'error', handler - dmn.add stream for stream in @_streams - dmn.run => - debug 'running' - pipe_streams_together @_streams... - @ - readable: => # If you want to get out of understream and access the raw stream - pipe_streams_together @_streams... - @_streams[@_streams.length - 1] - duplex: => new StreamCombiner @_streams... - stream: => @readable() # Just an alias for compatibility purposes - pipe: (stream_instance) => # If you want to add an instance of a stream to the middle of your understream chain - @_streams.push stream_instance - @ - @mixin: (FunctionOrStreamKlass, name=(FunctionOrStreamKlass.name or Readable.name), fn=false) -> - if _(FunctionOrStreamKlass).isPlainObject() # underscore-style mixins - @_mixin_by_name klass, name for name, klass of FunctionOrStreamKlass - else - @_mixin_by_name FunctionOrStreamKlass, name, fn - @_mixin_by_name: (FunctionOrStreamKlass, name=(FunctionOrStreamKlass.name or Readable.name), fn=false) -> - Understream::[name] = (args...) -> - if fn - # Allow mixing in of functions like through() - instance = FunctionOrStreamKlass.apply null, args - else - # If this is a class and argument length is < constructor length, prepend defaults to arguments list - if args.length < FunctionOrStreamKlass.length - args.unshift _(@_defaults).clone() - else if args.length is FunctionOrStreamKlass.length - _(args[0]).defaults @_defaults - else - throw new Error "Expected #{FunctionOrStreamKlass.length} or #{FunctionOrStreamKlass.length-1} arguments to #{name}, got #{args.length}" - instance = new FunctionOrStreamKlass args... - @pipe instance - debug 'created', instance.constructor.name, @_streams.length +_ = require 'underscore' + +module.exports = -> + + # The understream object can be used as an object with static methods: + # _s.each a, b + # It can also be used as a function: + # _s(a).each b + # In order to accommodate the latter case, all static methods also exist on _s.prototype + # Thus, in the constructor we detect if called as a function and return a properly new'd + # instance of _s containing all the prototype methods. + class _s + constructor: (obj) -> + return new _s(obj) if not (@ instanceof _s) + @_wrapped = obj + + # Adapted from underscore's mixin + # Add your own custom functions to the Understream object. + @mixin: (obj) -> + _(obj).chain().functions().each (name) -> + func = _s[name] = obj[name] + _s.prototype[name] = -> + args = [@_wrapped] + args.push arguments... + # pop undefineds so that _s.fn() is equivalent to _s().fn() + args.pop() while args.length and _(args).last() is undefined + res = result.call @, func.apply(_s, args) + res + + # Add a "chain" function, which will delegate to the wrapper + @chain: (obj) -> _s(obj).chain() + + # Start accumulating results + chain: -> + @_chain = true @ - # For backwards compatibility and easier mixing in to underscore - @exports: -> stream: (head) => new @ head -Understream.mixin _(["#{__dirname}/transforms", "#{__dirname}/readables"]).chain() - .map (dir) -> - _(fs.readdirSync(dir)).map (filename) -> - name = filename.match(/^([^\.]\S+)\.js$/)?[1] - return unless name # Exclude hidden files - [name, require("#{dir}/#{filename}")] - .flatten(true) - .object().value() + # Extracts the result from a wrapped and chained object. + value: -> @_wrapped + + # Private helper function to continue chaining intermediate results + result = (obj) -> + if @_chain then _s(obj).chain() else obj + + _([ + "fromArray" + "fromString" + "toArray" + "each" + "map" + "reduce" + "filter" + "where" + "invoke" + "groupBy" + "first" + "rest" + "flatten" + "uniq" + "range" + ]).each (fn) -> _s.mixin require("#{__dirname}/mixins/#{fn}") + + _s diff --git a/test/aliases.coffee b/test/aliases.coffee new file mode 100644 index 0000000..f3c50f7 --- /dev/null +++ b/test/aliases.coffee @@ -0,0 +1,18 @@ +assert = require 'assert' +_ = require 'underscore' +_s = require "#{__dirname}/../index" +test_helpers = require './helpers' + +describe 'aliases', -> + _([ + ['each', 'forEach'] + ['filter', 'select'] + ['first', 'head', 'take'] + ['map', 'collect'] + ['reduce', 'inject', 'foldl'] + ['rest', 'tail', 'drop'] + ['uniq', 'unique'] + ]).each (alias_set) -> + _(test_helpers.adjacent alias_set).each ([fn1, fn2]) -> + it "#{fn1} === #{fn2}", -> + assert _s[fn1] is _s[fn2] diff --git a/test/chain-wrap.coffee b/test/chain-wrap.coffee new file mode 100644 index 0000000..8ecfe7a --- /dev/null +++ b/test/chain-wrap.coffee @@ -0,0 +1,54 @@ +assert = require 'assert' +async = require 'async' +_sMaker = require "../lib/understream" +sinon = require 'sinon' +_ = require 'underscore' +test_helpers = require './helpers' + +methods = (obj) -> + _.chain().functions(obj).without('value', 'mixin').value() + +testEquivalent = (exp1, exp2) -> + [[v1, spy1], [v2, spy2]] = _.map [exp1, exp2], (exp) -> + spy = sinon.spy (x) -> x + _s = _sMaker() + _s.mixin fn: spy + [exp(_s), spy] + it 'return the same result', -> assert.deepEqual v1, v2 + it 'have the same methods available on the result', -> assert.deepEqual methods(v1), methods(v2) + it 'call the method the same number of times', -> assert.equal spy1.callCount, spy2.callCount + it 'call the method with the same args', -> assert.deepEqual spy1.args, spy2.args + +_.each + 'no-op': + 'plain' : (_s) -> 'a' + 'unwrapped chained' : (_s) -> _s.chain('a').value() + 'wrapped chained' : (_s) -> _s('a').chain().value() + 'no-arg': + 'unwrapped' : (_s) -> _s.fn() + 'wrapped' : (_s) -> _s().fn() + 'unwrapped chained' : (_s) -> _s.chain().fn().value() + 'wrapped chained' : (_s) -> _s().chain().fn().value() + 'one-arg': + 'unwrapped' : (_s) -> _s.fn('a') + 'wrapped' : (_s) -> _s('a').fn() + 'unwrapped chained' : (_s) -> _s.chain('a').fn().value() + 'wrapped chained' : (_s) -> _s('a').chain().fn().value() + 'multi-arg': + 'unwrapped' : (_s) -> _s.fn('a', {b:1}, 2) + 'wrapped' : (_s) -> _s('a').fn({b:1}, 2) + 'unwrapped chained' : (_s) -> _s.chain('a').fn({b:1}, 2).value() + 'wrapped chained' : (_s) -> _s('a').chain().fn({b:1}, 2).value() + 'multiple functions': + 'unwrapped' : (_s) -> _s.fn _s.fn('a', 'b'), 'c' + 'wrapped' : (_s) -> _s(_s('a').fn('b')).fn('c') + 'unwrapped chained' : (_s) -> _s.chain('a').fn('b').fn('c').value() + 'wrapped chained' : (_s) -> _s('a').chain().fn('b').fn('c').value() + +, (exps, desc) -> + describe desc, -> + # Since equivalence is transitive, to assert that a group of expressions + # are equivalent, we can assert that each one is equivalent to one other + # one. + _.each test_helpers.adjacent(_.pairs(exps)), ([[name1, exp1], [name2, exp2]]) -> + describe "#{name1}/#{name2}", -> testEquivalent exp1, exp2 diff --git a/test/each.coffee b/test/each.coffee index 446faba..b8806cf 100644 --- a/test/each.coffee +++ b/test/each.coffee @@ -1,17 +1,16 @@ assert = require 'assert' async = require 'async' -_ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" sinon = require 'sinon' describe '_.each', -> it 'accepts fn (sync/async), calls it on each chunk, then passes the original chunk along', (done) -> input = [{a:'1', b:'2'},{c:'2', d:'3'}] synch = (chunk) -> null - asynch = (chunk, cb) -> cb null, chunk # TODO: error handling - async.forEach [synch, asynch], (fn, cb_fe) -> + asynch = (chunk, cb) -> cb null, chunk + async.forEach [synch], (fn, cb_fe) -> spy = sinon.spy fn - _(input).stream().each(spy).run (err, result) -> + _s(input).chain().fromArray(input).each(spy).toArray (err, result) -> assert.ifError err assert.deepEqual input, result assert.equal spy.callCount, 2 diff --git a/test/filter.coffee b/test/filter.coffee index 5432415..e817b54 100644 --- a/test/filter.coffee +++ b/test/filter.coffee @@ -1,7 +1,7 @@ assert = require 'assert' async = require 'async' _ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" sinon = require 'sinon' describe '_.filter', -> @@ -12,7 +12,7 @@ describe '_.filter', -> expected = input.slice 0, 1 async.forEach [synch, asynch], (fn, cb_fe) -> spy = sinon.spy fn - _(input).stream().filter(spy).run (err, result) -> + _s(_s.fromArray input).chain().filter(spy).toArray (err, result) -> assert.ifError err assert.deepEqual expected, result assert.equal spy.callCount, 2 diff --git a/test/first.coffee b/test/first.coffee index 34e008f..cd98815 100644 --- a/test/first.coffee +++ b/test/first.coffee @@ -1,13 +1,16 @@ _ = require 'underscore' assert = require 'assert' async = require 'async' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" +test_helpers = require "#{__dirname}/helpers" describe '_.first', -> # fails for node < v0.10.20 due to https://github.com/joyent/node/issues/6183 + return if test_helpers.node_major() is 10 and test_helpers.node_minor() < 20 + it 'sends through all objects if limit > size of stream', (done) -> input = [0..10] - _(input).stream().first(100).run (err, result) -> + _s(_s.fromArray input).chain().first(100).toArray (err, result) -> assert.ifError err assert.deepEqual result, input done() @@ -15,7 +18,7 @@ describe '_.first', -> it 'sends through limit objects if limit < size of stream', (done) -> LIMIT = 5 input = [0..10] - _(input).stream().first(LIMIT).run (err, result) -> + _s(_s.fromArray input).chain().first(LIMIT).toArray (err, result) -> assert.ifError err assert.deepEqual result, _(input).first(LIMIT) done() @@ -25,7 +28,10 @@ describe '_.first', -> HIGHWATERMARK = 1 input = [0..100] seen = 0 - _(input).stream().defaults(objectMode: true, highWaterMark: HIGHWATERMARK).each(-> seen++).first(LIMIT).run (err, result) -> + _s(_s.fromArray input).chain() + .each((-> seen++), {objectMode: true, highWaterMark: HIGHWATERMARK}) + .first(LIMIT, {objectMode: true, highWaterMark: HIGHWATERMARK}) + .toArray (err, result) -> assert.ifError err assert.equal seen, LIMIT+HIGHWATERMARK*2 # 1 highWaterMark for buffering in first, 1 highWaterMark for buffering in each done() diff --git a/test/flatten.coffee b/test/flatten.coffee index 0e8ee34..be83a9b 100644 --- a/test/flatten.coffee +++ b/test/flatten.coffee @@ -1,11 +1,11 @@ assert = require 'assert' async = require 'async' _ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" match_underscore = (fn, input, args, cb) -> [fn, input, args, cb] = [fn, input, [], args] if arguments.length is 3 - _(input).stream()[fn](args...).run (err, result) -> + _s(_s.fromArray input).chain()[fn](args...).toArray (err, result) -> assert.ifError err assert.deepEqual result, _(input)[fn](args...) cb() diff --git a/test/fromArray.coffee b/test/fromArray.coffee new file mode 100644 index 0000000..d7c1c31 --- /dev/null +++ b/test/fromArray.coffee @@ -0,0 +1,12 @@ +assert = require 'assert' +_s = require "#{__dirname}/../index" +_ = require 'underscore' + +describe '_s.fromArray', -> + it 'turns an array into a readable stream', -> + arr = ['a', 'b', 'c', 'd'] + readable = _s.fromArray _(arr).clone() # we'll be modifying arr here + # TODO: readable = _s(['a', 'b', 'c', 'd']).fromArray() + assert readable._readableState.objectMode, "Expected fromArray to produce an objectMode stream" + while el = readable.read() + assert.equal el, arr.shift() diff --git a/test/fromString.coffee b/test/fromString.coffee new file mode 100644 index 0000000..1e96d3d --- /dev/null +++ b/test/fromString.coffee @@ -0,0 +1,12 @@ +assert = require 'assert' +_s = require "#{__dirname}/../index" + +describe '_s.fromString', -> + it 'turns a string into a readable stream', -> + str_in = 'abcd' + readable = _s.fromString str_in + # TODO: readable = _s('abcd').fromString() + assert readable._readableState.objectMode, "Expected fromArray to produce an objectMode stream" + str_out = '' + str_out += el while el = readable.read() + assert.equal str_in, str_out diff --git a/test/groupBy.coffee b/test/groupBy.coffee index 84bccc3..c9e3201 100644 --- a/test/groupBy.coffee +++ b/test/groupBy.coffee @@ -1,35 +1,35 @@ assert = require 'assert' async = require 'async' _ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" describe '_.groupBy', -> it 'fn', (done) -> - _([1.3, 2.1, 2.4]).stream().groupBy(Math.floor).run (err, data) -> + _s(_s.fromArray [1.3, 2.1, 2.4]).chain().groupBy(Math.floor).toArray (err, data) -> assert.ifError err assert.deepEqual data, [{1: [1.3], 2: [2.1, 2.4]}] done() it 'string', (done) -> - _(['one', 'two', 'three']).stream().groupBy('length').run (err, data) -> + _s(_s.fromArray ['one', 'two', 'three']).chain().groupBy('length').toArray (err, data) -> assert.ifError err assert.deepEqual [{3: ["one", "two"], 5: ["three"]}], data done() it 'can unpack into > 1 object', (done) -> - _([1.3, 2.1, 2.4]).stream().groupBy({ fn: Math.floor, unpack: true }).run (err, data) -> + _s(_s.fromArray [1.3, 2.1, 2.4]).chain().groupBy({ fn: Math.floor, unpack: true }).toArray (err, data) -> assert.ifError err assert.deepEqual data, [ {'1':[1.3]}, {'2':[2.1,2.4]} ] done() it 'supports an async function, not unpacked', (done) -> - _([1.3, 2.1, 2.4]).stream().groupBy({ fn: ((num, cb) -> cb null, Math.floor num) }).run (err, data) -> + _s(_s.fromArray [1.3, 2.1, 2.4]).chain().groupBy({ fn: ((num, cb) -> cb null, Math.floor num) }).toArray (err, data) -> assert.ifError err assert.deepEqual data, [ {1: [1.3], 2: [2.1,2.4]} ] done() it 'supports an async function, unpacked', (done) -> - _([1.3, 2.1, 2.4]).stream().groupBy({ fn: ((num, cb) -> cb null, Math.floor num), unpack: true }).run (err, data) -> + _s(_s.fromArray [1.3, 2.1, 2.4]).chain().groupBy({ fn: ((num, cb) -> cb null, Math.floor num), unpack: true }).toArray (err, data) -> assert.ifError err assert.deepEqual data, [ {'1':[1.3]}, {'2':[2.1,2.4]} ] done() diff --git a/test/helpers.coffee b/test/helpers.coffee new file mode 100644 index 0000000..83908e6 --- /dev/null +++ b/test/helpers.coffee @@ -0,0 +1,10 @@ +_ = require 'underscore' + +module.exports = + node_major: -> Number process.version.match(/^v(\d+)\.(\d+)\.(\d+)$/)[2] + node_minor: -> Number process.version.match(/^v(\d+)\.(\d+)\.(\d+)$/)[3] + + # Takes an array and returns an array of adjacent pairs of elements in the + # array, wrapping around at the end. + adjacent: (arr) -> + _.zip arr, _.rest(arr).concat [_.first arr] diff --git a/test/invoke.coffee b/test/invoke.coffee index c3a4a75..3e8a495 100644 --- a/test/invoke.coffee +++ b/test/invoke.coffee @@ -1,12 +1,12 @@ assert = require 'assert' async = require 'async' _ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" describe '_.invoke', -> it 'works', (done) -> input = [{m: () -> '1'}, {m: () -> '2'}] - _(input).stream().invoke('m').run (err, result) -> + _s(_s.fromArray input).chain().invoke('m').toArray (err, result) -> assert.ifError err assert.deepEqual result, _(input).invoke('m') done() diff --git a/test/map.coffee b/test/map.coffee index 7835c95..cd64039 100644 --- a/test/map.coffee +++ b/test/map.coffee @@ -1,18 +1,18 @@ assert = require 'assert' async = require 'async' -_ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_ = require 'underscore' +_s = require "#{__dirname}/../index" sinon = require 'sinon' describe '_.map', -> it 'accepts fn (sync/async), calls it on each chunk, then passes the fn result along', (done) -> input = [{a:'1', b:'2'},{c:'2', d:'3'}] synch = (chunk) -> _(chunk).keys() - asynch = (chunk, cb) -> cb null, _(chunk).keys() # TODO: error handling + asynch = (chunk, cb) -> cb null, _(chunk).keys() expected = _(input).map(_.keys) async.forEach [synch, asynch], (fn, cb_fe) -> spy = sinon.spy fn - _(input).stream().map(spy).run (err, result) -> + _s(input).chain().fromArray(input).map(spy).toArray (err, result) -> assert.ifError err assert.deepEqual expected, result assert.equal spy.callCount, 2 diff --git a/test/range.coffee b/test/range.coffee new file mode 100644 index 0000000..3e9382a --- /dev/null +++ b/test/range.coffee @@ -0,0 +1,29 @@ +assert = require 'assert' +_s = require "#{__dirname}/../index" +_ = require 'underscore' + +readable_equals_array = (readable, array, cb) -> + _s.toArray readable, (err, arr) -> + assert.ifError err + assert.deepEqual arr, array + cb() + +tests = + 'no args': [] + 'size': [5] + 'size 0': [0] + 'start, stop': [0, 10] + 'start, stop zero': [0, 0] + 'start, stop, step': [0, 10, 2] + 'start, stop zero, step': [0, 0, 2] + 'start, negative stop, negative step': [0, -10, -2] + 'start, stop zero, negative step': [0, 0, -2] + 'negative size': [-10] + 'start > stop': [10, 0] + +describe '_s.range', -> + _(tests).each (args, test) -> + it test, (done) -> + readable_equals_array _s.range.apply(_s, args), _.range.apply(_, args), done + it "#{test} with stream_opts", (done) -> + readable_equals_array _s.range.apply(_s, args.concat({highWaterMark:10})), _.range.apply(_, args), done diff --git a/test/reduce.coffee b/test/reduce.coffee index e88dd59..d52eea6 100644 --- a/test/reduce.coffee +++ b/test/reduce.coffee @@ -1,47 +1,49 @@ assert = require 'assert' async = require 'async' _ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" +test_helpers = require "#{__dirname}/helpers" describe '_.reduce', -> - # fails for node < v0.10.20 due to https://github.com/joyent/node/issues/6183 + return if test_helpers.node_major() is 10 and test_helpers.node_minor() < 20 + it 'works with an empty stream with base 0', (done) -> - _([]).stream().reduce + _s(_s.fromArray []).chain().reduce base: 0 fn: (count, item) -> count += 1 - .run (err, data) -> + .toArray (err, data) -> assert.deepEqual data, [0] assert.ifError err done() it 'works on numbers', (done) -> - _([1, 2, 3]).stream().reduce({fn: ((a,b) -> a + b), base: 0}).run (err, data) -> + _s(_s.fromArray [1, 2, 3]).chain().reduce({fn: ((a,b) -> a + b), base: 0}).toArray (err, data) -> assert.ifError err assert.deepEqual data, [6] done() it 'works on objects', (done) -> - _([{a: 1, b: 2}, {a: 1, b: 3}, {a: 1, b: 4}]).stream().reduce( + _s(_s.fromArray [{a: 1, b: 2}, {a: 1, b: 3}, {a: 1, b: 4}]).chain().reduce( base: {} fn: (obj, new_obj) -> obj = { a: new_obj.a, b: [] } unless obj.b? obj.b.push new_obj.b obj - ).run (err, data) -> + ).toArray (err, data) -> assert.ifError err assert.deepEqual data, [{ a: 1, b: [2,3,4] }] done() it 'works with multiple bases', (done) -> - _([{a: 1, b: 2}, {a: 1, b: 3}, {a: 1, b: 4}, {a: 2, b: 1}, {a: 3, b: 2}]).stream().reduce( + _s(_s.fromArray [{a: 1, b: 2}, {a: 1, b: 3}, {a: 1, b: 4}, {a: 2, b: 1}, {a: 3, b: 2}]).chain().reduce( base: () -> {} key: (new_obj) -> new_obj.a fn: (obj, new_obj) -> obj = { a: new_obj.a, b: [] } unless obj.b? obj.b.push new_obj.b obj - ).run (err, data) -> + ).toArray (err, data) -> assert.ifError err assert.deepEqual data, [{a: 1, b: [2,3,4]}, {a: 2, b:[1]}, {a: 3, b: [2]}] done() diff --git a/test/rest.coffee b/test/rest.coffee index fcfa736..27b8884 100644 --- a/test/rest.coffee +++ b/test/rest.coffee @@ -1,13 +1,17 @@ _ = require 'underscore' assert = require 'assert' async = require 'async' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" +test_helpers = require "#{__dirname}/helpers" describe '_.rest', -> + # fails for node < v0.10.20 due to https://github.com/joyent/node/issues/6183 + return if test_helpers.node_major() is 10 and test_helpers.node_minor() < 20 + it 'skips some objects if skip < size of stream', (done) -> SKIP = 5 input = [0..10] - _(input).stream().rest(SKIP).run (err, result) -> + _s(_s.fromArray input).chain().rest(SKIP).toArray (err, result) -> assert.ifError err assert.deepEqual result, _(input).rest(SKIP) done() @@ -15,7 +19,7 @@ describe '_.rest', -> it 'skips all objects if skip size > size of stream', (done) -> SKIP = 100 input = [0..10] - _(input).stream().rest(SKIP).run (err, result) -> + _s(_s.fromArray input).chain().rest(SKIP).toArray (err, result) -> assert.ifError err assert.deepEqual result, [] done() diff --git a/test/toArray.coffee b/test/toArray.coffee new file mode 100644 index 0000000..7d56d6a --- /dev/null +++ b/test/toArray.coffee @@ -0,0 +1,10 @@ +assert = require 'assert' +_s = require "#{__dirname}/../index" + +describe '_s.toArray', -> + it 'turns a readable stream into an array', -> + arr_in = ['a', 'b', 'c', 'd'] + readable = _s.fromArray arr_in + _s.toArray readable, (err, arr_out) -> + assert.ifError err + assert.deepEqual arr_in, arr_out diff --git a/test/batch.coffee b/test/todo/batch.coffee similarity index 100% rename from test/batch.coffee rename to test/todo/batch.coffee diff --git a/test/custom-transform.coffee b/test/todo/custom-transform.coffee similarity index 100% rename from test/custom-transform.coffee rename to test/todo/custom-transform.coffee diff --git a/test/duplex.coffee b/test/todo/duplex.coffee similarity index 100% rename from test/duplex.coffee rename to test/todo/duplex.coffee diff --git a/test/errors.coffee b/test/todo/errors.coffee similarity index 100% rename from test/errors.coffee rename to test/todo/errors.coffee diff --git a/test/file.coffee b/test/todo/file.coffee similarity index 100% rename from test/file.coffee rename to test/todo/file.coffee diff --git a/test/file.txt b/test/todo/file.txt similarity index 100% rename from test/file.txt rename to test/todo/file.txt diff --git a/test/join.coffee b/test/todo/join.coffee similarity index 100% rename from test/join.coffee rename to test/todo/join.coffee diff --git a/test/mixin.coffee b/test/todo/mixin.coffee similarity index 100% rename from test/mixin.coffee rename to test/todo/mixin.coffee diff --git a/test/myawt.coffee b/test/todo/myawt.coffee similarity index 100% rename from test/myawt.coffee rename to test/todo/myawt.coffee diff --git a/test/queue.coffee b/test/todo/queue.coffee similarity index 100% rename from test/queue.coffee rename to test/todo/queue.coffee diff --git a/test/slice.coffee b/test/todo/slice.coffee similarity index 100% rename from test/slice.coffee rename to test/todo/slice.coffee diff --git a/test/spawn.coffee b/test/todo/spawn.coffee similarity index 100% rename from test/spawn.coffee rename to test/todo/spawn.coffee diff --git a/test/split.coffee b/test/todo/split.coffee similarity index 100% rename from test/split.coffee rename to test/todo/split.coffee diff --git a/test/stream.coffee b/test/todo/stream.coffee similarity index 100% rename from test/stream.coffee rename to test/todo/stream.coffee diff --git a/test/test.txt b/test/todo/test.txt similarity index 100% rename from test/test.txt rename to test/todo/test.txt diff --git a/test/wrap.coffee b/test/todo/wrap.coffee similarity index 100% rename from test/wrap.coffee rename to test/todo/wrap.coffee diff --git a/test/uniq.coffee b/test/uniq.coffee index 1e7193f..1c0671a 100644 --- a/test/uniq.coffee +++ b/test/uniq.coffee @@ -1,6 +1,6 @@ _ = require 'underscore' assert = require 'assert' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" describe '_.uniq', -> expected = [1...4] @@ -11,24 +11,24 @@ describe '_.uniq', -> describe 'underscore-style arguments', -> it 'works without a hash function', (done) -> - _(sorted_input).stream().uniq(true).run (err, result) -> + _s(_s.fromArray sorted_input).chain().uniq(true).toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() it 'works with a hash function', (done) -> - _(sorted_input).stream().uniq(true, String).run (err, result) -> + _s(_s.fromArray sorted_input).chain().uniq(true, String).toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() describe 'options object', -> it 'works without a hash function', (done) -> - _(sorted_input).stream().uniq(sorted: true).run (err, result) -> + _s(_s.fromArray sorted_input).chain().uniq(sorted: true).toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() it 'works with a hash function', (done) -> - _(sorted_input).stream().uniq({sorted: true, hash_fn: String}).run (err, result) -> + _s(_s.fromArray sorted_input).chain().uniq({sorted: true, hash_fn: String}).toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() @@ -39,31 +39,31 @@ describe '_.uniq', -> describe 'no arguments', -> it 'works', (done) -> - _(unsorted_input).stream().uniq().run (err, result) -> + _s(_s.fromArray unsorted_input).chain().uniq().toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() describe 'underscore-style arguments', -> it 'works with a hash function', (done) -> - _(unsorted_input).stream().uniq(String).run (err, result) -> + _s(_s.fromArray unsorted_input).chain().uniq(String).toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() it 'gives invalid results with sorted true', (done) -> - _(unsorted_input).stream().uniq(true).run (err, result) -> + _s(_s.fromArray unsorted_input).chain().uniq(true).toArray (err, result) -> assert.ifError err assert.deepEqual result, unsorted_input done() describe 'options object', -> it 'works with a hash function', (done) -> - _(unsorted_input).stream().uniq(hash_fn: String).run (err, result) -> + _s(_s.fromArray unsorted_input).chain().uniq(hash_fn: String).toArray (err, result) -> assert.ifError err assert.deepEqual result, expected done() it 'gives invalid result with sorted true', (done) -> - _(unsorted_input).stream().uniq(sorted: true).run (err, result) -> + _s(_s.fromArray unsorted_input).chain().uniq(sorted: true).toArray (err, result) -> assert.ifError err assert.deepEqual result, unsorted_input done() diff --git a/test/where.coffee b/test/where.coffee index 4b63abb..3d87737 100644 --- a/test/where.coffee +++ b/test/where.coffee @@ -1,12 +1,12 @@ assert = require 'assert' async = require 'async' _ = require 'underscore' -_.mixin require("#{__dirname}/../index").exports() +_s = require "#{__dirname}/../index" describe '_.where', -> it 'works', (done) -> input = [{a: 1, b: 2}, {a: 2, b: 2}, {a: 1, b: 3}, {a: 1, b: 4}] - _(input).stream().where({a: 1}).run (err, result) -> + _s(_s.fromArray input).chain().where({a: 1}).toArray (err, result) -> assert.ifError err assert.deepEqual result, _(input).where({a: 1}) done()