Skip to main content

AI Streaming Integration Guide

This guide explains how to set up and use the AI Streaming components integrated from Drupal Native AI into the llm-mcp npm project.

Overviewโ€‹

The AI Streaming system provides a robust framework for streaming AI responses with support for:

  • Stream response handling
  • State management
  • Metrics collection and thresholds
  • Error handling
  • Provider integration

Installation and Setupโ€‹

Prerequisitesโ€‹

  • Node.js 20.x or later
  • TypeScript 5.x or later
  • Access to bfapi and bfllm services
  • JWT authentication tokens

Setting Up for Developmentโ€‹

  1. Clone the repository and install dependencies:
git clone https://gitlab.bluefly.io/llm/llm-mcp.git
cd llm-mcp
npm install
  1. Configure the environment:
cp .env.example .env
# Edit .env with your configuration
  1. Build the project:
npm run build
  1. Start the development server:
npm run dev

Testing the AI Streaming Serviceโ€‹

The streaming service can be tested with the provided test suite:

npm run test:unit -- --testPathPattern=ai-streaming

Using the AI Streaming Serviceโ€‹

Basic Implementation Exampleโ€‹

import [AiStreamingServiceImpl] from '../services/ai-streaming';
import [getLogger] from '../utils/logger';
import [StateManager] from '../services/state-manager';
import [ConfigManager] from '../services/config-manager';

// Initialize dependencies
const logger = getLogger('ai-streaming');
const stateManager = new StateManager();
const configManager = new ConfigManager();

// Create streaming service
const streamingService = new AiStreamingServiceImpl(
logger,
stateManager,
configManager
);

// Use in an Express route
app.get('/stream-demo', (req, res) => {
return streamingService.createStreamingResponse((chunkHandler) => {
// Send chunks with artificial delay
chunkHandler('Hello', { tokensUsed: 1 });
setTimeout(() => chunkHandler(' world', { tokensUsed: 1 }), 500);
setTimeout(() => chunkHandler('!', { tokensUsed: 0 }), 1000);
}, {
headers: {
'X-Custom-Header': 'custom-value',
},
});
});

Creating a Custom AI Providerโ€‹

import [AbstractAiProvider] from '../providers/abstract-ai-provider';
import [ConfigManager] from '../services/config-manager';
import [Logger] from 'winston';
import [StateManager] from '../services/state-manager';
import [AiStreamingService] from '../services/ai-streaming';

export class CustomProvider extends AbstractAiProvider {
constructor(
configManager: ConfigManager,
logger: Logger,
stateManager: StateManager,
streamingService: AiStreamingService
) {
super('custom-provider', configManager, logger, stateManager, streamingService);
}

public getCapabilities(): string[] {
return [
'text_generation',
'streaming',
'chat',
];
}

// Implement abstract methods...
}

Integrating with the Vector Bridgeโ€‹

The AI Streaming service can be integrated with the Vector Bridge for enhanced functionality:

import [VectorBridge] from '../vector-bridge';
import [AiStreamingServiceImpl] from '../services/ai-streaming';

// Create vector bridge
const vectorBridge = new VectorBridge({
database: 'qdrant',
embeddings: 'openai',
config: {
// Configuration options
}
});

// Create streaming service
const streamingService = new AiStreamingServiceImpl(
logger,
stateManager,
configManager
);

// Use together in RAG workflow
app.post('/rag-query', async (req, res) => {
const { query, collection } = req.body;

// Create streaming response
return streamingService.createStreamingResponse(async (chunkHandler) => {
try {
// Search for similar vectors
const searchResults = await vectorBridge.searchSimilar(
query,
collection,
{ limit: 5 }
);

// Get context from search results
const context = searchResults.results.map(r => r.metadata.text).join('\n\n');

// Generate response with context
const response = await someAiService.generateWithContext(query, context);

// Stream the response
chunkHandler(response, {
tokensUsed: response.length / 4,
sources: searchResults.results.map(r => r.metadata.source)
});

} catch (error) {
logger.error('Error in RAG query:', error);
throw error;
}
});
});

Advanced Configurationโ€‹

Metrics Thresholdsโ€‹

Configure thresholds for token usage and cost in your application configuration:

// In your configuration setup
configManager.set('ai-streaming.metrics-thresholds', {
maxTokens: 2000000, // 2 million tokens
maxCost: 500, // $500
});

Monitoring Integrationโ€‹

The streaming metrics can be integrated with your monitoring system:

import [MetricsExporter] from '../monitoring/metrics-exporter';

// Setup metrics integration
const metricsExporter = new MetricsExporter({
endpoint: 'http://prometheus:9090',
interval: 60000,
});

// Extend the streaming service
class MonitoredStreamingService extends AiStreamingServiceImpl {
public logMetrics(metrics: Record<string, any>): void {
// Call parent implementation
super.logMetrics(metrics);

// Export to monitoring system
metricsExporter.exportMetrics({
name: 'ai_streaming_metrics',
type: 'gauge',
values: {
tokens_used: metrics.tokensUsed || 0,
latency_ms: metrics.latency || 0,
cost: metrics.cost || 0,
},
labels: {
provider: metrics.provider || 'unknown',
operation: metrics.operation || 'unknown',
},
});
}
}

Integration with Drupalโ€‹

Setting Up the Bridgeโ€‹

Configure the Drupal bridge in your MCP server:

import [createDrupalContextBridge] from '../services/drupal-context-bridge';

// Create and configure the bridge
const drupalBridge = createDrupalContextBridge({
apiUrl: 'https://example.com/api',
apiKey: 'your-api-key',
timeout: 30000,
});

// Register the bridge with the tool registry
toolRegistry.registerBridge('drupal', drupalBridge);

Using AI Streaming with Drupalโ€‹

In your Node.js application:

// Execute an AI streaming operation in Drupal context
app.post('/drupal-ai-stream', async (req, res) => {
const { prompt, context } = req.body;

// Create a streaming response that uses Drupal context
return streamingService.createStreamingResponse(async (chunkHandler) => {
try {
// Get Drupal context
const drupalContext = await drupalBridge.getContext(context.id);

// Generate content with Drupal context
const result = await aiService.generateWithContext(prompt, drupalContext);

// Stream the result
chunkHandler(result, {
tokensUsed: result.length / 4,
contextId: context.id
});
} catch (error) {
logger.error('Error in Drupal AI stream:', error);
throw error;
}
});
});

Error Handling and Resilienceโ€‹

The AI Streaming service includes built-in error handling:

try [// Your streaming code] catch (error) {
// The streaming service will automatically:
// 1. Update the state with error information
// 2. Send an error response to the client
// 3. Log the error with context
}

For additional resilience, you can implement custom error handling:

app.post('/resilient-stream', async (req, res) => [const { prompt] = req.body;

return streamingService.createStreamingResponse(async (chunkHandler) => {
try {
// Try primary service
const result = await primaryService.generate(prompt);
chunkHandler(result, { service: 'primary' });
} catch (error) {
logger.warn('Primary service failed, using fallback', { error: error.message });

try {
// Try fallback service
const fallbackResult = await fallbackService.generate(prompt);
chunkHandler(fallbackResult, { service: 'fallback' });
} catch (fallbackError) {
logger.error('Fallback service also failed', { error: fallbackError.message });
throw fallbackError;
}
}
});
});

Additional Resourcesโ€‹