gists/cpp/freertos-active-object.md

5.8 KiB

Active Object Design Pattern

"The active object design pattern decouples method execution from method invocation for objects that each reside in their own thread of control." (Wikipedia). With this pattern, we can make objects thread safe.

Take for example this class.

class SomeClass {
public:
    void increment() {
        ++mNum;
    }

    void decrement() {
        --mNum;
    }
private:
    int mNum = 0;
};

We can make it threadsafe by decoupling the function calls from their execution by using a (thread safe) queue.

class SomeClass {
public:
    SomeClass() {
        std::thread thread([&](){
            while (true) {
                if (!mQueue.empty()) {
                    mQueue.front()();
                    mQueue.pop();
                }
            }
        });
    }

    void increment() {
        mQueue.push([&](){ ++mNum; });
    }
    void decrement() {
        mQueue.push([&](){ --mNum; });
    }
private:
    int mNum = 0;
    thread_safe_queue<std::function<void()>> mQueue;
};

Implementation for FreeRTOS

Thread safe queue implementation:

///
/// @brief Wrapper around a FreeRTOS queue that provides thread-safe access
///
///
template <typename T>
class Queue {
    constexpr static auto defaultWaitTime = std::chrono::microseconds{0};

public:
    enum class Error { QueueFull, CalledFromISR };

    Queue(std::size_t maxNumItems) {
        static_assert(std::is_trivially_copyable_v<T>, "T must be trivially "
                                                       "copyable");

        mQueueHandle = xQueueCreate(maxNumItems, sizeof(T));
        if (mQueueHandle == 0) {
            LogError("Failed to create queue");
            panic(); // TODO: What should be done here is system dependent
        }
    }

    ///
    /// @brief Push an item to the back of the queue
    ///
    /// @param waitTime Time to wait for space to become available in the queue
    ///
    [[nodiscard]] std::expected<void, Error>
    push(const T& item, std::chrono::microseconds waitTime = defaultWaitTime) {
        if (rtos::this_cpu::is_in_isr()) {
            LogError("Cannot call push() from an ISR");
            return std::unexpected{Error::CalledFromISR};
        }

        auto res =
            xQueueSendToBack(mQueueHandle, &item, rtos::asTicks(waitTime));

        if (res != pdPASS) return std::unexpected{Error::QueueFull};
        return {};
    }

    ///
    /// @brief Get the front item of the queue without removing it
    ///
    /// @param waitTime Time to wait for an item to become available in the
    /// queue
    ///
    [[nodiscard]] std::optional<T>
    front(std::chrono::microseconds waitTime = defaultWaitTime) const {
        if (rtos::this_cpu::is_in_isr()){
            LogError("Cannot call front() from an ISR");
            return std::unexpected{Error::CalledFromISR};
        }

        T item;

        auto res = xQueuePeek(mQueueHandle, &item, rtos::asTicks(waitTime));
        if (res != pdPASS) return std::nullopt;

        return item;
    }

    ///
    /// @brief Remove the front item of the queue
    ///
    /// @param waitTime Time to wait for an item to become available in the
    /// queue
    ///
    std::expected<void, Error>
    pop(std::chrono::microseconds waitTime = defaultWaitTime) {
        if (rtos::this_cpu::is_in_isr()) {
            LogError("Cannot call pop() from an ISR");
            return std::unexpected{Error::CalledFromISR};
        }

        T item;

        /// Errors are ignored. pop() should fail quietly if there are no
        /// elements in the queue
        xQueueReceive(mQueueHandle, &item, rtos::asTicks(waitTime));
        
        return {};
    }

    ///
    /// @brief Get the number of items in the queue
    ///
    std::size_t size() const {
        return uxQueueMessagesWaiting(mQueueHandle);
    }

private:
    QueueHandle_t mQueueHandle;
};

Base class implementation:

///
/// @brief Active object base class
/// @details Use static polymorphism to avoid virtual function overhead
///
/// @tparam Derived Derived class (CRTP for static polymorphism)
/// @tparam Actions Parameter pack of all action types
///
template <typename Derived, typename... Actions>
class ActiveObjectBase {
public:
    enum class Error { QueueFull, CalledFromISR };

    ActiveObjectBase(std::size_t maxNumActions = 32) : mQueue{maxNumActions} {
    }

    std::expected<bool, Error> bool process_action() {
        if (mQueue.size() == 0) return false;

        auto action = mQueue.front();
        /// This should not happen as we check the size of the queue
        /// beforehand, but let's handle all errors nonetheless
        if (!action) return false;

        if (!mQueue.pop()) return std::unexpected(Error::CalledFromISR);

        std::visit(
            [this](auto&& arg) {
                static_cast<Derived*>(this)->handle_action(arg);
            },
            *action);

        return true;
    }

    std::expected<void, Error> queue_action(const auto& action) {
        if (!mQueue.push(action)) return std::unexpected{Error::QueueFull};

        return {};
    }

private:
    Queue<std::variant<Actions...>> mQueue;
};

Derived class implementation:


struct Clear {};
struct ShowText { std::array<char, 32> text= {0}; };

class DisplayController :
    public ActiveObjectBase<DisplayController, Clear, ShowText> {
private:
    void handle_action(Clear action) {
        // Send appropriate command over SPI
    }

    void handle_action(ShowText action) {
        // Send appropriate command over SPI
    }

};


int main() {
    DisplayController controller;

    // Call these from different threads
    controller.queue_action(Clear{});
    controller.queue_action(ShowText{{"asdf"}});

    while (true) {
        controller.process_action();
    }
}