The Pipeline Operator Deep Dive
Every programming language has a way of composing function calls. Most use nesting: f(g(h(x))). Some use method chaining: x.h().g().f(). Lateralus uses pipelines: x |> h |> g |> f. This post explains why, and what it changes about the way you think.
The Problem with Nesting
Consider a common data processing task: take a list of transactions, filter out small ones, group by category, sum each group, and sort by total. In a traditional language, you'd write something like:
// Traditional — read inside-out
sort(sum_groups(group_by(filter(transactions, |t| t.amount > 20), |t| t.category)))
To understand this code, you start from the innermost call and read outward. The data flow is right-to-left, inside-out. The first operation applied (filter) is buried in the middle. This isn't how humans think about steps.
The Pipeline Solution
In Lateralus, the same operation reads like a recipe:
// Lateralus — read left-to-right
let report = transactions
|> filter(|t| t.amount > 20)
|> group_by(|t| t.category)
|> sum_groups()
|> sort(descending)
Each line is one step. The data flows top-to-bottom, left-to-right. Adding, removing, or reordering steps is trivial — just add or move a line.
How It Works
The pipeline operator |> takes the expression on its left and passes it as the first argument to the function on its right. So:
// These are equivalent:
x |> f(a, b) // pipeline form
f(x, a, b) // traditional form
// Chaining multiple:
x |> f() |> g() |> h() // pipeline
h(g(f(x))) // nested equivalent
Pipeline + Pattern Matching
Pipelines combine naturally with Lateralus's other features. Pattern matching in a pipeline lets you branch on the shape of data at any stage:
let result = input
|> parse()
|> match {
Ok(data) => data |> validate() |> transform(),
Err(e) => fallback(e),
}
|> serialize()
Pipeline + Async
Async operations flow just as naturally. No callback pyramids, no promise chains — just pipes:
let page = url
|> await fetch()
|> await read_body()
|> parse_html()
|> extract_links()
|> filter(|link| link.domain == "target.com")
Real-World Example: Log Analysis
Here's a complete example — parsing and analyzing server logs:
fn analyze_logs(path: String) -> Report {
path
|> read_file()
|> lines()
|> map(parse_log_entry)
|> filter(|e| e.status >= 400)
|> group_by(|e| e.endpoint)
|> map_entries(|endpoint, errors| {
let count = errors |> len()
let rate = count |> as_float() / total |> as_float()
EndpointReport { endpoint, count, rate }
})
|> sort_by(|r| r.count, descending)
|> Report::new()
}
Every step is one clear operation. No temporary variables. No nesting. The pipeline is the algorithm.
Design Principles
Three rules guided the pipeline design:
- Left-to-right always. Data flows the way you read.
- Each step is independent. You can add, remove, or reorder lines without restructuring.
- Composable. Pipelines work with every feature — pattern matching, async, error handling, lambdas.
Code is read far more often than it's written. The pipeline operator optimizes for the reader.
Pipeline fusion under the hood
The most important optimization in the Lateralus compiler is pipeline fusion. When consecutive pipeline stages operate on the same collection, they're merged into a single pass:
// Source code: three pipeline stages
let result = data
|> filter(|x| x.active)
|> map(|x| x.score * 2)
|> sum()
// Without fusion: 3 allocations, 3 passes
let temp1 = data.filter(is_active) // Allocates Vec
let temp2 = temp1.map(double_score) // Allocates Vec
let result = temp2.sum() // Iterates
// With fusion: 0 allocations, 1 pass
let result = 0
for x in data {
if x.active {
result += x.score * 2
}
}
The compiler performs this automatically. The programmer writes clean, composable pipeline stages; the compiler generates a tight loop. This is why Lateralus pipelines are competitive with hand-written C for throughput-critical code.
Branching and merging pipelines
Real workflows aren't always linear. Lateralus supports pipeline branching using tee and merging using merge:
// Branch: process data two ways simultaneously
let (errors, successes) = results
|> partition(|r| r.is_err())
// Process each branch independently
let error_report = errors
|> map(|e| e.unwrap_err())
|> group_by(|e| e.category)
|> to_report("errors.pdf")
let stats = successes
|> map(|s| s.unwrap())
|> aggregate(count, mean, median, std_dev)
// Merge results
let summary = merge(error_report, stats)
|> format_dashboard()
Lazy vs eager evaluation
By default, pipelines are eager — each stage fully evaluates before the next begins. For streaming data or large datasets, you can switch to lazy evaluation:
// Eager (default): loads everything into memory
let results = read_file("big.csv")
|> parse_csv()
|> filter(|row| row.amount > 1000)
|> take(10) // Still processes the entire file first
// Lazy: processes only what's needed
let results = read_file("big.csv")
|> lazy() // Switch to lazy evaluation
|> parse_csv()
|> filter(|row| row.amount > 1000)
|> take(10) // Stops after finding 10 matches
|> collect() // Materialize the results
The lazy() modifier converts the pipeline to a pull-based iterator chain. Downstream stages pull values from upstream on demand. Combined with pipeline fusion, this processes arbitrarily large datasets with constant memory.
Pipeline fusion under the hood
The most important optimization in the Lateralus compiler is pipeline fusion. When consecutive pipeline stages operate on the same collection, they're merged into a single pass:
// Source code: three pipeline stages
let result = data
|> filter(|x| x.active)
|> map(|x| x.score * 2)
|> sum()
// Without fusion: 3 allocations, 3 passes
let temp1 = data.filter(is_active) // Allocates Vec
let temp2 = temp1.map(double_score) // Allocates Vec
let result = temp2.sum() // Iterates
// With fusion: 0 allocations, 1 pass
let result = 0
for x in data {
if x.active {
result += x.score * 2
}
}
The compiler performs this automatically. The programmer writes clean, composable pipeline stages; the compiler generates a tight loop. This is why Lateralus pipelines are competitive with hand-written C for throughput-critical code.
Branching and merging pipelines
Real workflows aren't always linear. Lateralus supports pipeline branching using tee and merging using merge:
// Branch: process data two ways simultaneously
let (errors, successes) = results
|> partition(|r| r.is_err())
// Process each branch independently
let error_report = errors
|> map(|e| e.unwrap_err())
|> group_by(|e| e.category)
|> to_report("errors.pdf")
let stats = successes
|> map(|s| s.unwrap())
|> aggregate(count, mean, median, std_dev)
// Merge results
let summary = merge(error_report, stats)
|> format_dashboard()
Lazy vs eager evaluation
By default, pipelines are eager — each stage fully evaluates before the next begins. For streaming data or large datasets, you can switch to lazy evaluation:
// Eager (default): loads everything into memory
let results = read_file("big.csv")
|> parse_csv()
|> filter(|row| row.amount > 1000)
|> take(10) // Still processes the entire file first
// Lazy: processes only what's needed
let results = read_file("big.csv")
|> lazy() // Switch to lazy evaluation
|> parse_csv()
|> filter(|row| row.amount > 1000)
|> take(10) // Stops after finding 10 matches
|> collect() // Materialize the results
The lazy() modifier converts the pipeline to a pull-based iterator chain. Downstream stages pull values from upstream on demand. Combined with pipeline fusion, this processes arbitrarily large datasets with constant memory.