June 27, 2024

RxJS as a Gulp replacement

When writing the tooling to generate this blog, I picked up Gulp as a task executor/file processor. However, with the development, I had some issues, including incompatibility with Deno that I wanted to move onto. Gulp streams reminded me of another library I used in the past, RxJS. I decided to see how I could use it to replace Gulp with a bit of custom code, and it was rather easy.

It was a nice practical exercise, and made me learn more about how RxJS works. I focus myself here on the file processing aspect, but things shown here are valid for RxJS in other workflows too. On the other hand, I will be focusing more on the file processing/stream aspect here than the task runner aspect of Gulp, as RxJS is not relevant for that part.

As this was made to move into Deno, this article is all based on Deno. Deno is a new JavaScript runner that bundles a lot of nice features like built-in TypeScript support. While it should be mostly compatible with the classical NodeJS environment, there will be some adjustments to do to match with it, including using a package.json instead of using URLs in import statements, and some library mappings from @std to their node equivalent.

The basics

Gulp is a JavaScript tool that allows to write and execute tasks to transform files into others by applying a series of stream transformations. Tasks basically look like that:

export const transform = src("**/*.js").pipe(someTransformer()).dest("dist");

Gulp is leveraging object streams of NodeJS, passing instance of vinyl objects (virtual files) along different transformers. Gulp has a wide array of “plugins” available usable as transformers. In the case you need something custom, you can simply leverage Transformer.

RxJS is a library that supports a programming style called “Reactive programming”. Basically, it provides data streams and a wide array of transformer functions to work with these streams. The stream format and intermediate transforms seemed to be replicating Gulp streams well enough to be a suitable replacement.

Sourcing files

The first thing we will need to replicate is the gulp.src function. It is a function that takes a glob pattern, matching files and creating a stream of virtual files from them. For the files, it leverages vinyl to provide for virtual files. These objects contain a path, file contents, and history of paths. Transformers work on these files, altering the contents and paths at will.

To do the glob step, we can simply rely on the glob library. Its Glob class is directly pluggable into RxJS streams:

import { Glob } from "https://esm.sh/glob@^10.4.2";
import { from } from "https://esm.sh/rxjs@^7.8.1";

from(new Glob("**/*.ts", {})).subscribe((file) => console.log(file));

vinyl is perfectly usable outside of the Gulp environment, but in this I will favour the vfile library, notably because it is based on UInt8Array instead of Buffer as file contents, which play better with modern JavaScript and especially Deno.

So now that we have an observable of paths, we need to actually load those files into VFile objects. This can simply done by inserting a map operation in our stream:

import { Glob } from "https://esm.sh/glob@^10.4.2";
import { from, map } from "https://esm.sh/rxjs@^7.8.1";
import { readSync } from "https://esm.sh/to-vfile@^8.0.0";
import { resolve } from "jsr:@std/path@^0.225.2";

from(new Glob("**/*.ts", {}))
    .pipe(map((file) => readSync(resolve(file))))
    .subscribe((file) => console.log(file));

I leveraged the to-vfile library that provides helpers to bridge virtual files to the actual filesystem. The readSync method will take an absolute path (that we get through resolve) and returns a VFile.

Going async

Modern JavaScript leverages promises a lot, and RxJS is completely compatible with them. Actually, you can see Promises as streams with a single element, and that is exactly how RxJS handles them. So RxJS helpers that handle observables can also be called transparently with promises.

In our previous example, I used readSync that does a blocking operation, but we can totally make it async using the read function. However, map is not fit for this, as using it with a function that returns a promise will create a stream of promises (Observable<Promise<VFile>>), while we want to continue working with virtual files directly. For this, there is the mergeMap operator that takes functions returning streams and merge them into a single stream. Since promises are valid streams, we can directly use this.

import { Glob } from "https://esm.sh/glob@^10.4.2";
import { from, mergeMap } from "https://esm.sh/rxjs@^7.8.1";
import { read } from "https://esm.sh/to-vfile@^8.0.0";
import { resolve } from "jsr:@std/path@^0.225.2";

from(new Glob("**/*.ts", {}))
    .pipe(mergeMap((file) => read(resolve(file))))
    .subscribe((file) => console.log(file));

Not much is changed, but we are now using async code, which can provide a performance boost in some workflows.

map and readMap are going to be the primary operators to work with to do transformations, depending on if your transformation is async or not.

We can go one step further and make our code a little bit more reusable. The combination of Glob and readSync can be abstracted away in a simple function. Observables are fully composable, and .pipe can be called several times if needed, yielding a new observable. So let us make a function to create our source observable, mimicking the origin src from Gulp:

const src = (globs: string | string[]) =>
    from(new Glob(globs, {})).pipe(
        mergeMap((file) => read(resolve(file))),
    );

Now that we have a src function, we can use it to simplify our pipeline:

src("**/*.ts").subscribe((file) => console.log(file));

Outputting files

We now need to have the counterpart to src, dest, in order to write our files. This time it requires a bit more code, but nothing too difficult. We simply need to handle file writing at the right location.

const dest = (prefix: string) => async (file: VFile) => {

We declare a simple dest function that takes a prefix, which will be where all files will be output. It returns a new function that has the right prototype to be fed into our pipeline through mergeAll.

    const relativePath = relative(file.cwd, file.path);
    const finalPath = resolve(prefix, relative);

We then resolve the final path where to write the file. We have an absolute path that we transform into a relative path, then again into an absolute path injecting our prefix in to obtain the final filename. Note however that VFile can also have a relative path, but for this simple example this is enough.

    await Deno.mkdir(dirname(finalPath), { recursive: true });
    await Deno.writeFile(finalPath, file.value as UInt8Array);

The next part of the function simply ensures the parent directory is created, then uses writeFile to output the file data. Once again, I take a small shortcut for demonstration by using as UInt8Array, as file.value can also be a string but I am ignoring this case for the moment.

    console.log("Written", finalPath);
    return file;
}

The end of the function simply logs that we wrote the file, then returns it in case we want to do more stuff down the pipeline.

We also need to update our src function a bit: by default, VFile will set its cwd to /, while we want to have it set to the actual current working directory. This can be done simply with a tap operation:

const src = (globs: string | string[]) =>
    from(new Glob(globs, {})).pipe(
        mergeMap((file) => read(resolve(file))),
        tap((file) => file.cwd = Deno.cwd()),
    );

You can then use mergeMap to inject this function into our pipeline. Here is the complete file:

import { VFile } from "https://esm.sh/vfile@^6.0.1";
import { Glob } from "https://esm.sh/glob@^10.4.2";
import { read } from "https://esm.sh/to-vfile@^8.0.0";
import { from, mergeMap, tap } from "https://esm.sh/rxjs@^7.8.1";
import { dirname, relative, resolve } from "jsr:@std/path@^0.225.2";

const src = (globs: string | string[]) =>
    from(new Glob(globs, {})).pipe(
        mergeMap((file) => read(resolve(file))),
        tap((file) => file.cwd = Deno.cwd()),
    );

const dest = (prefix: string) => async (file: VFile) => {
    const relativePath = relative(file.cwd, file.path);
    const finalPath = resolve(prefix, relativePath);

    await Deno.mkdir(dirname(finalPath), { recursive: true });
    await Deno.writeFile(finalPath, file.value as Uint8Array);

    console.log("Written", finalPath);
    return file;
};

src("**/*.ts")
    .pipe(mergeMap(dest("../dist")))
    .subscribe({});

Note that I use .subscribe({}) to make the pipeline process as it is lazy by default.

Make it a promise

If we want to have tasks that we can chain, we need to know when the task is finished. By default, subscribing to a stream just has it running in the background without way of tracking it. So we are going to make it into a promise.

RxJS provides a .toPromise() method on the pipeline, but this function is deprecated in favour of using the lastValueFrom. This gives you a promise that resolves to the last VFile processed.

I am going to use a slightly longer way but a bit cleaner for me, that returns a proper Promise<void>. We are going to pair the Promise constructor with the subscribe method.

const runPipeline = (observable: Observable<VFile>): Promise<void> =>
    new Promise((complete, error) => observable.subscribe({ complete, error }));

runPipeline(
    src("**/*.ts").pipe(mergeMap(dest("../dist"))),
).then(() => console.log("Done."));

The runPipeline function we define is rather simple: It create a promise, and binds the resolve and reject (here named complete and error to correspond to RxJS naming) functions passed by the promise constructor into a subscription to the stream, then returning the new promise.

Using runPipeline you can turn any RxJS pipeline into an await-able task.

Inject files

For now, we have 1:1 transformations, but sometimes, we want to inject new files during the pipeline, for example a file name mapping manifest or assets included by a file.

We already have the tools for this here. For now, we used mergeMap to return a Promise<VFile>, but it handles returning an Observable<VFile> just as easily.

As a first example, we are going to emit a copy for every file we are passed:

const copyFile = (file: VFile) =>
    new Observable<VFile>((subscriber) => {
        subscriber.next(file);

        const copy = new VFile(file);
        copy.stem = copy.stem + "-copy";

        subscriber.next(copy);
        subscriber.complete();
    });

This function creates a new observable to which it passes the given file unmodified, then creates a copy, changes its path, and passes it to the subscriber. It ends by completing the observable.

You can insert mergeMap(copyFile) in the pipe before dest to create these clones.

For a slightly more complicated example, we are going to emit a list of all processed files in a manifest file. For this, we need to write a custom operator for RxJS, as just using mergeMap will not allow us to do this.

const manifest = (path: string) => (observable: Observable<VFile>) =>
    new Observable<VFile>((subscriber) => {
        const files = new Set<string>();

        observable.subscribe({
            next(file) {
                files.add(file.path);
                subscriber.next(file);
            },
            complete() {
                subscriber.next(
                    new VFile({
                        path,
                        value: JSON.stringify(Array.from(files)),
                    }),
                );
                subscriber.complete();
            },
            error(err) {
                subscriber.error(err);
            },
        });
    });

RxJS operators like mergeAll are functions that take an Observable and return another Observable, which is what the manifest function written here returns. We simply create a new observable, and use subscribe on the source observable to insert our functions. In next, which is called for each item, we just add the file path to a local Set. Then, complete is called on completion (after the last element), in which we inject a new VFile into the subscriber before completing it.

You do not need mergeMap to use this operator:

src("**/*.ts")
    .pipe(
        manifest("manifest.json"),
        mergeMap(dest("../dist")),
    );

If you run it with the previous code, it will not work, as we hit both noted limitations of our previous dest function. Here is an updated version:

const dest = (prefix: string) => async (file: VFile) => {
    const relativePath = isAbsolute(file.path) ? relative(file.cwd, file.path) : file.path;
    const finalPath = resolve(prefix, relativePath);

    await Deno.mkdir(dirname(finalPath), { recursive: true });
    if (typeof file.value === "string") {
        const encoder = new TextEncoder();
        await Deno.writeFile(finalPath, encoder.encode(file.value));
    } else {
        await Deno.writeFile(finalPath, file.value);
    }

    console.log("Written", finalPath);
    return file;
};

It has been modified to support relative paths, as well as VFile objects that are backed by a string instead of a UInt8Array, using TextEncoder to convert the string.

And that’s it! This reduced set of tool should be enough to go through files and apply all sorts of transformations. Privilege existing RxJS operators if you can, but if you need, custom operators are available for the most complex use cases.

Closing notes

Replacing Gulp’s streams with RxJS ones was not straightforward, and it is expected, as Gulp is specialised on the exact task of file transformation, and RxJS is a more generic stream tooling. However, it is still rather easily to do. RxJS is incredibly polyvalent and this article barely touches what it can actually do with streams, especially since we work on a very short execution time, while RxJS is also made for long-running applications, to handle, transform and re-dispatch events.