Sparkplug B C++ Library 1.0.0
Modern C++-23 implementation of Eclipse Sparkplug B 2.2 specification
Loading...
Searching...
No Matches
edge_node.hpp
1#pragma once
2
3#include "detail/compat.hpp"
4#include "logging.hpp"
5#include "mqtt_handle.hpp"
6#include "payload_builder.hpp"
7#include "sparkplug_b.pb.h"
8#include "topic.hpp"
9
10#include <functional>
11#include <memory>
12#include <mutex>
13#include <optional>
14#include <span>
15#include <string>
16#include <unordered_map>
17#include <vector>
18
19#include <MQTTAsync.h>
20
21namespace sparkplug {
22
29using CommandCallback =
30 std::function<void(const Topic&, const org::eclipse::tahu::protobuf::Payload&)>;
31
96class EdgeNode {
97public:
101 struct TlsOptions {
102 std::string trust_store;
103 std::string key_store;
104 std::string private_key;
106 std::string
109 };
110
114 struct Config {
115 std::string broker_url;
117 std::string client_id;
118 std::string group_id;
119 std::string edge_node_id;
120 int data_qos = 0;
122 int death_qos = 1;
123 bool clean_session = true;
125 60;
126 std::optional<TlsOptions>
127 tls{};
128 std::optional<std::string>
130 std::optional<std::string>
132 std::optional<CommandCallback> command_callback{};
133 std::optional<std::string> primary_host_id{};
134 std::optional<LogCallback> log_callback{};
135 };
136
146
151
152 EdgeNode(const EdgeNode&) = delete;
153 EdgeNode& operator=(const EdgeNode&) = delete;
154 EdgeNode(EdgeNode&&) noexcept;
155 EdgeNode& operator=(EdgeNode&&) noexcept;
156
165 void set_credentials(std::optional<std::string> username,
166 std::optional<std::string> password);
167
176 void set_tls(std::optional<TlsOptions> tls);
177
178 void set_log_callback(std::optional<LogCallback> callback);
179
192 [[nodiscard]] stdx::expected<void, std::string> connect();
193
203 [[nodiscard]] stdx::expected<void, std::string> disconnect();
204
225 [[nodiscard]] stdx::expected<void, std::string> publish_birth(PayloadBuilder& payload);
226
248 [[nodiscard]] stdx::expected<void, std::string> publish_data(PayloadBuilder& payload);
249
260 [[nodiscard]] stdx::expected<void, std::string> publish_death();
261
277 [[nodiscard]] stdx::expected<void, std::string> rebirth();
278
286 [[nodiscard]] uint64_t get_seq() const {
287 std::scoped_lock lock(mutex_);
288 return seq_num_;
289 }
290
298 [[nodiscard]] uint64_t get_bd_seq() const {
299 std::scoped_lock lock(mutex_);
300 return bd_seq_num_;
301 }
302
312 [[nodiscard]] bool is_primary_host_online() const {
313 std::scoped_lock lock(mutex_);
314 return primary_host_online_;
315 }
316
335 [[nodiscard]] stdx::expected<void, std::string>
336 publish_device_birth(std::string_view device_id, PayloadBuilder& payload);
337
357 [[nodiscard]] stdx::expected<void, std::string>
358 publish_device_data(std::string_view device_id, PayloadBuilder& payload);
359
371 [[nodiscard]] stdx::expected<void, std::string>
372 publish_device_death(std::string_view device_id);
373
399 [[nodiscard]] stdx::expected<void, std::string>
400 publish_node_command(std::string_view target_edge_node_id, PayloadBuilder& payload);
401
420 [[nodiscard]] stdx::expected<void, std::string>
421 publish_device_command(std::string_view target_edge_node_id,
422 std::string_view target_device_id,
423 PayloadBuilder& payload);
424
425 void log(LogLevel level, std::string_view message) const noexcept;
426
427private:
431 struct DeviceState {
432 std::vector<uint8_t> last_birth_payload; // Last DBIRTH for rebirth
433 bool is_online{false}; // True if DBIRTH sent and device online
434 };
435
436 Config config_;
437 MQTTAsyncHandle client_;
438 uint64_t seq_num_{0}; // Node message sequence (0-255)
439 uint64_t bd_seq_num_{0}; // Birth/Death sequence
440
441 // Store the NDEATH payload for the MQTT Will
442 std::vector<uint8_t> death_payload_data_;
443 std::string death_topic_str_; // Topic string for MQTT Will (must outlive async connect)
444 MQTTAsync_willOptions will_opts_; // Will options struct (must outlive async connect)
445 MQTTAsync_SSLOptions ssl_opts_{};
446
447 // Store last NBIRTH for rebirth command
448 std::vector<uint8_t> last_birth_payload_;
449
450 // Hash and equality functors that support heterogeneous lookup (string_view)
451 struct StringHash {
452 using is_transparent = void;
453 [[nodiscard]] size_t operator()(std::string_view sv) const noexcept {
454 return std::hash<std::string_view>{}(sv);
455 }
456 };
457
458 struct StringEqual {
459 using is_transparent = void;
460 [[nodiscard]] bool operator()(std::string_view lhs,
461 std::string_view rhs) const noexcept {
462 return lhs == rhs;
463 }
464 };
465
466 // Track state of attached devices (device_id -> state, with heterogeneous lookup)
467 std::unordered_map<std::string, DeviceState, StringHash, StringEqual> device_states_;
468
469 bool is_connected_{false};
470 bool primary_host_online_{
471 false}; // True if primary host is online (or no primary host configured)
472
473 // Mutex for thread-safe access to all mutable state
474 mutable std::mutex mutex_;
475
476 [[nodiscard]] static stdx::expected<void, std::string>
477 publish_message(MQTTAsync client,
478 const std::string& topic_str,
479 std::span<const uint8_t> payload_data,
480 int qos,
481 bool retain);
482
483 // Static MQTT callback for message arrived (NCMD)
484 static int on_message_arrived(void* context,
485 char* topicName,
486 int topicLen,
487 MQTTAsync_message* message);
488
489 // Static MQTT callback for connection lost
490 static void on_connection_lost(void* context, char* cause);
491};
492
493} // namespace sparkplug
Sparkplug B Edge Node implementing the complete message lifecycle.
Definition edge_node.hpp:96
stdx::expected< void, std::string > connect()
Connects to the MQTT broker and establishes a Sparkplug B session.
~EdgeNode()
Destroys the EdgeNode and cleans up MQTT resources.
void set_credentials(std::optional< std::string > username, std::optional< std::string > password)
Sets MQTT username and password for authentication.
void set_tls(std::optional< TlsOptions > tls)
Configures TLS/SSL options for secure MQTT connections.
stdx::expected< void, std::string > publish_node_command(std::string_view target_edge_node_id, PayloadBuilder &payload)
Publishes an NCMD (Node Command) message to another edge node.
stdx::expected< void, std::string > publish_death()
Publishes an NDEATH (Node Death) message.
stdx::expected< void, std::string > publish_device_death(std::string_view device_id)
Publishes a DDEATH (Device Death) message.
bool is_primary_host_online() const
Checks if the primary host application is online.
stdx::expected< void, std::string > publish_data(PayloadBuilder &payload)
Publishes an NDATA (Node Data) message.
uint64_t get_seq() const
Gets the current message sequence number.
stdx::expected< void, std::string > publish_device_data(std::string_view device_id, PayloadBuilder &payload)
Publishes a DDATA (Device Data) message.
stdx::expected< void, std::string > disconnect()
Gracefully disconnects from the MQTT broker.
stdx::expected< void, std::string > rebirth()
Triggers a rebirth by publishing a new NBIRTH with incremented bdSeq.
stdx::expected< void, std::string > publish_device_command(std::string_view target_edge_node_id, std::string_view target_device_id, PayloadBuilder &payload)
Publishes a DCMD (Device Command) message to a device on another edge node.
uint64_t get_bd_seq() const
Gets the current birth/death sequence number.
stdx::expected< void, std::string > publish_birth(PayloadBuilder &payload)
Publishes an NBIRTH (Node Birth) message.
EdgeNode(Config config)
Constructs an EdgeNode with the given configuration.
stdx::expected< void, std::string > publish_device_birth(std::string_view device_id, PayloadBuilder &payload)
Publishes a DBIRTH (Device Birth) message.
Type-safe builder for Sparkplug B payloads with automatic type detection.
Configuration parameters for the Sparkplug B Edge Node.
std::string client_id
Unique MQTT client identifier.
int keep_alive_interval
MQTT keep-alive interval in seconds (Sparkplug recommends 60)
bool clean_session
MQTT clean session flag.
int death_qos
MQTT QoS for NDEATH Will Message. Sparkplug requires 1.
std::string group_id
Sparkplug group ID (topic namespace)
std::optional< TlsOptions > tls
TLS/SSL options (required if broker_url uses ssl://)
int data_qos
MQTT QoS for data messages (NBIRTH/NDATA/DBIRTH/DDATA).
std::optional< std::string > password
MQTT password for authentication (optional)
std::optional< std::string > username
MQTT username for authentication (optional)
std::string broker_url
MQTT broker URL (e.g., "tcp://localhost:1883" or "ssl://localhost:8883")
std::string edge_node_id
Edge node identifier within the group.
TLS/SSL configuration options for secure MQTT connections.
std::string private_key_password
Password for encrypted private key (optional)
std::string private_key
Path to client private key file (PEM format, optional)
std::string trust_store
Path to CA certificate file (PEM format)
std::string key_store
Path to client certificate file (PEM format, optional)
bool enable_server_cert_auth
Verify server certificate (default: true)
std::string enabled_cipher_suites
Colon-separated list of cipher suites (optional)