MQTT JavaScript Example
This comprehensive example demonstrates how to implement MQTT communication with CoCoCo using the popular mqtt library for Node.js. The example covers connection management, data publishing, command handling, and error recovery.
Prerequisites
Install the MQTT.js library:
bash
npm install mqttComplete Implementation Example
javascript
// Import the MQTT library
const mqtt = require('mqtt');
// --- Configuration ---
// Replace with your actual device ID and token
const deviceId = 'your_device_id_here';
const deviceToken = 'your_device_token_here';
// Replace with the correct regional broker URL (using mqtts for TLS)
const brokerUrl = 'mqtts://mqtt.na.cococo.cloud'; // Example: North America
// Generate a unique client ID (can be static or dynamic)
const clientId = `my-device-client-${Math.random().toString(16).substring(2, 8)}`;
// --- Device State Management ---
let deviceState = {
online: false,
mode: 'unknown',
status: 'stopped',
lastHeartbeat: null,
commandQueue: []
};
// --- Connection Options ---
const connectOptions = {
clientId: clientId,
username: `d:${deviceId}`,
password: deviceToken,
keepalive: 60,
connectTimeout: 30 * 1000,
reconnectPeriod: 5 * 1000,
clean: true,
rejectUnauthorized: true, // Ensures server certificate validation
will: {
topic: `d/${deviceId}/s`,
payload: JSON.stringify({
mode: 'unknown',
status: 'stopped',
online: false
}),
retain: true,
qos: 1
}
};
// --- Connect to the Broker ---
console.log(`Connecting to ${brokerUrl} as client ${clientId}...`);
const client = mqtt.connect(brokerUrl, connectOptions);
// --- Event Handlers ---
// Handle successful connection
client.on('connect', () => {
console.log('Successfully connected to CoCoCo MQTT broker!');
// Update device state
deviceState.online = true;
deviceState.mode = 'productive';
deviceState.status = 'running';
// Subscribe to inbound messages
const inboundTopic = `d/${deviceId}/i`;
client.subscribe(inboundTopic, { qos: 1 }, (err) => {
if (!err) {
console.log(`Subscribed to ${inboundTopic}`);
} else {
console.error(`Failed to subscribe to ${inboundTopic}: ${err}`);
}
});
// Publish initial status
publishStatus();
// Process any queued commands
processCommandQueue();
// Start periodic data publishing
startPeriodicReporting();
});
// Handle connection errors
client.on('error', (err) => {
console.error('Connection error:', err);
deviceState.online = false;
});
// Handle received messages (commands)
client.on('message', (topic, message) => {
console.log(`Received message on topic ${topic}:`);
if (topic === `d/${deviceId}/i`) {
handleInboundMessage(message);
}
});
// Handle disconnection
client.on('close', () => {
console.log('Disconnected from MQTT broker');
deviceState.online = false;
});
// Handle offline status
client.on('offline', () => {
console.log('Client is offline');
deviceState.online = false;
});
// Handle reconnection attempts
client.on('reconnect', () => {
console.log('Attempting to reconnect...');
});
// --- Publishing Functions ---
function publishStatus() {
const statusPayload = {
mode: deviceState.mode,
status: deviceState.status,
online: deviceState.online
};
const statusTopic = `d/${deviceId}/s`;
client.publish(statusTopic, JSON.stringify(statusPayload),
{ retain: true, qos: 1 }, (err) => {
if (err) {
console.error(`Failed to publish status: ${err}`);
} else {
console.log(`Published status:`, statusPayload);
}
});
}
function publishError(message, severity = 3, metadata = {}) {
const errorPayload = {
message: message,
severity: severity,
metadata: {
timestamp: new Date().toISOString(),
client_id: clientId,
...metadata
}
};
const errorTopic = `d/${deviceId}/e`;
client.publish(errorTopic, JSON.stringify(errorPayload), { qos: 1 }, (err) => {
if (err) {
console.error(`Failed to publish error: ${err}`);
} else {
console.log(`Published error:`, errorPayload);
}
});
}
function publishMetric(name, value, unit, customUnit = null, metadata = {}) {
const metricPayload = {
value: value,
unit: unit,
timestamp: new Date().toISOString(),
metadata: metadata
};
if (unit === 'custom' && customUnit) {
metricPayload.custom_unit = customUnit;
}
const metricTopic = `d/${deviceId}/m/${name}`;
client.publish(metricTopic, JSON.stringify(metricPayload), (err) => {
if (err) {
console.error(`Failed to publish metric (${name}): ${err}`);
} else {
console.log(`Published metric ${name}:`, metricPayload);
}
});
}
function publishMultipleMetrics(metrics) {
const multiMetricPayload = metrics.map(metric => ({
name: metric.name,
value: metric.value,
unit: metric.unit,
custom_unit: metric.customUnit,
timestamp: metric.timestamp || new Date().toISOString(),
metadata: metric.metadata || {}
}));
const multiMetricTopic = `d/${deviceId}/m`;
client.publish(multiMetricTopic, JSON.stringify(multiMetricPayload), (err) => {
if (err) {
console.error(`Failed to publish multiple metrics: ${err}`);
} else {
console.log(`Published multiple metrics:`, multiMetricPayload);
}
});
}
function publishToggle(name, state) {
const toggleTopic = `d/${deviceId}/b/${name}`;
client.publish(toggleTopic, JSON.stringify(state), (err) => {
if (err) {
console.error(`Failed to publish toggle (${name}): ${err}`);
} else {
console.log(`Published toggle ${name}:`, state);
}
});
}
function publishGauge(name, value) {
const gaugeTopic = `d/${deviceId}/g/${name}`;
client.publish(gaugeTopic, JSON.stringify(value), (err) => {
if (err) {
console.error(`Failed to publish gauge (${name}): ${err}`);
} else {
console.log(`Published gauge ${name}:`, value);
}
});
}
function publishCounter(name, increment) {
const counterTopic = `d/${deviceId}/t/${name}`;
client.publish(counterTopic, JSON.stringify(increment), (err) => {
if (err) {
console.error(`Failed to publish counter (${name}): ${err}`);
} else {
console.log(`Published counter ${name} increment:`, increment);
}
});
}
// --- Command Handling Functions ---
function handleInboundMessage(message) {
try {
const payload = JSON.parse(message.toString());
console.log('Received command payload:', payload);
// Validate command
if (!validateCommand(payload)) {
publishCommandResponse(payload.type || 'unknown', 'rejected', {
error: 'Invalid command format'
});
return;
}
// Process command based on type
switch (payload.type) {
case 'command':
handleCommand(payload);
break;
case 'config':
handleConfiguration(payload);
break;
case 'control':
handleControl(payload);
break;
default:
console.warn('Unknown message type:', payload.type);
publishCommandResponse(payload.type, 'rejected', {
error: `Unknown command type: ${payload.type}`
});
}
} catch (error) {
console.error('Failed to parse inbound message:', error);
publishError(`Command parsing failed: ${error.message}`, 3, {
raw_message: message.toString()
});
}
}
function validateCommand(payload) {
if (!payload || typeof payload !== 'object') {
return false;
}
if (!payload.type) {
return false;
}
const allowedTypes = ['command', 'config', 'control'];
if (!allowedTypes.includes(payload.type)) {
return false;
}
return true;
}
function handleCommand(payload) {
const { action, parameters = {} } = payload;
publishCommandResponse('command', 'accepted', { action });
switch (action) {
case 'restart':
handleRestartCommand(parameters);
break;
case 'shutdown':
handleShutdownCommand(parameters);
break;
case 'diagnostics':
handleDiagnosticsCommand(parameters);
break;
default:
publishCommandResponse('command', 'rejected', {
error: `Unknown action: ${action}`
});
}
}
function handleConfiguration(payload) {
const { settings } = payload;
if (!settings || typeof settings !== 'object') {
publishCommandResponse('config', 'rejected', {
error: 'Invalid settings object'
});
return;
}
console.log('Applying configuration updates:', settings);
// Apply configuration changes
let updated = [];
Object.keys(settings).forEach(key => {
// Add your configuration handling logic here
console.log(`Updated ${key} to ${settings[key]}`);
updated.push(key);
});
publishCommandResponse('config', 'completed', { updated_fields: updated });
}
function handleControl(payload) {
const { target, state, parameters = {} } = payload;
console.log(`Control command: ${target} -> ${state}`);
switch (target) {
case 'heater':
controlHeater(state === 'on', parameters.duration);
break;
case 'pump':
controlPump(state === 'on');
break;
case 'valve':
controlValve(state === 'open');
break;
default:
publishCommandResponse('control', 'rejected', {
error: `Unknown target: ${target}`
});
return;
}
publishCommandResponse('control', 'completed', { target, state });
}
// --- Device Control Functions ---
function handleRestartCommand(parameters) {
const delay = parameters.delay || 0;
console.log(`Restart requested with ${delay}s delay`);
publishCommandResponse('command', 'executing', { action: 'restart', delay });
setTimeout(() => {
console.log('Restarting device...');
publishCommandResponse('command', 'completed', { action: 'restart' });
// Graceful shutdown
client.end();
process.exit(0);
}, delay * 1000);
}
function handleShutdownCommand(parameters) {
console.log('Shutdown requested');
publishCommandResponse('command', 'executing', { action: 'shutdown' });
// Update status before shutdown
deviceState.mode = 'maintenance';
deviceState.status = 'stopped';
publishStatus();
setTimeout(() => {
publishCommandResponse('command', 'completed', { action: 'shutdown' });
client.end();
process.exit(0);
}, 1000);
}
function handleDiagnosticsCommand(parameters) {
console.log('Running diagnostics...');
publishCommandResponse('command', 'executing', { action: 'diagnostics' });
// Simulate diagnostics
setTimeout(() => {
const diagnosticsResult = {
memory_usage: process.memoryUsage(),
uptime: process.uptime(),
platform: process.platform,
node_version: process.version,
connection_status: client.connected ? 'connected' : 'disconnected'
};
publishCommandResponse('command', 'completed', {
action: 'diagnostics',
results: diagnosticsResult
});
// Also publish as a metric for historical tracking
publishMetric('diagnostics_memory', diagnosticsResult.memory_usage.heapUsed, 'custom', 'bytes');
publishMetric('diagnostics_uptime', diagnosticsResult.uptime, 'custom', 'seconds');
}, 2000);
}
function controlHeater(turnOn, duration = null) {
console.log(`Heater ${turnOn ? 'ON' : 'OFF'}${duration ? ` for ${duration}s` : ''}`);
// Update toggle state
publishToggle('heaterOn', turnOn);
if (turnOn && duration) {
setTimeout(() => {
console.log('Heater auto-shutoff');
publishToggle('heaterOn', false);
}, duration * 1000);
}
}
function controlPump(turnOn) {
console.log(`Pump ${turnOn ? 'ON' : 'OFF'}`);
publishToggle('pumpRunning', turnOn);
}
function controlValve(open) {
console.log(`Valve ${open ? 'OPEN' : 'CLOSED'}`);
publishToggle('valveOpen', open);
}
// --- Utility Functions ---
function publishCommandResponse(commandType, status, data = {}) {
const response = {
command_type: commandType,
status: status,
timestamp: new Date().toISOString(),
device_id: deviceId,
...data
};
// Publish response as an error (for visibility) or status update
if (status === 'failed' || status === 'rejected') {
publishError(`Command ${status}: ${JSON.stringify(response)}`, 2);
} else {
console.log('Command response:', response);
}
}
function processCommandQueue() {
while (deviceState.commandQueue.length > 0 && client.connected) {
const queuedCommand = deviceState.commandQueue.shift();
console.log('Processing queued command:', queuedCommand);
handleInboundMessage(Buffer.from(JSON.stringify(queuedCommand)));
}
}
// --- Periodic Reporting ---
function startPeriodicReporting() {
// Simulate sensor readings every 30 seconds
setInterval(() => {
if (!client.connected) return;
// Simulate temperature sensor
const temperature = 20 + Math.random() * 10; // 20-30°C
publishMetric('temperature', parseFloat(temperature.toFixed(1)), 'deg_c', null, {
sensor_location: 'main_chamber'
});
// Simulate multiple metrics
publishMultipleMetrics([
{
name: 'humidity',
value: parseFloat((40 + Math.random() * 20).toFixed(1)),
unit: 'custom',
customUnit: '%RH'
},
{
name: 'pressure',
value: parseFloat((1000 + Math.random() * 50).toFixed(1)),
unit: 'custom',
customUnit: 'hPa'
}
]);
// Update gauge (simulated stock level)
const stockLevel = Math.floor(Math.random() * 1000);
publishGauge('stockLevel', stockLevel);
// Increment production counter occasionally
if (Math.random() < 0.3) {
publishCounter('itemsProduced', 1);
}
}, 30000); // Every 30 seconds
// Heartbeat every 5 minutes
setInterval(() => {
if (!client.connected) return;
deviceState.lastHeartbeat = new Date().toISOString();
publishStatus();
}, 5 * 60 * 1000); // Every 5 minutes
}
// --- Graceful Shutdown ---
process.on('SIGINT', () => {
console.log('Received SIGINT, shutting down gracefully...');
// Update status to offline
deviceState.online = false;
deviceState.status = 'stopped';
publishStatus();
// Close MQTT connection
setTimeout(() => {
client.end();
process.exit(0);
}, 1000);
});
process.on('SIGTERM', () => {
console.log('Received SIGTERM, shutting down...');
client.end();
process.exit(0);
});
// --- Example Usage Functions ---
function demonstratePublishing() {
console.log('--- Demonstrating Data Publishing ---');
// 1. Set Status
publishStatus();
// 2. Record an Error
publishError('Sensor reading out of range', 3, { sensor: 'temp01' });
// 3. Single Metric (Temperature)
publishMetric('temperature', 22.5, 'deg_c');
// 4. Multiple Metrics
publishMultipleMetrics([
{ name: 'humidity', value: 45.2, unit: 'custom', customUnit: '%RH' },
{ name: 'doorOpen', value: 0, unit: 'custom', customUnit: 'state' }
]);
// 5. Boolean State (Heater On)
publishToggle('heaterOn', true);
// 6. Gauge (Stock Level)
publishGauge('stockLevel', 250);
// 7. Counter Increment
publishCounter('itemsProduced', 1);
}
// Run demonstration after connection
client.on('connect', () => {
// Wait a moment for subscriptions to be established
setTimeout(demonstratePublishing, 2000);
});
console.log('MQTT client started. Press Ctrl+C to exit.');Key Features of This Example
Connection Management
- Automatic reconnection with exponential backoff
- Last Will and Testament for offline detection
- Proper TLS configuration with certificate validation
- Connection state tracking
Data Publishing
- All data types supported (status, errors, metrics, toggles, gauges, counters)
- Proper error handling for publish operations
- QoS and retain flags where appropriate
- Batch publishing for efficiency
Command Handling
- Complete command validation and processing
- Support for restart, shutdown, and diagnostics commands
- Device control functions (heater, pump, valve)
- Command response and acknowledgment system
Error Handling
- Comprehensive error logging
- Graceful degradation on connection loss
- Command queuing for offline scenarios
- Proper cleanup on shutdown
Production Features
- Periodic sensor reporting simulation
- Heartbeat mechanism
- Memory and performance diagnostics
- Graceful shutdown handling
Usage Instructions
Install Dependencies:
bashnpm install mqttConfigure Device:
- Replace
your_device_id_herewith your actual device ID - Replace
your_device_token_herewith your device token - Update the broker URL for your region
- Replace
Run the Application:
bashnode your-mqtt-app.jsTest Commands: Use MQTT client tools to send commands to
d/your_device_id/i
Customization Points
- Sensor Simulation: Replace the simulated sensor readings with actual hardware interfaces
- Command Actions: Add your specific device commands and control logic
- Configuration: Extend the configuration handling for your device parameters
- Error Handling: Customize error codes and recovery strategies for your use case
- Reporting Intervals: Adjust the periodic reporting frequency based on your needs
This example provides a solid foundation for production MQTT implementations with CoCoCo.