newosp GitHub: DeguiLiu/newosp

本文基于 newosp v0.2.0,1114 test cases (26085 assertions),ASan/TSan/UBSan 全部通过。

相关文章:

CSDN 原文: newosp 深度解析: C++17 事件驱动架构

1. 问题: 传感器数据处理的三重困境

工业传感器系统 (激光雷达、深度相机、工业视觉) 面临三个相互矛盾的工程需求:

确定性: 1kHz+ 采样率要求热路径微秒级延迟,一次 mallocmutex slow path 就可能导致帧丢失。

安全性: 传感器有复杂的硬件状态 (初始化、采集中、校准、异常恢复)。if-else 嵌套到第三层就无法维护,需要可建模、可测试的状态管理。

可组合性: 数据从 DMA 采集到最终输出要经过 5-6 个处理阶段,每个阶段可能由不同开发者负责。阶段之间需要解耦,同时不能引入共享状态和锁竞争。

传统方案的选择:

方案缺陷
shared_ptr<Frame> + mutex + deque堆分配 control block + futex slow path,热路径不确定
回调链 + std::functionSBO 溢出时堆分配,回调嵌套难以测试
手写 enum + switch 状态机状态数增长时 O(n^2) 转换,无法继承默认行为

newosp 是面向 ARM-Linux 嵌入式平台的 C++17 header-only 基础设施库 (GitHub),41 个模块覆盖从消息总线到状态机的全栈需求。本文以激光雷达点云处理流水线为主线,展示这些模块如何协同工作。

2. 流水线全景: 从 DMA 到输出

2.1 架构

DMA/ISR ──> Acquisition ──> Preprocess ──> Compute ──> Filter ──> Package
            (采集)         (预处理)      (距离计算)  (滤波)    (封装)
            HSM 管理       去串扰/鬼影    深度转换    噪声消除   帧聚合

每个阶段是一个 StaticNode (编译期绑定 Handler),阶段间通过共享的 AsyncBus 通信。数据以 std::variant 值语义在 Bus 的环形缓冲中传递,全程零堆分配

2.2 消息类型

所有消息必须 trivially_copyable (SPSC/ShmRingBuffer memcpy 安全):

struct RawFrame {
    uint32_t seq;
    uint32_t timestamp_us;
    uint16_t adc_values[256];
    uint16_t point_count;
    uint8_t  channel_id;
    uint8_t  padding[1];
};

struct ComputedFrame {
    uint32_t seq;
    uint32_t timestamp_us;
    float    distances[256];
    float    intensities[256];
    uint16_t valid_count;
    uint8_t  channel_id;
    uint8_t  padding[1];
};

// 中间阶段: PreprocessedFrame, FilteredFrame, PackagedResult (结构类似)
// 控制消息
struct PipelineControl {
    enum class Action : uint8_t { kStart = 0, kStop, kReset, kCalibrate };
    Action action;
    uint8_t channel_id;
};

// 流水线 Payload
using SensorPayload = std::variant<
    RawFrame, PreprocessedFrame, ComputedFrame,
    FilteredFrame, PackagedResult, PipelineControl>;

// 编译期保证
static_assert(std::is_trivially_copyable_v<RawFrame>);
static_assert(std::is_trivially_copyable_v<ComputedFrame>);

为什么用 variant 值语义而不是 shared_ptr: shared_ptr<Frame> 的 control block 需要堆分配,引用计数的原子操作在 ARM 上也有可观开销。variant 将帧数据 (固定大小 POD) 直接嵌入 Bus 的 Envelope 中,一次 memcpy 完成发布,sizeof 编译期确定。

2.3 流水线装配

auto& bus = osp::AsyncBus<SensorPayload>::Instance();

// 每个阶段 = StaticNode<Payload, Handler>
osp::StaticNode<SensorPayload, AcquisitionHandler>  acq_node("acquisition", 1, acq_handler);
osp::StaticNode<SensorPayload, PreprocessHandler>   prep_node("preprocess", 2, prep_handler);
osp::StaticNode<SensorPayload, ComputeHandler>      comp_node("compute", 3, comp_handler);
osp::StaticNode<SensorPayload, FilterHandler>       filt_node("filter", 4, filt_handler);
osp::StaticNode<SensorPayload, PackageHandler>      pack_node("package", 5, pack_handler);

// 单线程顺序调度 (延迟最确定)
while (!shutdown.IsShutdown()) {
    acq_node.SpinOnce();    // DMA/ISR → RawFrame
    prep_node.SpinOnce();   // RawFrame → PreprocessedFrame
    comp_node.SpinOnce();   // PreprocessedFrame → ComputedFrame
    filt_node.SpinOnce();   // ComputedFrame → FilteredFrame
    pack_node.SpinOnce();   // FilteredFrame → PackagedResult
}

也可以按计算密度分配到不同 CPU 核心:

// I/O 密集阶段 → CPU 0,计算密集阶段 → CPU 1
std::thread io_thread([&]() {
    while (!shutdown.IsShutdown()) { acq_node.SpinOnce(); prep_node.SpinOnce(); }
});
std::thread compute_thread([&]() {
    while (!shutdown.IsShutdown()) {
        comp_node.SpinOnce(); filt_node.SpinOnce(); pack_node.SpinOnce();
    }
});

2.4 Handler 示例: 预处理阶段

每个 Handler 是一个实现了 operator() 重载的 struct,编译器通过 std::visit 生成直接跳转表 (零间接调用):

struct PreprocessHandler {
    osp::AsyncBus<SensorPayload>* bus;

    void operator()(const RawFrame& raw, const osp::MessageHeader&) {
        PreprocessedFrame pf{};
        pf.seq = raw.seq;
        pf.timestamp_us = raw.timestamp_us;

        // 去串扰 + 去鬼影 (具体算法省略)
        pf.valid_count = remove_crosstalk_and_ghost(
            raw.adc_values, raw.point_count, pf.cleaned_values);

        bus->Publish(SensorPayload(pf), /*sender_id=*/2);
    }

    template <typename T>
    void operator()(const T&, const osp::MessageHeader&) {}  // 其他类型忽略
};

2.5 端到端延迟追踪

每帧携带 seq + timestamp_us,在最终阶段计算流水线延迟:

void operator()(const FilteredFrame& ff, const osp::MessageHeader&) {
    uint32_t latency_us = osp::SteadyNowUs() - ff.timestamp_us;
    if (latency_us > 1000) {  // 超过 1ms 告警
        OSP_LOG_WARN("Pipeline", "high latency: seq=%u %u us", ff.seq, latency_us);
    }
    // ... 正常封装处理
}

3. AsyncBus: 支撑流水线的无锁消息总线

流水线中每个 StaticNode 调用 bus->Publish() 发布消息,调用 SpinOnce() 消费消息。这两个操作由 AsyncBus 的 CAS 无锁 MPSC 环形缓冲支撑。

3.1 核心数据结构

template <typename PayloadVariant,
          uint32_t QueueDepth = 4096,    // 必须是 2 的幂
          uint32_t BatchSize = 256>
class AsyncBus;

消息以信封 (header + variant payload) 存储在缓存行对齐的环形缓冲中:

struct MessageHeader {
    uint64_t msg_id;           // 全局递增 ID
    uint64_t timestamp_us;     // 微秒时间戳
    uint32_t sender_id;        // 发送者节点 ID
    uint32_t topic_hash;       // FNV-1a 32-bit hash
    MessagePriority priority;  // kLow / kMedium / kHigh
};

struct alignas(64) RingBufferNode {
    std::atomic<uint32_t> sequence;  // 序号控制 (CAS 的核心)
    MessageEnvelope<PayloadVariant> envelope;
};

3.2 CAS 发布: 多生产者无锁竞争

多个 StaticNode (可能在不同线程) 同时发布消息,通过 CAS 循环竞争环形缓冲的写入位置:

Producer 0 ─┐
Producer 1 ──┼── CAS Publish ──> Ring Buffer ──> ProcessBatch() ──> 类型分发
Producer 2 ─┘   (无锁竞争)      (sequence-based)   (批量消费)      (variant visit)

关键内存序: 生产者用 acq_rel CAS 抢占位置,用 release store 发布数据; 消费者用 acquire load 读取数据。完整实现见 bus.hpp

3.3 优先级准入控制

传感器流水线中,控制命令 (启停、校准) 的优先级高于数据帧。当 Bus 队列压力增大时,AsyncBus 按优先级逐级丢弃:

队列深度
│  ████████████████████  100%
│  ██████████████████    99%  ← kHigh 阈值 (控制命令)
│  ██████████████        80%  ← kMedium 阈值 (常规数据)
│  ██████████            60%  ← kLow 阈值 (遥测/统计)

生产者先检查本地缓存的消费位置 (relaxed 读,无 cache line 竞争),只有接近阈值时才重新读取真正的消费位置 (acquire 读)。低负载时完全避免对消费者原子变量的争用。

3.4 编译期分发: StaticNode vs Node

newosp 提供两种消费模式:

模式机制ns/msg (P50)
StaticNode (编译期)std::visit 跳转表~2
Node (运行时)FixedFunction 回调表~30

StaticNode 的 Handler 在编译期确定,编译器为每个 variant alternative 生成直接调用,无 FixedFunction 间接调用、无 SharedSpinLock、无回调表遍历。流水线的 5 个处理阶段全部使用 StaticNode。

FixedFunction: 需要运行时动态订阅的场景使用 Node 模式,回调存储在 FixedFunction<void(const Envelope&), 32> 中 – 32 字节 SBO 缓冲,编译期 static_assert 拒绝超限 lambda,杜绝 std::function 的隐式堆分配

4. HSM: 采集阶段的状态管理

流水线的 Acquisition 阶段需要管理传感器硬件状态。平面 FSM 在这里不够用: 设备的 Idle/Acquiring/Validating 三个子状态都需要响应 “Deactivate” 事件 (关闭设备),平面 FSM 需要为每个子状态都写一条相同的转换。

4.1 层次状态机: 继承与覆盖

newosp 的 StateMachine<Context, MaxStates> 通过状态嵌套解决这一问题:

Acquisition HSM (6 个状态)
├── Inactive         ← 设备未启动
├── Active (父状态)  ← 处理 Deactivate (所有子状态继承)
│   ├── Idle         ← 等待 DMA 完成信号
│   ├── Acquiring    ← 正在接收 DMA 数据
│   └── Validating   ← 校验帧完整性
└── Error            ← 硬件异常 (DMA 超时/CRC 错误)

子状态未处理的事件自动冒泡到父状态。Active 父状态只需定义一次 Deactivate 处理:

// Active 父状态: 一次定义,三个子状态 (Idle/Acquiring/Validating) 共享
inline osp::TransitionResult HandleActive(AcqContext& ctx, osp::Event& e) {
    if (e.id == kEvtDeactivate) {
        return ctx.sm->RequestTransition(ctx.idx_inactive);
    }
    return osp::TransitionResult::kUnhandled;  // 继续冒泡
}

4.2 LCA 转换算法

状态转换的核心是最近公共祖先 (LCA) 计算: 从源状态到 LCA 依次执行 Exit 动作,再从 LCA 到目标状态依次执行 Entry 动作。newosp 使用深度归一化实现:

  1. 计算源和目标的深度
  2. 将较深的状态上移到同一层
  3. 同步上移直到找到公共祖先
  4. 执行 Exit 路径 (source → LCA) 和 Entry 路径 (LCA → target)

Exit/Entry 路径使用栈上固定数组 (int32_t path[32]),整个转换过程零堆分配。完整实现见 hsm.hpp

4.3 Guard 条件

newosp 在状态配置中内置 Guard 函数指针。事件派发时,先检查 Guard 条件,返回 false 则跳过该状态直接冒泡:

struct StateConfig {
    const char* name;
    int32_t parent_index;                         // -1 = 根
    TransitionResult (*handler)(Context&, Event&);
    void (*on_entry)(Context&);
    void (*on_exit)(Context&);
    bool (*guard)(const Context&, Event&);        // Guard 条件
};

这使得"仅在特定条件下才处理事件"可以声明式表达,而非在 handler 内部硬编码 if 判断。

4.4 采集 HSM 与 Handler 的集成

AcquisitionHandler 将 Bus 消息转化为 HSM 事件:

struct AcquisitionHandler {
    AcqContext* ctx;

    void operator()(const PipelineControl& ctrl, const osp::MessageHeader&) {
        osp::Event e{ctrl.action == PipelineControl::Action::kStart
                         ? kEvtActivate : kEvtDeactivate};
        ctx->sm->Dispatch(e);
    }

    template <typename T>
    void operator()(const T&, const osp::MessageHeader&) {}
};

HSM 内部的状态处理函数在适当时机通过 bus->Publish() 发布 RawFrame,驱动下游流水线。消息总线和状态机各司其职: Bus 负责阶段间解耦通信,HSM 负责阶段内状态管理。

4.5 零堆分配保证

组件存储方式
状态配置std::array<StateConfig, MaxStates>
转换路径栈上 int32_t path[32]
事件值语义 Event (栈分配)
Handler函数指针 (非 std::function)
内存开销~500B (16 状态)

5. SPSC 环形缓冲: 模块间的零拷贝通道

newosp 的 SpscRingbuffer 是一个通用的、类型化的 wait-free SPSC 组件,在多个模块中复用:

集成点元素类型容量
WorkerPool 工作线程MessageEnvelope<PayloadVariant>1024
串口字节缓冲uint8_t4096
网络帧缓冲RecvFrameSlot32
统计通道ShmStats (48B)16

5.1 编译期双路径

template <typename T, size_t BufferSize, bool FakeTSO = false>
class SpscRingbuffer {
    bool PushBatch(const T* buf, size_t count) {
        if constexpr (std::is_trivially_copyable_v<T>) {
            // POD: memcpy 批量拷贝 (处理 wrap-around 分两段)
            std::memcpy(&data_buff_[head_offset], buf, first_part * sizeof(T));
        } else {
            // 非 POD: 逐元素 move
            for (size_t i = 0; i < count; ++i) {
                data_buff_[(head + i) & kMask] = std::move(buf[i]);
            }
        }
    }
};

if constexpr 在编译期选择 memcpy (POD) 或逐元素 move (非 POD) 路径。传感器流水线中的所有消息类型都是 trivially_copyable,走 memcpy 快路径。

5.2 FakeTSO: 单核 MCU 优化

static constexpr std::memory_order AcquireOrder() {
    return FakeTSO ? std::memory_order_relaxed : std::memory_order_acquire;
}
FakeTSOAcquire/Release适用场景
falseacquire / release多核 ARM-Linux
truerelaxed / relaxed单核 MCU (RT-Thread)

单核 MCU 上,ISR 和线程之间的内存可见性由 ISB 隐式保证,将 acquire/release 降级为 relaxed 节省 ARM DMB 指令开销。

5.3 缓存行对齐

struct alignas(64) PaddedIndex {
    std::atomic<size_t> value{0};
};
PaddedIndex head_;  // 生产者独占
// --- 64 字节边界 ---
PaddedIndex tail_;  // 消费者独占

生产者和消费者的索引分布在不同缓存行,消除 false sharing。

5.4 延迟计算

template <typename Callable>
bool PushFromCallback(Callable&& callback) {
    if (IsFull()) return false;  // 队列满则跳过
    data_buff_[head & kMask] = callback();  // 延迟计算
    head_.value.store(head + 1, ReleaseOrder());
    return true;
}

传感器采集场景: 如果下游消费者跟不上,PushFromCallback 直接跳过 ADC 读取,避免无意义的硬件操作。

完整 SPSC 实现分析见 SPSC 无锁环形缓冲区设计剖析

6. 生命周期与调度

6.1 LifecycleNode: 16 状态 HSM 驱动

newosp 的 LifecycleNode 内置了一个 16 状态的层次状态机,将节点生命周期本身建模为 HSM:

Alive (根状态)
├── Unconfigured (Initializing / WaitingConfig)
├── Configured
│   ├── Inactive (Standby / Paused)
│   └── Active (Starting / Running / Degraded)
├── Error (Recoverable / Fatal)
└── Finalized (终态)

LifecycleNode 继承自 Node,支持粗粒度 (4 状态) 和细粒度 (16 状态) 双层视图,内置 FaultReporter 注入点。HSM 实例使用 placement new 在预分配的对齐存储中构造,零堆分配。

6.2 Executor 调度家族

流水线的调度策略由 Executor 决定。newosp 提供四种执行模型:

Executor特点适用场景
SingleThread阻塞调用线程调试、单核
Static后台线程 + 休眠策略通用场景
PinnedCPU 绑核确定性调度
RealtimeSCHED_FIFO + mlockall + CPU affinity工业实时

RealtimeExecutor 初始化序列: mlockall() → CPU affinity → SCHED_FIFO → 自定义栈 → 进入 ProcessBatch 循环。高精度休眠使用 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME) 实现亚毫秒级精度。

详细的线程模型、WorkerPool 二级排队、IoPoller I/O 集成见 newosp 嵌入式并发与 I/O 架构

7. 跨模块集成

7.1 数据流全景

传感器线程 ─┐
控制线程  ──┼── AsyncBus::Publish() ─── [CAS MPSC Ring Buffer] ───┐
网络线程  ─┘   (无锁竞争)              (4096 slots, 缓存行对齐)      │
                                                                     │
                              ┌──── ProcessBatch() 单消费者 ─────────┘
                              │
                              ▼
                    Node 类型路由 (FNV-1a topic hash)
                              │
                    ┌─────────┼─────────┐
                    │         │         │
                    ▼         ▼         ▼
              StaticNode   Node     WorkerPool
              (零开销)   (动态订阅)  (SPSC 分发)
                    │         │         │
                    ▼         ▼         ▼
                HSM/BT    回调处理    并行计算

7.2 可靠性闭环

newosp 的 Watchdog 和 FaultCollector 职责正交:

  • Watchdog: 检测问题 (线程心跳超时)
  • FaultCollector: 处理问题 (分级上报 + hook 决策: kHandled / kEscalate / kDefer / kShutdown)

两者通过应用层 wiring 组合,没有内部依赖。流水线的 Acquisition HSM 通过 FaultReporter 向 FaultCollector 上报硬件异常 (DMA 超时、CRC 错误)。

7.3 模块选择指南

场景推荐方案
简单传感器数据流Node + AsyncBus + SingleThreadExecutor
高吞吐多生产者StaticNode + AsyncBus<PV, 4096, 256> + PinnedExecutor
多级流水线处理多个 StaticNode + 共享 Bus + 顺序 SpinOnce
复杂状态管理StateMachine (手动) 或 LifecycleNode (标准生命周期)
实时控制RealtimeExecutor (SCHED_FIFO) + PreciseSleepStrategy
跨进程 IPCShmChannel + ShmRingBuffer (零拷贝共享内存)

7.4 迁移路径: Linux → RT-Thread

newosp 核心模块保持 POSIX API 边界清晰:

模块Linux 依赖RT-Thread 适配
SPSC无 (纯 C++ atomic)FakeTSO = true
HSM无 (纯 C++)直接使用
AsyncBus无 (CAS 原子操作)直接使用
Executorpthread, SCHED_FIFO映射到 RT-Thread 线程 API
ShmTransportshm_open, mmap不适用 (单进程)

8. 设计原则总结

原则实现手段
零全局状态Bus 依赖注入,非全局单例
栈优先,零堆分配FixedFunction SBO / FixedVector / FixedString / variant 值语义
无锁/最小锁CAS MPSC + wait-free SPSC + SharedSpinLock
编译期分发模板参数化 + if constexpr + std::visit 跳转表
类型安全std::variant + expected<V,E> + static_assert trivially_copyable
-fno-exceptions -fno-rtti全代码库兼容,Validate() 返回 bool 而非抛异常

为什么不用对象池。避免热路径 malloc 的另一种常见方案是对象池 (mutex + queue<shared_ptr>)。对象池确实比裸分配快约 60%,但在 newosp 的场景中仍有三个代价: mutex 在每次 acquire/release 上引入 futex 开销 (~20-40ns),shared_ptr 原子引用计数在单向数据流中完全多余,std::queue 动态增长破坏编译期内存预算。newosp 用预分配环形缓冲 (slot 覆写,零 malloc) + variant 值语义 (无引用计数) + CAS/wait-free (无 mutex) 彻底消除了这三项开销。对象池适合生命周期不确定的共享对象 (连接池、线程池),不适合单向流动的消息通道。详见 SPSC 无锁环形缓冲区设计剖析对象池在嵌入式热路径上的三个隐性成本

这些原则贯穿整条传感器流水线: variant 保证消息类型安全,CAS 保证并发发布安全,HSM 保证状态转换安全,static_assert 保证跨平台 sizeof 一致。每个"安全"都是编译期或硬件级保证,不是运行时约定。

参考资料

  1. newosp GitHub 仓库 – C++17 header-only 嵌入式基础设施库
  2. ROS2 Lifecycle Node Design
  3. 事件驱动架构的嵌入式激光雷达点云数据处理
  4. Miro Samek, Practical UML Statecharts in C/C++, 2nd Edition