Statistics
| Branch: | Revision:

urtware / doc / classdiagrams / pubsub.uml @ 155b0443

History | View | Annotate | Download (12.817 KB)

1 4d55cea4 Thomas Schöpping
/'
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5
6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7
8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17
18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
'/
21
22
/'### INTRO ##################################################################'/
23
24
@startuml
25
26
title **µRtWare**\nPublish-Subscribe System
27
28 dd31cb03 Thomas Schöpping
!include ./functions.iuml
29 4d55cea4 Thomas Schöpping
30
/'### ENTITIES ###############################################################'/
31
32
!startsub ENTITIES
33
34
/' Message type. '/
35
$structure("urt_message_t") {
36
    'Pointer to the next message in a list.
37
  + {field} next : urt_message_t*
38
    'Pointer to some arbitrary (reusable) payload object.
39
  + {field} payload : void*
40
    'Origin time of the message.
41
  + {field} originTime : urt_osTime_t
42
    'Counter of HRT subscribers that did not consume the message yet.
43
  + {field} numHrtConsumersLeft : unsigned int
44
  .. URT_CFG_PUBSUB_PROFILING == true ..
45
    'Counter of overall subscribers that did not consume the message yet.
46
  + {field} numConsumersLeft : unsigned int
47
  __
48
    'Initializes a urt_message_t object.
49 dd31cb03 Thomas Schöpping
  + {method} urtMessageInit (message : urt_message_t*, payload : void*) : void
50 4d55cea4 Thomas Schöpping
}
51
52
$group("subscriber") {
53 ee83a495 Thomas Schöpping
  /' Base subscriber type. '/
54
  $structure("urt_basesubscriber_t") {
55 4d55cea4 Thomas Schöpping
      'Pointer to the topic, this subscriber subscribed to.
56
    + {field} topic : urt_topic_t*
57
      'Event listener to notify the node about new messages.
58
    + {field} evtListener : urt_osEventListener_t
59
      'Pointer to the message consumed most recently.
60
    + {field} lastMessage : urt_message_t*
61
      'Copy of the origin time of the message consumed most recently.
62
    + {field} lastMessageTime : urt_osTime_t
63
    .. URT_CFG_PUBSUB_PROFILING == true ..
64
      'Sum of all latencies.
65
    + {field} sumLatencies : uint64_t
66
      'Number of messages received.
67
    + {field} numMessagesReceived : uint64_t
68 ee83a495 Thomas Schöpping
  }
69
70
  /' Function type to be called when calculating the usefulness of a message. '/
71
  $function("urt_srtusefulnessfunc_t") {
72
    'Takes a delay and optionally parameters as arguments and returns a float in [0, 1].
73
    urt_srtusefulnessfunc_t (dt : urt_delay_t, params : void*) : float
74
  }
75
76
  /' NRT subscriber type. '/
77
  $structure("urt_nrtsubscriber_t") {
78
    .. URT_CFG_PUBSUB_PROFILING == true ..
79
      'Minimum latency ever detected.
80
    + {field} minLatency : urt_delay_t
81
      'Maximum latency ever detected.
82
    + {field} maxLatency : urt_delay_t
83
    __
84
      'Initializes a urt_nrtsubscriber_t object.
85 dd31cb03 Thomas Schöpping
    + {method} urtNrtSubscriberInit (subscriber : urt_nrtsubscriber_t*) : void
86 ee83a495 Thomas Schöpping
      'Tries to subscribe to a topic and optionally contributes a list of messages to the topic.
87
    + {method} urtNrtSubscriberSubscribe (subscriber : urt_nrtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*) : urt_status_t
88
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
89
    + {method} urtNrtSubscriberFetchNextMessage (subscriber : urt_nrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
90
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
91
    + {method} urtNrtSubscriberFetchLatestMessage (subscriber : urt_nrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
92
      'Unsubscribes from a topic.
93
    + {method} urtNrtSubscriberUnsubscribe (subscriber : urt_nrtsubscriber_t*) : urt_status_t
94
  }
95
96
  /' SRT subscriber type. '/
97
  $structure("urt_srtsubscriber_t") {
98
      'Callback to calculate usefulness of a message
99
    + {field} usefulnesscb : urt_srtusefulnessfunc_t*
100
      'Optional parameters for the usefulness callback function.
101
    + {field} cbparams : void*
102 056e40d2 Thomas Schöpping
    .. URT_CFG_PUBSUB_PROFILING == true ..
103 ee83a495 Thomas Schöpping
      'Minimum latency ever detected.
104
    + {field} minLatency : urt_delay_t
105
      'Maximum latency ever detected.
106
    + {field} maxLatency : urt_delay_t
107
    __
108
      'Initializes a urt_srtsubscriber_t object.
109 dd31cb03 Thomas Schöpping
    + {method} urtSrtSubscriberInit (subscriber : urt_srtsubscriber_t*) : void
110 ee83a495 Thomas Schöpping
      'Tries to subscribe to a topic, sets all parameters and optionally contributes a list of messages to the topic.
111
    + {method} urtSrtSubscriberSubscribe (subscriber : urt_srtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*, usefulnesscb : urt_srtusefulnessfunc_t*, cbparams : void*) : urt_status_t
112
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
113
    + {method} urtSrtSubscriberFetchNextMessage (subscriber : urt_srtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
114
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
115
    + {method} urtSrtSubscriberFetchLatestMessage (subscriber : urt_srtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
116
      'Calculates the usefulness of a message after the specified delay.
117
    + {method} urtSrtSubscriberCalculateUsefulness (subscriber : urt_srtsubscriber_t*, latency : urt_delay_t) : float
118
      'Unsubscribes from a topic.
119
    + {method} urtSrtSubscriberUnsubscribe (subscriber : urt_srtsubscriber_t*) : urt_status_t
120
  }
121
122
  /' FRT subscriber type. '/
123
  $structure("urt_frtsubscriber_t") {
124
    .. URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true ..
125
      'Maximum temporal offset between creation and consumption of messages.
126
    + {field} deadlineOffset : urt_delay_t
127
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true ..
128
      'Maximum expected jitter.
129
    + {field} maxJitter : urt_delay_t
130
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true ||  URT_CFG_PUBSUB_PROFILING == true ..
131 4d55cea4 Thomas Schöpping
      'Minimum latency ever detected (to calculate jitter).
132
    + {field} minLatency : urt_delay_t
133
      'Maximum latency ever detected (to calculate jitter).
134
    + {field} maxLatency : urt_delay_t
135
    __
136 ee83a495 Thomas Schöpping
      'Initializes a urt_frtsubscriber_t object.
137 dd31cb03 Thomas Schöpping
    + {method} urtFrtSubscriberInit (subscriber : urt_frtsubscriber_t*) : void
138 ee83a495 Thomas Schöpping
      'Tries to subscribe to a topic, sets all parameters and optionally contributes a list of messages to the topic.
139
    + {method} urtFrtSubscriberSubscribe (subscriber : urt_frtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*, deadline : urt_delay_t, jitter : urt_delay_t) : urt_status_t
140
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
141
    + {method} urtFrtSubscriberFetchNextMessage (subscriber : urt_frtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
142
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
143
    + {method} urtFrtSubscriberFetchLatestMessage (subscriber : urt_frtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
144
      'Checks whether a message is valid after the specified delay.
145
    + {method} urtFrtSubscriberCalculateValidity (subscriber : urt_frtsubscriber_t*, latency : urt_delay_t) : bool
146 4d55cea4 Thomas Schöpping
      'Unsubscribes from a topic.
147 ee83a495 Thomas Schöpping
    + {method} urtFrtSubscriberUnsubscribe (subscriber : urt_frtsubscriber_t*) : urt_status_t
148
  }
149
150
  /' HRT subscriber type. '/
151
  $structure("urt_hrtsubscriber_t") {
152
      'Pointer to the next HRT subscriber in a list.
153
    + {field} next : urt_hrtsubscriber_t*
154
    .. URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true ..
155
      'Maximum temporal offset between creation and consumption of messages.
156
    + {field} deadlineOffset : urt_delay_t
157
      'QoS Timer to detect missed deadlines.
158
    + {field} qosDeadlineTimer : urt_osTimer_t
159
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true ..
160
      'Maximum expected jitter.
161
    + {field} maxJitter : urt_delay_t
162
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true..
163
      'Minimum latency ever detected (to calculate jitter).
164
    + {field} minLatency : urt_delay_t
165
      'Maximum latency ever detected (to calculate jitter).
166
    + {field} maxLatency : urt_delay_t
167
    .. URT_CFG_PUBSUB_QOS_RATECHECKS == true ..
168
      'Expected rate at which data is published.
169
    + {field} expectedRate : urt_delay_t
170
    __
171
      'Initializes a urt_hrtsubscriber_t object.
172 dd31cb03 Thomas Schöpping
    + {method} urtHrtSubscriberInit (subscriber : urt_hrtsubscriber_t*) : void
173 ee83a495 Thomas Schöpping
      'Tries to subscribe to a topic, sets all parameters and optionally contributes a list of messages to the topic.
174
    + {method} urtHrtSubscriberSubscribe (subscriber : urt_hrtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*, deadline : urt_delay_t, rate : urt_delay_t, jitter : urt_delay_t) : urt_status_t
175 4d55cea4 Thomas Schöpping
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
176 ee83a495 Thomas Schöpping
    + {method} urtHrtSubscriberFetchNextMessage (subscriber : urt_hrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
177 4d55cea4 Thomas Schöpping
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
178 ee83a495 Thomas Schöpping
    + {method} urtHrtSubscriberFetchLatestMessage (subscriber : urt_hrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
179
      'Unsubscribes from a topic.
180
    + {method} urtHrtSubscriberUnsubscribe (subscriber : urt_hrtsubscriber_t*) : urt_status_t
181 4d55cea4 Thomas Schöpping
  }
182
} /'subscriber'/
183
184
/' Publisher type. '/
185
$structure("urt_publisher_t") {
186
    'Pointer to the topic for publishing.
187
  + {field} topic : urt_topic_t*
188 ee83a495 Thomas Schöpping
  .. URT_CFG_PUBSUB_PROFILING == true ..
189 4d55cea4 Thomas Schöpping
    'Counter of attempts to publish a message.
190
  + {field} publishAttempts : uint64_t
191
    'Counter of failed attempts to publish a message.
192
  + {field} publishFails : uint64_t
193
  __
194
    'Initializes a urt_publisher_t object and contributes an optional list of messages.
195 dd31cb03 Thomas Schöpping
  + {method} urtPublisherInit (publisher : urt_publisher_t*, topic : urt_topic_t*, messages : urt_message_t*) : void
196 4d55cea4 Thomas Schöpping
    'Publishes a message via the associated topic.
197 dd31cb03 Thomas Schöpping
  + {method} urtPublisherPublish (publisher : urt_publisher_t*, payload : void*, bytes : size_t, t : urt_osTime_t, timeout : urt_delay_t) : urt_status_t
198 4d55cea4 Thomas Schöpping
}
199
200
/' Topic type. '/
201
$structure("urt_topic_t") {
202
    'Pointer to the next topic in a list.
203
  + {field} next : urt_topic_t*
204
    'Identifier of the topic.
205
  + {field} id : urt_topicid_t
206
    'Mutex lock for exclusive access.
207
  + {field} lock : urt_osMutex_t
208
    'Event source to inform all subscribers when a new message is published.
209
  + {field} evtSource : urt_osEventSource_t
210
    'Number of HRT subscribers.
211
  + {field} numHrtSubscribers : unsigned int
212 056e40d2 Thomas Schöpping
    'Condition variable to inform waiting publishers when a blocked message becomes available.
213
  + {field} hrtReleased : urt_osCondvar_t
214 4d55cea4 Thomas Schöpping
    'Mandatory message, each Topic holds.
215
  + {field} mandatoryMessage : urt_message_t
216
    'Pointer to the latest message.
217
  + {field} latestMessage : urt_message_t*
218
  .. URT_CFG_PUBSUB_QOS_RATECHECKS == true ..
219 dd31cb03 Thomas Schöpping
    'List of HRT subscribers, orderd by their expected rate (most critical first).
220
  + {field} hrtSubscribers : urt_hrtsubscriber_t*
221 4d55cea4 Thomas Schöpping
    'Timer to check for missed rates.
222
  + {field} qosRateTimer : urt_osTimer_t
223
  .. URT_CFG_PUBSUB_PROFILING == true ..
224 dd31cb03 Thomas Schöpping
    'Number of overall published messages on this topic.
225
  + {field} numMessagesPublished : uint64_t
226
    'Variable to count how often (non-hrt) subscribers did not fetch a message before it was reused.
227
  + {field} numMessagesDiscarded : uint64_t
228 4d55cea4 Thomas Schöpping
    'Number of overall subscribers.
229
  + {field} numSubscribers : unsigned int
230
  __
231
    'Initializes an urt_topic_t object.
232
  + {method} urtTopicInit (topic : urt_topic_t*, id : urt_topicid_t) : urt_status_t
233
}
234
235
!endsub
236
237
/'### DEPENDENCIES & LAYOUT ##################################################'/
238
239
!startsub DEPENDENCIES
240
241 ee83a495 Thomas Schöpping
urt_message_t "1" o-- "0..1" urt_message_t
242 4d55cea4 Thomas Schöpping
243 ee83a495 Thomas Schöpping
urt_nrtsubscriber_t --|> urt_basesubscriber_t
244
urt_nrtsubscriber_t ..> urt_message_t
245 4d55cea4 Thomas Schöpping
246 ee83a495 Thomas Schöpping
urt_srtsubscriber_t --|> urt_basesubscriber_t
247
urt_srtsubscriber_t "1" o-- "0..1" urt_srtusefulnessfunc_t
248
urt_srtsubscriber_t ..> urt_message_t
249 4d55cea4 Thomas Schöpping
250 ee83a495 Thomas Schöpping
urt_frtsubscriber_t --|> urt_basesubscriber_t
251
urt_frtsubscriber_t ..> urt_message_t
252 4d55cea4 Thomas Schöpping
253 ee83a495 Thomas Schöpping
urt_hrtsubscriber_t --|> urt_basesubscriber_t
254
urt_hrtsubscriber_t "1" o-- "0..1" urt_hrtsubscriber_t
255
urt_hrtsubscriber_t ..> urt_message_t
256 4d55cea4 Thomas Schöpping
257
urt_publisher_t "1" o- "1" urt_topic_t
258
urt_publisher_t ..> urt_message_t
259
260
urt_topic_t "1" o-- "0..1" urt_topic_t
261 ee83a495 Thomas Schöpping
urt_topic_t "1" o- "0..1" urt_hrtsubscriber_t
262 4d55cea4 Thomas Schöpping
urt_topic_t "1" *-- "1" urt_message_t
263
264
!endsub
265
266
/'### OUTRO ##################################################################'/
267
268
@enduml