Type-safe, lazy data processing for TypeScript and JavaScript. Inspired by Java Streams, C# LINQ, and Kotlin Sequences.
npm install @szilanor/stream
Stream API creates a pipeline where data flows through operations. It is lazy, meaning items are processed one by one, and only if needed.
import { stream, filter, map, toArray } from "@szilanor/stream";
const result = stream([1, 2, 3, 4, 5])
.pipe(
filter((x) => x % 2 === 0),
map((x) => x * 10),
)
.collect(toArray());
console.log(result); // [20, 40]
AsyncIterable and async transformations.Under the hood, Stream API uses JavaScript Itarator and Generators.
When you call .pipe(), you aren't iterating the data yet. You are composing a chain of generator functions. Iteration only begins when you call .collect() (or iterate the stream manually). This is what allows for efficiency gains:
find or take stop the generator chain early.sort).Standard array methods like .map().filter() create a new array for every step. Stream API does not.
import { stream, map, filter, take, toArray } from "@szilanor/stream";
// Standard JS: Iterates the entire array multiple times
const bad = hugeArray
.map(transform) // Allocates new array size of hugeArray
.filter(predicate) // Allocates another new array
.slice(0, 5);
// Stream API: Iterates once, stops early
const good = stream(hugeArray)
.pipe(
map(transform),
filter(predicate),
take(5), // Stops processing after finding 5 items
)
.collect(toArray());
Handling async data streams seamlessly.
import { stream, mapAsync, filterAsync, toArrayAsync } from "@szilanor/stream";
const activeUsers = await stream(userIds)
.pipeAsync(
mapAsync(async (id) => {
const user = await fetchUser(id);
return user;
}),
filterAsync((user) => user.isActive),
)
.collectAsync(toArrayAsync());
Extending the library is as simple as writing a generator function.
import { OperationFunction, stream, toArray } from "@szilanor/stream";
// Emit every Nth item
const everyNth = <T>(n: number): OperationFunction<T, T> => {
return function* (iterable) {
let i = 0;
for (const item of iterable) {
if (i++ % n === 0) yield item;
}
};
};
stream([1, 2, 3, 4, 5, 6]).pipe(everyNth(2)).collect(toArray()); // [1, 3, 5]
| Stream API | RxJS | Native Array Methods | |
|---|---|---|---|
| Paradigm | Pull-based (Iterators) | Push-based (Observables) | Eager |
| Data Type | Data in motion / Collections | Events / Time streams | Static Arrays |
| Async | AsyncIterable (await/for-await) |
Observables (subscribe) | Promise.all needed |
The main difference lies in who controls the flow:
[0]), but requiring all data to be loaded before processing begins.