Beliebte Suchanfragen

Cloud Native

DevOps

IT-Security

Agile Methoden

Java

|
//

Streams in JavaScript: a versatile Tool

23.5.2014 | 9 minutes of reading time

Streams in Node.js are an important and versatile Tool. Unix Pipes were an example for Streams. At the beginning they were just representations of byte sequences or strings, built to improve the efficiency of the internals of Node.js, especially the parsers. But streams were always sequences of messages or events. And that makes them so versatile.

What is a Stream?

A Stream is an abstraction of a sequence of data, which is distributed in time. Usually we know sequences as arrays and lists, data distributed in space. First, we load the whole sequence into memory and then process the data. Streams allow us to process data chunk-by-chunk, as data appears in memory, pass it further and then to forget it immediately. In this case we don’t need much memory, ideally just enough to hold one single chunk and the result of its transformation.

Streams have a few properties we have to consider when working with them. They can be push- or pull-based. This is a question about how we can consume data from a stream. What happens, if I am too slow, or too fast? Do I have to buffer data myself?

A Stream in Node.js can be readable, writable or both, which we call Duplex stream. Transforms are a special kind of stream, because their output depends directly on their input. They can be used for higher purposes, as we’ll see later.

Streams are versatile

Well, how can we use streams? An almost classical use case is to reduce memory needs. A typical example for that is reading and processing a file:

1var fileStream = fs.createReadStream(filename)
2fileStream.on('readable', function(){
3  var chunk
4 
5  while( (chunk = fileStream.read() ) != null )
6    doSomething(chunk)
7})
8 
9fileStream.on('end', function(){
10  console.log('Filestream ended!')
11})

In this code, a file is read with a Readable stream and processed chunk-by-chunk. The reading here is pull-based. The fileStream informs the user code via the 'readable' event, when there is data available. Then it reads the data until the buffer of the stream is empty. .read() returns null to signal that. Then we can wait until the next event. Often it is simpler just to .pipe() the readable to a writable stream instead of consuming it step by step:

1fileStream.pipe(response)

Now we can see an important strength of streams: they are highly combinable. Because all streams implement the same communication protocol, we can stick them together by just calling the .pipe() method. Streams, used as the public interface of a module, allow us to achieve the least coupling, so called Message Coupling.

Because streams are really sequences, they can be processed very similarly to arrays and lists. A Transform is an abstract, generic map-function; based on it we can implement further functions such as filter, flatten, reduce etc.

1var devocalize = new stream.Transform()
2 
3devocalize._transform = function( chunk, encoding, callback ) {
4  var devocalize = chunk.replace( /(a|e|i|o|u)/gi , '')
5  this.push(devocalize)
6  callback()
7}
8// Devocalize -> Dvclz
9request.pipe(devocalize).pipe(response)

So you can export a stream, or a factory function which returns a stream, to make your module seamlessly cooperate with others. Your module doesn’t need to know anything about the others, the only detail that matters is the structure of the data coming in and out of the stream. I write “structure” intentionally, because the word “type” is not that expressive in JavaScript. All that matters in the example above is that the data object have the .replace() method on it.

Let’s imagine further: Transforms, which produce output from the input, possibly without side effects. Sounds like “pure functions”, right? Chains of transforms are combined transforms of higher order. We heard about that somewhere!

Streams and Functional Programming

In functional programming, software is mostly seen as a transformation of input to output. In Haskell:

Int -> Int -> Int

describes a function, that basically receives two Ints and returns a new one. The arrows describe a transformation. Mathematically this is a function that maps a set of int-tuples to another set of ints. As said before: transform streams are map functions. Do you see the similarities?

The Highland.js library consequently follows this approach. Lazy evaluation, partial application, higher-order streams etc. This library is quite capable to solve the challenges on higher levels of abstraction: architecture implementation, integration und abstraction of the outer boundaries of an application. The application itself becomes a higher-order stream itself, combined from smaller streams.

But wait! Doesn’t functional means free of side effects? Right. But every meaningful software has to have side effects. The world is one single mutable state, at least in our perception. We need side effects to have influence on it. Functional languages are no different. But they try hard to push the mutable state as far as they can to the boundaries of the application and encapsulate it away then. In Haskell the developer is advised to strictly separate state and logic. In JavaScript it is a good habit, too. Streams are a great tool for that.

Now, because streams allow us to apply most of the functional concepts, we can model and implement the processing of input to output with them as a chain of steps. Following the modular approach, this style of code may look very declarative. Below is a simple file server, that handles HTTP requests in two branches. The first one takes POSTs, where a file is saved to the hard drive, the second one handles GET by sending the saved file:

1var hl = require('highland')
2 
3// server is a stream of req-res tupels
4var server = hl('request', httpServer, ['req', 'res'])
5// this branch process POST 
6server.fork().filter(function(reqRes){ return reqRes.req.method === 'POST'})
7  .map(function(reqResPosts){
8    hl(reqRes.req).pipe(fs.createWriteStream(toFilename(reqRes.req.url)))
9    return reqRes.res
10  }).each(function(res){
11    res.writeHead(201)
12    res.end()
13  })
14// this one GET
15server.fork().filter(function(reqRes){ return reqRes.req.method === 'GET'})
16  .each(function(reqRes){
17    fs.createReadStream(toFilename(reqRes.req.url)).pipe(reqRes.res)
18  })

or even more modular:

1var hl = require('highland')
2 
3var server = hl('request', httpServer, ['req', 'res'])
4 
5server.fork()
6  .filter(onlyPOST)
7  .map(writeToFile)
8  .each(respondOK)
9 
10server.fork()
11  .filter(onlyGET)
12  .each(respondWithFile)

The important thing in this example is that an application is just a chain of functional transformations. Functional here in the sense of a business logic function.

Streams and Tooling

Because streams carry and transform messages on-the-fly, because the can encapsulate mutable state, because they are pipe-able to each other, they are versatile. Here is a summary on what they can be used for:

  • Memory usage control
  • Transformation on-the-fly
  • unified interfaces
  • encapsulation of state and decoupling of modules
  • flow control

It’s clear, that not every library or framework will fit all of these purposes equally. So I want to give you an overview of 3 libraries I found useful.

Node.js Core Streams

The Node.js Stream Module is one of the main components in Node.js, which are responsible for its famous performance and small memory footprint. Core streams are also an example and the source of many specialized streams and stream-based APIs on npm. This module focuses on performance for low-level operations e. g. HTTP-parsing. So the Node.js developers spent much effort to make streams work with binary data as efficiently as they could. The second priority was to allow other developers to use stream and implement custom streams easily. Unix pipes were the dream behind the .pipe() method. Core streams implement many of the technical details, such as buffering and back pressure handling. Because of that we can concentrate on our logic. For that we have these 4 abstract constructors: Readable, Writable, Duplex and Transform. An example of a Duplex is below, _read and _write are the points for our logic:

1function Dup( ){
2  Duplex.call(this)
3}
4util.inherits(Dup, Duplex)
5 
6Dup.prototype._write = function(chunk, enc, cb){
7  consume(chunk, enc)
8  cb()
9}
10 
11Dup.prototype._read = function( size ){
12  this.push( createChunk(size) )
13}

Core streams work with Buffer and String instances by default, to make them work with other objects, we have to activate the "objectMode" via a constructor parameter. If you want to use this API in the browser, you can install the readable-stream module which mirrors the core API and use it with Browserify and similar tools.

Core streams in Node.js support both push- and pull-based reading on the current stable branch (0.10). However changing between these modes is unreliable, but will be fixed in the upcoming stable version (0.12.). Then it will be a matter of .pause() and .resume(). But until then you have to choose before you start to consume the data.

The example above shows a prototype-based implementation. Almost nothing looks functional here. This is because of considerations regarding V8’s optimizing compiler. The next library helps here with a simpler and more elegant approach for creation of streams.

EventStream

The EventStream module made by Dominic Tarr is basically a collection of different stream implementations from npm, especially through, but they all are bound by the same idea:

EventStream is like functional programming meets IO.

The strength of these modules is simplicity and ease of use. EventStream provides a few generic streams, that can be easily extended or configured. through: itself, which is the core of the library is one of the simplest ways to create a stream from a function. through streams are push-based. To control the flow of data you can use the pause stream or the .pause() method. merge and some terminating streams are also there. Streams are like arrays and we should be able to work with them just as easily. Also there are tons of custom streams on npm , that are not part of EventStream but depending on through.

Worth mentioning is that through, which is a transform, is the core of EventStream. In contrast to that in Node.js Core, a Transform is a special Duplex – just the other way around. Here is an example of a simple through implementation:

1es.through(function transform(data) {
2    this.emit('data', data)
3    //this.pause()
4  },
5  function end () { //optional
6    this.emit('end')
7  })
1var s = hl('request', httpServer, ['req', 'res'])
2 
3s.fork().pluck('req').map(hl).sequence().pipe(process.stdout)
4s.fork().pluck('res').each(function(res){
5  res.writeHead(200)
6  res.end('Success!')
7})

In my opinion this is the strength of Highland.js: It allows us to think and model our applications differently without requiring much effort for changing it. Here, for consistency, a simple stream with Highland:

1var count = 0
2 
3var streamIntsTill100 = hl(function (push, next) {
4  count += 1
5  if (count <= 100)
6    push(null, count)
7  else
8    push(new Error('Overflow!'))
9  next()
10})

What should I use?

This question is really about priorities and trade-offs. If performance is key, then Core Streams are probably the better choice even though your code base might become bigger than with other options. EventStream and through-based streams come with many functions already implemented and could reduce development time greatly. Highland.js provides the possibility to think differently about the application and to derive new functions from existing ones easily.

Personally I’m a fan of Highland. But those who are not that into functional concepts might be alienated by it. EventStream is a good entry point to play with streams. And, of course, it is always worthwhile to know how Node.js core streams work.

Conclusion

You could call streams an “Eierlegende Wollmilchsau” (lit. “egg-laying-wool-and-milk-pork”) – the German version of a swiss army knife. They are not quite that, but close. They are a versatile tool that can do more than may be immediately apparent. Used on different levels of abstraction they can help with performance, compose-ability, decoupling and flow control. With streams we can embrace functional concepts and make our programs more declarative. It is always a good thing to learn about and work with streams and to improve parts of the applications we code with their help. You don’t have to go the whole nine yards, you can easily start by sprinkling streams in here and there. If you develop in Node.js, chances are, you are using them already. Streams do not stand in your way.

Do you have experience using Streams? Let’s hear them in the comments, I’m very curious!

|

share post

Likes

1

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.