Introduction | Theory | Lab | Course Home
This comprehensive laboratory explores Zephyr’s inter-thread communication mechanisms through progressively complex exercises. You’ll implement real-world communication patterns, analyze performance characteristics, and design robust distributed systems using message queues, FIFOs, mailboxes, Zbus, and pipes.
Objective: Implement a sensor data collection system using message queues for structured inter-thread communication.
Create a new Zephyr project with the following structure:
sensor_system/
├── CMakeLists.txt
├── prj.conf
├── src/
│ └── main.c
└── boards/
└── native_posix.conf
Step 1: Configure the project
Create prj.conf:
CONFIG_MULTITHREADING=y
CONFIG_LOG=y
CONFIG_LOG_DEFAULT_LEVEL=3
CONFIG_PRINTK=y
CONFIG_ASSERT=y
CONFIG_THREAD_STACK_INFO=y
CONFIG_THREAD_NAME=y
Create CMakeLists.txt:
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(sensor_system)
target_sources(app PRIVATE src/main.c)
Step 2: Implement the sensor data system
Create src/main.c:
#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
#include <zephyr/random/random.h>
#include <zephyr/sys/printk.h>
LOG_MODULE_REGISTER(sensor_system, LOG_LEVEL_INF);
/* Sensor data structure */
struct sensor_reading {
uint32_t timestamp;
uint8_t sensor_id;
int16_t temperature;
uint16_t humidity;
uint8_t status;
};
/* Define message queue for sensor data */
K_MSGQ_DEFINE(sensor_queue, sizeof(struct sensor_reading), 20, 4);
/* Thread stacks */
K_THREAD_STACK_DEFINE(sensor_thread_stack, 1024);
K_THREAD_STACK_DEFINE(processor_thread_stack, 1024);
K_THREAD_STACK_DEFINE(monitor_thread_stack, 512);
/* Thread control blocks */
struct k_thread sensor_thread_data;
struct k_thread processor_thread_data;
struct k_thread monitor_thread_data;
/* Statistics */
static uint32_t readings_generated = 0;
static uint32_t readings_processed = 0;
static uint32_t queue_overruns = 0;
/* Sensor thread - data producer */
void sensor_thread_entry(void *arg1, void *arg2, void *arg3)
{
struct sensor_reading reading;
uint8_t sensor_count = 3;
LOG_INF("Sensor thread started");
while (1) {
for (uint8_t i = 0; i < sensor_count; i++) {
/* Generate sensor reading */
reading.timestamp = k_uptime_get_32();
reading.sensor_id = i;
reading.temperature = (int16_t)(sys_rand32_get() % 500) + 200; /* 20.0°C to 70.0°C */
reading.humidity = (uint16_t)(sys_rand32_get() % 1000); /* 0% to 100% */
reading.status = (sys_rand32_get() % 100) < 95 ? 0 : 1; /* 95% good readings */
/* Send to queue with timeout */
int ret = k_msgq_put(&sensor_queue, &reading, K_MSEC(100));
if (ret == 0) {
readings_generated++;
LOG_DBG("Sensor %d: T=%d.%d°C, H=%d.%d%%, Status=%s",
reading.sensor_id,
reading.temperature / 10, reading.temperature % 10,
reading.humidity / 10, reading.humidity % 10,
reading.status == 0 ? "OK" : "ERROR");
} else {
queue_overruns++;
LOG_WRN("Queue full, reading dropped (sensor %d)", i);
}
}
/* Sensor reading interval */
k_msleep(1000);
}
}
/* Data processor thread - consumer */
void processor_thread_entry(void *arg1, void *arg2, void *arg3)
{
struct sensor_reading reading;
LOG_INF("Processor thread started");
while (1) {
int ret = k_msgq_get(&sensor_queue, &reading, K_FOREVER);
if (ret == 0) {
readings_processed++;
LOG_INF("Processing Sensor %d: T=%d.%d°C, H=%d.%d%%",
reading.sensor_id,
reading.temperature / 10, reading.temperature % 10,
reading.humidity / 10, reading.humidity % 10);
// Simulate processing time
k_msleep(50);
}
}
}
/* Monitoring thread */
void monitor_thread_entry(void *arg1, void *arg2, void *arg3)
{
LOG_INF("Monitor thread started");
while (1) {
k_msleep(K_SECONDS(10));
LOG_INF("STATS: Generated=%u, Processed=%u, Overruns=%u, Queue space=%u/%u",
readings_generated, readings_processed, queue_overruns,
k_msgq_num_free_get(&sensor_queue), k_msgq_max_msgs_get(&sensor_queue));
}
}
int main(void)
{
LOG_INF("=== Inter-Thread Communication Lab: Message Queues ===");
k_thread_create(&sensor_thread_data, sensor_thread_stack,
K_THREAD_STACK_SIZEOF(sensor_thread_stack),
sensor_thread_entry, NULL, NULL, NULL,
5, 0, K_NO_WAIT);
k_thread_name_set(&sensor_thread_data, "sensor_thread");
k_thread_create(&processor_thread_data, processor_thread_stack,
K_THREAD_STACK_SIZEOF(processor_thread_stack),
processor_thread_entry, NULL, NULL, NULL,
6, 0, K_NO_WAIT);
k_thread_name_set(&processor_thread_data, "processor_thread");
k_thread_create(&monitor_thread_data, monitor_thread_stack,
K_THREAD_STACK_SIZEOF(monitor_thread_stack),
monitor_thread_entry, NULL, NULL, NULL,
7, 0, K_NO_WAIT);
k_thread_name_set(&monitor_thread_data, "monitor_thread");
return 0;
}
Build and Run:
west build -p -b native_posix
west build -t run
Expected Output:
Exercise Questions:
Queue Sizing: Modify the queue size to 5 messages. What happens to the overrun rate?
Processing Delays: Add a k_msleep(2000) in the processor thread. How does this affect system performance?
Priority Impact: Change the processor thread priority to 4 (higher than sensor). Does this improve efficiency?
Timeout Analysis: Change the sensor thread timeout from K_MSEC(100) to K_NO_WAIT. What’s the trade-off?
Objective: Implement a data processing pipeline using FIFOs for zero-copy data transfer.
Create a new project directory pipeline_system/ and implement a multi-stage processing pipeline with FIFOs connecting each stage.
Key Features:
Pipeline Efficiency: Analyze the throughput of each stage. Which stage becomes the bottleneck?
Memory Management: Monitor heap and slab utilization. How does packet size affect memory usage?
Deadline Monitoring: Implement deadline tracking. What happens when processing load increases?
Objective: Design a comprehensive IoT sensor system using Zbus for publish-subscribe communication.
Create an IoT monitoring system with:
Dynamic Observer Registration: Implement runtime addition and removal of observers based on system conditions.
Message Validation: Create comprehensive validators that check message integrity and business logic.
Performance Monitoring: Add Zbus channel statistics tracking and performance analysis.
Fault Tolerance: Implement error handling and recovery mechanisms for channel communication failures.
Objective: Analyze and optimize inter-thread communication performance across different mechanisms.
Create a performance testing framework that compares:
Performance Comparison: Which mechanism provides the lowest latency? Why?
Memory Efficiency: Compare static vs dynamic memory usage patterns.
Scalability: How does performance change with different message sizes and queue depths?
Real-time Characteristics: Which mechanisms provide the most predictable timing?
After completing these laboratories, you should understand:
Message Queue Design: Implementing producer-consumer patterns with proper queue sizing and timeout handling
Zero-Copy Communication: Using FIFOs and LIFOs for efficient data transfer without memory copying
Event-Driven Architecture: Building scalable systems using Zbus publish-subscribe patterns
Performance Optimization: Analyzing and optimizing communication performance for real-time requirements
System Integration: Combining multiple communication mechanisms in complex embedded systems
Hybrid Communication: Design a system that uses multiple communication mechanisms together
Fault Tolerance: Implement error detection and recovery for communication failures
Load Balancing: Create systems that dynamically balance processing load across multiple threads
Protocol Implementation: Use communication primitives to implement custom communication protocols
Real-time Analysis: Perform detailed timing analysis and worst-case execution time calculations
This comprehensive laboratory experience provides practical skills for implementing professional-grade inter-thread communication in embedded systems using Zephyr RTOS.