RxJS as a Gulp replacement
When writing the tooling to generate this blog, I picked up Gulp as a task executor/file processor. However, with the development, I had some issues, including incompatibility with Deno that I wanted to move onto. Gulp streams reminded me of another library I used in the past, RxJS. I decided to see how I could use it to replace Gulp with a bit of custom code, and it was rather easy.
It was a nice practical exercise, and made me learn more about how RxJS works. I focus myself here on the file processing aspect, but things shown here are valid for RxJS in other workflows too. On the other hand, I will be focusing more on the file processing/stream aspect here than the task runner aspect of Gulp, as RxJS is not relevant for that part.
As this was made to move into Deno, this article is all based on Deno. Deno is a new JavaScript runner that bundles a lot of nice features like built-in TypeScript support. While it should be mostly compatible with the classical NodeJS environment, there will be some adjustments to do to match with it, including using a package.json
instead of using URLs in import
statements, and some library mappings from @std
to their node equivalent.
The basics
Gulp is a JavaScript tool that allows to write and execute tasks to transform files into others by applying a series of stream transformations. Tasks basically look like that:
export const transform = src("**/*.js").pipe(someTransformer()).dest("dist");
Gulp is leveraging object streams of Node.js, passing instance of vinyl
objects (virtual files) along different transformers. Gulp has a wide array of “plugins” available usable as transformers. In the case you need something custom, you can simply leverage Transformer
.
RxJS is a library that supports a programming style called “Reactive programming”. Basically, it provides data streams and a wide array of transformer functions to work with these streams. The stream format and intermediate transforms seemed to be replicating Gulp streams well enough to be a suitable replacement.
Sourcing files
The first thing we will need to replicate is the gulp.src
function. It is a function that takes a glob pattern, matching files and creating a stream of virtual files from them. For the files, it leverages vinyl
to provide for virtual files. These objects contain a path, file contents, and history of paths. Transformers work on these files, altering the contents and paths at will.
To do the glob step, we can simply rely on the glob
library. Its Glob
class is directly pluggable into RxJS streams:
import {Glob} from "https://esm.sh/glob@^10.4.2";
import {from} from "https://esm.sh/rxjs@^7.8.1";
from(new Glob("**/*.ts", {})).subscribe((file) => console.log(file));
vinyl
is perfectly usable outside the Gulp environment, but in this I will favour the vfile
library, notably because it is based on UInt8Array
instead of Buffer
as file contents, which play better with modern JavaScript and especially Deno.
So now that we have an observable of paths, we need to actually load those files into VFile
objects. This can simply be done by inserting a map
operation in our stream:
import {Glob} from "https://esm.sh/glob@^10.4.2";
import {from, map} from "https://esm.sh/rxjs@^7.8.1";
import {readSync} from "https://esm.sh/to-vfile@^8.0.0";
import {resolve} from "jsr:@std/path@^0.225.2";
from(new Glob("**/*.ts", {}))
.pipe(map((file) => readSync(resolve(file))))
.subscribe((file) => console.log(file));
I leveraged the to-vfile
library that provides helpers to bridge virtual files to the actual filesystem. The readSync
method will take an absolute path (that we get through resolve
) and returns a VFile
.
Going async
Modern JavaScript leverages promises a lot, and RxJS is completely compatible with them. Actually, you can see Promises as streams with a single element, and that is exactly how RxJS handles them. So RxJS helpers that handle observables can also be called transparently with promises.
In our previous example, I used readSync
that does a blocking operation, but we can totally make it async using the read
function. However, map
is not fit for this, as using it with a function that returns a promise will create a stream of promises (Observable<Promise<VFile>>
), while we want to continue working with virtual files directly. For this, there is the mergeMap
operator that takes functions returning streams and merge them into a single stream. Since promises are valid streams, we can directly use this.
import {Glob} from "https://esm.sh/glob@^10.4.2";
import {from, mergeMap} from "https://esm.sh/rxjs@^7.8.1";
import {read} from "https://esm.sh/to-vfile@^8.0.0";
import {resolve} from "jsr:@std/path@^0.225.2";
from(new Glob("**/*.ts", {}))
.pipe(mergeMap((file) => read(resolve(file))))
.subscribe((file) => console.log(file));
Not much is changed, but we are now using async code, which can provide a performance boost in some workflows.
map
and readMap
are going to be the primary operators to work with to do transformations, depending on if your transformation is async
or not.
We can go one step further and make our code a little bit more reusable. The combination of Glob
and readSync
can be abstracted away in a simple function. Observables are fully composable, and .pipe
can be called several times if needed, yielding a new observable. So let us make a function to create our source observable, mimicking the origin src
from Gulp:
const src = (globs: string | string[]) =>
from(new Glob(globs, {})).pipe(
mergeMap((file) => read(resolve(file))),
);
Now that we have a src
function, we can use it to simplify our pipeline:
src("**/*.ts").subscribe((file) => console.log(file));
Outputting files
We now need to have the counterpart to src
, dest
, in order to write our files. This time it requires a bit more code, but nothing too difficult. We simply need to handle file writing at the right location.
const dest = (prefix: string) => async (file: VFile) => {
We declare a simple dest
function that takes a prefix, which will be where all files will be output. It returns a new function that has the right prototype to be fed into our pipeline through mergeAll
.
const relativePath = relative(file.cwd, file.path);
const finalPath = resolve(prefix, relative);
We then resolve the final path where to write the file. We have an absolute path that we transform into a relative path, then again into an absolute path injecting our prefix in to obtain the final filename. Note however that VFile
can also have a relative path, but for this simple example this is enough.
await Deno.mkdir(dirname(finalPath), { recursive: true });
await Deno.writeFile(finalPath, file.value as UInt8Array);
The next part of the function simply ensures the parent directory is created, then uses writeFile
to output the file data. Once again, I take a small shortcut for demonstration by using as UInt8Array
, as file.value
can also be a string
but I am ignoring this case for the moment.
console.log("Written", finalPath);
return file;
}
The end of the function simply logs that we wrote the file, then returns it in case we want to do more stuff down the pipeline.
We also need to update our src
function a bit: by default, VFile
will set its cwd
to /
, while we want to have it set to the actual current working directory. This can be done simply with a tap
operation:
const src = (globs: string | string[]) =>
from(new Glob(globs, {})).pipe(
mergeMap((file) => read(resolve(file))),
tap((file) => file.cwd = Deno.cwd()),
);
You can then use mergeMap
to inject this function into our pipeline. Here is the complete file:
import {VFile} from "https://esm.sh/vfile@^6.0.1";
import {Glob} from "https://esm.sh/glob@^10.4.2";
import {read} from "https://esm.sh/to-vfile@^8.0.0";
import {from, mergeMap, tap} from "https://esm.sh/rxjs@^7.8.1";
import {dirname, relative, resolve} from "jsr:@std/path@^0.225.2";
const src = (globs: string | string[]) =>
from(new Glob(globs, {})).pipe(
mergeMap((file) => read(resolve(file))),
tap((file) => file.cwd = Deno.cwd()),
);
const dest = (prefix: string) => async (file: VFile) => {
const relativePath = relative(file.cwd, file.path);
const finalPath = resolve(prefix, relativePath);
await Deno.mkdir(dirname(finalPath), { recursive: true });
await Deno.writeFile(finalPath, file.value as Uint8Array);
console.log("Written", finalPath);
return file;
};
src("**/*.ts")
.pipe(mergeMap(dest("../dist")))
.subscribe({});
Note that I use .subscribe({})
to make the pipeline process as it is lazy by default.
Make it a promise
If we want to have tasks that we can chain, we need to know when the task is finished. By default, subscribing to a stream just has it running in the background without way of tracking it. So we are going to make it into a promise.
RxJS provides a .toPromise()
method on the pipeline, but this function is deprecated in favour of using the lastValueFrom
. This gives you a promise that resolves to the last VFile
processed.
I am going to use a slightly longer way but a bit cleaner for me, that returns a proper Promise<void>
. We are going to pair the Promise
constructor with the subscribe
method.
const runPipeline = (observable: Observable<VFile>): Promise<void> =>
new Promise((complete, error) => observable.subscribe({ complete, error }));
runPipeline(
src("**/*.ts").pipe(mergeMap(dest("../dist"))),
).then(() => console.log("Done."));
The runPipeline
function we define is rather simple: It creates a promise, and binds the resolve
and reject
(here named complete
and error
to correspond to RxJS naming) functions passed by the promise constructor into a subscription to the stream, then returning the new promise.
Using runPipeline
you can turn any RxJS pipeline into an await-able task.
Inject files
For now, we have 1:1 transformations, but sometimes, we want to inject new files during the pipeline, for example a file name mapping manifest or assets included by a file.
We already have the tools for this here. For now, we used mergeMap
to return a Promise<VFile>
, but it handles returning an Observable<VFile>
just as easily.
As a first example, we are going to emit a copy for every file we are passed:
const copyFile = (file: VFile) =>
new Observable<VFile>((subscriber) => {
subscriber.next(file);
const copy = new VFile(file);
copy.stem = copy.stem + "-copy";
subscriber.next(copy);
subscriber.complete();
});
This function creates a new observable to which it passes the given file unmodified, then creates a copy, changes its path, and passes it to the subscriber. It ends by completing the observable.
You can insert mergeMap(copyFile)
in the pipe
before dest
to create these clones.
For a slightly more complicated example, we are going to emit a list of all processed files in a manifest file. For this, we need to write a custom operator for RxJS, as just using mergeMap
will not allow us to do this.
const manifest = (path: string) => (observable: Observable<VFile>) =>
new Observable<VFile>((subscriber) => {
const files = new Set<string>();
observable.subscribe({
next(file) {
files.add(file.path);
subscriber.next(file);
},
complete() {
subscriber.next(
new VFile({
path,
value: JSON.stringify(Array.from(files)),
}),
);
subscriber.complete();
},
error(err) {
subscriber.error(err);
},
});
});
RxJS operators like mergeAll
are functions that take an Observable
and return another Observable
, which is what the manifest
function written here returns. We simply create a new observable, and use subscribe
on the source observable to insert our functions. In next
, which is called for each item, we just add the file path to a local Set
. Then, complete
is called on completion (after the last element), in which we inject a new VFile
into the subscriber
before completing it.
You do not need mergeMap
to use this operator:
src("**/*.ts")
.pipe(
manifest("manifest.json"),
mergeMap(dest("../dist")),
);
If you run it with the previous code, it will not work, as we hit both noted limitations of our previous dest
function. Here is an updated version:
const dest = (prefix: string) => async (file: VFile) => {
const relativePath = isAbsolute(file.path) ? relative(file.cwd, file.path) : file.path;
const finalPath = resolve(prefix, relativePath);
await Deno.mkdir(dirname(finalPath), { recursive: true });
if (typeof file.value === "string") {
const encoder = new TextEncoder();
await Deno.writeFile(finalPath, encoder.encode(file.value));
} else {
await Deno.writeFile(finalPath, file.value);
}
console.log("Written", finalPath);
return file;
};
It has been modified to support relative paths, as well as VFile
objects that are backed by a string
instead of a UInt8Array
, using TextEncoder
to convert the string.
And that’s it! This reduced set of tool should be enough to go through files and apply all sorts of transformations. Privilege existing RxJS operators if you can, but if you need, custom operators are available for the most complex use cases.
Closing notes
Replacing Gulp’s streams with RxJS ones was not straightforward, and it is expected, as Gulp is specialised on the exact task of file transformation, and RxJS is a more generic stream tooling. However, it is still rather easily to do. RxJS is incredibly powerful and this article barely touches what it can actually do with streams, especially since we work on a very short execution time, while RxJS is also made for long-running applications, to handle, transform and re-dispatch events.