Sparkplug B C++ Library 1.0.0
Modern C++-23 implementation of Eclipse Sparkplug B 2.2 specification
Loading...
Searching...
No Matches
host_application.hpp
1#pragma once
2
3#include "detail/compat.hpp"
4#include "logging.hpp"
5#include "mqtt_handle.hpp"
6#include "payload_builder.hpp"
7#include "sparkplug_b.pb.h"
8#include "topic.hpp"
9
10#include <functional>
11#include <memory>
12#include <mutex>
13#include <optional>
14#include <span>
15#include <string>
16#include <unordered_map>
17
18#include <MQTTAsync.h>
19
20namespace sparkplug {
21
29using MessageCallback =
30 std::function<void(const Topic&, const org::eclipse::tahu::protobuf::Payload&)>;
31
96public:
100 struct TlsOptions {
101 std::string trust_store;
102 std::string key_store;
103 std::string private_key;
105 std::string
108 };
109
113 struct DeviceState {
114 bool is_online{false};
115 uint64_t last_seq{255};
116 bool birth_received{false};
117 uint64_t offline_timestamp{0};
118 bool metrics_stale{false};
119 std::unordered_map<uint64_t, std::string>
121 };
122
127 using is_transparent = void;
128 using hash_type = std::hash<std::string_view>;
129 [[nodiscard]] size_t operator()(std::string_view str) const noexcept {
130 return hash_type{}(str);
131 }
132 [[nodiscard]] size_t operator()(const std::string& str) const noexcept {
133 return hash_type{}(str);
134 }
135 };
136
140 struct NodeState {
141 bool is_online{false};
142 uint64_t last_seq{255};
143 uint64_t bd_seq{0};
144 uint64_t birth_timestamp{0};
145 bool birth_received{false};
146 std::unordered_map<std::string, DeviceState, TransparentStringHash, std::equal_to<>>
148 std::unordered_map<uint64_t, std::string>
150 };
151
155 struct Config {
156 std::string broker_url;
158 std::string client_id;
159 std::string host_id;
160 int qos = 1;
162 true;
164 int max_inflight = 100;
167 true;
168 std::optional<TlsOptions>
169 tls{};
170 std::optional<std::string>
172 std::optional<std::string>
174 MessageCallback message_callback{};
175 LogCallback log_callback{};
176 };
177
187
192
193 HostApplication(const HostApplication&) = delete;
194 HostApplication& operator=(const HostApplication&) = delete;
196 HostApplication& operator=(HostApplication&&) noexcept;
197
206 void set_credentials(std::optional<std::string> username,
207 std::optional<std::string> password);
208
217 void set_tls(std::optional<TlsOptions> tls);
218
226 void set_message_callback(MessageCallback callback);
227
235 void set_log_callback(LogCallback callback);
236
248 [[nodiscard]] stdx::expected<void, std::string> connect();
249
258 [[nodiscard]] stdx::expected<void, std::string> disconnect();
259
273 [[nodiscard]] stdx::expected<void, std::string> subscribe_all_groups();
274
286 [[nodiscard]] stdx::expected<void, std::string>
287 subscribe_group(std::string_view group_id);
288
301 [[nodiscard]] stdx::expected<void, std::string>
302 subscribe_node(std::string_view group_id, std::string_view edge_node_id);
303
317 [[nodiscard]] stdx::expected<void, std::string>
318 subscribe_state(std::string_view host_id);
319
330 [[nodiscard]] std::optional<std::reference_wrapper<const NodeState>>
331 get_node_state(std::string_view group_id, std::string_view edge_node_id) const;
332
349 [[nodiscard]] std::optional<std::string_view>
350 get_metric_name(std::string_view group_id,
351 std::string_view edge_node_id,
352 std::string_view device_id,
353 uint64_t alias) const;
354
381 [[nodiscard]] stdx::expected<void, std::string> publish_state_birth(uint64_t timestamp);
382
410 [[nodiscard]] stdx::expected<void, std::string> publish_state_death(uint64_t timestamp);
411
438 [[nodiscard]] stdx::expected<void, std::string>
439 publish_node_command(std::string_view group_id,
440 std::string_view target_edge_node_id,
441 PayloadBuilder& payload);
442
463 [[nodiscard]] stdx::expected<void, std::string>
464 publish_device_command(std::string_view group_id,
465 std::string_view target_edge_node_id,
466 std::string_view target_device_id,
467 PayloadBuilder& payload);
468
475 void log(LogLevel level, std::string_view message) const noexcept;
476
477private:
478 Config config_;
479 MQTTAsyncHandle client_;
480 bool is_connected_{false};
481
482 // MQTT connection options that must outlive async operations
483 MQTTAsync_SSLOptions ssl_opts_{};
484
485 // Node state tracking
486 struct NodeKey {
487 std::string group_id;
488 std::string edge_node_id;
489
490 [[nodiscard]] bool operator==(const NodeKey& other) const noexcept {
491 return group_id == other.group_id && edge_node_id == other.edge_node_id;
492 }
493 };
494
495 struct NodeKeyHash {
496 using is_transparent = void;
497 [[nodiscard]] size_t operator()(const NodeKey& key) const noexcept {
498 size_t h1 = std::hash<std::string>{}(key.group_id);
499 size_t h2 = std::hash<std::string>{}(key.edge_node_id);
500 return h1 ^ (h2 << 1);
501 }
502 [[nodiscard]] size_t
503 operator()(std::pair<std::string_view, std::string_view> key) const noexcept {
504 size_t h1 = std::hash<std::string_view>{}(key.first);
505 size_t h2 = std::hash<std::string_view>{}(key.second);
506 return h1 ^ (h2 << 1);
507 }
508 };
509
510 struct NodeKeyEqual {
511 using is_transparent = void;
512 [[nodiscard]] bool operator()(const NodeKey& lhs, const NodeKey& rhs) const noexcept {
513 return lhs == rhs;
514 }
515 [[nodiscard]] bool
516 operator()(const NodeKey& lhs,
517 std::pair<std::string_view, std::string_view> rhs) const noexcept {
518 return lhs.group_id == rhs.first && lhs.edge_node_id == rhs.second;
519 }
520 [[nodiscard]] bool operator()(std::pair<std::string_view, std::string_view> lhs,
521 const NodeKey& rhs) const noexcept {
522 return lhs.first == rhs.group_id && lhs.second == rhs.edge_node_id;
523 }
524 };
525
526 std::unordered_map<NodeKey, NodeState, NodeKeyHash, NodeKeyEqual> node_states_;
527
528 // Mutex for thread-safe access to all mutable state
529 mutable std::mutex mutex_;
530
531 [[nodiscard]] stdx::expected<void, std::string>
532 publish_raw_message(std::string_view topic,
533 std::span<const uint8_t> payload_data,
534 int qos,
535 bool retain);
536
537 [[nodiscard]] stdx::expected<void, std::string>
538 publish_command_message(std::string_view topic, std::span<const uint8_t> payload_data);
539
540 bool validate_message(const Topic& topic,
541 const org::eclipse::tahu::protobuf::Payload& payload);
542
543 // Static MQTT callback for message arrived
544 static int on_message_arrived(void* context,
545 char* topicName,
546 int topicLen,
547 MQTTAsync_message* message);
548
549 static void on_connection_lost(void* context, char* cause);
550};
551
552} // namespace sparkplug
Sparkplug B Host Application for SCADA/Primary Applications.
stdx::expected< void, std::string > subscribe_state(std::string_view host_id)
Subscribes to STATE messages from another primary application.
stdx::expected< void, std::string > publish_state_death(uint64_t timestamp)
Publishes a STATE death message to indicate Host Application is offline.
void log(LogLevel level, std::string_view message) const noexcept
Internal logging method accessible from C bindings.
void set_tls(std::optional< TlsOptions > tls)
Configures TLS/SSL options for secure MQTT connections.
void set_log_callback(LogCallback callback)
Sets the log callback for receiving library diagnostic messages.
stdx::expected< void, std::string > publish_device_command(std::string_view group_id, std::string_view target_edge_node_id, std::string_view target_device_id, PayloadBuilder &payload)
Publishes a DCMD (Device Command) message to a device on an Edge Node.
stdx::expected< void, std::string > subscribe_all_groups()
Subscribes to all Sparkplug B messages across all groups.
stdx::expected< void, std::string > publish_node_command(std::string_view group_id, std::string_view target_edge_node_id, PayloadBuilder &payload)
Publishes an NCMD (Node Command) message to an Edge Node.
stdx::expected< void, std::string > disconnect()
Gracefully disconnects from the MQTT broker.
HostApplication(Config config)
Constructs a HostApplication with the given configuration.
stdx::expected< void, std::string > subscribe_group(std::string_view group_id)
Subscribes to messages from a specific group.
void set_message_callback(MessageCallback callback)
Sets the message callback for receiving Sparkplug messages.
stdx::expected< void, std::string > publish_state_birth(uint64_t timestamp)
Publishes a STATE birth message to indicate Host Application is online.
stdx::expected< void, std::string > connect()
Connects to the MQTT broker.
std::optional< std::reference_wrapper< const NodeState > > get_node_state(std::string_view group_id, std::string_view edge_node_id) const
Gets the current state of a specific edge node.
void set_credentials(std::optional< std::string > username, std::optional< std::string > password)
Sets MQTT username and password for authentication.
~HostApplication()
Destroys the HostApplication and cleans up MQTT resources.
stdx::expected< void, std::string > subscribe_node(std::string_view group_id, std::string_view edge_node_id)
Subscribes to messages from a specific edge node in a group.
std::optional< std::string_view > get_metric_name(std::string_view group_id, std::string_view edge_node_id, std::string_view device_id, uint64_t alias) const
Resolves a metric alias to its name for a specific node or device.
Type-safe builder for Sparkplug B payloads with automatic type detection.
Configuration parameters for the Sparkplug B Host Application.
std::optional< TlsOptions > tls
TLS/SSL options (required if broker_url uses ssl://)
std::optional< std::string > username
MQTT username for authentication (optional)
std::string host_id
Host Application identifier (for STATE messages)
int max_inflight
Maximum number of QoS 1/2 messages allowed in-flight (default: 100, paho default: 10)
bool clean_session
MQTT clean session flag (should be true per Sparkplug spec)
LogCallback log_callback
Optional callback for library log messages.
std::optional< std::string > password
MQTT password for authentication (optional)
int qos
MQTT QoS for STATE messages and commands (default: 1)
MessageCallback message_callback
Callback for received Sparkplug messages.
std::string client_id
Unique MQTT client identifier.
int keep_alive_interval
MQTT keep-alive interval in seconds (default: 60)
bool validate_sequence
Enable sequence number validation (detects packet loss)
std::string broker_url
MQTT broker URL (e.g., "tcp://localhost:1883" or "ssl://localhost:8883")
Tracks the state of a device attached to an edge node.
bool is_online
True if DBIRTH received and device is online.
std::unordered_map< uint64_t, std::string > alias_map
Maps metric alias to name (from DBIRTH)
uint64_t offline_timestamp
Timestamp when device went offline (from DDEATH)
bool metrics_stale
True if metrics marked stale after DDEATH.
bool birth_received
True if DBIRTH has been received.
uint64_t last_seq
Last received device sequence number.
Tracks the state of an individual edge node.
std::unordered_map< std::string, DeviceState, TransparentStringHash, std::equal_to<> > devices
Attached devices (device_id -> state)
std::unordered_map< uint64_t, std::string > alias_map
Maps metric alias to name (from NBIRTH)
uint64_t last_seq
Last received node sequence number (starts at 255)
uint64_t bd_seq
Current birth/death sequence number.
bool is_online
True if NBIRTH received and node is online.
bool birth_received
True if NBIRTH has been received.
uint64_t birth_timestamp
Timestamp of last NBIRTH.
TLS/SSL configuration options for secure MQTT connections.
bool enable_server_cert_auth
Verify server certificate (default: true)
std::string private_key
Path to client private key file (PEM format, optional)
std::string key_store
Path to client certificate file (PEM format, optional)
std::string enabled_cipher_suites
Colon-separated list of cipher suites (optional)
std::string trust_store
Path to CA certificate file (PEM format)
std::string private_key_password
Password for encrypted private key (optional)
Transparent hash for string keys to enable heterogeneous lookup.