[cookbook] Stream #4, byline module (Custom Stream Object 작성하기)

Stream #4 Introduction

이번 시간에는 Stream 객체를 상속하여 stream data를 컨트롤 할 수 있는 User Define Stream 객체를 만들어 보고, 라인별로 stream data를 뽑아주는 byline 모듈을 설명하겠습니다.

용도

pump

이 그림 너무 재밌네요.

사전지식

Custom Stream Boilerplate

Reference

boilerplate

// Stream 객체 import
var Stream = require('stream').Stream;
var util = require('util');

module.exports = StreamCustom;

function StreamCustom() {
  this.writable = true; // 기본
  this.readable = true; // 기본
}

// Stream 을 상속하여, 복잡한 구현을 하지 않아도 된다. (pipe 사용)
util.inherits(StreamCustom, Stream);

//
// WritableStream#write
// stream의 data가 전송될 시 data 이벤트를 발생시킨다.
//
StreamCustom.prototype.write = function(data) {
  this.emit('data', data);
};

//
// end
//
StreamCustom.prototype.end = function() {
  this.emit('end');
};

//
// destroy
//
StreamCustom.prototype.destroy = function() {
  this.emit('close');
};

// 위와 같은 형식으로 원하는 stream method를 추가합니다.

stream과 stream을 연결해보기

var StreamCustom = require('./StreamCustom'),
    fs = require('fs');

var source = fs.createReadStream('source.txt');
var dest   = fs.createWriteStream('dest.txt');

var through = new StreamCustom();

// ReadableStream 을 Custom Stream 으로 연결
source.pipe(through);

// Custom Stream을 WritableStream 으로 연결
through.pipe(dest);

// 완료 이벤트
dest.on('close', function() {
  console.log('done!');
});

Byline

소개

stream 데이타는 4096 chunk 단위로 구성됩니다. 프로그래밍하다보면 이를 라인별로 받고 싶을 경우가 있는데, 이때 필요한 모듈입니다.

참조: 4k chunk는 전통적인 기본구성이고, ReadStream 에서는 64k chunk으로 고정하여 사용중입니다. 이는 원하는대로 변경할 수 있습니다.

설치

$ npm install byline
npm http GET http://registry.npmjs.org/byline
npm http 200 http://registry.npmjs.org/byline
npm http GET http://registry.npmjs.org/byline/-/byline-2.0.3.tgz
npm http 200 http://registry.npmjs.org/byline/-/byline-2.0.3.tgz
byline@2.0.3 ../../node_modules/byline

기본사용방법

byline 사용

$ cat byline_example.js

var fs = require('fs'),
    byline = require('byline');

//
// byline 모듈을 사용하여 ReadStream을 받습니다.
//
var stream = byline(fs.createReadStream('sample.txt'));

//
// 보통 여기서 chunk size만큼 데이타가 보여지는데
// sample.txt 파일 내용을 라인별로 끊어서 받을 수 있습니다.
// 어떻게 이런일이 벌어질까요?
// 바로 custom stream 때문입니다.
//  
stream.on('data', function(line) {
  console.log('===>', line);
});


//
// 라인별로 떨어지는것을 확인할 수 있습니다.
//
$ node byline_example.js 
===> abcdefghijklmn
===> 01234567890

일반적인 사용

$ cat normal_example.js

var fs = require('fs');

//
// byline 모듈을 사용하여 ReadStream을 받습니다.
//
var stream = fs.createReadStream('sample.txt');

// 5 바이트씩 stream data를 발생시킵니다.
stream.bufferSize = 5;

//
// 보통 여기서 chunk size만큼 데이타가 보여지는데
// sample.txt 파일 내용을 라인별로 끊어서 받을 수 있습니다.
// 어떻게 이런일이 벌어질까요?
// 바로 custom stream 때문입니다.
//  
stream.on('data', function(line) {
  console.log('===>', line.toString());
});


//
// buffersize 5 byte 만큼 stream data가 전송됩니다.
//
//
$ node normal_example.js 
===> abcde
===> fghij
===> klmn

===> 01234
===> 56789
===> 0

pipe 사용방법

byline 의 리턴값은 stream object 이므로, pipe method 사용하여 backend stream 으로 넘길 수 있습니다.

var stream = fs.createReadStream('sample.txt');

// line별로 읽은 ReadStream
stream = byline.createStream(stream);

// WriteStream 으로 전송
stream.pipe(fs.createWriteStream('nolines.txt'));

어떻게 라인별로 구현?

byline 해당 관련 부분 참조. 아래 설명하겠습니다.

function LineStream() {
  //
  // custom stream 기본값. stream은 읽거나 쓸 수 있다.
  //
  this.writable = true;
  this.readable = true;

  …
  …

  //
  // ReadStream에서 data 이벤트가 발생하면
  // WriteStream에 write 이벤트가 발생합니다.
  //
  this.write = function(data, encoding) {
    if (Buffer.isBuffer(data)) {
      data = data.toString(encoding || 'utf8');
    }

    // 데이타를 라인별로 나눕니다.
    // 참조: 블로그 글 작성시 markdown이 깨져서 라인피드에 공백을 넣어놓았습니다.
    var parts = data.split(/\\ n|\\ r\\ n/g);

    // buffer 변수의 데이타가 있으면
    // parts변수가 세팅되고
    if (buffer.length > 0) {
      parts[0] = buffer + parts[0];
    }

    // 라인별로 구분된 parts변수가 있으면
    // data 이벤트를 발생시킵니다.
    for (var i = 0; i < parts.length -1; i++) {
      self.emit('data', parts[i]);
    }

    // 다시 buffer변수를 조절합니다.
    buffer = parts[parts.length - 1];
  };

  //
  // buffer 값이 남아 있으면 data 를 발생
  //
  this.end = function(){
    if (buffer.length > 0){
      self.emit('data', buffer);
    }
    self.emit('end');
  };
  …
  …   
}
util.inherits(LineStream, Stream);

위와 같이 ReadStream에서 넘어온 데이타를 WriteStream에서 write, end 메소드를 재구현하여 stream data를 조작할 수 있었습니다.

Conclusion

이번편에서는 Stream 을 재구성하여 stream data를 조작하는 방법을 알아보았습니다. stream data는 nodejs의 기본이며, 예컨데 tcp / udp 에서 발생하는 패킷 데이타 분석, 조작을 모두 custom하게 할 수 있습니다. 아이디어는 여러분의 것입니다. 더위 조심하세요. 감사합니다. KIN플 하세요. ~~~~

추천 관련글