MQTT Automation with Reactions
Beyond simply collecting and visualizing data, the CoCoCo platform allows you to create automated workflows using a feature called "Reactions". By leveraging our Lua scripting API, you can define custom logic that triggers automatically in response to incoming device data or other events. This unlocks powerful possibilities for automation and proactive responses.
What are Reactions?
Reactions are event-driven automation scripts that execute automatically when specific conditions are met. They can:
- Monitor incoming MQTT data for threshold violations
- Respond to device status changes
- Send alerts and notifications
- Trigger actions in other systems
- Generate automatic reports
- Control other devices based on sensor readings
Common Automation Scenarios
Predictive Maintenance
Monitor device metrics to predict failures before they occur:
-- React to high vibration readings
if data.vibration > 5.0 then
notifiers.recipient("maintenance@company.com",
"High Vibration Alert",
"Device " .. device.id .. " showing vibration of " .. data.vibration .. "g")
-- Schedule maintenance check
database.query("maintenance_db",
"INSERT INTO maintenance_requests (device_id, priority, reason) VALUES (?, ?, ?)",
{device.id, "high", "Excessive vibration detected"})
endInventory Management
Automatically reorder supplies when levels get low:
-- React to low paper level in printer
if metric.name == "paperLevel" and metric.value < 100 then
-- Check if we've already ordered recently
local recent_orders = database.query("inventory_db",
"SELECT COUNT(*) as count FROM orders WHERE device_id = ? AND created_at > ?",
{device.id, time.subtract(time.now(), "24h")})
if recent_orders[1].count == 0 then
-- Trigger automatic reorder
http.request({
url = "https://api.supplier.com/orders",
method = "POST",
headers = {"Authorization": "Bearer " .. api_token},
json = {
item = "printer_paper_a4",
quantity = 10,
delivery_location = device.location
}
})
notifiers.recipient("procurement@company.com",
"Paper Reorder Triggered",
"Automatic reorder placed for printer " .. device.id)
end
endEnvironmental Control
Maintain optimal conditions by controlling HVAC systems:
-- React to temperature readings
if metric.name == "temperature" then
local temp = metric.value
local target_temp = 22.0 -- 22°C target
local tolerance = 2.0
if temp > target_temp + tolerance then
-- Too hot - increase cooling
graphql.query([[
mutation TriggerCooling($deviceId: String!) {
sendDeviceCommand(deviceId: $deviceId, command: {
type: "control",
target: "hvac",
state: "cool",
parameters: { intensity: "high" }
}) {
success
}
}
]], { deviceId = hvac_device_id })
elseif temp < target_temp - tolerance then
-- Too cold - increase heating
graphql.query([[
mutation TriggerHeating($deviceId: String!) {
sendDeviceCommand(deviceId: $deviceId, command: {
type: "control",
target: "hvac",
state: "heat",
parameters: { intensity: "medium" }
}) {
success
}
}
]], { deviceId = hvac_device_id })
end
endProduction Line Optimization
Coordinate multiple devices for optimal throughput:
-- React to production rate changes
if metric.name == "production_rate" and metric.value < 50 then
-- Production is slow, check upstream devices
local upstream_status = graphql.query([[
query GetUpstreamDevices($lineId: String!) {
devices(filter: { productionLine: $lineId, position: { lt: ]] .. device.position .. [[ } }) {
id
currentStatus {
mode
status
}
metrics(names: ["queue_depth", "error_count"]) {
name
value
}
}
}
]], { lineId = device.production_line })
for _, upstream_device in ipairs(upstream_status.devices) do
-- Check if upstream device has issues
local error_metric = nil
for _, m in ipairs(upstream_device.metrics) do
if m.name == "error_count" then
error_metric = m
break
end
end
if error_metric and error_metric.value > 5 then
-- Send alert about upstream issues
notifiers.recipient("production@company.com",
"Production Bottleneck Detected",
"Device " .. upstream_device.id .. " may be causing downstream slowdown")
break
end
end
endQuality Control
Automatically reject products that don't meet specifications:
-- React to quality measurement
if metric.name == "product_dimension" then
local spec_min = 10.0
local spec_max = 12.0
if metric.value < spec_min or metric.value > spec_max then
-- Product is out of spec
local reject_command = {
type = "control",
target = "reject_gate",
state = "activate",
parameters = { duration = 2 }
}
-- Send reject command to sorting device
graphql.query([[
mutation RejectProduct($deviceId: String!, $command: JSON!) {
sendDeviceCommand(deviceId: $deviceId, command: $command) {
success
}
}
]], {
deviceId = sorting_device_id,
command = reject_command
})
-- Log quality issue
database.query("quality_db",
"INSERT INTO quality_issues (device_id, measurement, spec_min, spec_max, timestamp) VALUES (?, ?, ?, ?, ?)",
{device.id, metric.value, spec_min, spec_max, metric.timestamp})
-- Send alert if too many rejects
local recent_rejects = database.query("quality_db",
"SELECT COUNT(*) as count FROM quality_issues WHERE device_id = ? AND timestamp > ?",
{device.id, time.subtract(time.now(), "1h")})
if recent_rejects[1].count > 10 then
notifiers.recipient("quality@company.com",
"High Reject Rate Alert",
"Device " .. device.id .. " has rejected " .. recent_rejects[1].count .. " products in the last hour")
end
end
endEnergy Management
Optimize energy usage based on demand and pricing:
-- React to power consumption readings
if metric.name == "power_consumption" then
local consumption = metric.value
local current_hour = tonumber(time.format(time.now(), "%H"))
-- Check if we're in peak hours (9 AM - 5 PM)
if current_hour >= 9 and current_hour <= 17 then
local peak_threshold = 5000 -- 5kW
if consumption > peak_threshold then
-- High consumption during peak hours
-- Check for non-essential devices to shut down
local non_essential = graphql.query([[
query GetNonEssentialDevices($area: String!) {
devices(filter: {
area: $area,
priority: "low",
currentStatus: { status: "running" }
}) {
id
name
}
}
]], { area = device.area })
-- Shut down lowest priority devices
for i, dev in ipairs(non_essential.devices) do
if i <= 2 then -- Only shut down 2 devices max
graphql.query([[
mutation ShutdownDevice($deviceId: String!) {
sendDeviceCommand(deviceId: $deviceId, command: {
type: "command",
action: "standby",
parameters: { reason: "peak_load_management" }
}) {
success
}
}
]], { deviceId = dev.id })
end
end
notifiers.recipient("facilities@company.com",
"Peak Load Management",
"Reduced power consumption by shutting down non-essential devices")
end
end
endSecurity Monitoring
Detect and respond to security events:
-- React to door sensor events
if toggle.name == "door_open" and toggle.value == true then
local current_time = time.now()
local after_hours = time.parse("18:00:00")
local before_hours = time.parse("06:00:00")
-- Check if door opened after hours
if time.compare(current_time, after_hours) > 0 or time.compare(current_time, before_hours) < 0 then
-- After hours access detected
notifiers.group("security_team",
"After Hours Access Alert",
"Door " .. device.location .. " opened at " .. time.format(current_time, "%H:%M:%S"))
-- Activate security camera recording
graphql.query([[
mutation ActivateRecording($cameraId: String!) {
sendDeviceCommand(deviceId: $cameraId, command: {
type: "control",
target: "recording",
state: "start",
parameters: { duration: 300 }
}) {
success
}
}
]], { cameraId = security_camera_id })
-- Log security event
database.query("security_db",
"INSERT INTO access_events (device_id, location, timestamp, authorized) VALUES (?, ?, ?, ?)",
{device.id, device.location, current_time, false})
end
endReaction Triggers
Reactions can be triggered by various MQTT events:
Metric Thresholds
-- Trigger when temperature exceeds 30°C
if metric.name == "temperature" and metric.value > 30 then
-- Handle overheating
endStatus Changes
-- Trigger when device goes offline
if status.online == false and previous_status.online == true then
-- Handle device disconnection
endError Events
-- Trigger on critical errors
if error.severity >= 4 then
-- Handle critical errors immediately
endTime-based Conditions
-- Trigger during maintenance windows
local maintenance_day = time.format(time.now(), "%A")
local maintenance_hour = tonumber(time.format(time.now(), "%H"))
if maintenance_day == "Sunday" and maintenance_hour >= 2 and maintenance_hour <= 6 then
-- Maintenance window logic
endData Patterns
-- Trigger on trend analysis
local recent_temps = cache.get("recent_temperatures") or {}
table.insert(recent_temps, metric.value)
-- Keep only last 10 readings
if #recent_temps > 10 then
table.remove(recent_temps, 1)
end
cache.put("recent_temperatures", recent_temps)
-- Check for rising temperature trend
if #recent_temps >= 5 then
local trend_increasing = true
for i = 2, #recent_temps do
if recent_temps[i] <= recent_temps[i-1] then
trend_increasing = false
break
end
end
if trend_increasing then
-- Temperature is consistently rising
notifiers.recipient("maintenance@company.com",
"Temperature Trend Alert",
"Device " .. device.id .. " showing consistent temperature increase")
end
endBest Practices for MQTT Automation
1. Start Simple
Begin with basic threshold monitoring before implementing complex logic:
-- Simple threshold check
if metric.name == "pressure" and metric.value > 100 then
notifiers.recipient("operator@company.com", "High Pressure", "Pressure: " .. metric.value)
end2. Implement Rate Limiting
Prevent spam from frequent triggers:
local last_alert_key = "last_alert_" .. device.id .. "_" .. metric.name
local last_alert_time = cache.get(last_alert_key)
local current_time = time.now()
-- Only alert once per hour
if not last_alert_time or time.diff(current_time, time.parse(last_alert_time)) > 3600 then
-- Send alert
notifiers.recipient("ops@company.com", "Alert", "Condition detected")
cache.put_ttl(last_alert_key, time.format(current_time), 3600)
end3. Use Meaningful Context
Include relevant information in alerts and actions:
local context = {
device_name = device.name,
location = device.location,
current_value = metric.value,
threshold = 85.0,
timestamp = time.format(time.now(), "%Y-%m-%d %H:%M:%S"),
trend = cache.get("trend_" .. device.id) or "unknown"
}
local message = templates.render("alert_template", context)
notifiers.recipient("team@company.com", "Threshold Alert", message)4. Handle Errors Gracefully
Always include error handling:
local success, result = pcall(function()
return http.request({
url = "https://api.external-system.com/alert",
method = "POST",
json = alert_data
})
end)
if not success then
print("Failed to send external alert: " .. tostring(result))
-- Fallback to internal notification
notifiers.recipient("admin@company.com", "Integration Error",
"Failed to send alert to external system: " .. tostring(result))
end5. Test Thoroughly
Create test scenarios for your reactions:
-- Test mode flag
local test_mode = cache.get("test_mode_" .. device.id) or false
if test_mode then
print("TEST MODE: Would send alert for " .. metric.name .. " = " .. metric.value)
return
end
-- Normal operation
if metric.value > threshold then
-- Send actual alert
endDeployment and Management
Creating Reactions
- Navigate to the Reactions section in CoCoCo
- Create a new Reaction with appropriate triggers
- Write your Lua script using the available modules
- Test with sample data before deploying
- Monitor reaction execution logs
Monitoring Reactions
- Check reaction execution logs regularly
- Monitor reaction performance and execution time
- Set up alerts for reaction failures
- Track the effectiveness of automated responses
Version Control
- Keep versions of your reaction scripts
- Test changes in a staging environment
- Document the logic and expected behavior
- Plan rollback procedures for critical reactions
Reactions transform your MQTT device network from passive data collection to intelligent, automated operations. They enable proactive management, reduce manual intervention, and help prevent issues before they impact your business.