MQTT Subscribing to Commands
While publishing data to CoCoCo is essential for monitoring, subscribing to topics allows your devices to receive commands, configuration updates, and other control messages from the platform. This enables bidirectional communication and remote device management.
Inbound Message Topic
Your device can receive messages from the CoCoCo platform by subscribing to a specific topic structure:
d/${deviceId}/i - Receive Inbound Messages
Subscribe to this topic to receive messages initiated from the CoCoCo cloud platform intended for your specific device.
Topic Pattern:
d/- Device namespace${deviceId}- Your actual device ID/i- Inbound message indicator
Example Subscription:
const inboundTopic = `d/${deviceId}/i`;
client.subscribe(inboundTopic, (err) => {
if (!err) {
console.log(`Successfully subscribed to ${inboundTopic}`);
} else {
console.error(`Failed to subscribe: ${err}`);
}
});Message Format
Messages received on the inbound topic will have JSON-encoded payloads. The specific structure and content depend on your application logic, as CoCoCo treats the payload as opaque data when relaying messages to devices.
Common Message Types
While the platform doesn't enforce specific message formats, common patterns include:
Command Messages:
{
"type": "command",
"action": "restart",
"parameters": {
"delay": 30
},
"timestamp": "2025-01-15T10:30:00Z"
}Configuration Updates:
{
"type": "config",
"settings": {
"sampling_interval": 60,
"temperature_threshold": 75.0,
"enable_debug": false
}
}Remote Control:
{
"type": "control",
"target": "heater",
"state": "on",
"duration": 1800
}Handling Received Messages
Implement proper message handling in your MQTT client to process incoming commands:
Basic Message Handler
client.on('message', (topic, message) => {
// Check if this is an inbound message for our device
if (topic === `d/${deviceId}/i`) {
try {
// Parse the JSON payload
const payload = JSON.parse(message.toString());
console.log('Received command:', payload);
// Process the command
handleInboundMessage(payload);
} catch (error) {
console.error('Failed to parse inbound message:', error);
console.error('Raw message:', message.toString());
}
}
});
function handleInboundMessage(payload) {
switch (payload.type) {
case 'command':
handleCommand(payload);
break;
case 'config':
handleConfiguration(payload);
break;
case 'control':
handleControl(payload);
break;
default:
console.warn('Unknown message type:', payload.type);
}
}Command Processing Examples
Restart Command:
function handleCommand(payload) {
if (payload.action === 'restart') {
const delay = payload.parameters?.delay || 0;
console.log(`Restart requested with ${delay}s delay`);
// Acknowledge the command
publishCommandResponse('restart', 'accepted', { delay });
// Schedule restart
setTimeout(() => {
console.log('Restarting device...');
process.exit(0); // or your restart logic
}, delay * 1000);
}
}Configuration Update:
function handleConfiguration(payload) {
if (payload.settings) {
// Update local configuration
Object.keys(payload.settings).forEach(key => {
if (config.hasOwnProperty(key)) {
config[key] = payload.settings[key];
console.log(`Updated ${key} to ${payload.settings[key]}`);
}
});
// Save configuration
saveConfiguration(config);
// Acknowledge update
publishCommandResponse('config', 'applied');
}
}Remote Control:
function handleControl(payload) {
const { target, state } = payload;
switch (target) {
case 'heater':
if (state === 'on') {
activateHeater(payload.duration);
publishCommandResponse('control', 'heater_activated');
} else {
deactivateHeater();
publishCommandResponse('control', 'heater_deactivated');
}
break;
case 'pump':
controlPump(state === 'on');
publishCommandResponse('control', `pump_${state}`);
break;
default:
publishCommandResponse('control', 'unknown_target', { error: `Unknown target: ${target}` });
}
}Command Response and Acknowledgment
It's good practice to acknowledge received commands and report their execution status:
Response Publishing
function publishCommandResponse(commandType, status, data = {}) {
const response = {
command_type: commandType,
status: status,
timestamp: new Date().toISOString(),
device_id: deviceId,
...data
};
// You might publish responses to a specific topic or as device status
const responseTopic = `d/${deviceId}/s`; // or a dedicated response topic
client.publish(responseTopic, JSON.stringify(response));
}Status Codes
Use consistent status codes for command responses:
accepted- Command received and accepted for executionexecuting- Command is currently being executedcompleted- Command executed successfullyfailed- Command execution failedrejected- Command was rejected (invalid, not supported, etc.)timeout- Command execution timed out
Subscription Management
Quality of Service
Choose appropriate QoS levels for command subscriptions:
// QoS 1 ensures command delivery
client.subscribe(`d/${deviceId}/i`, { qos: 1 }, (err) => {
if (!err) {
console.log('Subscribed to inbound messages with QoS 1');
}
});Handling Reconnection
Ensure subscriptions are restored after reconnection:
client.on('connect', () => {
console.log('Connected to MQTT broker');
// Resubscribe to inbound messages
client.subscribe(`d/${deviceId}/i`, { qos: 1 });
// Publish online status
publishStatus('productive', 'running', true);
});
client.on('reconnect', () => {
console.log('Reconnecting to MQTT broker...');
});Security Considerations
Message Validation
Always validate incoming commands:
function validateCommand(payload) {
// Check required fields
if (!payload.type) {
throw new Error('Missing command type');
}
// Validate command types
const allowedTypes = ['command', 'config', 'control'];
if (!allowedTypes.includes(payload.type)) {
throw new Error(`Invalid command type: ${payload.type}`);
}
// Additional validation based on command type
switch (payload.type) {
case 'command':
if (!payload.action) {
throw new Error('Command missing action field');
}
break;
case 'config':
if (!payload.settings || typeof payload.settings !== 'object') {
throw new Error('Configuration command missing settings object');
}
break;
}
return true;
}Command Authorization
Implement command authorization if needed:
function isCommandAuthorized(payload) {
// Check if command source is authorized
// This could involve checking signatures, tokens, or other auth mechanisms
// Example: Check for required authorization header/field
if (payload.requires_auth && !payload.auth_token) {
return false;
}
return true;
}Error Handling
Graceful Error Handling
client.on('message', (topic, message) => {
if (topic === `d/${deviceId}/i`) {
try {
const payload = JSON.parse(message.toString());
// Validate message
validateCommand(payload);
// Check authorization
if (!isCommandAuthorized(payload)) {
publishCommandResponse(payload.type, 'rejected', {
error: 'Not authorized'
});
return;
}
// Process command
handleInboundMessage(payload);
} catch (error) {
console.error('Command processing error:', error);
// Try to send error response if we can determine command type
try {
const payload = JSON.parse(message.toString());
publishCommandResponse(payload.type || 'unknown', 'failed', {
error: error.message
});
} catch (parseError) {
console.error('Could not parse message for error response');
}
}
}
});Best Practices
Command Queue Management
For devices that might be offline, implement command queuing:
let commandQueue = [];
function processCommandQueue() {
while (commandQueue.length > 0) {
const command = commandQueue.shift();
try {
handleInboundMessage(command.payload);
} catch (error) {
console.error('Failed to process queued command:', error);
}
}
}
// Add to queue when offline, process when online
client.on('connect', () => {
processCommandQueue();
});Rate Limiting
Implement rate limiting for command processing:
let lastCommandTime = 0;
const MIN_COMMAND_INTERVAL = 1000; // 1 second
function rateLimitCommand(payload) {
const now = Date.now();
if (now - lastCommandTime < MIN_COMMAND_INTERVAL) {
publishCommandResponse(payload.type, 'rejected', {
error: 'Rate limit exceeded'
});
return false;
}
lastCommandTime = now;
return true;
}Testing Commands
Use MQTT client tools to test command reception:
Using mosquitto_pub
# Send a test command
mosquitto_pub -h mqtt.na.cococo.cloud -p 8883 \
--cafile /etc/ssl/certs/ca-certificates.crt \
-u "d:your_device_id" -P "your_device_token" \
-t "d/your_device_id/i" \
-m '{"type":"command","action":"restart","parameters":{"delay":10}}'Testing with MQTT Explorer
- Connect to your CoCoCo MQTT broker using your device credentials
- Navigate to the
d/your_device_id/itopic - Publish test command messages
- Monitor your device's response and status updates
Integration with CoCoCo Reactions
Commands can be automatically generated by CoCoCo Reactions based on device data or external triggers. For example:
- A Reaction could send a "reduce_speed" command when temperature exceeds a threshold
- Maintenance schedules could trigger "run_diagnostics" commands
- External systems could trigger configuration updates via the CoCoCo API
This creates powerful automation workflows where devices respond intelligently to changing conditions and external events.
Next Steps
After implementing command subscription:
- Set up automated responses using CoCoCo Reactions
- Implement comprehensive logging for command processing
- Create device-specific command handlers for your use case
- Test command scenarios thoroughly in your development environment