RPC Pattern (Request/Reply) β
In-Memory First β
Nexus Orchestrator is an in-memory event bus for architectural decoupling.
The RPC pattern enables Request/Reply communication between components in the same runtime without tight coupling. This is not a networking or HTTP toolβit solves in-process communication.
The Music Player Analogy π΅ β
Imagine a music player application with two components:
- Music Player UI Component β Displays track info, controls
- Audio Engine Service β Loads files, manages playback, applies effects
The Problem:
// β Tight Coupling: UI directly imports and calls the engine
import { audioEngine } from './AudioEngine';
function onPlayClick(trackId: string) {
const track = audioEngine.loadTrack(trackId); // Direct dependency!
displayTrack(track);
}Why this fails:
- The UI knows too much about the Audio Engine
- Hard to test the UI in isolation
- Hard to swap Audio Engine implementations
- Circular dependencies are easy
- No async handling out of the box
The Nexus Solution:
// β
Architectural Decoupling: UI requests via the bus
async function onPlayClick(trackId: string) {
const track = await bus.request('audio:loadTrack', { id: trackId });
displayTrack(track);
}
// Somewhere else in the codebase (AudioEngineService.ts)
bus.on('audio:loadTrack', async (payload) => {
const track = await audioEngine.load(payload.id);
bus.reply(payload, track);
});Key points:
- β Same application, same runtime, same memory space
- β No HTTP, no network, no serialization
- β UI doesn't know who handles the request
- β UI doesn't know where the handler lives
- β Location agnostic β handler could be in another module, file, or even process (with Teleportation)
The Problem: Traditional Event Buses Are Weak β
Traditional event emitters are fire-and-forget. They don't support request/reply natively.
// β Traditional EventEmitter (no return values)
emitter.emit('user:fetch', { id: 123 });
// ... now what? No way to get the result!To fake request/reply, you'd have to:
- Generate unique correlation IDs manually
- Create temporary listeners for replies
- Handle timeouts yourself
- Clean up listeners afterward
- Propagate errors manually
This leads to callback hell:
// β Manual correlation (18 lines of boilerplate!)
function fetchUser(id: number): Promise<User> {
return new Promise((resolve, reject) => {
const requestId = `req_${Date.now()}_${Math.random()}`;
let timeout: any;
const cleanup = bus.once(`response:${requestId}`, (data) => {
clearTimeout(timeout);
resolve(data);
});
timeout = setTimeout(() => {
cleanup();
reject(new Error('Request timeout'));
}, 5000);
bus.emit('user:fetch', { id, requestId });
});
}Nexus eliminates all this boilerplate:
// β
Clean, simple, built-in (1 line!)
const user = await bus.request('user:fetch', { id: 123 });The Solution: Built-In RPC Pattern β
Nexus provides request() and reply() as first-class primitives for in-memory communication.
Core API β
// Requester (any component)
const result = await bus.request(
'event:name', // Event name
{ data: 'payload' }, // Payload
5000 // Timeout (optional, default: 30s)
);
// Handler (any service/module)
bus.on('event:name', async (payload) => {
const result = await doWork(payload);
bus.reply(payload, result); // π Automatic reply routing
});What Nexus handles automatically:
- β Correlation ID generation
- β Temporary listener creation
- β Timeout management
- β Listener cleanup
- β Error propagation
- β Type safety (TypeScript)
How It Works (In-Memory) β
Critical insight: The _replyTo field is hidden from your code. You never see it. Nexus manages it internally.
Example 1: Music Player (In-Memory) β
Scenario: A music player UI needs to load a track from the audio engine.
// ========================================
// π File: MusicPlayerUI.tsx
// ========================================
import { bus } from './eventBus';
async function onPlayButtonClick(trackId: string) {
try {
// Request track from audio engine
const track = await bus.request('audio:loadTrack',
{ id: trackId },
3000 // 3 second timeout
);
// Update UI
setCurrentTrack(track);
setPlaying(true);
} catch (error) {
console.error('Failed to load track:', error);
showErrorNotification('Track unavailable');
}
}
// ========================================
// π File: AudioEngineService.ts
// ========================================
import { bus } from './eventBus';
import { audioEngine } from './nativeAudioEngine';
// Handler runs in the same process
bus.on('audio:loadTrack', async (payload) => {
console.log('Loading track:', payload.id);
// Load from disk (heavy I/O operation)
const trackData = await audioEngine.load(payload.id);
// Apply effects
const processedTrack = audioEngine.applyEqualizer(trackData);
// Reply to requester
bus.reply(payload, {
id: payload.id,
duration: processedTrack.duration,
waveform: processedTrack.waveform,
metadata: processedTrack.metadata
});
});What's happening here:
- β In-memory communication β Same JavaScript runtime, same process
- β No network β No HTTP, no fetch, no serialization
- β Architectural decoupling β UI doesn't import AudioEngine
- β Location agnostic β Handler could move to another file without breaking UI
- β Same API β If AudioEngine moves to another process later, UI code doesn't change
Example 2: Dashboard Analytics Widget (In-Memory) β
Scenario: A dashboard widget needs analytics data from a computation service. The computation is expensive, so we run it in a background priority lane to keep the UI responsive.
// ========================================
// π File: DashboardWidget.tsx
// ========================================
async function loadAnalytics() {
setLoading(true);
try {
// Request analytics computation
const stats = await bus.request('analytics:compute', {
dateRange: 'last-30-days',
metrics: ['revenue', 'users', 'conversions']
});
setData(stats);
renderCharts(stats);
} catch (error) {
console.error('Analytics failed:', error);
showFallbackData();
} finally {
setLoading(false);
}
}
// ========================================
// π File: AnalyticsService.ts
// ========================================
bus.on('analytics:compute', async (payload) => {
// Heavy computation β run in background priority
const result = await heavyComputation(payload);
bus.reply(payload, result);
}, {
priority: 'background' // π Non-blocking execution
});
async function heavyComputation(params) {
// Expensive calculation
const rawData = await database.query(params.dateRange);
const processed = processMetrics(rawData, params.metrics);
return {
revenue: processed.revenue,
users: processed.users,
conversions: processed.conversions,
computedAt: Date.now()
};
}What's happening here:
- β In-memory first β Same application, no network
- β Priority lanes β Background execution keeps UI smooth
- β Decoupling β Widget doesn't know how analytics are computed
- β
Testability β Easy to mock
analytics:computein tests
Scenario: A form component validates user input by requesting validation from a dedicated service module.
// ========================================
// π File: SignupForm.tsx
// ========================================
async function onSubmit(formData) {
setErrors({});
try {
// Request validation from service
const validation = await bus.request('validation:signup', formData, 2000);
if (validation.valid) {
// Proceed with signup
submitSignup(formData);
} else {
// Show errors
setErrors(validation.errors);
}
} catch (error) {
setErrors({ _global: 'Validation service unavailable' });
}
}
// ========================================
// π File: ValidationService.ts
// ========================================
bus.on('validation:signup', async (payload) => {
const errors = {};
// Email validation
if (!payload.email.includes('@')) {
errors.email = 'Invalid email format';
}
// Check if email already exists (in-memory database query)
const existingUser = await userRepository.findByEmail(payload.email);
if (existingUser) {
errors.email = 'Email already registered';
}
// Password strength
if (payload.password.length < 8) {
errors.password = 'Password must be at least 8 characters';
}
// Reply with validation result
bus.reply(payload, {
valid: Object.keys(errors).length === 0,
errors
});
});What's happening here:
- β Architectural decoupling β Form doesn't know validation rules
- β In-memory β Validation runs in the same process
- β Testability β Easy to test form and validation independently
- β Reusability β Validation service can be used by multiple forms
Edge Cases (Still In-Memory) β
Timeout Handling β
What happens when a handler never replies?
// Handler that forgot to reply
bus.on('slow:operation', async (payload) => {
await doSomeWork(payload);
// β Oops! Forgot to call bus.reply()
});
// Requester with timeout protection
try {
const result = await bus.request('slow:operation', { data: '...' }, 3000);
console.log(result);
} catch (error) {
// β
After 3 seconds, Promise rejects
console.error('Timeout:', error.message);
// Error: [Nexus] Request timeout: slow:operation
showFallbackUI();
}Best practice: Always set reasonable timeouts based on expected latency.
Multiple Handlers β
What if multiple modules handle the same request event?
// Handler 1
bus.on('data:fetch', (payload) => {
bus.reply(payload, { source: 'cache', data: cachedData });
});
// Handler 2 (also listening!)
bus.on('data:fetch', (payload) => {
bus.reply(payload, { source: 'database', data: dbData });
});
// Requester
const result = await bus.request('data:fetch', { id: 123 });
// Result: { source: 'cache', data: cachedData } β First reply wins!How it works: request() uses .once() internally, so it only accepts the first reply and then unsubscribes.
Best practice: Ensure only one handler replies to a given request event, or use this behavior intentionally for redundancy.
Nested Requests β
Can you make requests inside request handlers? Yes!
// Handler makes nested requests (still in-memory)
bus.on('user:profile', async (payload) => {
// Request 1: Fetch user
const user = await bus.request('db:getUser', { id: payload.userId });
// Request 2: Fetch user's posts
const posts = await bus.request('db:getPosts', { userId: payload.userId });
// Request 3: Fetch user's followers
const followers = await bus.request('db:getFollowers', { userId: payload.userId });
// Combine and reply
bus.reply(payload, {
...user,
posts,
followers
});
});
// Requester
const profile = await bus.request('user:profile', { userId: 123 });
// Result: Complete user profile with nested dataNote: All communication is still in-memory. No network involved.
Retry Logic with Circuit Breaker β
Combine RPC with resilience features:
// Handler with automatic retries (in-memory operations)
bus.on('data:process', async (payload) => {
// This might fail occasionally (e.g., database lock)
const result = await heavyDatabaseOperation(payload);
bus.reply(payload, result);
}, {
attempts: 3, // Retry 3 times on failure
backoff: 1000, // Wait 1 second between retries
fallback: (error) => {
// All retries exhausted
console.error('Operation failed after 3 attempts:', error);
}
});
// Requester doesn't need to know about retries
const data = await bus.request('data:process', { input: '...' });Testing RPC Handlers (In-Memory) β
import { describe, it, expect } from 'vitest';
import { Nexus } from '@caeligo/nexus-orchestrator';
describe('Music Player RPC', () => {
it('should load track successfully', async () => {
const bus = new Nexus();
// Setup handler (mock audio engine)
bus.on('audio:loadTrack', (payload) => {
bus.reply(payload, {
id: payload.id,
title: 'Test Track',
duration: 180
});
});
// Test requester
const track = await bus.request('audio:loadTrack', { id: 'track-42' });
expect(track.title).toBe('Test Track');
expect(track.duration).toBe(180);
});
it('should timeout if handler does not reply', async () => {
const bus = new Nexus();
// No handler registered
await expect(
bus.request('missing:handler', {}, 100)
).rejects.toThrow('Request timeout');
});
it('should handle nested requests', async () => {
const bus = new Nexus();
// Mock database handlers
bus.on('db:getUser', (p) => bus.reply(p, { name: 'Alice' }));
bus.on('db:getPosts', (p) => bus.reply(p, [{ id: 1 }]));
// Composite handler
bus.on('user:profile', async (payload) => {
const user = await bus.request('db:getUser', { id: payload.userId });
const posts = await bus.request('db:getPosts', { userId: payload.userId });
bus.reply(payload, { ...user, posts });
});
// Test
const profile = await bus.request('user:profile', { userId: 123 });
expect(profile.name).toBe('Alice');
expect(profile.posts).toHaveLength(1);
});
});Performance Characteristics (In-Memory) β
Overhead per request:
- 1 temporary listener (automatically cleaned up)
- 1 timeout timer (cleared on reply or timeout)
- Negligible memory footprint
Throughput:
- 10,000+ requests/second on modern hardware (2023 MacBook Pro)
- No blocking β async execution
- Scales with CPU, not network
Memory:
- Temporary listeners are garbage collected immediately after reply
- No memory leaks from abandoned requests
Teleportation: Optional Network Extension β
Everything so far has been in-memory. Nexus Orchestrator is not a networking tool by default.
However, if you need to extend the same mental model across process or network boundaries, use Teleportation.
Same API, Different Transport β
With Teleportation, the requester and handler code does NOT change. Only the transport changes.
// ========================================
// π File: electronMain.ts (Main Process)
// ========================================
import { Nexus } from '@caeligo/nexus-orchestrator';
import { ipcMain } from 'electron';
const bus = new Nexus({
// Bridge events to renderer process via IPC
teleport: (event, payload) => {
rendererWindow.webContents.send('nexus-event', { event, payload });
}
});
// Handler in main process
bus.on('fs:readFile', async (payload) => {
const content = await fs.readFile(payload.path, 'utf8');
bus.reply(payload, content);
});
// Receive events from renderer
ipcMain.on('nexus-event', (_, { event, payload }) => {
bus.feed(event, payload);
});
// ========================================
// π File: electronRenderer.ts (Renderer Process)
// ========================================
import { Nexus } from '@caeligo/nexus-orchestrator';
import { ipcRenderer } from 'electron';
const bus = new Nexus({
// Bridge events to main process via IPC
teleport: (event, payload) => {
ipcRenderer.send('nexus-event', { event, payload });
}
});
// Receive events from main
ipcRenderer.on('nexus-event', (_, { event, payload }) => {
bus.feed(event, payload);
});
// Requester in renderer process
async function loadFile(path: string) {
// THIS CODE DOESN'T CHANGE even though handler is in another process!
const content = await bus.request('fs:readFile', { path });
displayContent(content);
}Key insight:
- β The requester code in renderer process is identical to in-memory version
- β The handler code in main process is identical to in-memory version
- β Only the transport layer (IPC) changed
- β Network is an implementation detail
WebSocket Example β
// ========================================
// Server
// ========================================
const ws = new WebSocketServer({ port: 3000 });
const serverBus = new Nexus({
teleport: (event, payload) => {
ws.clients.forEach(client => {
client.send(JSON.stringify({ event, payload }));
});
}
});
ws.on('connection', (socket) => {
socket.on('message', (data) => {
const { event, payload } = JSON.parse(data);
serverBus.feed(event, payload);
});
});
// Handler on server
serverBus.on('db:query', async (payload) => {
const result = await database.query(payload.sql);
serverBus.reply(payload, result);
});
// ========================================
// Client (Browser)
// ========================================
const socket = new WebSocket('ws://localhost:3000');
const clientBus = new Nexus({
teleport: (event, payload) => {
socket.send(JSON.stringify({ event, payload }));
}
});
socket.onmessage = (msg) => {
const { event, payload } = JSON.parse(msg.data);
clientBus.feed(event, payload);
};
// Requester code doesn't change!
async function queryDatabase(sql: string) {
const result = await clientBus.request('db:query', { sql });
return result;
}Critical understanding:
- The mental model doesn't change
- The API doesn't change
- The requester doesn't know if the handler is local or remote
- Location agnostic β true architectural decoupling
For more details on Teleportation, see Teleportation & Networking.
Comparison with Other Patterns β
| Pattern | Use Case | Coupling | Type Safety | Error Handling |
|---|---|---|---|---|
| Direct Import | Simple apps | High | Excellent | Try/Catch |
| Traditional EventEmitter | Fire-and-forget | Low | Poor | Manual |
| Callbacks | Legacy code | Medium | Poor | Error-first |
| Promises | Async operations | Medium | Good | Try/Catch |
| Nexus RPC | Architectural decoupling | Low | Excellent | Built-in |
When to Use RPC Pattern β
β Use RPC when:
- You need a response from another module
- You want architectural decoupling without tight imports
- You need timeout protection
- You want easy testability (mock handlers)
- You might move components to different processes later
β Don't use RPC when:
- Fire-and-forget notifications are sufficient (use
emit()) - You're doing truly synchronous operations (use direct calls)
- You don't need the response
Key Takeaways β
- In-Memory First β Nexus RPC is for in-process communication by default
- Location Agnostic β Requester doesn't know where handler lives
- Architectural Decoupling β No direct imports between modules
- Not HTTP β This is not a networking or API tool
- Same API, Different Transport β Teleportation extends to network if needed
- Type Safe β Full TypeScript support with generics
- Built-in Timeouts β No manual correlation IDs or cleanup
Next Steps β
- Priority Lanes β Control execution timing for RPC handlers
- Resilience β Add retries and circuit breakers to handlers
- Cursors & State β Alternative pattern for synchronous state access
- Teleportation β Extend RPC across network boundaries
Quick Reference β
Request:
const result = await bus.request<ResponseType>(
'event:name',
payload,
timeout // optional, default: 5000ms
);Reply:
bus.on('event:name', (payload) => {
bus.reply(payload, responseData);
});TypeScript Types:
interface Events {
'user:fetch': { id: number };
// Reply type is inferred or specified at call site
}
const user = await bus.request<User>('user:fetch', { id: 123 });