NodeJS stream typed piping

Kasama Chenkaow
2 min readSep 1, 2023

--

I’m just wanna take a note on the challenge I encountered today where I wanna use NodeJS official stream APIs to help piping my data processing but it seems that official types of the APIs does not come with fully type support on .pipe() that much so I ended up create one for myself

import stream, { Transform, TransformCallback } from 'node:stream';

/**
* Extends the Readable class with a generic chunk type.
*/
export class GenericReadable<T> extends stream.Readable {
// Need to add this because of how Typescript works regarding class structure comparison.
// In short, methods and constructor are not compared, only properties.
// Read more on https://www.typescriptlang.org/docs/handbook/type-compatibility.html#classes
_typeGuard: T;
constructor(options?: stream.ReadableOptions) {
super(options);
}

// Need a generic pipe to allow piping only to the same generic stream chunk type
pipeT<U extends GenericWritable<T>>(destination: U, options?: { end?: boolean }): U {
return super.pipe(destination, options);
}
}

/**
* Extends the Transform class with a generic chunk type.
*/
interface GenericTransformOptions<T> extends stream.TransformOptions {
transform?: (this: Transform, chunk: T, encoding: BufferEncoding, callback: TransformCallback) => void;
}

export class GenericTransform<T> extends stream.Transform {
_typeGuard: T; // See why we need this in the GenericReadable<T> class.
constructor(options?: GenericTransformOptions<T>) {
super(options);
}

pipeT(destination: GenericWritable<T>, options?: { end?: boolean }): GenericWritable<T> {
return super.pipe(destination, options);
}
}

/**
* Extends the Writable class with a generic chunk type.
*/
interface GenericWritableOptions<T> extends stream.WritableOptions {
write?: (this: Transform, chunk: T, encoding: BufferEncoding, callback: TransformCallback) => void;
}

export class GenericWritable<T> extends stream.Writable {
_typeGuard: T; // See why we need this in the GenericReadable<T> class.
constructor(options?: GenericWritableOptions<T>) {
super(options);
}
}

Just thought it could help someone who’s stuck with the same problem!

--

--