Statistics
| Branch: | Revision:

urtware / doc / classdiagrams / pubsub.uml @ e48e1ccf

History | View | Annotate | Download (12.762 KB)

1
/'
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

    
6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

    
8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

    
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

    
18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
'/
21

    
22
/'### INTRO ##################################################################'/
23

    
24
@startuml
25

    
26
title **µRtWare**\nPublish-Subscribe System
27

    
28
!include ../functions.iuml
29

    
30
/'### ENTITIES ###############################################################'/
31

    
32
!startsub ENTITIES
33

    
34
/' Message type. '/
35
$structure("urt_message_t") {
36
    'Pointer to the next message in a list.
37
  + {field} next : urt_message_t*
38
    'Pointer to some arbitrary (reusable) payload object.
39
  + {field} payload : void*
40
    'Origin time of the message.
41
  + {field} originTime : urt_osTime_t
42
    'Counter of HRT subscribers that did not consume the message yet.
43
  + {field} numHrtConsumersLeft : unsigned int
44
  .. URT_CFG_PUBSUB_PROFILING == true ..
45
    'Counter of overall subscribers that did not consume the message yet.
46
  + {field} numConsumersLeft : unsigned int
47
  __
48
    'Initializes a urt_message_t object.
49
  + {method} urtMessageInit (message : urt_message_t*, payload : void*) : urt_status_t
50
}
51

    
52
$group("subscriber") {
53
  /' Base subscriber type. '/
54
  $structure("urt_basesubscriber_t") {
55
      'Pointer to the topic, this subscriber subscribed to.
56
    + {field} topic : urt_topic_t*
57
      'Event listener to notify the node about new messages.
58
    + {field} evtListener : urt_osEventListener_t
59
      'Pointer to the message consumed most recently.
60
    + {field} lastMessage : urt_message_t*
61
      'Copy of the origin time of the message consumed most recently.
62
    + {field} lastMessageTime : urt_osTime_t
63
    .. URT_CFG_PUBSUB_PROFILING == true ..
64
      'Sum of all latencies.
65
    + {field} sumLatencies : uint64_t
66
      'Number of messages received.
67
    + {field} numMessagesReceived : uint64_t
68
  }
69

    
70
  /' Function type to be called when calculating the usefulness of a message. '/
71
  $function("urt_srtusefulnessfunc_t") {
72
    'Takes a delay and optionally parameters as arguments and returns a float in [0, 1].
73
    urt_srtusefulnessfunc_t (dt : urt_delay_t, params : void*) : float
74
  }
75

    
76
  /' NRT subscriber type. '/
77
  $structure("urt_nrtsubscriber_t") {
78
    .. URT_CFG_PUBSUB_PROFILING == true ..
79
      'Minimum latency ever detected.
80
    + {field} minLatency : urt_delay_t
81
      'Maximum latency ever detected.
82
    + {field} maxLatency : urt_delay_t
83
    __
84
      'Initializes a urt_nrtsubscriber_t object.
85
    + {method} urtNrtSubscriberInit (subscriber : urt_nrtsubscriber_t*) : urt_status_t
86
      'Tries to subscribe to a topic and optionally contributes a list of messages to the topic.
87
    + {method} urtNrtSubscriberSubscribe (subscriber : urt_nrtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*) : urt_status_t
88
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
89
    + {method} urtNrtSubscriberFetchNextMessage (subscriber : urt_nrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
90
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
91
    + {method} urtNrtSubscriberFetchLatestMessage (subscriber : urt_nrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
92
      'Unsubscribes from a topic.
93
    + {method} urtNrtSubscriberUnsubscribe (subscriber : urt_nrtsubscriber_t*) : urt_status_t
94
  }
95

    
96
  /' SRT subscriber type. '/
97
  $structure("urt_srtsubscriber_t") {
98
      'Callback to calculate usefulness of a message
99
    + {field} usefulnesscb : urt_srtusefulnessfunc_t*
100
      'Optional parameters for the usefulness callback function.
101
    + {field} cbparams : void*
102
    .. URT_CFG_PUBSUB_PROFILING == true ..
103
      'Minimum latency ever detected.
104
    + {field} minLatency : urt_delay_t
105
      'Maximum latency ever detected.
106
    + {field} maxLatency : urt_delay_t
107
    __
108
      'Initializes a urt_srtsubscriber_t object.
109
    + {method} urtSrtSubscriberInit (subscriber : urt_srtsubscriber_t*) : urt_status_t
110
      'Tries to subscribe to a topic, sets all parameters and optionally contributes a list of messages to the topic.
111
    + {method} urtSrtSubscriberSubscribe (subscriber : urt_srtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*, usefulnesscb : urt_srtusefulnessfunc_t*, cbparams : void*) : urt_status_t
112
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
113
    + {method} urtSrtSubscriberFetchNextMessage (subscriber : urt_srtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
114
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
115
    + {method} urtSrtSubscriberFetchLatestMessage (subscriber : urt_srtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
116
      'Calculates the usefulness of a message after the specified delay.
117
    + {method} urtSrtSubscriberCalculateUsefulness (subscriber : urt_srtsubscriber_t*, latency : urt_delay_t) : float
118
      'Unsubscribes from a topic.
119
    + {method} urtSrtSubscriberUnsubscribe (subscriber : urt_srtsubscriber_t*) : urt_status_t
120
  }
121

    
122
  /' FRT subscriber type. '/
123
  $structure("urt_frtsubscriber_t") {
124
    .. URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true ..
125
      'Maximum temporal offset between creation and consumption of messages.
126
    + {field} deadlineOffset : urt_delay_t
127
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true ..
128
      'Maximum expected jitter.
129
    + {field} maxJitter : urt_delay_t
130
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true ||  URT_CFG_PUBSUB_PROFILING == true ..
131
      'Minimum latency ever detected (to calculate jitter).
132
    + {field} minLatency : urt_delay_t
133
      'Maximum latency ever detected (to calculate jitter).
134
    + {field} maxLatency : urt_delay_t
135
    __
136
      'Initializes a urt_frtsubscriber_t object.
137
    + {method} urtFrtSubscriberInit (subscriber : urt_frtsubscriber_t*) : urt_status_t
138
      'Tries to subscribe to a topic, sets all parameters and optionally contributes a list of messages to the topic.
139
    + {method} urtFrtSubscriberSubscribe (subscriber : urt_frtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*, deadline : urt_delay_t, jitter : urt_delay_t) : urt_status_t
140
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
141
    + {method} urtFrtSubscriberFetchNextMessage (subscriber : urt_frtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
142
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
143
    + {method} urtFrtSubscriberFetchLatestMessage (subscriber : urt_frtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
144
      'Checks whether a message is valid after the specified delay.
145
    + {method} urtFrtSubscriberCalculateValidity (subscriber : urt_frtsubscriber_t*, latency : urt_delay_t) : bool
146
      'Unsubscribes from a topic.
147
    + {method} urtFrtSubscriberUnsubscribe (subscriber : urt_frtsubscriber_t*) : urt_status_t
148
  }
149

    
150
  /' HRT subscriber type. '/
151
  $structure("urt_hrtsubscriber_t") {
152
      'Pointer to the next HRT subscriber in a list.
153
    + {field} next : urt_hrtsubscriber_t*
154
    .. URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true ..
155
      'Maximum temporal offset between creation and consumption of messages.
156
    + {field} deadlineOffset : urt_delay_t
157
      'QoS Timer to detect missed deadlines.
158
    + {field} qosDeadlineTimer : urt_osTimer_t
159
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true ..
160
      'Maximum expected jitter.
161
    + {field} maxJitter : urt_delay_t
162
    .. URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true..
163
      'Minimum latency ever detected (to calculate jitter).
164
    + {field} minLatency : urt_delay_t
165
      'Maximum latency ever detected (to calculate jitter).
166
    + {field} maxLatency : urt_delay_t
167
    .. URT_CFG_PUBSUB_QOS_RATECHECKS == true ..
168
      'Expected rate at which data is published.
169
    + {field} expectedRate : urt_delay_t
170
    __
171
      'Initializes a urt_hrtsubscriber_t object.
172
    + {method} urtHrtSubscriberInit (subscriber : urt_hrtsubscriber_t*) : urt_status_t
173
      'Tries to subscribe to a topic, sets all parameters and optionally contributes a list of messages to the topic.
174
    + {method} urtHrtSubscriberSubscribe (subscriber : urt_hrtsubscriber_t*, topic : urt_topic_t*, messages : urt_message_t*, deadline : urt_delay_t, rate : urt_delay_t, jitter : urt_delay_t) : urt_status_t
175
      'Fetches the next message in the buffer, optionally copies the payload and optionally returns the latency.
176
    + {method} urtHrtSubscriberFetchNextMessage (subscriber : urt_hrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
177
      'Fetches the latest message, optionally copies the payload and optionally returns the latency.
178
    + {method} urtHrtSubscriberFetchLatestMessage (subscriber : urt_hrtsubscriber_t*, payload : void*, bytes : size_t, latency : urt_delay_t*) : urt_status_t
179
      'Unsubscribes from a topic.
180
    + {method} urtHrtSubscriberUnsubscribe (subscriber : urt_hrtsubscriber_t*) : urt_status_t
181
  }
182
} /'subscriber'/
183

    
184
/' Publisher type. '/
185
$structure("urt_publisher_t") {
186
    'Pointer to the topic for publishing.
187
  + {field} topic : urt_topic_t*
188
  .. URT_CFG_PUBSUB_PROFILING == true ..
189
    'Counter of attempts to publish a message.
190
  + {field} publishAttempts : uint64_t
191
    'Counter of failed attempts to publish a message.
192
  + {field} publishFails : uint64_t
193
  __
194
    'Initializes a urt_publisher_t object and contributes an optional list of messages.
195
  + {method} urtPublisherInit (publisher : urt_publisher_t*, topic : urt_topic_t*, messages : urt_message_t*) : urt_status_t
196
    'Publishes a message via the associated topic.
197
  + {method} urtPublisherPublish (publisher : urt_publisher_t*, payload : void*, n : size_t, t : urt_osTime_t, timeout : urt_delay_t) : urt_status_t
198
}
199

    
200
/' Topic type. '/
201
$structure("urt_topic_t") {
202
    'Pointer to the next topic in a list.
203
  + {field} next : urt_topic_t*
204
    'Identifier of the topic.
205
  + {field} id : urt_topicid_t
206
    'Mutex lock for exclusive access.
207
  + {field} lock : urt_osMutex_t
208
    'Event source to inform all subscribers when a new message is published.
209
  + {field} evtSource : urt_osEventSource_t
210
    'Number of HRT subscribers.
211
  + {field} numHrtSubscribers : unsigned int
212
    'List of HRT subscribers, orderd by their expected rate (most critical first).
213
  + {field} hrtSubscribers : urt_hrtsubscriber_t*
214
    'Condition variable to inform waiting publishers when a blocked message becomes available.
215
  + {field} hrtReleased : urt_osCondvar_t
216
    'Mandatory message, each Topic holds.
217
  + {field} mandatoryMessage : urt_message_t
218
    'Pointer to the latest message.
219
  + {field} latestMessage : urt_message_t*
220
  .. URT_CFG_PUBSUB_QOS_RATECHECKS == true ..
221
    'Timer to check for missed rates.
222
  + {field} qosRateTimer : urt_osTimer_t
223
  .. URT_CFG_PUBSUB_PROFILING == true ..
224
    'Variable to count how many (non-hrt) subscribers did not fetch a message before it was reused.
225
  + {field} numDiscardedMessages : uint64_t
226
    'Number of overall subscribers.
227
  + {field} numSubscribers : unsigned int
228
  __
229
    'Initializes an urt_topic_t object.
230
  + {method} urtTopicInit (topic : urt_topic_t*, id : urt_topicid_t) : urt_status_t
231
}
232

    
233
!endsub
234

    
235
/'### DEPENDENCIES & LAYOUT ##################################################'/
236

    
237
!startsub DEPENDENCIES
238

    
239
urt_message_t "1" o-- "0..1" urt_message_t
240

    
241
urt_nrtsubscriber_t --|> urt_basesubscriber_t
242
urt_nrtsubscriber_t ..> urt_message_t
243

    
244
urt_srtsubscriber_t --|> urt_basesubscriber_t
245
urt_srtsubscriber_t "1" o-- "0..1" urt_srtusefulnessfunc_t
246
urt_srtsubscriber_t ..> urt_message_t
247

    
248
urt_frtsubscriber_t --|> urt_basesubscriber_t
249
urt_frtsubscriber_t ..> urt_message_t
250

    
251
urt_hrtsubscriber_t --|> urt_basesubscriber_t
252
urt_hrtsubscriber_t "1" o-- "0..1" urt_hrtsubscriber_t
253
urt_hrtsubscriber_t ..> urt_message_t
254

    
255
urt_publisher_t "1" o- "1" urt_topic_t
256
urt_publisher_t ..> urt_message_t
257

    
258
urt_topic_t "1" o-- "0..1" urt_topic_t
259
urt_topic_t "1" o- "0..1" urt_hrtsubscriber_t
260
urt_topic_t "1" *-- "1" urt_message_t
261

    
262
!endsub
263

    
264
/'### OUTRO ##################################################################'/
265

    
266
@enduml
267