Overview

Streams in JavaScript: a versatile Tool

1 Comment

Streams in Node.js are an important and versatile Tool. Unix Pipes were an example for Streams. At the beginning they were just representations of byte sequences or strings, built to improve the efficiency of the internals of Node.js, especially the parsers. But streams were always sequences of messages or events. And that makes them so versatile.

What is a Stream?

A Stream is an abstraction of a sequence of data, which is distributed in time. Usually we know sequences as arrays and lists, data distributed in space. First, we load the whole sequence into memory and then process the data. Streams allow us to process data chunk-by-chunk, as data appears in memory, pass it further and then to forget it immediately. In this case we don’t need much memory, ideally just enough to hold one single chunk and the result of its transformation.

Streams have a few properties we have to consider when working with them. They can be push- or pull-based. This is a question about how we can consume data from a stream. What happens, if I am too slow, or too fast? Do I have to buffer data myself?

A Stream in Node.js can be readable, writable or both, which we call Duplex stream. Transforms are a special kind of stream, because their output depends directly on their input. They can be used for higher purposes, as we’ll see later.

Streams are versatile

Well, how can we use streams? An almost classical use case is to reduce memory needs. A typical example for that is reading and processing a file:

var fileStream = fs.createReadStream(filename)
fileStream.on('readable', function(){
  var chunk
 
  while( (chunk = fileStream.read() ) != null )
    doSomething(chunk)
})
 
fileStream.on('end', function(){
  console.log('Filestream ended!')
})

In this code, a file is read with a Readable stream and processed chunk-by-chunk. The reading here is pull-based. The fileStream informs the user code via the 'readable' event, when there is data available. Then it reads the data until the buffer of the stream is empty. .read() returns null to signal that. Then we can wait until the next event. Often it is simpler just to .pipe() the readable to a writable stream instead of consuming it step by step:

fileStream.pipe(response)

Now we can see an important strength of streams: they are highly combinable. Because all streams implement the same communication protocol, we can stick them together by just calling the .pipe() method. Streams, used as the public interface of a module, allow us to achieve the least coupling, so called Message Coupling.

Because streams are really sequences, they can be processed very similarly to arrays and lists. A Transform is an abstract, generic map-function; based on it we can implement further functions such as filter, flatten, reduce etc.

var devocalize = new stream.Transform()
 
devocalize._transform = function( chunk, encoding, callback ) {
  var devocalize = chunk.replace( /(a|e|i|o|u)/gi , '')
  this.push(devocalize)
  callback()
}
// Devocalize -> Dvclz
request.pipe(devocalize).pipe(response)

So you can export a stream, or a factory function which returns a stream, to make your module seamlessly cooperate with others. Your module doesn’t need to know anything about the others, the only detail that matters is the structure of the data coming in and out of the stream. I write “structure” intentionally, because the word “type” is not that expressive in JavaScript. All that matters in the example above is that the data object have the .replace() method on it.

Let’s imagine further: Transforms, which produce output from the input, possibly without side effects. Sounds like “pure functions”, right? Chains of transforms are combined transforms of higher order. We heard about that somewhere!

Streams and Functional Programming

In functional programming, software is mostly seen as a transformation of input to output. In Haskell:

Int -> Int -> Int

describes a function, that basically receives two Ints and returns a new one. The arrows describe a transformation. Mathematically this is a function that maps a set of int-tuples to another set of ints. As said before: transform streams are map functions. Do you see the similarities?

The Highland.js library consequently follows this approach. Lazy evaluation, partial application, higher-order streams etc. This library is quite capable to solve the challenges on higher levels of abstraction: architecture implementation, integration und abstraction of the outer boundaries of an application. The application itself becomes a higher-order stream itself, combined from smaller streams.

But wait! Doesn’t functional means free of side effects? Right. But every meaningful software has to have side effects. The world is one single mutable state, at least in our perception. We need side effects to have influence on it. Functional languages are no different. But they try hard to push the mutable state as far as they can to the boundaries of the application and encapsulate it away then. In Haskell the developer is advised to strictly separate state and logic. In JavaScript it is a good habit, too. Streams are a great tool for that.

Now, because streams allow us to apply most of the functional concepts, we can model and implement the processing of input to output with them as a chain of steps. Following the modular approach, this style of code may look very declarative. Below is a simple file server, that handles HTTP requests in two branches. The first one takes POSTs, where a file is saved to the hard drive, the second one handles GET by sending the saved file:

var hl = require('highland')
 
// server is a stream of req-res tupels
var server = hl('request', httpServer, ['req', 'res'])
// this branch process POST 
server.fork().filter(function(reqRes){ return reqRes.req.method === 'POST'})
  .map(function(reqResPosts){
    hl(reqRes.req).pipe(fs.createWriteStream(toFilename(reqRes.req.url)))
    return reqRes.res
  }).each(function(res){
    res.writeHead(201)
    res.end()
  })
// this one GET
server.fork().filter(function(reqRes){ return reqRes.req.method === 'GET'})
  .each(function(reqRes){
    fs.createReadStream(toFilename(reqRes.req.url)).pipe(reqRes.res)
  })

or even more modular:

var hl = require('highland')
 
var server = hl('request', httpServer, ['req', 'res'])
 
server.fork()
  .filter(onlyPOST)
  .map(writeToFile)
  .each(respondOK)
 
server.fork()
  .filter(onlyGET)
  .each(respondWithFile)

The important thing in this example is that an application is just a chain of functional transformations. Functional here in the sense of a business logic function.

Streams and Tooling

Because streams carry and transform messages on-the-fly, because the can encapsulate mutable state, because they are pipe-able to each other, they are versatile. Here is a summary on what they can be used for:

  • Memory usage control
  • Transformation on-the-fly
  • unified interfaces
  • encapsulation of state and decoupling of modules
  • flow control

It’s clear, that not every library or framework will fit all of these purposes equally. So I want to give you an overview of 3 libraries I found useful.

Node.js Core Streams

The Node.js Stream Module is one of the main components in Node.js, which are responsible for its famous performance and small memory footprint. Core streams are also an example and the source of many specialized streams and stream-based APIs on npm. This module focuses on performance for low-level operations e. g. HTTP-parsing. So the Node.js developers spent much effort to make streams work with binary data as efficiently as they could. The second priority was to allow other developers to use stream and implement custom streams easily. Unix pipes were the dream behind the .pipe() method. Core streams implement many of the technical details, such as buffering and back pressure handling. Because of that we can concentrate on our logic. For that we have these 4 abstract constructors: Readable, Writable, Duplex and Transform. An example of a Duplex is below, _read and _write are the points for our logic:

function Dup( ){
  Duplex.call(this)
}
util.inherits(Dup, Duplex)
 
Dup.prototype._write = function(chunk, enc, cb){
  consume(chunk, enc)
  cb()
}
 
Dup.prototype._read = function( size ){
  this.push( createChunk(size) )
}

Core streams work with Buffer and String instances by default, to make them work with other objects, we have to activate the "objectMode" via a constructor parameter. If you want to use this API in the browser, you can install the readable-stream module which mirrors the core API and use it with Browserify and similar tools.

Core streams in Node.js support both push- and pull-based reading on the current stable branch (0.10). However changing between these modes is unreliable, but will be fixed in the upcoming stable version (0.12.). Then it will be a matter of .pause() and .resume(). But until then you have to choose before you start to consume the data.

The example above shows a prototype-based implementation. Almost nothing looks functional here. This is because of considerations regarding V8’s optimizing compiler. The next library helps here with a simpler and more elegant approach for creation of streams.

EventStream

The EventStream module made by Dominic Tarr is basically a collection of different stream implementations from npm, especially through, but they all are bound by the same idea:

EventStream is like functional programming meets IO.

The strength of these modules is simplicity and ease of use. EventStream provides a few generic streams, that can be easily extended or configured. through: itself, which is the core of the library is one of the simplest ways to create a stream from a function. through streams are push-based. To control the flow of data you can use the pause stream or the .pause() method. merge and some terminating streams are also there. Streams are like arrays and we should be able to work with them just as easily. Also there are tons of custom streams on npm, that are not part of EventStream but depending on through.

Worth mentioning is that through, which is a transform, is the core of EventStream. In contrast to that in Node.js Core, a Transform is a special Duplex – just the other way around. Here is an example of a simple through implementation:

es.through(function transform(data) {
    this.emit('data', data)
    //this.pause()
  },
  function end () { //optional
    this.emit('end')
  })

Highland.js

Highland.js made by Coalan McMahon follows similar idea, but even more consequently and unites functional programming and streams. It provides many ways to manipulate streams, to transform them and to work with higher-order streams. Stream combinations, currying and lazy evaluations.

With Highland we can wrap many things in a stream: EventEmitters, arrays, functions, promises and other streams. Highland streams try to maintain compatibility to the Node.js API. The library tries to solve typical problems with flow control in asynchronous environments using streams, as well as to re-think sequence processing in general. In the end the distinction between synchronous and asynchronous code blurs, it becomes an irrelevant detail. The first highland example above and the next one below have much in common functionality-wise. Both take the HTTP-Request body and write its content into a file or standard output. But the chain of processing is different. The first example implements writing to the file, the side effect, in the map function, violating the single responsibility paradigm. The following example flattens the process with .sequence(). This is sensible, because the side effect is the main purpose of this application and the mutation of the state is pushed as far away is we could, to the end of the chain: .pipe(process.stdout)

var s = hl('request', httpServer, ['req', 'res'])
 
s.fork().pluck('req').map(hl).sequence().pipe(process.stdout)
s.fork().pluck('res').each(function(res){
  res.writeHead(200)
  res.end('Success!')
})

In my opinion this is the strength of Highland.js: It allows us to think and model our applications differently without requiring much effort for changing it. Here, for consistency, a simple stream with Highland:

var count = 0
 
var streamIntsTill100 = hl(function (push, next) {
  count += 1
  if (count <= 100)
    push(null, count)
  else
    push(new Error('Overflow!'))
  next()
})

What should I use?

This question is really about priorities and trade-offs. If performance is key, then Core Streams are probably the better choice even though your code base might become bigger than with other options. EventStream and through-based streams come with many functions already implemented and could reduce development time greatly. Highland.js provides the possibility to think differently about the application and to derive new functions from existing ones easily.

Personally I’m a fan of Highland. But those who are not that into functional concepts might be alienated by it. EventStream is a good entry point to play with streams. And, of course, it is always worthwhile to know how Node.js core streams work.

Conclusion

You could call streams an “Eierlegende Wollmilchsau” (lit. “egg-laying-wool-and-milk-pork”) – the German version of a swiss army knife. They are not quite that, but close. They are a versatile tool that can do more than may be immediately apparent. Used on different levels of abstraction they can help with performance, compose-ability, decoupling and flow control. With streams we can embrace functional concepts and make our programs more declarative. It is always a good thing to learn about and work with streams and to improve parts of the applications we code with their help. You don’t have to go the whole nine yards, you can easily start by sprinkling streams in here and there. If you develop in Node.js, chances are, you are using them already. Streams do not stand in your way.

Do you have experience using Streams? Let’s hear them in the comments, I’m very curious!

More content about Flexible Architectures

Kommentare

Comment

Your email address will not be published. Required fields are marked *