Programmatic API
RecordStream isn't just a command-line tool — it's also a TypeScript library with a fluent, chainable API. Build data pipelines in code with the same power as the CLI.
Quick Start
import { RecordStream } from "recs";
const results = await RecordStream.fromJsonLines(`
{"name":"Alice","age":30,"dept":"Engineering"}
{"name":"Bob","age":25,"dept":"Marketing"}
{"name":"Charlie","age":35,"dept":"Engineering"}
`)
.grep('{{dept}} === "Engineering"')
.sort("age")
.toJsonArray();
// [{ name: "Alice", age: 30, ... }, { name: "Charlie", age: 35, ... }]Creating Streams
From JSON Lines
The most common format — one JSON object per line:
// From a string
const stream = RecordStream.fromJsonLines(jsonlString);
// From a ReadableStream (e.g., fetch response body)
const response = await fetch("https://api.example.com/data.jsonl");
const stream = RecordStream.fromJsonLines(response.body);From JSON Arrays
// From a string
const stream = RecordStream.fromJsonArray('[{"a":1},{"a":2}]');
// From an array of objects
const stream = RecordStream.fromJsonArray([
{ name: "Alice", age: 30 },
{ name: "Bob", age: 25 },
]);From CSV
const stream = RecordStream.fromCsv(`name,age,dept
Alice,30,Engineering
Bob,25,Marketing`);From Records
import { Record } from "recs";
const records = [
new Record({ name: "Alice", age: 30 }),
new Record({ name: "Bob", age: 25 }),
];
const stream = RecordStream.fromRecords(records);From Async Iterables
async function* generateRecords() {
for (let i = 0; i < 1000; i++) {
yield new Record({ index: i, value: Math.random() });
}
}
const stream = RecordStream.fromAsyncIterable(generateRecords());Transform Methods
All transform methods return a new RecordStream, so you can chain them:
grep(predicate)
Filter records. Accepts a snippet string or a function:
// Snippet (uses {{keyspec}} syntax)
stream.grep('{{age}} > 21')
// Function
stream.grep(r => r.get("age") > 21)eval(snippet)
Modify records in place with a snippet:
stream.eval('{{full_name}} = {{first}} + " " + {{last}}')xform(snippetOrFn)
Transform records, optionally producing multiple outputs:
// Snippet
stream.xform('{{age}} = {{age}} + 1')
// Function returning array of records
stream.xform(r => {
const tags = r.get("tags") as string[];
return tags.map(tag => {
const clone = r.clone();
clone.set("tag", tag);
return clone;
});
})sort(...keys)
Sort by key specs. Prefix with - for descending, suffix with =n for numeric:
stream.sort("name") // ascending lexical
stream.sort("age=n") // ascending numeric
stream.sort("age=-n") // descending numeric
stream.sort("dept", "name") // multi-key sorthead(n) / tail(n)
Take or skip records:
stream.head(10) // first 10 records
stream.tail(5) // skip first 5 recordsuniq(...keys)
Deduplicate (records must be pre-sorted by the given keys):
stream.sort("email").uniq("email")collate(options)
Group and aggregate:
import { aggregatorRegistry } from "recs";
stream.collate({
keys: ["department"],
aggregators: new Map([
["count", aggregatorRegistry.make("count", [])],
["avg_salary", aggregatorRegistry.make("average", ["salary"])],
]),
})decollate(field)
Expand an array field into separate records:
// Input: { name: "Alice", tags: ["admin", "user"] }
// Output: { name: "Alice", tags: "admin" }
// { name: "Alice", tags: "user" }
stream.decollate("tags")map(fn) / reverse() / concat(other)
stream.map(r => { r.set("processed", true); return r; })
stream.reverse()
stream.concat(otherStream)Terminal Operations
These consume the stream and return a result:
const records = await stream.toArray(); // Record[]
const objects = await stream.toJsonArray(); // JsonObject[]
const jsonl = await stream.toJsonLines(); // string (JSONL)
const csv = await stream.toCsv(); // string (CSV)Async Iteration
RecordStream implements AsyncIterable, so you can use for await:
for await (const record of stream) {
console.log(record.get("name"));
}The Record Class
The Record class wraps a JSON object with convenient accessors:
import { Record } from "recs";
const r = new Record({ name: "Alice", age: 30 });
r.get("name"); // "Alice"
r.set("age", 31); // modifies in place
r.keys(); // ["name", "age"]
r.toJSON(); // { name: "Alice", age: 31 }
r.toString(); // '{"name":"Alice","age":31}'
r.clone(); // deep copy
// Key spec access for nested data
r.set("address/city", "Seattle"); // creates nested structure
r.get("address/city"); // "Seattle"Putting It All Together
import { RecordStream } from "recs";
// Analyze API response data
const response = await fetch("https://api.example.com/users");
const users = await response.json();
const report = await RecordStream.fromJsonArray(users)
.grep('{{active}} === true')
.eval('{{tenure_years}} = (Date.now() - new Date({{created_at}}).getTime()) / 31536000000')
.sort("tenure_years=-n")
.head(20)
.toJsonArray();
console.log("Top 20 longest-tenured active users:", report);