Pipes & Data Transformation
The Water Filtration Analogy
Think of a water filtration system:
- Raw water enters (potentially dirty)
- Filter removes impurities
- Mineralizer adds nutrients
- Logger records flow rate
- Clean water exits to your glass
Nexus pipes work the same way - events flow through operators before reaching listeners.
The Problem: Pre-Processing Boilerplate
Without pipes, every listener duplicates logic:
// BAD: Duplicated validation
bus.on('user:input', (data) => {
if (!data.text || data.text.length < 3) return; // Duplicate!
handleInput(data);
});
bus.on('user:input', (data) => {
if (!data.text || data.text.length < 3) return; // Duplicate!
logInput(data);
});
bus.on('user:input', (data) => {
if (!data.text || data.text.length < 3) return; // Duplicate!
sendToAnalytics(data);
});Pipes centralize pre-processing:
import { filter, map, logger } from '@caeligo/nexus-orchestrator';
bus.pipe(
'user:input',
filter(data => data.text && data.text.length >= 3), // Validate once
logger('input'), // Log once
map(data => ({ ...data, sanitized: sanitize(data.text) })), // Transform once
(data) => {
// Final handler receives clean, validated data
handleInput(data);
}
);Built-In Operators
Nexus includes four operators:
1. filter() - Conditional Execution
Blocks events that don't match predicate:
import { filter } from '@caeligo/nexus-orchestrator';
bus.pipe(
'api:response',
filter(data => data.status === 200), // Only pass success responses
(data) => {
updateUI(data.body);
}
);
// Failed responses (status !== 200) are blocked2. map() - Data Transformation
Transforms data before it reaches the handler:
import { map } from '@caeligo/nexus-orchestrator';
bus.pipe(
'price:update',
map(price => ({ ...price, formatted: `$${price.value.toFixed(2)}` })),
(data) => {
console.log(data.formatted); // "$19.99"
}
);3. debounce() - Rate Limiting
Delays execution and cancels previous calls:
import { debounce } from '@caeligo/nexus-orchestrator';
bus.pipe(
'search:input',
debounce(500), // Wait 500ms after last input
(query) => {
// Only called once after user stops typing
performSearch(query);
}
);4. logger() - Debugging
Logs data passing through the pipe:
import { logger } from '@caeligo/nexus-orchestrator';
bus.pipe(
'complex:flow',
logger('step1'),
map(data => ({ ...data, processed: true })),
logger('step2'),
filter(data => data.processed),
logger('step3'),
(data) => finalHandler(data)
);
// Console output:
// [Pipe:step1] { value: 123 }
// [Pipe:step2] { value: 123, processed: true }
// [Pipe:step3] { value: 123, processed: true }Basic Example: Form Validation
import { Nexus, filter, map, debounce } from '@caeligo/nexus-orchestrator';
const bus = new Nexus();
bus.pipe(
'form:email',
debounce(300), // Wait for user to stop typing
filter(data => data.email.length > 0), // Ignore empty
map(data => ({
...data,
valid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.email)
})), // Add validation result
(data) => {
// Only valid, non-empty emails reach here
const indicator = document.getElementById('email-indicator');
indicator.className = data.valid ? 'valid' : 'invalid';
indicator.innerText = data.valid ? '✓ Valid email' : '✗ Invalid email';
}
);
// User types in email field
document.getElementById('email')?.addEventListener('input', (e) => {
bus.emit('form:email', { email: e.target.value });
});Real-World Example: Search with Autocomplete
import { Nexus, filter, debounce, map, logger } from '@caeligo/nexus-orchestrator';
const bus = new Nexus({ debug: true });
bus.pipe(
'search:query',
debounce(300), // Don't spam API
filter(data => data.query.trim().length >= 3), // Minimum 3 characters
map(data => ({ ...data, query: data.query.trim().toLowerCase() })), // Normalize
logger('search'), // Debug logging
async (data) => {
// Fetch autocomplete suggestions
const response = await fetch(`/api/autocomplete?q=${data.query}`);
const suggestions = await response.json();
bus.emit('search:suggestions', {
query: data.query,
suggestions
});
}
);
// Display suggestions
bus.on('search:suggestions', (data) => {
const list = document.getElementById('suggestions');
list.innerHTML = data.suggestions
.map(s => `<li>${s}</li>`)
.join('');
});
// Input handler
document.getElementById('search')?.addEventListener('input', (e) => {
bus.emit('search:query', { query: e.target.value });
});Real-World Example: Analytics Pipeline
import { Nexus, filter, map } from '@caeligo/nexus-orchestrator';
const bus = new Nexus();
// Centralized analytics pipeline
bus.pipe(
'analytics:*', // Wildcard: all analytics events
filter(data => data.user && data.timestamp), // Validate required fields
map(data => ({
...data,
sessionId: getSessionId(),
userAgent: navigator.userAgent,
url: window.location.href
})), // Enrich with context
map(data => {
// Sanitize PII (remove email, credit card, etc.)
const sanitized = { ...data };
delete sanitized.email;
delete sanitized.creditCard;
return sanitized;
}),
(data) => {
// Send to analytics service
fetch('https://analytics.example.com/track', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
});
}
);
// Usage: all analytics events automatically enriched and sanitized
bus.emit('analytics:page-view', { user: 'alice', timestamp: Date.now() });
bus.emit('analytics:click', { user: 'bob', button: 'checkout', timestamp: Date.now() });Custom Operators
Create your own operators:
import { Operator } from '@caeligo/nexus-orchestrator';
// Retry operator
function retry(attempts: number, delay: number): Operator {
return async (data, next) => {
let lastError;
for (let i = 0; i < attempts; i++) {
try {
await next(data);
return; // Success
} catch (err) {
lastError = err;
if (i < attempts - 1) {
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
console.error('Retry exhausted:', lastError);
};
}
// Usage
bus.pipe(
'api:call',
retry(3, 1000),
async (data) => {
const response = await fetch('/api/data');
if (!response.ok) throw new Error('API Error');
return response.json();
}
);Custom Operator: Throttle
function throttle(ms: number): Operator {
let lastCall = 0;
return (data, next) => {
const now = Date.now();
if (now - lastCall >= ms) {
lastCall = now;
next(data);
}
// Otherwise, drop event (throttled)
};
}
// Usage: max 1 event per second
bus.pipe(
'scroll:position',
throttle(1000),
(data) => {
console.log('Scroll position:', data.y);
}
);Custom Operator: Batch
function batch(size: number, timeout: number): Operator {
let buffer: any[] = [];
let timer: any = null;
return (data, next) => {
buffer.push(data);
// Flush if buffer full
if (buffer.length >= size) {
next(buffer);
buffer = [];
clearTimeout(timer);
} else if (!timer) {
// Flush after timeout
timer = setTimeout(() => {
if (buffer.length > 0) {
next(buffer);
buffer = [];
}
timer = null;
}, timeout);
}
};
}
// Usage: send analytics in batches
bus.pipe(
'analytics:event',
batch(10, 5000), // 10 events or 5 seconds
(events) => {
// Send batch to server
fetch('/api/analytics/batch', {
method: 'POST',
body: JSON.stringify({ events })
});
}
);Edge Case: Error Handling in Pipes
What if an operator throws?
bus.pipe(
'data:process',
map(data => {
if (!data.value) throw new Error('Missing value');
return data;
}),
(data) => {
console.log('This might not execute');
}
);
// Emit invalid data
bus.emit('data:process', {}); // Throws, handler never calledSolution: Wrap operators in try/catch:
function safeMap<T, R>(transform: (data: T) => R): Operator {
return (data, next) => {
try {
next(transform(data));
} catch (err) {
console.error('Map failed:', err);
// Optionally: emit error event
bus.emit('pipe:error', { error: err, data });
}
};
}Pattern: Conditional Branching
Route events to different handlers based on conditions:
function branch<T>(
predicate: (data: T) => boolean,
onTrue: Operator<T>,
onFalse: Operator<T>
): Operator<T> {
return (data, next) => {
if (predicate(data)) {
onTrue(data, next);
} else {
onFalse(data, next);
}
};
}
// Usage
bus.pipe(
'user:action',
branch(
data => data.role === 'admin',
(data, next) => {
console.log('Admin action');
next(data);
},
(data, next) => {
console.log('Regular user action');
next(data);
}
),
(data) => handleAction(data)
);Comparison with Middleware Systems
| System | Composition | Async Support | Type Safety |
|---|---|---|---|
| Express.js | Sequential | Yes | Poor |
| Redux | Sequential | No | Medium |
| Nexus Pipes | Sequential | Yes | Excellent |
Testing Pipes
import { describe, it, expect } from 'vitest';
import { Nexus, filter, map } from '@caeligo/nexus-orchestrator';
describe('Pipes', () => {
it('should filter and transform data', () => {
const bus = new Nexus();
let result: any = null;
bus.pipe(
'test:event',
filter(data => data.value > 10),
map(data => ({ ...data, doubled: data.value * 2 })),
(data) => {
result = data;
}
);
bus.emit('test:event', { value: 5 });
expect(result).toBeNull(); // Filtered out
bus.emit('test:event', { value: 15 });
expect(result).toEqual({ value: 15, doubled: 30 });
});
});Performance Considerations
Overhead: Each operator adds ~1-2ms latency (negligible).
Memory: Operators are stateless (except debounce/throttle).
Recommendation: Keep pipelines under 5 operators for optimal performance.
Quick Reference
Import operators:
import { filter, map, debounce, logger } from '@caeligo/nexus-orchestrator';Basic pipe:
bus.pipe(
'event:name',
operator1,
operator2,
finalHandler
);Custom operator:
import { Operator } from '@caeligo/nexus-orchestrator';
const myOperator: Operator<MyType> = (data, next) => {
// Process data
next(modifiedData);
};Chaining:
bus.pipe(
'event',
filter(predicate),
map(transform),
debounce(500),
logger('debug'),
handler
);