Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 77bd2c61

History | View | Annotate | Download (27.99 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23
24 1fb06240 skenneweg
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43
44
/******************************************************************************/
45
/* EXPORTED FUNCTIONS                                                         */
46
/******************************************************************************/
47
48 7d9678db skenneweg
/**
49
 * @brief   Initialize the nrt Subscriber.
50
 *
51 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
52 7d9678db skenneweg
 */
53 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54
{
55 a5e142de skenneweg
  urtDebugAssert(subscriber);
56
57 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
58
  urtEventListenerInit(subscriber->base.evtListener);
59
  subscriber->base.lastMessage = NULL;
60
  subscriber->base.lastMessageTime = 0;
61 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
62 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
63
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
64 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
65 5c6cb22f skenneweg
  return;
66
}
67 1fb06240 skenneweg
68 7d9678db skenneweg
/**
69 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
70 7d9678db skenneweg
 *
71 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
72
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
73
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
74
 *                      Messages must not be associated to another topic.
75
 *                      Once a message has been contributed, it cannot be removed later.
76
 *                      May be NULL(no messages to contribute).
77 7d9678db skenneweg
 *
78 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
79
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
80 7d9678db skenneweg
 */
81 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
82
{
83
  urtDebugAssert(subscriber);
84
  urtDebugAssert(topic);
85
86
  if (!subscriber->base.topic)
87
      return URT_STATUS_SUBSCRIBE_TOPICSET;
88
89
  subscriber->base.topic = topic;
90 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
91 a5e142de skenneweg
92
  if (messages)
93
  {
94
    urt_message_t* lastMessageContribute = messages;
95
    while (lastMessageContribute->next)
96
    {
97
        lastMessageContribute = lastMessageContribute->next;
98
    }
99
    lastMessageContribute->next = topic->latestMessage->next;
100
    topic->latestMessage->next = messages;
101
  }
102
103
  subscriber->base.lastMessage = topic->latestMessage;
104
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
105
106 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
107 a5e142de skenneweg
108
#if (URT_CFG_PUBSUB_PROFILING == true)
109
    topic->numHrtSubscribers--;
110
#endif /* URT_CFG_PUBSUB_PROFILING */
111
112 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
113 7d9678db skenneweg
  return URT_STATUS_OK;
114 1fb06240 skenneweg
}
115
116 7d9678db skenneweg
/**
117 5198dfae skenneweg
 * @brief  Fetches the next message.
118 7d9678db skenneweg
 *
119 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
120
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
121
 * @param[in] bytes  Payload size in bytes.
122
 * @param[in] latency  The latency can be returned by reference. May be NULL.
123 7d9678db skenneweg
 *
124 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
125
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
126
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
127 7d9678db skenneweg
 */
128 a5e142de skenneweg
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129
{
130
  urtDebugAssert(subscriber);
131 7d9678db skenneweg
  return URT_STATUS_OK;
132 1fb06240 skenneweg
}
133
134 7d9678db skenneweg
/**
135 a5e142de skenneweg
 * @brief Fetches the latest message.
136 7d9678db skenneweg
 *
137 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
138
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
139
 * @param[in] bytes  Payload size in bytes.
140
 * @param[in] latency  The latency can be returned by reference. May be NULL.
141 7d9678db skenneweg
 *
142 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
143
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
144
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
145 7d9678db skenneweg
 */
146 1fb06240 skenneweg
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
147 7d9678db skenneweg
  return URT_STATUS_OK;
148 1fb06240 skenneweg
}
149
150 7d9678db skenneweg
/**
151 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
152 7d9678db skenneweg
 *
153 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
154 7d9678db skenneweg
 *
155 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
156
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
157 7d9678db skenneweg
 */
158 5b7188aa skenneweg
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
159
{
160 a5e142de skenneweg
  if (subscriber->base.topic)
161
  {
162
# if(URT_CFG_PUBSUB_PROFILING == true)
163 37cd5dc2 Svenja
      urtMutexLock(topic->lock);
164 5b7188aa skenneweg
      subscriber->base.topic->numSubscribers--;
165 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
166
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
167
# if(URT_CFG_PUBSUB_PROFILING == true)
168 37cd5dc2 Svenja
      urtMutexUnlock(topic->lock);
169 a5e142de skenneweg
      subscriber->base.topic = NULL;
170
      subscriber->base.lastMessage = NULL;
171
      subscriber->base.lastMessageTime = 0;
172
#endif /* URT_CFG_PUBSUB_PROFILING */
173
    return URT_STATUS_OK;
174
  }
175 5b7188aa skenneweg
176
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
177 1fb06240 skenneweg
}
178
179 7d9678db skenneweg
180
/**
181 5198dfae skenneweg
 * @brief  Initialize the srt Subscriber.
182 7d9678db skenneweg
 *
183 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
184 7d9678db skenneweg
 */
185 5c6cb22f skenneweg
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
186
{
187 a5e142de skenneweg
  urtDebugAssert(subscriber);
188
189 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
190
  urtEventListenerInit(subscriber->base.evtListener);
191
  subscriber->base.lastMessage = NULL;
192
  subscriber->base.lastMessageTime = 0;
193
  #if (URT_CFG_PUBSUB_PROFILING)
194
    subscriber->base.sumLatencies = 0;
195
    subscriber->base.numMessagesReceived = 0;
196
    subscriber->usefulnesscb = NULL;
197
    subscriber->cbparams = NULL;
198
    subscriber->minLatency = URT_DELAY_INFINITE;
199
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
200
  #endif /* URT_CFG_PUBSUB_PROFILING */
201
  return;
202
}
203 7d9678db skenneweg
204
/**
205 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
206
 *
207
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
208
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
209
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
210
 *                     Messages must not be associated to another topic.
211
 *                     Once a message has been contributed, it cannot be removed later.
212
 *                     May be NULL (no messages to contribute)
213
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
214
 * @param[in] cbparams  Optional parameters for the usefulness callback.
215
 *                      May be NULL if the callback expects no parameters.
216
 *
217
 * @return  Returns URT_STATUS_OK on success.
218
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
219 7d9678db skenneweg
 */
220
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
221 a5e142de skenneweg
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
222
{
223
  urtDebugAssert(subscriber);
224
  urtDebugAssert(topic);
225
226
  if (subscriber->base.topic)
227
  {
228
    return URT_STATUS_SUBSCRIBE_TOPICSET;
229
  }
230 5b7188aa skenneweg
231
  subscriber->base.topic = topic;
232
  subscriber->usefulnesscb = usefulnesscb;
233
  subscriber->cbparams = cbparams;
234 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
235 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
236
  subscriber->base.numMessagesReceived = 0;
237
  subscriber->minLatency = URT_DELAY_INFINITE;
238
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
239 a5e142de skenneweg
# endif  /* URT_CFG_PUBSUB_PROFILING */
240
241 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
242 a5e142de skenneweg
  if (messages)
243
  {
244
    urt_message_t* lastMessageContribute = messages;
245
    while (lastMessageContribute->next)
246
    {
247
        lastMessageContribute = lastMessageContribute->next;
248
    }
249
    lastMessageContribute->next = topic->latestMessage->next;
250
    topic->latestMessage->next = messages;
251
  }
252
253
  subscriber->base.lastMessage = topic->latestMessage;
254
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
255
256 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
257 a5e142de skenneweg
258
# if (URT_CFG_PUBSUB_PROFILING == true)
259
    topic->numHrtSubscribers--;
260
# endif /* URT_CFG_PUBSUB_PROFILING */
261
262 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
263 a5e142de skenneweg
  return URT_STATUS_OK;
264
}
265 7d9678db skenneweg
266
/**
267 5198dfae skenneweg
 * @brief  Fetches the next message.
268 7d9678db skenneweg
 *
269 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
270
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
271
 * @param[in] bytes  Payload size in bytes.
272
 * @param[in] latency  The latency can be returned by reference. May be NULL.
273 7d9678db skenneweg
 *
274 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
275
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
276
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
277 7d9678db skenneweg
 */
278
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
279
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
280
281
/**
282 5198dfae skenneweg
 * @brief  Fetches the latest message.
283 7d9678db skenneweg
 *
284 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
285
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
286
 * @param[in] bytes  Payload size in bytes.
287
 * @param[in] latency  The latency can be returned by reference. May be NULL.
288 7d9678db skenneweg
 *
289 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
290
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
291
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
292 7d9678db skenneweg
 */
293
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
294
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
295
296
/**
297 5198dfae skenneweg
 * @brief  Calculates the usefulness of the subscriber.
298 7d9678db skenneweg
 *
299 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
300
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
301 7d9678db skenneweg
 *
302 5198dfae skenneweg
 * @return  Returns the usefulness as a value within [0,1].
303 7d9678db skenneweg
 */
304 a5e142de skenneweg
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
305
{
306
  urtDebugAssert(subscriber);
307
308
  return subscriber->usefulnesscb(latency);
309
}
310 7d9678db skenneweg
311
/**
312 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
313 7d9678db skenneweg
 *
314 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
315 7d9678db skenneweg
 *
316 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
317
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
318 7d9678db skenneweg
 */
319 a5e142de skenneweg
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
320
{
321
  urtDebugAssert(subscriber);
322
323
  if (subscriber->base.topic)
324
  {
325 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
326 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
327 5b7188aa skenneweg
    subscriber->base.topic->numSubscribers--;
328
# endif /* URT_CFG_PUBSUB_PROFILING */
329 a5e142de skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
330
# if (URT_CFG_PUBSUB_PROFILING == true)
331 37cd5dc2 Svenja
      urtMutexUnlock(topic->lock);
332 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
333
    subscriber->base.topic = NULL;
334
    subscriber->base.lastMessage = NULL;
335
    subscriber->base.lastMessageTime = 0;
336
    return URT_STATUS_OK;
337
  }
338
339
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
340
}
341 7d9678db skenneweg
342
343
/**
344 5198dfae skenneweg
 * @brief  Initialize the FRT Subscriber.
345 7d9678db skenneweg
 *
346 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
347 7d9678db skenneweg
 */
348 5c6cb22f skenneweg
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
349
{
350 a5e142de skenneweg
  urtDebugAssert(subscriber);
351
352 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
353
  urtEventListenerInit(subscriber->base.evtListener);
354
  subscriber->base.lastMessage = NULL;
355
  subscriber->base.lastMessageTime = 0;
356
357
  #if (URT_CFG_PUBSUB_PROFILING)
358
    subscriber->base.sumLatencies = 0;
359
    subscriber->base.numMessagesReceived = 0;
360
  #endif /* URT_CFG_PUBSUB_PROFILING */
361
362
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
363
    subscriber->deadlineOffset = 0;
364
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
365
366
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
367
    subscriber->maxJitter = 0;
368
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
369
370
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
371
    subscriber->minLatency = URT_DELAY_INFINITE;
372
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
373
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
374
  return;
375
}
376
377 7d9678db skenneweg
378
/**
379 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
380
 *
381
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
382
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
383
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
384
 *                      Messages must not be associated to another topic.
385
 *                      Once a message has been contributed, it cannot be removed later.
386
 *                      May be NULL(no messages to contribute).
387
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
388
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
389
 *                    A value of 0 indicates that jitter is of no concern.
390
 *
391
 * @return  Returns URT_STATUS_OK on success.
392
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
393 7d9678db skenneweg
 */
394
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
395 a5e142de skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
396
{
397
  urtDebugAssert(subscriber);
398
  urtDebugAssert(topic);
399
400 5b7188aa skenneweg
  if (subscriber->base.topic)
401
  {
402
    return URT_STATUS_SUBSCRIBE_TOPICSET;
403
  }
404
405
  subscriber->base.topic = topic;
406 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
407 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
408
  subscriber->base.numMessagesReceived = 0;
409 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
410
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
411 5b7188aa skenneweg
  subscriber->deadlineOffset = deadline;
412 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
413
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
414 5b7188aa skenneweg
  subscriber->maxJitter =jitter;
415 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
416
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
417 5b7188aa skenneweg
  subscriber->minLatency = URT_DELAY_INFINITE;
418
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
419 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
420
421 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
422 5b7188aa skenneweg
  if (messages)
423
  {
424
    urt_message_t* lastMessageContribute = messages;
425
    while (lastMessageContribute->next)
426 a5e142de skenneweg
    {
427 5b7188aa skenneweg
      lastMessageContribute = lastMessageContribute->next;
428 a5e142de skenneweg
    }
429 5b7188aa skenneweg
    lastMessageContribute->next = topic->latestMessage->next;
430
    topic->latestMessage->next = messages;
431
  }
432 a5e142de skenneweg
433 5b7188aa skenneweg
  subscriber->base.lastMessage = topic->latestMessage;
434
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
435 a5e142de skenneweg
436 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
437 a5e142de skenneweg
438
# if (URT_CFG_PUBSUB_PROFILING == true)
439 5b7188aa skenneweg
  topic->numHrtSubscribers--;
440 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
441
442 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
443 5b7188aa skenneweg
  return URT_STATUS_OK;
444 a5e142de skenneweg
}
445 7d9678db skenneweg
446
/**
447 5198dfae skenneweg
 * @brief  Fetches the next message.
448 7d9678db skenneweg
 *
449 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
450
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
451
 * @param[in] bytes  Payload size in bytes.
452
 * @param[in] latency  The latency can be returned by reference. May be NULL.
453 7d9678db skenneweg
 *
454 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
455
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
456
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
457 7d9678db skenneweg
 */
458
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
459
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
460
461
/**
462 5198dfae skenneweg
 * @brief  Fetches the latest message.
463 7d9678db skenneweg
 *
464 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
465
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
466
 * @param[in] bytes  Payload size in bytes.
467
 * @param[in] latency  The latency can be returned by reference. May be NULL.
468 7d9678db skenneweg
 *
469 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
470
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
471
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
472 7d9678db skenneweg
 */
473
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
474
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
475
476
/**
477 5198dfae skenneweg
 * @brief  Calculates the validity from the subscriber.
478 7d9678db skenneweg
 *
479 5198dfae skenneweg
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
480
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
481 7d9678db skenneweg
 *
482 5198dfae skenneweg
 * @return  Returns a boolean indicator whether the latency is fine.
483 7d9678db skenneweg
 */
484 a5e142de skenneweg
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
485
{
486 37cd5dc2 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
487
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
488 5b7188aa skenneweg
    return true;
489 37cd5dc2 Svenja
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
490
491
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
492
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
493
      return true;
494
495
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
496
      return true;
497 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
498 5b7188aa skenneweg
499 a5e142de skenneweg
  return false;
500
}
501 7d9678db skenneweg
502
/**
503 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
504 7d9678db skenneweg
 *
505 5c6cb22f skenneweg
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
506 7d9678db skenneweg
 *
507 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
508
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
509 7d9678db skenneweg
 */
510 5b7188aa skenneweg
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
511
{
512
  urtDebugAssert(subscriber);
513
514
  if (subscriber->base.topic)
515
  {
516 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
517 5b7188aa skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
518
    //TODO: decrement topic's HRT counter
519
# if (URT_CFG_PUBSUB_PROFILING == true)
520
    subscriber->base.topic->numSubscribers--;
521
# endif /* URT_CFG_PUBSUB_PROFILING */
522
//Hier weiter
523
524
# if (URT_CFG_PUBSUB_PROFILING == true)
525 37cd5dc2 Svenja
    urtMutexUnlock(topic->lock);
526 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
527
    subscriber->base.topic = NULL;
528
    subscriber->base.lastMessage = NULL;
529
    subscriber->base.lastMessageTime = 0;
530
    return URT_STATUS_OK;
531
  }
532
533
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
534
}
535 7d9678db skenneweg
536
537
/**
538 5198dfae skenneweg
 * @brief  Initialize the HRT Subscriber.
539 7d9678db skenneweg
 *
540 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
541 7d9678db skenneweg
 */
542 5c6cb22f skenneweg
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
543
{
544 a5e142de skenneweg
  urtDebugAssert(subscriber);
545
546 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
547
  urtEventListenerInit(subscriber->base.evtListener);
548
  subscriber->base.lastMessage = NULL;
549
  subscriber->base.lastMessageTime = 0;
550
551 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING)
552 5c6cb22f skenneweg
    subscriber->base.sumLatencies = 0;
553
    subscriber->base.numMessagesReceived = 0;
554 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
555 5c6cb22f skenneweg
556
  subscriber->next = NULL;
557
558 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
559 5c6cb22f skenneweg
    subscriber->deadlineOffset = 0;
560
    urtTimerInit(subscriber->qodDeadlineTimer);
561 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
562 5c6cb22f skenneweg
563 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
564 5c6cb22f skenneweg
    subscriber->expectedRate = 0;
565 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
566 5c6cb22f skenneweg
567 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
568 5c6cb22f skenneweg
    subscriber->maxJitter = 0;
569 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
570 5c6cb22f skenneweg
571 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
572 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
573
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
574 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
575 5c6cb22f skenneweg
  return;
576
}
577
578 7d9678db skenneweg
579
/**
580 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
581
 *
582 5c6cb22f skenneweg
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
583 5198dfae skenneweg
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
584
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
585
 *                      Messages must not be associated to another topic.
586
 *                      Once a message has been contributed, it cannot be removed later.
587
 *                      May be NULL(no messages to contribute).
588
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
589
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
590
 *                  A value of 0 indicates, that rate is of no concern.
591
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
592
 *                    A value of 0 indicates that jitter is of no concern.
593
 *
594
 * @return  Returns URT_STATUS_OK on success.
595
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
596 7d9678db skenneweg
 */
597
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
598 5b7188aa skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
599
{
600
  urtDebugAssert(subscriber);
601
  urtDebugAssert(topic);
602
603
  if (subscriber->base.topic)
604
  {
605
    return URT_STATUS_SUBSCRIBE_TOPICSET;
606
  }
607
608
  subscriber->base.topic = topic;
609
# if (URT_CFG_PUBSUB_PROFILING == true)
610
  subscriber->base.sumLatencies = 0;
611
  subscriber->base.numMessagesReceived = 0;
612
# endif /* URT_CFG_PUBSUB_PROFILING */
613
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
614
  subscriber->deadlineOffset = deadline;
615
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
616
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
617
  subscriber->maxJitter =jitter;
618
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
619
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
620
  subscriber->minLatency = URT_DELAY_INFINITE;
621
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
622
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
623
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
624
  subscriber->expectedRate = rate;
625
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
626
627 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
628 5b7188aa skenneweg
  if (messages)
629
  {
630
    urt_message_t* lastMessageContribute = messages;
631
    while (lastMessageContribute->next)
632
    {
633
      lastMessageContribute = lastMessageContribute->next;
634
    }
635
    lastMessageContribute->next = topic->latestMessage->next;
636
    topic->latestMessage->next = messages;
637
  }
638
639
  subscriber->base.lastMessage = topic->latestMessage;
640
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
641
642 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
643 5b7188aa skenneweg
644
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
645 8378a78b Svenja
  //TODO: Implement
646 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
647
648
  topic->numHrtSubscribers--;
649
# if (URT_CFG_PUBSUB_PROFILING == true)
650
  topic->numSubscribers--;
651
# endif /* URT_CFG_PUBSUB_PROFILING */
652
653 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
654 5b7188aa skenneweg
  return URT_STATUS_OK;
655
}
656 7d9678db skenneweg
657
658
/**
659 5198dfae skenneweg
 * @brief  Fetches the next message.
660 7d9678db skenneweg
 *
661 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
662
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
663
 * @param[in] bytes  Payload size in bytes.
664
 * @param[in] latency  The latency can be returned by reference. May be NULL.
665 7d9678db skenneweg
 *
666 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
667
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
668
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
669 7d9678db skenneweg
 */
670
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
671
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
672
673
674
/**
675 5198dfae skenneweg
 * @brief  Fetches the latest message.
676 7d9678db skenneweg
 *
677 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
678
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
679
 * @param[in] bytes  Payload size in bytes.
680
 * @param[in] latency  The latency can be returned by reference. May be NULL.
681 7d9678db skenneweg
 *
682 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
683
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
684
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
685 7d9678db skenneweg
 */
686
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
687
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
688
689
/**
690 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
691 7d9678db skenneweg
 *
692 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
693 7d9678db skenneweg
 *
694 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
695
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
696 7d9678db skenneweg
 */
697 5b7188aa skenneweg
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
698
{
699
  urtDebugAssert(subscriber);
700
701
  if (subscriber->base.topic)
702
  {
703 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
704 5b7188aa skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
705
    subscriber->base.topic->numHrtSubscribers--;
706
# if (URT_CFG_PUBSUB_PROFILING == true)
707
    subscriber->base.topic->numSubscribers--;
708
# endif /* URT_CFG_PUBSUB_PROFILING */
709
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
710
    //TODO: remove self from topics lsit of HRT subscribers
711
    //TODO: ...
712
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
713
714
    urt_message_t* messageTemp = subscriber->base.lastMessage;
715 37cd5dc2 Svenja
    bool hrtZero = false;
716 5b7188aa skenneweg
    while (messageTemp->next->originTime < messageTemp->originTime)
717
    {
718
        messageTemp = messageTemp->next;
719
        messageTemp->numHrtConsumersLeft--;
720 37cd5dc2 Svenja
        if (messageTemp->numHrtConsumersLeft == 0)
721
        {
722
            hrtZero = true;
723
        }
724 5b7188aa skenneweg
# if(URT_CFG_PUBSUB_PROFILING == true)
725
        messageTemp->numConsumersLeft--;
726
# endif /* URT_CFG_PUBSUB_PROFILING */
727
    }
728 37cd5dc2 Svenja
    if (hrtZero)
729 5b7188aa skenneweg
    {
730 37cd5dc2 Svenja
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
731 5b7188aa skenneweg
    }
732
733 37cd5dc2 Svenja
    urtMutexUnlock(topic->lock);
734 5b7188aa skenneweg
    subscriber->base.topic = NULL;
735
    subscriber->base.lastMessage = NULL;
736
    subscriber->base.lastMessageTime = 0;
737
    return URT_STATUS_OK;
738
  }
739
740
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
741
}
742
743