Skip to content

RPC Pattern (Request/Reply) ​

In-Memory First ​

Nexus Orchestrator is an in-memory event bus for architectural decoupling.

The RPC pattern enables Request/Reply communication between components in the same runtime without tight coupling. This is not a networking or HTTP toolβ€”it solves in-process communication.

The Music Player Analogy 🎡 ​

Imagine a music player application with two components:

  • Music Player UI Component β€” Displays track info, controls
  • Audio Engine Service β€” Loads files, manages playback, applies effects

The Problem:

typescript
// ❌ Tight Coupling: UI directly imports and calls the engine
import { audioEngine } from './AudioEngine';

function onPlayClick(trackId: string) {
  const track = audioEngine.loadTrack(trackId); // Direct dependency!
  displayTrack(track);
}

Why this fails:

  • The UI knows too much about the Audio Engine
  • Hard to test the UI in isolation
  • Hard to swap Audio Engine implementations
  • Circular dependencies are easy
  • No async handling out of the box

The Nexus Solution:

typescript
// βœ… Architectural Decoupling: UI requests via the bus
async function onPlayClick(trackId: string) {
  const track = await bus.request('audio:loadTrack', { id: trackId });
  displayTrack(track);
}

// Somewhere else in the codebase (AudioEngineService.ts)
bus.on('audio:loadTrack', async (payload) => {
  const track = await audioEngine.load(payload.id);
  bus.reply(payload, track);
});

Key points:

  • βœ… Same application, same runtime, same memory space
  • βœ… No HTTP, no network, no serialization
  • βœ… UI doesn't know who handles the request
  • βœ… UI doesn't know where the handler lives
  • βœ… Location agnostic β€” handler could be in another module, file, or even process (with Teleportation)

The Problem: Traditional Event Buses Are Weak ​

Traditional event emitters are fire-and-forget. They don't support request/reply natively.

typescript
// ❌ Traditional EventEmitter (no return values)
emitter.emit('user:fetch', { id: 123 });
// ... now what? No way to get the result!

To fake request/reply, you'd have to:

  1. Generate unique correlation IDs manually
  2. Create temporary listeners for replies
  3. Handle timeouts yourself
  4. Clean up listeners afterward
  5. Propagate errors manually

This leads to callback hell:

typescript
// ❌ Manual correlation (18 lines of boilerplate!)
function fetchUser(id: number): Promise<User> {
  return new Promise((resolve, reject) => {
    const requestId = `req_${Date.now()}_${Math.random()}`;
    let timeout: any;
    
    const cleanup = bus.once(`response:${requestId}`, (data) => {
      clearTimeout(timeout);
      resolve(data);
    });
    
    timeout = setTimeout(() => {
      cleanup();
      reject(new Error('Request timeout'));
    }, 5000);
    
    bus.emit('user:fetch', { id, requestId });
  });
}

Nexus eliminates all this boilerplate:

typescript
// βœ… Clean, simple, built-in (1 line!)
const user = await bus.request('user:fetch', { id: 123 });

The Solution: Built-In RPC Pattern ​

Nexus provides request() and reply() as first-class primitives for in-memory communication.

Core API ​

typescript
// Requester (any component)
const result = await bus.request(
  'event:name',        // Event name
  { data: 'payload' }, // Payload
  5000                 // Timeout (optional, default: 30s)
);

// Handler (any service/module)
bus.on('event:name', async (payload) => {
  const result = await doWork(payload);
  bus.reply(payload, result); // πŸ‘ˆ Automatic reply routing
});

What Nexus handles automatically:

  • βœ… Correlation ID generation
  • βœ… Temporary listener creation
  • βœ… Timeout management
  • βœ… Listener cleanup
  • βœ… Error propagation
  • βœ… Type safety (TypeScript)

How It Works (In-Memory) ​

Critical insight: The _replyTo field is hidden from your code. You never see it. Nexus manages it internally.

Example 1: Music Player (In-Memory) ​

Scenario: A music player UI needs to load a track from the audio engine.

typescript
// ========================================
// πŸ“„ File: MusicPlayerUI.tsx
// ========================================
import { bus } from './eventBus';

async function onPlayButtonClick(trackId: string) {
  try {
    // Request track from audio engine
    const track = await bus.request('audio:loadTrack', 
      { id: trackId }, 
      3000 // 3 second timeout
    );
    
    // Update UI
    setCurrentTrack(track);
    setPlaying(true);
    
  } catch (error) {
    console.error('Failed to load track:', error);
    showErrorNotification('Track unavailable');
  }
}

// ========================================
// πŸ“„ File: AudioEngineService.ts
// ========================================
import { bus } from './eventBus';
import { audioEngine } from './nativeAudioEngine';

// Handler runs in the same process
bus.on('audio:loadTrack', async (payload) => {
  console.log('Loading track:', payload.id);
  
  // Load from disk (heavy I/O operation)
  const trackData = await audioEngine.load(payload.id);
  
  // Apply effects
  const processedTrack = audioEngine.applyEqualizer(trackData);
  
  // Reply to requester
  bus.reply(payload, {
    id: payload.id,
    duration: processedTrack.duration,
    waveform: processedTrack.waveform,
    metadata: processedTrack.metadata
  });
});

What's happening here:

  • βœ… In-memory communication β€” Same JavaScript runtime, same process
  • βœ… No network β€” No HTTP, no fetch, no serialization
  • βœ… Architectural decoupling β€” UI doesn't import AudioEngine
  • βœ… Location agnostic β€” Handler could move to another file without breaking UI
  • βœ… Same API β€” If AudioEngine moves to another process later, UI code doesn't change

Example 2: Dashboard Analytics Widget (In-Memory) ​

Scenario: A dashboard widget needs analytics data from a computation service. The computation is expensive, so we run it in a background priority lane to keep the UI responsive.

typescript
// ========================================
// πŸ“„ File: DashboardWidget.tsx
// ========================================
async function loadAnalytics() {
  setLoading(true);
  
  try {
    // Request analytics computation
    const stats = await bus.request('analytics:compute', {
      dateRange: 'last-30-days',
      metrics: ['revenue', 'users', 'conversions']
    });
    
    setData(stats);
    renderCharts(stats);
    
  } catch (error) {
    console.error('Analytics failed:', error);
    showFallbackData();
  } finally {
    setLoading(false);
  }
}

// ========================================
// πŸ“„ File: AnalyticsService.ts
// ========================================
bus.on('analytics:compute', async (payload) => {
  // Heavy computation β€” run in background priority
  const result = await heavyComputation(payload);
  
  bus.reply(payload, result);
}, { 
  priority: 'background' // πŸ‘ˆ Non-blocking execution
});

async function heavyComputation(params) {
  // Expensive calculation
  const rawData = await database.query(params.dateRange);
  const processed = processMetrics(rawData, params.metrics);
  
  return {
    revenue: processed.revenue,
    users: processed.users,
    conversions: processed.conversions,
    computedAt: Date.now()
  };
}

What's happening here:

  • βœ… In-memory first β€” Same application, no network
  • βœ… Priority lanes β€” Background execution keeps UI smooth
  • βœ… Decoupling β€” Widget doesn't know how analytics are computed
  • βœ… Testability β€” Easy to mock analytics:compute in tests

Scenario: A form component validates user input by requesting validation from a dedicated service module.

typescript
// ========================================
// πŸ“„ File: SignupForm.tsx
// ========================================
async function onSubmit(formData) {
  setErrors({});
  
  try {
    // Request validation from service
    const validation = await bus.request('validation:signup', formData, 2000);
    
    if (validation.valid) {
      // Proceed with signup
      submitSignup(formData);
    } else {
      // Show errors
      setErrors(validation.errors);
    }
    
  } catch (error) {
    setErrors({ _global: 'Validation service unavailable' });
  }
}

// ========================================
// πŸ“„ File: ValidationService.ts
// ========================================
bus.on('validation:signup', async (payload) => {
  const errors = {};
  
  // Email validation
  if (!payload.email.includes('@')) {
    errors.email = 'Invalid email format';
  }
  
  // Check if email already exists (in-memory database query)
  const existingUser = await userRepository.findByEmail(payload.email);
  if (existingUser) {
    errors.email = 'Email already registered';
  }
  
  // Password strength
  if (payload.password.length < 8) {
    errors.password = 'Password must be at least 8 characters';
  }
  
  // Reply with validation result
  bus.reply(payload, {
    valid: Object.keys(errors).length === 0,
    errors
  });
});

What's happening here:

  • βœ… Architectural decoupling β€” Form doesn't know validation rules
  • βœ… In-memory β€” Validation runs in the same process
  • βœ… Testability β€” Easy to test form and validation independently
  • βœ… Reusability β€” Validation service can be used by multiple forms

Edge Cases (Still In-Memory) ​

Timeout Handling ​

What happens when a handler never replies?

typescript
// Handler that forgot to reply
bus.on('slow:operation', async (payload) => {
  await doSomeWork(payload);
  // ❌ Oops! Forgot to call bus.reply()
});

// Requester with timeout protection
try {
  const result = await bus.request('slow:operation', { data: '...' }, 3000);
  console.log(result);
} catch (error) {
  // βœ… After 3 seconds, Promise rejects
  console.error('Timeout:', error.message);
  // Error: [Nexus] Request timeout: slow:operation
  
  showFallbackUI();
}

Best practice: Always set reasonable timeouts based on expected latency.

Multiple Handlers ​

What if multiple modules handle the same request event?

typescript
// Handler 1
bus.on('data:fetch', (payload) => {
  bus.reply(payload, { source: 'cache', data: cachedData });
});

// Handler 2 (also listening!)
bus.on('data:fetch', (payload) => {
  bus.reply(payload, { source: 'database', data: dbData });
});

// Requester
const result = await bus.request('data:fetch', { id: 123 });
// Result: { source: 'cache', data: cachedData } β€” First reply wins!

How it works: request() uses .once() internally, so it only accepts the first reply and then unsubscribes.

Best practice: Ensure only one handler replies to a given request event, or use this behavior intentionally for redundancy.

Nested Requests ​

Can you make requests inside request handlers? Yes!

typescript
// Handler makes nested requests (still in-memory)
bus.on('user:profile', async (payload) => {
  // Request 1: Fetch user
  const user = await bus.request('db:getUser', { id: payload.userId });
  
  // Request 2: Fetch user's posts
  const posts = await bus.request('db:getPosts', { userId: payload.userId });
  
  // Request 3: Fetch user's followers
  const followers = await bus.request('db:getFollowers', { userId: payload.userId });
  
  // Combine and reply
  bus.reply(payload, {
    ...user,
    posts,
    followers
  });
});

// Requester
const profile = await bus.request('user:profile', { userId: 123 });
// Result: Complete user profile with nested data

Note: All communication is still in-memory. No network involved.

Retry Logic with Circuit Breaker ​

Combine RPC with resilience features:

typescript
// Handler with automatic retries (in-memory operations)
bus.on('data:process', async (payload) => {
  // This might fail occasionally (e.g., database lock)
  const result = await heavyDatabaseOperation(payload);
  bus.reply(payload, result);
}, {
  attempts: 3,        // Retry 3 times on failure
  backoff: 1000,      // Wait 1 second between retries
  fallback: (error) => {
    // All retries exhausted
    console.error('Operation failed after 3 attempts:', error);
  }
});

// Requester doesn't need to know about retries
const data = await bus.request('data:process', { input: '...' });

Testing RPC Handlers (In-Memory) ​

typescript
import { describe, it, expect } from 'vitest';
import { Nexus } from '@caeligo/nexus-orchestrator';

describe('Music Player RPC', () => {
  it('should load track successfully', async () => {
    const bus = new Nexus();
    
    // Setup handler (mock audio engine)
    bus.on('audio:loadTrack', (payload) => {
      bus.reply(payload, {
        id: payload.id,
        title: 'Test Track',
        duration: 180
      });
    });
    
    // Test requester
    const track = await bus.request('audio:loadTrack', { id: 'track-42' });
    
    expect(track.title).toBe('Test Track');
    expect(track.duration).toBe(180);
  });
  
  it('should timeout if handler does not reply', async () => {
    const bus = new Nexus();
    // No handler registered
    
    await expect(
      bus.request('missing:handler', {}, 100)
    ).rejects.toThrow('Request timeout');
  });
  
  it('should handle nested requests', async () => {
    const bus = new Nexus();
    
    // Mock database handlers
    bus.on('db:getUser', (p) => bus.reply(p, { name: 'Alice' }));
    bus.on('db:getPosts', (p) => bus.reply(p, [{ id: 1 }]));
    
    // Composite handler
    bus.on('user:profile', async (payload) => {
      const user = await bus.request('db:getUser', { id: payload.userId });
      const posts = await bus.request('db:getPosts', { userId: payload.userId });
      bus.reply(payload, { ...user, posts });
    });
    
    // Test
    const profile = await bus.request('user:profile', { userId: 123 });
    expect(profile.name).toBe('Alice');
    expect(profile.posts).toHaveLength(1);
  });
});

Performance Characteristics (In-Memory) ​

Overhead per request:

  • 1 temporary listener (automatically cleaned up)
  • 1 timeout timer (cleared on reply or timeout)
  • Negligible memory footprint

Throughput:

  • 10,000+ requests/second on modern hardware (2023 MacBook Pro)
  • No blocking β€” async execution
  • Scales with CPU, not network

Memory:

  • Temporary listeners are garbage collected immediately after reply
  • No memory leaks from abandoned requests

Teleportation: Optional Network Extension ​

Everything so far has been in-memory. Nexus Orchestrator is not a networking tool by default.

However, if you need to extend the same mental model across process or network boundaries, use Teleportation.

Same API, Different Transport ​

With Teleportation, the requester and handler code does NOT change. Only the transport changes.

typescript
// ========================================
// πŸ“„ File: electronMain.ts (Main Process)
// ========================================
import { Nexus } from '@caeligo/nexus-orchestrator';
import { ipcMain } from 'electron';

const bus = new Nexus({
  // Bridge events to renderer process via IPC
  teleport: (event, payload) => {
    rendererWindow.webContents.send('nexus-event', { event, payload });
  }
});

// Handler in main process
bus.on('fs:readFile', async (payload) => {
  const content = await fs.readFile(payload.path, 'utf8');
  bus.reply(payload, content);
});

// Receive events from renderer
ipcMain.on('nexus-event', (_, { event, payload }) => {
  bus.feed(event, payload);
});

// ========================================
// πŸ“„ File: electronRenderer.ts (Renderer Process)
// ========================================
import { Nexus } from '@caeligo/nexus-orchestrator';
import { ipcRenderer } from 'electron';

const bus = new Nexus({
  // Bridge events to main process via IPC
  teleport: (event, payload) => {
    ipcRenderer.send('nexus-event', { event, payload });
  }
});

// Receive events from main
ipcRenderer.on('nexus-event', (_, { event, payload }) => {
  bus.feed(event, payload);
});

// Requester in renderer process
async function loadFile(path: string) {
  // THIS CODE DOESN'T CHANGE even though handler is in another process!
  const content = await bus.request('fs:readFile', { path });
  displayContent(content);
}

Key insight:

  • βœ… The requester code in renderer process is identical to in-memory version
  • βœ… The handler code in main process is identical to in-memory version
  • βœ… Only the transport layer (IPC) changed
  • βœ… Network is an implementation detail

WebSocket Example ​

typescript
// ========================================
// Server
// ========================================
const ws = new WebSocketServer({ port: 3000 });

const serverBus = new Nexus({
  teleport: (event, payload) => {
    ws.clients.forEach(client => {
      client.send(JSON.stringify({ event, payload }));
    });
  }
});

ws.on('connection', (socket) => {
  socket.on('message', (data) => {
    const { event, payload } = JSON.parse(data);
    serverBus.feed(event, payload);
  });
});

// Handler on server
serverBus.on('db:query', async (payload) => {
  const result = await database.query(payload.sql);
  serverBus.reply(payload, result);
});

// ========================================
// Client (Browser)
// ========================================
const socket = new WebSocket('ws://localhost:3000');

const clientBus = new Nexus({
  teleport: (event, payload) => {
    socket.send(JSON.stringify({ event, payload }));
  }
});

socket.onmessage = (msg) => {
  const { event, payload } = JSON.parse(msg.data);
  clientBus.feed(event, payload);
};

// Requester code doesn't change!
async function queryDatabase(sql: string) {
  const result = await clientBus.request('db:query', { sql });
  return result;
}

Critical understanding:

  • The mental model doesn't change
  • The API doesn't change
  • The requester doesn't know if the handler is local or remote
  • Location agnostic β€” true architectural decoupling

For more details on Teleportation, see Teleportation & Networking.

Comparison with Other Patterns ​

PatternUse CaseCouplingType SafetyError Handling
Direct ImportSimple appsHighExcellentTry/Catch
Traditional EventEmitterFire-and-forgetLowPoorManual
CallbacksLegacy codeMediumPoorError-first
PromisesAsync operationsMediumGoodTry/Catch
Nexus RPCArchitectural decouplingLowExcellentBuilt-in

When to Use RPC Pattern ​

βœ… Use RPC when:

  • You need a response from another module
  • You want architectural decoupling without tight imports
  • You need timeout protection
  • You want easy testability (mock handlers)
  • You might move components to different processes later

❌ Don't use RPC when:

  • Fire-and-forget notifications are sufficient (use emit())
  • You're doing truly synchronous operations (use direct calls)
  • You don't need the response

Key Takeaways ​

  1. In-Memory First β€” Nexus RPC is for in-process communication by default
  2. Location Agnostic β€” Requester doesn't know where handler lives
  3. Architectural Decoupling β€” No direct imports between modules
  4. Not HTTP β€” This is not a networking or API tool
  5. Same API, Different Transport β€” Teleportation extends to network if needed
  6. Type Safe β€” Full TypeScript support with generics
  7. Built-in Timeouts β€” No manual correlation IDs or cleanup

Next Steps ​

Quick Reference ​

Request:

typescript
const result = await bus.request<ResponseType>(
  'event:name',
  payload,
  timeout // optional, default: 5000ms
);

Reply:

typescript
bus.on('event:name', (payload) => {
  bus.reply(payload, responseData);
});

TypeScript Types:

typescript
interface Events {
  'user:fetch': { id: number };
  // Reply type is inferred or specified at call site
}

const user = await bus.request<User>('user:fetch', { id: 123 });

Released under the MIT License.