Skip to content

MQTT Subscribing to Commands

While publishing data to CoCoCo is essential for monitoring, subscribing to topics allows your devices to receive commands, configuration updates, and other control messages from the platform. This enables bidirectional communication and remote device management.

Inbound Message Topic

Your device can receive messages from the CoCoCo platform by subscribing to a specific topic structure:

d/${deviceId}/i - Receive Inbound Messages

Subscribe to this topic to receive messages initiated from the CoCoCo cloud platform intended for your specific device.

Topic Pattern:

  • d/ - Device namespace
  • ${deviceId} - Your actual device ID
  • /i - Inbound message indicator

Example Subscription:

javascript
const inboundTopic = `d/${deviceId}/i`;
client.subscribe(inboundTopic, (err) => {
  if (!err) {
    console.log(`Successfully subscribed to ${inboundTopic}`);
  } else {
    console.error(`Failed to subscribe: ${err}`);
  }
});

Message Format

Messages received on the inbound topic will have JSON-encoded payloads. The specific structure and content depend on your application logic, as CoCoCo treats the payload as opaque data when relaying messages to devices.

Common Message Types

While the platform doesn't enforce specific message formats, common patterns include:

Command Messages:

json
{
  "type": "command",
  "action": "restart",
  "parameters": {
    "delay": 30
  },
  "timestamp": "2025-01-15T10:30:00Z"
}

Configuration Updates:

json
{
  "type": "config",
  "settings": {
    "sampling_interval": 60,
    "temperature_threshold": 75.0,
    "enable_debug": false
  }
}

Remote Control:

json
{
  "type": "control",
  "target": "heater",
  "state": "on",
  "duration": 1800
}

Handling Received Messages

Implement proper message handling in your MQTT client to process incoming commands:

Basic Message Handler

javascript
client.on('message', (topic, message) => {
  // Check if this is an inbound message for our device
  if (topic === `d/${deviceId}/i`) {
    try {
      // Parse the JSON payload
      const payload = JSON.parse(message.toString());
      console.log('Received command:', payload);

      // Process the command
      handleInboundMessage(payload);

    } catch (error) {
      console.error('Failed to parse inbound message:', error);
      console.error('Raw message:', message.toString());
    }
  }
});

function handleInboundMessage(payload) {
  switch (payload.type) {
    case 'command':
      handleCommand(payload);
      break;
    case 'config':
      handleConfiguration(payload);
      break;
    case 'control':
      handleControl(payload);
      break;
    default:
      console.warn('Unknown message type:', payload.type);
  }
}

Command Processing Examples

Restart Command:

javascript
function handleCommand(payload) {
  if (payload.action === 'restart') {
    const delay = payload.parameters?.delay || 0;
    console.log(`Restart requested with ${delay}s delay`);

    // Acknowledge the command
    publishCommandResponse('restart', 'accepted', { delay });

    // Schedule restart
    setTimeout(() => {
      console.log('Restarting device...');
      process.exit(0); // or your restart logic
    }, delay * 1000);
  }
}

Configuration Update:

javascript
function handleConfiguration(payload) {
  if (payload.settings) {
    // Update local configuration
    Object.keys(payload.settings).forEach(key => {
      if (config.hasOwnProperty(key)) {
        config[key] = payload.settings[key];
        console.log(`Updated ${key} to ${payload.settings[key]}`);
      }
    });

    // Save configuration
    saveConfiguration(config);

    // Acknowledge update
    publishCommandResponse('config', 'applied');
  }
}

Remote Control:

javascript
function handleControl(payload) {
  const { target, state } = payload;

  switch (target) {
    case 'heater':
      if (state === 'on') {
        activateHeater(payload.duration);
        publishCommandResponse('control', 'heater_activated');
      } else {
        deactivateHeater();
        publishCommandResponse('control', 'heater_deactivated');
      }
      break;

    case 'pump':
      controlPump(state === 'on');
      publishCommandResponse('control', `pump_${state}`);
      break;

    default:
      publishCommandResponse('control', 'unknown_target', { error: `Unknown target: ${target}` });
  }
}

Command Response and Acknowledgment

It's good practice to acknowledge received commands and report their execution status:

Response Publishing

javascript
function publishCommandResponse(commandType, status, data = {}) {
  const response = {
    command_type: commandType,
    status: status,
    timestamp: new Date().toISOString(),
    device_id: deviceId,
    ...data
  };

  // You might publish responses to a specific topic or as device status
  const responseTopic = `d/${deviceId}/s`; // or a dedicated response topic
  client.publish(responseTopic, JSON.stringify(response));
}

Status Codes

Use consistent status codes for command responses:

  • accepted - Command received and accepted for execution
  • executing - Command is currently being executed
  • completed - Command executed successfully
  • failed - Command execution failed
  • rejected - Command was rejected (invalid, not supported, etc.)
  • timeout - Command execution timed out

Subscription Management

Quality of Service

Choose appropriate QoS levels for command subscriptions:

javascript
// QoS 1 ensures command delivery
client.subscribe(`d/${deviceId}/i`, { qos: 1 }, (err) => {
  if (!err) {
    console.log('Subscribed to inbound messages with QoS 1');
  }
});

Handling Reconnection

Ensure subscriptions are restored after reconnection:

javascript
client.on('connect', () => {
  console.log('Connected to MQTT broker');

  // Resubscribe to inbound messages
  client.subscribe(`d/${deviceId}/i`, { qos: 1 });

  // Publish online status
  publishStatus('productive', 'running', true);
});

client.on('reconnect', () => {
  console.log('Reconnecting to MQTT broker...');
});

Security Considerations

Message Validation

Always validate incoming commands:

javascript
function validateCommand(payload) {
  // Check required fields
  if (!payload.type) {
    throw new Error('Missing command type');
  }

  // Validate command types
  const allowedTypes = ['command', 'config', 'control'];
  if (!allowedTypes.includes(payload.type)) {
    throw new Error(`Invalid command type: ${payload.type}`);
  }

  // Additional validation based on command type
  switch (payload.type) {
    case 'command':
      if (!payload.action) {
        throw new Error('Command missing action field');
      }
      break;
    case 'config':
      if (!payload.settings || typeof payload.settings !== 'object') {
        throw new Error('Configuration command missing settings object');
      }
      break;
  }

  return true;
}

Command Authorization

Implement command authorization if needed:

javascript
function isCommandAuthorized(payload) {
  // Check if command source is authorized
  // This could involve checking signatures, tokens, or other auth mechanisms

  // Example: Check for required authorization header/field
  if (payload.requires_auth && !payload.auth_token) {
    return false;
  }

  return true;
}

Error Handling

Graceful Error Handling

javascript
client.on('message', (topic, message) => {
  if (topic === `d/${deviceId}/i`) {
    try {
      const payload = JSON.parse(message.toString());

      // Validate message
      validateCommand(payload);

      // Check authorization
      if (!isCommandAuthorized(payload)) {
        publishCommandResponse(payload.type, 'rejected', {
          error: 'Not authorized'
        });
        return;
      }

      // Process command
      handleInboundMessage(payload);

    } catch (error) {
      console.error('Command processing error:', error);

      // Try to send error response if we can determine command type
      try {
        const payload = JSON.parse(message.toString());
        publishCommandResponse(payload.type || 'unknown', 'failed', {
          error: error.message
        });
      } catch (parseError) {
        console.error('Could not parse message for error response');
      }
    }
  }
});

Best Practices

Command Queue Management

For devices that might be offline, implement command queuing:

javascript
let commandQueue = [];

function processCommandQueue() {
  while (commandQueue.length > 0) {
    const command = commandQueue.shift();
    try {
      handleInboundMessage(command.payload);
    } catch (error) {
      console.error('Failed to process queued command:', error);
    }
  }
}

// Add to queue when offline, process when online
client.on('connect', () => {
  processCommandQueue();
});

Rate Limiting

Implement rate limiting for command processing:

javascript
let lastCommandTime = 0;
const MIN_COMMAND_INTERVAL = 1000; // 1 second

function rateLimitCommand(payload) {
  const now = Date.now();
  if (now - lastCommandTime < MIN_COMMAND_INTERVAL) {
    publishCommandResponse(payload.type, 'rejected', {
      error: 'Rate limit exceeded'
    });
    return false;
  }
  lastCommandTime = now;
  return true;
}

Testing Commands

Use MQTT client tools to test command reception:

Using mosquitto_pub

bash
# Send a test command
mosquitto_pub -h mqtt.na.cococo.cloud -p 8883 \
  --cafile /etc/ssl/certs/ca-certificates.crt \
  -u "d:your_device_id" -P "your_device_token" \
  -t "d/your_device_id/i" \
  -m '{"type":"command","action":"restart","parameters":{"delay":10}}'

Testing with MQTT Explorer

  1. Connect to your CoCoCo MQTT broker using your device credentials
  2. Navigate to the d/your_device_id/i topic
  3. Publish test command messages
  4. Monitor your device's response and status updates

Integration with CoCoCo Reactions

Commands can be automatically generated by CoCoCo Reactions based on device data or external triggers. For example:

  • A Reaction could send a "reduce_speed" command when temperature exceeds a threshold
  • Maintenance schedules could trigger "run_diagnostics" commands
  • External systems could trigger configuration updates via the CoCoCo API

This creates powerful automation workflows where devices respond intelligently to changing conditions and external events.

Next Steps

After implementing command subscription:

  1. Set up automated responses using CoCoCo Reactions
  2. Implement comprehensive logging for command processing
  3. Create device-specific command handlers for your use case
  4. Test command scenarios thoroughly in your development environment

Connect. Combine. Collaborate.
The pioneering open integration platform, dedicated to transforming connectivity in the printing industry.