Skip to content

MQTT JavaScript Example

This comprehensive example demonstrates how to implement MQTT communication with CoCoCo using the popular mqtt library for Node.js. The example covers connection management, data publishing, command handling, and error recovery.

Prerequisites

Install the MQTT.js library:

bash
npm install mqtt

Complete Implementation Example

javascript
// Import the MQTT library
const mqtt = require('mqtt');

// --- Configuration ---
// Replace with your actual device ID and token
const deviceId = 'your_device_id_here';
const deviceToken = 'your_device_token_here';

// Replace with the correct regional broker URL (using mqtts for TLS)
const brokerUrl = 'mqtts://mqtt.na.cococo.cloud'; // Example: North America

// Generate a unique client ID (can be static or dynamic)
const clientId = `my-device-client-${Math.random().toString(16).substring(2, 8)}`;

// --- Device State Management ---
let deviceState = {
  online: false,
  mode: 'unknown',
  status: 'stopped',
  lastHeartbeat: null,
  commandQueue: []
};

// --- Connection Options ---
const connectOptions = {
  clientId: clientId,
  username: `d:${deviceId}`,
  password: deviceToken,
  keepalive: 60,
  connectTimeout: 30 * 1000,
  reconnectPeriod: 5 * 1000,
  clean: true,
  rejectUnauthorized: true, // Ensures server certificate validation
  will: {
    topic: `d/${deviceId}/s`,
    payload: JSON.stringify({
      mode: 'unknown',
      status: 'stopped',
      online: false
    }),
    retain: true,
    qos: 1
  }
};

// --- Connect to the Broker ---
console.log(`Connecting to ${brokerUrl} as client ${clientId}...`);
const client = mqtt.connect(brokerUrl, connectOptions);

// --- Event Handlers ---

// Handle successful connection
client.on('connect', () => {
  console.log('Successfully connected to CoCoCo MQTT broker!');

  // Update device state
  deviceState.online = true;
  deviceState.mode = 'productive';
  deviceState.status = 'running';

  // Subscribe to inbound messages
  const inboundTopic = `d/${deviceId}/i`;
  client.subscribe(inboundTopic, { qos: 1 }, (err) => {
    if (!err) {
      console.log(`Subscribed to ${inboundTopic}`);
    } else {
      console.error(`Failed to subscribe to ${inboundTopic}: ${err}`);
    }
  });

  // Publish initial status
  publishStatus();

  // Process any queued commands
  processCommandQueue();

  // Start periodic data publishing
  startPeriodicReporting();
});

// Handle connection errors
client.on('error', (err) => {
  console.error('Connection error:', err);
  deviceState.online = false;
});

// Handle received messages (commands)
client.on('message', (topic, message) => {
  console.log(`Received message on topic ${topic}:`);

  if (topic === `d/${deviceId}/i`) {
    handleInboundMessage(message);
  }
});

// Handle disconnection
client.on('close', () => {
  console.log('Disconnected from MQTT broker');
  deviceState.online = false;
});

// Handle offline status
client.on('offline', () => {
  console.log('Client is offline');
  deviceState.online = false;
});

// Handle reconnection attempts
client.on('reconnect', () => {
  console.log('Attempting to reconnect...');
});

// --- Publishing Functions ---

function publishStatus() {
  const statusPayload = {
    mode: deviceState.mode,
    status: deviceState.status,
    online: deviceState.online
  };

  const statusTopic = `d/${deviceId}/s`;
  client.publish(statusTopic, JSON.stringify(statusPayload),
    { retain: true, qos: 1 }, (err) => {
    if (err) {
      console.error(`Failed to publish status: ${err}`);
    } else {
      console.log(`Published status:`, statusPayload);
    }
  });
}

function publishError(message, severity = 3, metadata = {}) {
  const errorPayload = {
    message: message,
    severity: severity,
    metadata: {
      timestamp: new Date().toISOString(),
      client_id: clientId,
      ...metadata
    }
  };

  const errorTopic = `d/${deviceId}/e`;
  client.publish(errorTopic, JSON.stringify(errorPayload), { qos: 1 }, (err) => {
    if (err) {
      console.error(`Failed to publish error: ${err}`);
    } else {
      console.log(`Published error:`, errorPayload);
    }
  });
}

function publishMetric(name, value, unit, customUnit = null, metadata = {}) {
  const metricPayload = {
    value: value,
    unit: unit,
    timestamp: new Date().toISOString(),
    metadata: metadata
  };

  if (unit === 'custom' && customUnit) {
    metricPayload.custom_unit = customUnit;
  }

  const metricTopic = `d/${deviceId}/m/${name}`;
  client.publish(metricTopic, JSON.stringify(metricPayload), (err) => {
    if (err) {
      console.error(`Failed to publish metric (${name}): ${err}`);
    } else {
      console.log(`Published metric ${name}:`, metricPayload);
    }
  });
}

function publishMultipleMetrics(metrics) {
  const multiMetricPayload = metrics.map(metric => ({
    name: metric.name,
    value: metric.value,
    unit: metric.unit,
    custom_unit: metric.customUnit,
    timestamp: metric.timestamp || new Date().toISOString(),
    metadata: metric.metadata || {}
  }));

  const multiMetricTopic = `d/${deviceId}/m`;
  client.publish(multiMetricTopic, JSON.stringify(multiMetricPayload), (err) => {
    if (err) {
      console.error(`Failed to publish multiple metrics: ${err}`);
    } else {
      console.log(`Published multiple metrics:`, multiMetricPayload);
    }
  });
}

function publishToggle(name, state) {
  const toggleTopic = `d/${deviceId}/b/${name}`;
  client.publish(toggleTopic, JSON.stringify(state), (err) => {
    if (err) {
      console.error(`Failed to publish toggle (${name}): ${err}`);
    } else {
      console.log(`Published toggle ${name}:`, state);
    }
  });
}

function publishGauge(name, value) {
  const gaugeTopic = `d/${deviceId}/g/${name}`;
  client.publish(gaugeTopic, JSON.stringify(value), (err) => {
    if (err) {
      console.error(`Failed to publish gauge (${name}): ${err}`);
    } else {
      console.log(`Published gauge ${name}:`, value);
    }
  });
}

function publishCounter(name, increment) {
  const counterTopic = `d/${deviceId}/t/${name}`;
  client.publish(counterTopic, JSON.stringify(increment), (err) => {
    if (err) {
      console.error(`Failed to publish counter (${name}): ${err}`);
    } else {
      console.log(`Published counter ${name} increment:`, increment);
    }
  });
}

// --- Command Handling Functions ---

function handleInboundMessage(message) {
  try {
    const payload = JSON.parse(message.toString());
    console.log('Received command payload:', payload);

    // Validate command
    if (!validateCommand(payload)) {
      publishCommandResponse(payload.type || 'unknown', 'rejected', {
        error: 'Invalid command format'
      });
      return;
    }

    // Process command based on type
    switch (payload.type) {
      case 'command':
        handleCommand(payload);
        break;
      case 'config':
        handleConfiguration(payload);
        break;
      case 'control':
        handleControl(payload);
        break;
      default:
        console.warn('Unknown message type:', payload.type);
        publishCommandResponse(payload.type, 'rejected', {
          error: `Unknown command type: ${payload.type}`
        });
    }

  } catch (error) {
    console.error('Failed to parse inbound message:', error);
    publishError(`Command parsing failed: ${error.message}`, 3, {
      raw_message: message.toString()
    });
  }
}

function validateCommand(payload) {
  if (!payload || typeof payload !== 'object') {
    return false;
  }

  if (!payload.type) {
    return false;
  }

  const allowedTypes = ['command', 'config', 'control'];
  if (!allowedTypes.includes(payload.type)) {
    return false;
  }

  return true;
}

function handleCommand(payload) {
  const { action, parameters = {} } = payload;

  publishCommandResponse('command', 'accepted', { action });

  switch (action) {
    case 'restart':
      handleRestartCommand(parameters);
      break;
    case 'shutdown':
      handleShutdownCommand(parameters);
      break;
    case 'diagnostics':
      handleDiagnosticsCommand(parameters);
      break;
    default:
      publishCommandResponse('command', 'rejected', {
        error: `Unknown action: ${action}`
      });
  }
}

function handleConfiguration(payload) {
  const { settings } = payload;

  if (!settings || typeof settings !== 'object') {
    publishCommandResponse('config', 'rejected', {
      error: 'Invalid settings object'
    });
    return;
  }

  console.log('Applying configuration updates:', settings);

  // Apply configuration changes
  let updated = [];
  Object.keys(settings).forEach(key => {
    // Add your configuration handling logic here
    console.log(`Updated ${key} to ${settings[key]}`);
    updated.push(key);
  });

  publishCommandResponse('config', 'completed', { updated_fields: updated });
}

function handleControl(payload) {
  const { target, state, parameters = {} } = payload;

  console.log(`Control command: ${target} -> ${state}`);

  switch (target) {
    case 'heater':
      controlHeater(state === 'on', parameters.duration);
      break;
    case 'pump':
      controlPump(state === 'on');
      break;
    case 'valve':
      controlValve(state === 'open');
      break;
    default:
      publishCommandResponse('control', 'rejected', {
        error: `Unknown target: ${target}`
      });
      return;
  }

  publishCommandResponse('control', 'completed', { target, state });
}

// --- Device Control Functions ---

function handleRestartCommand(parameters) {
  const delay = parameters.delay || 0;
  console.log(`Restart requested with ${delay}s delay`);

  publishCommandResponse('command', 'executing', { action: 'restart', delay });

  setTimeout(() => {
    console.log('Restarting device...');
    publishCommandResponse('command', 'completed', { action: 'restart' });

    // Graceful shutdown
    client.end();
    process.exit(0);
  }, delay * 1000);
}

function handleShutdownCommand(parameters) {
  console.log('Shutdown requested');

  publishCommandResponse('command', 'executing', { action: 'shutdown' });

  // Update status before shutdown
  deviceState.mode = 'maintenance';
  deviceState.status = 'stopped';
  publishStatus();

  setTimeout(() => {
    publishCommandResponse('command', 'completed', { action: 'shutdown' });
    client.end();
    process.exit(0);
  }, 1000);
}

function handleDiagnosticsCommand(parameters) {
  console.log('Running diagnostics...');

  publishCommandResponse('command', 'executing', { action: 'diagnostics' });

  // Simulate diagnostics
  setTimeout(() => {
    const diagnosticsResult = {
      memory_usage: process.memoryUsage(),
      uptime: process.uptime(),
      platform: process.platform,
      node_version: process.version,
      connection_status: client.connected ? 'connected' : 'disconnected'
    };

    publishCommandResponse('command', 'completed', {
      action: 'diagnostics',
      results: diagnosticsResult
    });

    // Also publish as a metric for historical tracking
    publishMetric('diagnostics_memory', diagnosticsResult.memory_usage.heapUsed, 'custom', 'bytes');
    publishMetric('diagnostics_uptime', diagnosticsResult.uptime, 'custom', 'seconds');

  }, 2000);
}

function controlHeater(turnOn, duration = null) {
  console.log(`Heater ${turnOn ? 'ON' : 'OFF'}${duration ? ` for ${duration}s` : ''}`);

  // Update toggle state
  publishToggle('heaterOn', turnOn);

  if (turnOn && duration) {
    setTimeout(() => {
      console.log('Heater auto-shutoff');
      publishToggle('heaterOn', false);
    }, duration * 1000);
  }
}

function controlPump(turnOn) {
  console.log(`Pump ${turnOn ? 'ON' : 'OFF'}`);
  publishToggle('pumpRunning', turnOn);
}

function controlValve(open) {
  console.log(`Valve ${open ? 'OPEN' : 'CLOSED'}`);
  publishToggle('valveOpen', open);
}

// --- Utility Functions ---

function publishCommandResponse(commandType, status, data = {}) {
  const response = {
    command_type: commandType,
    status: status,
    timestamp: new Date().toISOString(),
    device_id: deviceId,
    ...data
  };

  // Publish response as an error (for visibility) or status update
  if (status === 'failed' || status === 'rejected') {
    publishError(`Command ${status}: ${JSON.stringify(response)}`, 2);
  } else {
    console.log('Command response:', response);
  }
}

function processCommandQueue() {
  while (deviceState.commandQueue.length > 0 && client.connected) {
    const queuedCommand = deviceState.commandQueue.shift();
    console.log('Processing queued command:', queuedCommand);
    handleInboundMessage(Buffer.from(JSON.stringify(queuedCommand)));
  }
}

// --- Periodic Reporting ---

function startPeriodicReporting() {
  // Simulate sensor readings every 30 seconds
  setInterval(() => {
    if (!client.connected) return;

    // Simulate temperature sensor
    const temperature = 20 + Math.random() * 10; // 20-30°C
    publishMetric('temperature', parseFloat(temperature.toFixed(1)), 'deg_c', null, {
      sensor_location: 'main_chamber'
    });

    // Simulate multiple metrics
    publishMultipleMetrics([
      {
        name: 'humidity',
        value: parseFloat((40 + Math.random() * 20).toFixed(1)),
        unit: 'custom',
        customUnit: '%RH'
      },
      {
        name: 'pressure',
        value: parseFloat((1000 + Math.random() * 50).toFixed(1)),
        unit: 'custom',
        customUnit: 'hPa'
      }
    ]);

    // Update gauge (simulated stock level)
    const stockLevel = Math.floor(Math.random() * 1000);
    publishGauge('stockLevel', stockLevel);

    // Increment production counter occasionally
    if (Math.random() < 0.3) {
      publishCounter('itemsProduced', 1);
    }

  }, 30000); // Every 30 seconds

  // Heartbeat every 5 minutes
  setInterval(() => {
    if (!client.connected) return;

    deviceState.lastHeartbeat = new Date().toISOString();
    publishStatus();

  }, 5 * 60 * 1000); // Every 5 minutes
}

// --- Graceful Shutdown ---

process.on('SIGINT', () => {
  console.log('Received SIGINT, shutting down gracefully...');

  // Update status to offline
  deviceState.online = false;
  deviceState.status = 'stopped';
  publishStatus();

  // Close MQTT connection
  setTimeout(() => {
    client.end();
    process.exit(0);
  }, 1000);
});

process.on('SIGTERM', () => {
  console.log('Received SIGTERM, shutting down...');
  client.end();
  process.exit(0);
});

// --- Example Usage Functions ---

function demonstratePublishing() {
  console.log('--- Demonstrating Data Publishing ---');

  // 1. Set Status
  publishStatus();

  // 2. Record an Error
  publishError('Sensor reading out of range', 3, { sensor: 'temp01' });

  // 3. Single Metric (Temperature)
  publishMetric('temperature', 22.5, 'deg_c');

  // 4. Multiple Metrics
  publishMultipleMetrics([
    { name: 'humidity', value: 45.2, unit: 'custom', customUnit: '%RH' },
    { name: 'doorOpen', value: 0, unit: 'custom', customUnit: 'state' }
  ]);

  // 5. Boolean State (Heater On)
  publishToggle('heaterOn', true);

  // 6. Gauge (Stock Level)
  publishGauge('stockLevel', 250);

  // 7. Counter Increment
  publishCounter('itemsProduced', 1);
}

// Run demonstration after connection
client.on('connect', () => {
  // Wait a moment for subscriptions to be established
  setTimeout(demonstratePublishing, 2000);
});

console.log('MQTT client started. Press Ctrl+C to exit.');

Key Features of This Example

Connection Management

  • Automatic reconnection with exponential backoff
  • Last Will and Testament for offline detection
  • Proper TLS configuration with certificate validation
  • Connection state tracking

Data Publishing

  • All data types supported (status, errors, metrics, toggles, gauges, counters)
  • Proper error handling for publish operations
  • QoS and retain flags where appropriate
  • Batch publishing for efficiency

Command Handling

  • Complete command validation and processing
  • Support for restart, shutdown, and diagnostics commands
  • Device control functions (heater, pump, valve)
  • Command response and acknowledgment system

Error Handling

  • Comprehensive error logging
  • Graceful degradation on connection loss
  • Command queuing for offline scenarios
  • Proper cleanup on shutdown

Production Features

  • Periodic sensor reporting simulation
  • Heartbeat mechanism
  • Memory and performance diagnostics
  • Graceful shutdown handling

Usage Instructions

  1. Install Dependencies:

    bash
    npm install mqtt
  2. Configure Device:

    • Replace your_device_id_here with your actual device ID
    • Replace your_device_token_here with your device token
    • Update the broker URL for your region
  3. Run the Application:

    bash
    node your-mqtt-app.js
  4. Test Commands: Use MQTT client tools to send commands to d/your_device_id/i

Customization Points

  • Sensor Simulation: Replace the simulated sensor readings with actual hardware interfaces
  • Command Actions: Add your specific device commands and control logic
  • Configuration: Extend the configuration handling for your device parameters
  • Error Handling: Customize error codes and recovery strategies for your use case
  • Reporting Intervals: Adjust the periodic reporting frequency based on your needs

This example provides a solid foundation for production MQTT implementations with CoCoCo.

Connect. Combine. Collaborate.
The pioneering open integration platform, dedicated to transforming connectivity in the printing industry.