Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 5c6cb22f

History | View | Annotate | Download (18.687 KB)

1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

    
22
#include <urtware.h>
23

    
24
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27

    
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31

    
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35

    
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39

    
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43

    
44
/******************************************************************************/
45
/* EXPORTED FUNCTIONS                                                         */
46
/******************************************************************************/
47

    
48
/**
49
 * @brief   Initialize the nrt Subscriber.
50
 *
51
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
52
 */
53
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54
{
55
  subscriber->base.topic = NULL;
56
  urtEventListenerInit(subscriber->base.evtListener);
57
  subscriber->base.lastMessage = NULL;
58
  subscriber->base.lastMessageTime = 0;
59
  #if (URT_CFG_PUBSUB_PROFILING)
60
    subscriber->minLatency = URT_DELAY_INFINITE;
61
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
62
  #endif /* URT_CFG_PUBSUB_PROFILING */
63
  return;
64
}
65

    
66
/**
67
 * @brief  Subscribes the subscriber to a topic.
68
 *
69
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
70
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
71
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
72
 *                      Messages must not be associated to another topic.
73
 *                      Once a message has been contributed, it cannot be removed later.
74
 *                      May be NULL(no messages to contribute).
75
 *
76
 * @return  Returns URT_STATUS_OK on success.
77
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
78
 */
79
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages) {
80
  return URT_STATUS_OK;
81
}
82

    
83
/**
84
 * @brief  Fetches the next message.
85
 *
86
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
87
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
88
 * @param[in] bytes  Payload size in bytes.
89
 * @param[in] latency  The latency can be returned by reference. May be NULL.
90
 *
91
 * @return  Returns URT_STATUS_OK on success.
92
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
93
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
94
 */
95
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
96
  return URT_STATUS_OK;
97
}
98

    
99
/**
100
 * @brief Fetches the lates message.
101
 *
102
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
103
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
104
 * @param[in] bytes  Payload size in bytes.
105
 * @param[in] latency  The latency can be returned by reference. May be NULL.
106
 *
107
 * @return  Returns URT_STATUS_OK on success.
108
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
109
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
110
 */
111
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
112
  return URT_STATUS_OK;
113
}
114

    
115
/**
116
 * @brief  Unsubscribes from a subscriber.
117
 *
118
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
119
 *
120
 * @return  Returns URT_STATUS_OK on sucess.
121
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
122
 */
123
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) {
124
  return URT_STATUS_OK;
125
}
126

    
127

    
128
/**
129
 * @brief  Initialize the srt Subscriber.
130
 *
131
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
132
 */
133
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
134
{
135
  subscriber->base.topic = NULL;
136
  urtEventListenerInit(subscriber->base.evtListener);
137
  subscriber->base.lastMessage = NULL;
138
  subscriber->base.lastMessageTime = 0;
139
  #if (URT_CFG_PUBSUB_PROFILING)
140
    subscriber->base.sumLatencies = 0;
141
    subscriber->base.numMessagesReceived = 0;
142
    subscriber->usefulnesscb = NULL;
143
    subscriber->cbparams = NULL;
144
    subscriber->minLatency = URT_DELAY_INFINITE;
145
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
146
  #endif /* URT_CFG_PUBSUB_PROFILING */
147
  return;
148
}
149

    
150
/**
151
 * @brief  Subscribes the subscriber to a topic.
152
 *
153
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
154
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
155
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
156
 *                     Messages must not be associated to another topic.
157
 *                     Once a message has been contributed, it cannot be removed later.
158
 *                     May be NULL (no messages to contribute)
159
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
160
 * @param[in] cbparams  Optional parameters for the usefulness callback.
161
 *                      May be NULL if the callback expects no parameters.
162
 *
163
 * @return  Returns URT_STATUS_OK on success.
164
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
165
 */
166
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
167
                                       urt_message_t* message, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams) {return URT_STATUS_OK;}
168

    
169
/**
170
 * @brief  Fetches the next message.
171
 *
172
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
173
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
174
 * @param[in] bytes  Payload size in bytes.
175
 * @param[in] latency  The latency can be returned by reference. May be NULL.
176
 *
177
 * @return  Returns URT_STATUS_OK on success.
178
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
179
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
180
 */
181
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
182
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
183

    
184
/**
185
 * @brief  Fetches the latest message.
186
 *
187
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
188
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
189
 * @param[in] bytes  Payload size in bytes.
190
 * @param[in] latency  The latency can be returned by reference. May be NULL.
191
 *
192
 * @return  Returns URT_STATUS_OK on success.
193
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
194
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
195
 */
196
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
197
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
198

    
199
/**
200
 * @brief  Calculates the usefulness of the subscriber.
201
 *
202
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
203
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
204
 *
205
 * @return  Returns the usefulness as a value within [0,1].
206
 */
207
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency){return 0;}
208

    
209
/**
210
 * @brief  Unsubscribes from a subscriber.
211
 *
212
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
213
 *
214
 * @return  Returns URT_STATUS_OK on sucess.
215
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
216
 */
217
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber){return URT_STATUS_OK;}
218

    
219

    
220
/**
221
 * @brief  Initialize the FRT Subscriber.
222
 *
223
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
224
 */
225
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
226
{
227
  subscriber->base.topic = NULL;
228
  urtEventListenerInit(subscriber->base.evtListener);
229
  subscriber->base.lastMessage = NULL;
230
  subscriber->base.lastMessageTime = 0;
231

    
232
  #if (URT_CFG_PUBSUB_PROFILING)
233
    subscriber->base.sumLatencies = 0;
234
    subscriber->base.numMessagesReceived = 0;
235
  #endif /* URT_CFG_PUBSUB_PROFILING */
236

    
237
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
238
    subscriber->deadlineOffset = 0;
239
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
240

    
241
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
242
    subscriber->maxJitter = 0;
243
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
244

    
245
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
246
    subscriber->minLatency = URT_DELAY_INFINITE;
247
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
248
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
249
  return;
250
}
251

    
252

    
253
/**
254
 * @brief  Subscribes the subscriber to a topic.
255
 *
256
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
257
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
258
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
259
 *                      Messages must not be associated to another topic.
260
 *                      Once a message has been contributed, it cannot be removed later.
261
 *                      May be NULL(no messages to contribute).
262
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
263
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
264
 *                    A value of 0 indicates that jitter is of no concern.
265
 *
266
 * @return  Returns URT_STATUS_OK on success.
267
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
268
 */
269
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
270
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter){return URT_STATUS_OK;}
271

    
272
/**
273
 * @brief  Fetches the next message.
274
 *
275
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
276
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
277
 * @param[in] bytes  Payload size in bytes.
278
 * @param[in] latency  The latency can be returned by reference. May be NULL.
279
 *
280
 * @return  Returns URT_STATUS_OK on success.
281
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
282
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
283
 */
284
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
285
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
286

    
287
/**
288
 * @brief  Fetches the latest message.
289
 *
290
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
291
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
292
 * @param[in] bytes  Payload size in bytes.
293
 * @param[in] latency  The latency can be returned by reference. May be NULL.
294
 *
295
 * @return  Returns URT_STATUS_OK on success.
296
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
297
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
298
 */
299
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
300
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
301

    
302
/**
303
 * @brief  Calculates the validity from the subscriber.
304
 *
305
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
306
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
307
 *
308
 * @return  Returns a boolean indicator whether the latency is fine.
309
 */
310
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency){return true;}
311

    
312
/**
313
 * @brief  Unsubscribes from a subscriber.
314
 *
315
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
316
 *
317
 * @return  Returns URT_STATUS_OK on sucess.
318
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
319
 */
320
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
321

    
322

    
323
/**
324
 * @brief  Initialize the HRT Subscriber.
325
 *
326
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
327
 */
328
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
329
{
330
  subscriber->base.topic = NULL;
331
  urtEventListenerInit(subscriber->base.evtListener);
332
  subscriber->base.lastMessage = NULL;
333
  subscriber->base.lastMessageTime = 0;
334

    
335
  #if (URT_CFG_PUBSUB_PROFILING)
336
    subscriber->base.sumLatencies = 0;
337
    subscriber->base.numMessagesReceived = 0;
338
  #endif /* URT_CFG_PUBSUB_PROFILING */
339

    
340
  subscriber->next = NULL;
341

    
342
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
343
    subscriber->deadlineOffset = 0;
344
    urtTimerInit(subscriber->qodDeadlineTimer);
345
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
346

    
347
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
348
    subscriber->expectedRate = 0;
349
  #endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
350

    
351
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
352
    subscriber->maxJitter = 0;
353
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
354

    
355
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
356
    subscriber->minLatency = URT_DELAY_INFINITE;
357
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
358
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
359
  return;
360
}
361

    
362

    
363
/**
364
 * @brief  Subscribes the subscriber to a topic.
365
 *
366
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
367
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
368
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
369
 *                      Messages must not be associated to another topic.
370
 *                      Once a message has been contributed, it cannot be removed later.
371
 *                      May be NULL(no messages to contribute).
372
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
373
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
374
 *                  A value of 0 indicates, that rate is of no concern.
375
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
376
 *                    A value of 0 indicates that jitter is of no concern.
377
 *
378
 * @return  Returns URT_STATUS_OK on success.
379
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
380
 */
381
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
382
                                       urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;}
383

    
384

    
385
/**
386
 * @brief  Fetches the next message.
387
 *
388
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
389
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
390
 * @param[in] bytes  Payload size in bytes.
391
 * @param[in] latency  The latency can be returned by reference. May be NULL.
392
 *
393
 * @return  Returns URT_STATUS_OK on success.
394
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
395
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
396
 */
397
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
398
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
399

    
400

    
401
/**
402
 * @brief  Fetches the latest message.
403
 *
404
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
405
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
406
 * @param[in] bytes  Payload size in bytes.
407
 * @param[in] latency  The latency can be returned by reference. May be NULL.
408
 *
409
 * @return  Returns URT_STATUS_OK on success.
410
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
411
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
412
 */
413
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
414
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
415

    
416
/**
417
 * @brief  Unsubscribes from a subscriber.
418
 *
419
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
420
 *
421
 * @return  Returns URT_STATUS_OK on sucess.
422
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
423
 */
424
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;}