Skip to content

Pipes & Data Transformation

The Water Filtration Analogy

Think of a water filtration system:

  1. Raw water enters (potentially dirty)
  2. Filter removes impurities
  3. Mineralizer adds nutrients
  4. Logger records flow rate
  5. Clean water exits to your glass

Nexus pipes work the same way - events flow through operators before reaching listeners.

The Problem: Pre-Processing Boilerplate

Without pipes, every listener duplicates logic:

typescript
// BAD: Duplicated validation
bus.on('user:input', (data) => {
  if (!data.text || data.text.length < 3) return; // Duplicate!
  handleInput(data);
});

bus.on('user:input', (data) => {
  if (!data.text || data.text.length < 3) return; // Duplicate!
  logInput(data);
});

bus.on('user:input', (data) => {
  if (!data.text || data.text.length < 3) return; // Duplicate!
  sendToAnalytics(data);
});

Pipes centralize pre-processing:

typescript
import { filter, map, logger } from '@caeligo/nexus-orchestrator';

bus.pipe(
  'user:input',
  filter(data => data.text && data.text.length >= 3), // Validate once
  logger('input'),                                      // Log once
  map(data => ({ ...data, sanitized: sanitize(data.text) })), // Transform once
  (data) => {
    // Final handler receives clean, validated data
    handleInput(data);
  }
);

Built-In Operators

Nexus includes four operators:

1. filter() - Conditional Execution

Blocks events that don't match predicate:

typescript
import { filter } from '@caeligo/nexus-orchestrator';

bus.pipe(
  'api:response',
  filter(data => data.status === 200),  // Only pass success responses
  (data) => {
    updateUI(data.body);
  }
);

// Failed responses (status !== 200) are blocked

2. map() - Data Transformation

Transforms data before it reaches the handler:

typescript
import { map } from '@caeligo/nexus-orchestrator';

bus.pipe(
  'price:update',
  map(price => ({ ...price, formatted: `$${price.value.toFixed(2)}` })),
  (data) => {
    console.log(data.formatted); // "$19.99"
  }
);

3. debounce() - Rate Limiting

Delays execution and cancels previous calls:

typescript
import { debounce } from '@caeligo/nexus-orchestrator';

bus.pipe(
  'search:input',
  debounce(500), // Wait 500ms after last input
  (query) => {
    // Only called once after user stops typing
    performSearch(query);
  }
);

4. logger() - Debugging

Logs data passing through the pipe:

typescript
import { logger } from '@caeligo/nexus-orchestrator';

bus.pipe(
  'complex:flow',
  logger('step1'),
  map(data => ({ ...data, processed: true })),
  logger('step2'),
  filter(data => data.processed),
  logger('step3'),
  (data) => finalHandler(data)
);

// Console output:
// [Pipe:step1] { value: 123 }
// [Pipe:step2] { value: 123, processed: true }
// [Pipe:step3] { value: 123, processed: true }

Basic Example: Form Validation

typescript
import { Nexus, filter, map, debounce } from '@caeligo/nexus-orchestrator';

const bus = new Nexus();

bus.pipe(
  'form:email',
  debounce(300),                                    // Wait for user to stop typing
  filter(data => data.email.length > 0),            // Ignore empty
  map(data => ({
    ...data,
    valid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.email)
  })),                                              // Add validation result
  (data) => {
    // Only valid, non-empty emails reach here
    const indicator = document.getElementById('email-indicator');
    indicator.className = data.valid ? 'valid' : 'invalid';
    indicator.innerText = data.valid ? '✓ Valid email' : '✗ Invalid email';
  }
);

// User types in email field
document.getElementById('email')?.addEventListener('input', (e) => {
  bus.emit('form:email', { email: e.target.value });
});

Real-World Example: Search with Autocomplete

typescript
import { Nexus, filter, debounce, map, logger } from '@caeligo/nexus-orchestrator';

const bus = new Nexus({ debug: true });

bus.pipe(
  'search:query',
  debounce(300),                                    // Don't spam API
  filter(data => data.query.trim().length >= 3),    // Minimum 3 characters
  map(data => ({ ...data, query: data.query.trim().toLowerCase() })), // Normalize
  logger('search'),                                 // Debug logging
  async (data) => {
    // Fetch autocomplete suggestions
    const response = await fetch(`/api/autocomplete?q=${data.query}`);
    const suggestions = await response.json();
    
    bus.emit('search:suggestions', { 
      query: data.query, 
      suggestions 
    });
  }
);

// Display suggestions
bus.on('search:suggestions', (data) => {
  const list = document.getElementById('suggestions');
  list.innerHTML = data.suggestions
    .map(s => `<li>${s}</li>`)
    .join('');
});

// Input handler
document.getElementById('search')?.addEventListener('input', (e) => {
  bus.emit('search:query', { query: e.target.value });
});

Real-World Example: Analytics Pipeline

typescript
import { Nexus, filter, map } from '@caeligo/nexus-orchestrator';

const bus = new Nexus();

// Centralized analytics pipeline
bus.pipe(
  'analytics:*',                                    // Wildcard: all analytics events
  filter(data => data.user && data.timestamp),      // Validate required fields
  map(data => ({
    ...data,
    sessionId: getSessionId(),
    userAgent: navigator.userAgent,
    url: window.location.href
  })),                                              // Enrich with context
  map(data => {
    // Sanitize PII (remove email, credit card, etc.)
    const sanitized = { ...data };
    delete sanitized.email;
    delete sanitized.creditCard;
    return sanitized;
  }),
  (data) => {
    // Send to analytics service
    fetch('https://analytics.example.com/track', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(data)
    });
  }
);

// Usage: all analytics events automatically enriched and sanitized
bus.emit('analytics:page-view', { user: 'alice', timestamp: Date.now() });
bus.emit('analytics:click', { user: 'bob', button: 'checkout', timestamp: Date.now() });

Custom Operators

Create your own operators:

typescript
import { Operator } from '@caeligo/nexus-orchestrator';

// Retry operator
function retry(attempts: number, delay: number): Operator {
  return async (data, next) => {
    let lastError;
    
    for (let i = 0; i < attempts; i++) {
      try {
        await next(data);
        return; // Success
      } catch (err) {
        lastError = err;
        if (i < attempts - 1) {
          await new Promise(resolve => setTimeout(resolve, delay));
        }
      }
    }
    
    console.error('Retry exhausted:', lastError);
  };
}

// Usage
bus.pipe(
  'api:call',
  retry(3, 1000),
  async (data) => {
    const response = await fetch('/api/data');
    if (!response.ok) throw new Error('API Error');
    return response.json();
  }
);

Custom Operator: Throttle

typescript
function throttle(ms: number): Operator {
  let lastCall = 0;
  
  return (data, next) => {
    const now = Date.now();
    
    if (now - lastCall >= ms) {
      lastCall = now;
      next(data);
    }
    // Otherwise, drop event (throttled)
  };
}

// Usage: max 1 event per second
bus.pipe(
  'scroll:position',
  throttle(1000),
  (data) => {
    console.log('Scroll position:', data.y);
  }
);

Custom Operator: Batch

typescript
function batch(size: number, timeout: number): Operator {
  let buffer: any[] = [];
  let timer: any = null;
  
  return (data, next) => {
    buffer.push(data);
    
    // Flush if buffer full
    if (buffer.length >= size) {
      next(buffer);
      buffer = [];
      clearTimeout(timer);
    } else if (!timer) {
      // Flush after timeout
      timer = setTimeout(() => {
        if (buffer.length > 0) {
          next(buffer);
          buffer = [];
        }
        timer = null;
      }, timeout);
    }
  };
}

// Usage: send analytics in batches
bus.pipe(
  'analytics:event',
  batch(10, 5000), // 10 events or 5 seconds
  (events) => {
    // Send batch to server
    fetch('/api/analytics/batch', {
      method: 'POST',
      body: JSON.stringify({ events })
    });
  }
);

Edge Case: Error Handling in Pipes

What if an operator throws?

typescript
bus.pipe(
  'data:process',
  map(data => {
    if (!data.value) throw new Error('Missing value');
    return data;
  }),
  (data) => {
    console.log('This might not execute');
  }
);

// Emit invalid data
bus.emit('data:process', {}); // Throws, handler never called

Solution: Wrap operators in try/catch:

typescript
function safeMap<T, R>(transform: (data: T) => R): Operator {
  return (data, next) => {
    try {
      next(transform(data));
    } catch (err) {
      console.error('Map failed:', err);
      // Optionally: emit error event
      bus.emit('pipe:error', { error: err, data });
    }
  };
}

Pattern: Conditional Branching

Route events to different handlers based on conditions:

typescript
function branch<T>(
  predicate: (data: T) => boolean,
  onTrue: Operator<T>,
  onFalse: Operator<T>
): Operator<T> {
  return (data, next) => {
    if (predicate(data)) {
      onTrue(data, next);
    } else {
      onFalse(data, next);
    }
  };
}

// Usage
bus.pipe(
  'user:action',
  branch(
    data => data.role === 'admin',
    (data, next) => {
      console.log('Admin action');
      next(data);
    },
    (data, next) => {
      console.log('Regular user action');
      next(data);
    }
  ),
  (data) => handleAction(data)
);

Comparison with Middleware Systems

SystemCompositionAsync SupportType Safety
Express.jsSequentialYesPoor
ReduxSequentialNoMedium
Nexus PipesSequentialYesExcellent

Testing Pipes

typescript
import { describe, it, expect } from 'vitest';
import { Nexus, filter, map } from '@caeligo/nexus-orchestrator';

describe('Pipes', () => {
  it('should filter and transform data', () => {
    const bus = new Nexus();
    let result: any = null;
    
    bus.pipe(
      'test:event',
      filter(data => data.value > 10),
      map(data => ({ ...data, doubled: data.value * 2 })),
      (data) => {
        result = data;
      }
    );
    
    bus.emit('test:event', { value: 5 });
    expect(result).toBeNull(); // Filtered out
    
    bus.emit('test:event', { value: 15 });
    expect(result).toEqual({ value: 15, doubled: 30 });
  });
});

Performance Considerations

Overhead: Each operator adds ~1-2ms latency (negligible).

Memory: Operators are stateless (except debounce/throttle).

Recommendation: Keep pipelines under 5 operators for optimal performance.

Quick Reference

Import operators:

typescript
import { filter, map, debounce, logger } from '@caeligo/nexus-orchestrator';

Basic pipe:

typescript
bus.pipe(
  'event:name',
  operator1,
  operator2,
  finalHandler
);

Custom operator:

typescript
import { Operator } from '@caeligo/nexus-orchestrator';

const myOperator: Operator<MyType> = (data, next) => {
  // Process data
  next(modifiedData);
};

Chaining:

typescript
bus.pipe(
  'event',
  filter(predicate),
  map(transform),
  debounce(500),
  logger('debug'),
  handler
);

Released under the MIT License.