Introduction
Re-thinking the JavaScript utility belt, Highland manages synchronous and asynchronous code easily, using nothing more than standard JavaScript and Node-like streams. You may be familiar with Promises, EventEmitters and callbacks, but moving between them is far from seamless. Thankfully, there exists a deeper abstraction which can free our code. By updating the tools we use on Arrays, and applying them to values distributed in time instead of space, we can discard plumbing and focus on the important things. With Highland, you can switch between synchronous and asynchronous data sources at will, without having to re-write your code. Time to dive in!
Made by @caolan, with help and patience from friends - Leave a tip or fork this :)
Usage in the browser
Highland can be used both in Node.js and in the browser. When you install the highland package,
you will find a dist/highland.js
file in the package hierarchy.
This file has been prepared with browserify in order to bring a browser-ready version of highland.
Simply load this script with a <script>
tag, and a highland
variable
will be made available in the global javascript scope.
If you prefer using highland under the name _
like is done in the examples below,
you can then simply use:
var _ = highland
You can also integrate highland in your own browserify bundle using a classical browserify workflow, but this is beyond the scope of this documentation.
Examples
Converting to/from Highland Streams
_([1,2,3,4]).toArray(function (xs) {
// xs is [1,2,3,4]
});
Mapping over a Stream
var doubled = _([1,2,3,4]).map(function (x) {
return x * 2;
});
Reading files in parallel (4 at once)
var data = _(filenames).map(readFile).parallel(4);
Handling errors
data.errors(function (err, rethrow) {
// handle or rethrow error
});
Piping to a Node Stream
data.pipe(output);
Piping in data from Node Streams
var output = fs.createWriteStream('output');
var docs = db.createReadStream();
// wrap a node stream and pipe to file
_(docs).filter(isBlogpost).pipe(output);
// or, pipe in a node stream directly:
var through = _.pipeline(_.filter(isBlogpost));
docs.pipe(through).pipe(output);
Handling events
var clicks = _('click', btn).map(1);
var counter = clicks.scan(0, _.add);
counter.each(function (n) {
$('#count').text(n);
});
Arrays
To work with data in Arrays, just wrap it in _()
. The Highland methods are then available on it:
var shouty = _(['foo', 'bar', 'baz']).map(toUpperCase);
These methods return Stream objects, not Arrays, so you can chain together method calls:
_(['foo', 'bar', 'baz']).map(toUpperCase).map(function (x) {
return {name: x};
});
When using the Highland APIs there is little reason to turn this back into an Array, but if you're calling an outside library you may need to convert it back:
_(['foo', 'bar', 'baz']).map(toUpperCase).toArray(function (xs) {
// xs will now be ['FOO', 'BAR', 'BAZ]
});
Passing a function to the toArray
call may seem a little unfamiliar, but this enables an important trick in Highland. Now, without changing any of your existing code, you could swap out ['foo', 'bar', 'baz']
for an asynchronous data source, and it would just work!
You can also pass Arrays into the top-level functions instead of using methods on the Stream object:
_.map(doubled, [1, 2, 3, 4]) // => 2 4 6 8
Note, this still returns a Stream.
Async
Now, let's see how we might swap out an Array source for an asynchronous one. By passing a function to the Stream constructor we can manually push values onto the Stream:
function getData(filename) {
// create a new Stream
return _(function (push, next) {
// do something async when we read from the Stream
fs.readFile(filename, function (err, data) {
push(err, data);
push(null, _.nil);
});
});
};
First, we return a new Stream which when read from will read a file (this is called lazy evaluation). When fs.readFile
calls its callback, we push the error and data values onto the Stream. Finally, we push _.nil
onto the Stream. This is the "end of stream" marker and will tell any consumers of this stream to stop reading.
Since wrapping a callback is a fairly common thing to do, there is a convenience function:
var getData = _.wrapCallback(fs.readFile);
Now we have a new asynchronous source, we can run the exact same code from the Array examples on it:
getData('myfile').map(toUpperCase).map(function (x) {
return {name: x};
});
With Highland, we really can have one language to work with both synchronous and asynchronous data, whether it's from a Node Stream, an EventEmitter, a callback or an Array. You can even wrap ES6 or jQuery promises:
var foo = _($.getJSON('/api/foo'));
Laziness
When you call map
in Highland, it doesn't go off and immediately map over all your data. Rather it defines your intention, and the hard work occurs as you pull data from the Stream. This is lazy evaluation and it's what enables Highland to manage back-pressure and also the sequencing of asynchronous actions, such as reading from a file.
var calls = 0;
var nums = _(['1', '2', '3']).map(function (x) {
calls++;
return Number(x);
});
// calls === 0
To get the map iterator to be called, we must consume the stream. A number of Highland methods will do so (e.g., each
, done
, apply
, toArray
, pipe
, resume
). A stream may only be consumed once and consuming an already-consumed stream will result in undefined behavior.
nums.each(function (n) { console.log(n); });
// calls === 3
Equally, when we tell Highland to map a Stream of filenames to the `readFile` function, it doesn't actually go and read all the files at once, it let's us decide on how we want to read them:
filenames.map(readFile).series();
filenames.map(readFile).parallel(10)
Back-pressure
Since Highland is designed to play nicely with Node Streams, it also support back-pressure. This means that a fast source will not overwhelm a slow consumer.
fastSource.map(slowThing)
In the above example, fastSource
will be paused while slowThing
does its processing.
Some streams (such as those based on events) cannot be paused. In these cases data is buffered until the consumer is ready to handle it. If you expect a non-pausable source to be consumed by a slow consumer, then you should use methods such as throttle or latest to selectively drop data and regulate the flow.
Occasionally, you'll need to split Streams in your program. At this point, Highland will force you to choose between sharing back-pressure with the new consumer, or letting the existing consumer regulate backpressure and have the new consumer simply observe values as they arrive. Attempting to add two consumers to a Stream without calling fork or observe will throw an error.
// shared back-pressure
source.map(output1);
source.fork().map(output2);
// let the first handle backpressure and the second simply observe
source.map(output);
source.observe().map(_.log);
Currying
As well as calling functions as methods on the Stream object, Highland also exports them at the top-level.
mystream.map(doubled)
// is equivalent to
_.map(doubled, mystream)
By convention, all top-level functions are "curryable", meaning you can partially apply their arguments. In the above example, this could be called as:
_.map(doubled)(mystream);
In real-world use, this means you can define the behaviour you'd like before knowing what stream you'd like to perform it on:
// partially apply the filter() function to create a new function
var getBlogposts = _.filter(function (doc) {
return doc.type === 'blogpost';
});
// now we can use the new function by completing its arguments
getBlogposts(data); // => new Stream of blogposts
You can curry your own functions too:
var myCurryableFn = _.curry(fn);
Events
Streams can be used to handle events as well as data, control-flow and error propagation. This is often a convenient way to filter and combine events into groups, a common goal on dynamically updated sites.
var inbox = _('message', client).where({recipient: 'me'});
If you expect to receive a lot of events, and perform an async process on each of them, then you should sample the events instead of buffering all of them.
// get a frequent event source
var text = _('keyup', $('#searchbox'));
// Regulate event stream:
// - wait until no keyup events for 1s
// - when read from, only return the latest value
var searches = text.debounce(1000).latest();
// map the search events to an AJAX request
var results = searches.map(searchRequest);
// for each response, display it
results.each(function (result) {
// display result
});
{{#each sections}}