Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 982056f7

History | View | Annotate | Download (42.379 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23 982056f7 Svenja
#include <stdio.h>
24 7d9678db skenneweg
25 1fb06240 skenneweg
/******************************************************************************/
26
/* LOCAL DEFINITIONS                                                          */
27
/******************************************************************************/
28
29
/******************************************************************************/
30
/* EXPORTED VARIABLES                                                         */
31
/******************************************************************************/
32
33
/******************************************************************************/
34
/* LOCAL TYPES                                                                */
35
/******************************************************************************/
36
37
/******************************************************************************/
38
/* LOCAL VARIABLES                                                            */
39
/******************************************************************************/
40
41
/******************************************************************************/
42
/* LOCAL FUNCTIONS                                                            */
43
/******************************************************************************/
44
45 982056f7 Svenja
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
46 65dc89cb skenneweg
{
47 982056f7 Svenja
  subscriber->base.lastMessage = message;
48
  *subscriber->base.lastMessageTime = message->originTime;
49
  memcpy(message->payload, payload, bytes);
50 65dc89cb skenneweg
}
51
52
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
53
{
54
  while (oldestMessage->next->originTime < oldestMessage->originTime)
55
  {
56
    oldestMessage = oldestMessage->next;
57
  }
58
  return oldestMessage;
59
}
60
61
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
62
{
63
  urt_message_t* lastMessage = subscriber->base.lastMessage;
64
  while (lastMessage->next->originTime < lastMessage->originTime)
65
  {
66
    lastMessage = lastMessage->next;
67
#if (URT_CFG_PUBSUB_PROFILING == true)
68
    subscriber->base.lastMessage->numConsumersLeft--;
69
    subscriber->base->numMessagesReceived++;
70
#endif /* URT_CFG_PUBSUB_PROFILING */
71
  }
72
}
73
74 982056f7 Svenja
void urtContributeMessages(urt_message_t* messages)
75
{
76
  urt_message_t* lastMessageContribute = messages;
77
  while (lastMessageContribute->next)
78
  {
79
    lastMessageContribute = lastMessageContribute->next;
80
  }
81
  lastMessageContribute->next = topic->latestMessage->next;
82
  topic->latestMessage->next = messages;
83
}
84
85 1fb06240 skenneweg
/******************************************************************************/
86
/* EXPORTED FUNCTIONS                                                         */
87
/******************************************************************************/
88
89 7d9678db skenneweg
/**
90
 * @brief   Initialize the nrt Subscriber.
91
 *
92 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
93 7d9678db skenneweg
 */
94 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
95
{
96 a5e142de skenneweg
  urtDebugAssert(subscriber);
97
98 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
99
  urtEventListenerInit(subscriber->base.evtListener);
100
  subscriber->base.lastMessage = NULL;
101
  subscriber->base.lastMessageTime = 0;
102 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
103 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
104
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
105 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
106 5c6cb22f skenneweg
  return;
107
}
108 1fb06240 skenneweg
109 7d9678db skenneweg
/**
110 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
111 7d9678db skenneweg
 *
112 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
113
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
114
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
115
 *                      Messages must not be associated to another topic.
116
 *                      Once a message has been contributed, it cannot be removed later.
117
 *                      May be NULL(no messages to contribute).
118 7d9678db skenneweg
 *
119 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
120
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
121 7d9678db skenneweg
 */
122 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
123
{
124
  urtDebugAssert(subscriber);
125
  urtDebugAssert(topic);
126
127 982056f7 Svenja
  if (!subscriber->base.topic) {
128
    return URT_STATUS_SUBSCRIBE_TOPICSET;
129
  }
130 a5e142de skenneweg
131
  subscriber->base.topic = topic;
132 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
133 a5e142de skenneweg
134 982056f7 Svenja
  if (messages) {
135
    urtContributeMessages(messages);
136 a5e142de skenneweg
  }
137
138
  subscriber->base.lastMessage = topic->latestMessage;
139
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
140
141 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
142 a5e142de skenneweg
143
#if (URT_CFG_PUBSUB_PROFILING == true)
144
    topic->numHrtSubscribers--;
145
#endif /* URT_CFG_PUBSUB_PROFILING */
146
147 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
148 7d9678db skenneweg
  return URT_STATUS_OK;
149 1fb06240 skenneweg
}
150
151 7d9678db skenneweg
/**
152 5198dfae skenneweg
 * @brief  Fetches the next message.
153 7d9678db skenneweg
 *
154 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
155
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
156
 * @param[in] bytes  Payload size in bytes.
157
 * @param[in] latency  The latency can be returned by reference. May be NULL.
158 7d9678db skenneweg
 *
159 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
160
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
161
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
162 7d9678db skenneweg
 */
163 982056f7 Svenja
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
164 65dc89cb skenneweg
{   
165 a5e142de skenneweg
  urtDebugAssert(subscriber);
166 65dc89cb skenneweg
167 982056f7 Svenja
  if (!subscriber->base.topic) {
168
    return URT_STATUS_FETCH_NOTOPIC;
169
  }
170 65dc89cb skenneweg
171
  urtMutexLock(subscriber->base.topic->lock);
172
173 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
174
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
175
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
176 65dc89cb skenneweg
      urtMutexUnlock(subscriber->base.topic->lock);
177
      return URT_STATUS_FETCH_NOMESSAGE;
178
    }
179 982056f7 Svenja
    oldestMessage = oldestMessage->next;
180 65dc89cb skenneweg
  }
181 982056f7 Svenja
  else {
182
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
183 65dc89cb skenneweg
  }
184
185 982056f7 Svenja
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
186
187
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
188
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
189 65dc89cb skenneweg
190
#if(URT_CFG_PUBSUB_PROFILING == true)
191
  subscriber->base.sumLatencies += calculatedLatency;
192
193 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
194 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
195
  }
196 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
197 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
198
  }
199
#endif /* URT_CFG_PUBSUB_PROFILING */
200 982056f7 Svenja
201
    if (latency) {
202
      latency = calculatedLatency;
203
    }
204 65dc89cb skenneweg
  }
205
206
#if (URT_CFG_PUBSUB_PROFILING == true)
207
  subscriber->base.lastMessage->numConsumersLeft--;
208
  subscriber->base->numMessagesReceived++;
209
#endif /* URT_CFG_PUBSUB_PROFILING */
210
211
  urtMutexUnlock(subscriber->base.topic->lock);
212 7d9678db skenneweg
  return URT_STATUS_OK;
213 1fb06240 skenneweg
}
214
215 7d9678db skenneweg
/**
216 a5e142de skenneweg
 * @brief Fetches the latest message.
217 7d9678db skenneweg
 *
218 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
219
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
220
 * @param[in] bytes  Payload size in bytes.
221
 * @param[in] latency  The latency can be returned by reference. May be NULL.
222 7d9678db skenneweg
 *
223 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
224
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
225
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
226 7d9678db skenneweg
 */
227 982056f7 Svenja
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
228 65dc89cb skenneweg
  urtDebugAssert(subscriber);
229
230
  if (!subscriber->base.topic)
231
      return URT_STATUS_FETCH_NOTOPIC;
232
233
  urtMutexLock(subscriber->base.topic->lock);
234
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
235
236
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
237
  {
238
    urtMutexUnlock(subscriber->base.topic->lock);
239
    return URT_STATUS_FETCH_NOMESSAGE;
240
  }
241
242 982056f7 Svenja
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
243
244
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
245
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
246 65dc89cb skenneweg
247
#if(URT_CFG_PUBSUB_PROFILING == true)
248
  subscriber->base.sumLatencies += calculatedLatency;
249
250 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
251 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
252
  }
253 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
254 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
255
  }
256
#endif /* URT_CFG_PUBSUB_PROFILING */
257 982056f7 Svenja
258
    if (latency) {
259
      latency = calculatedLatency;
260
    }
261 65dc89cb skenneweg
  }
262
263
  urtMutexUnlock(subscriber->base.topic->lock);
264 7d9678db skenneweg
  return URT_STATUS_OK;
265 1fb06240 skenneweg
}
266
267 7d9678db skenneweg
/**
268 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
269 7d9678db skenneweg
 *
270 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
271 7d9678db skenneweg
 *
272 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
273
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
274 7d9678db skenneweg
 */
275 5b7188aa skenneweg
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
276
{
277 a5e142de skenneweg
  if (subscriber->base.topic)
278
  {
279
# if(URT_CFG_PUBSUB_PROFILING == true)
280 37cd5dc2 Svenja
      urtMutexLock(topic->lock);
281 5b7188aa skenneweg
      subscriber->base.topic->numSubscribers--;
282 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
283
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
284
# if(URT_CFG_PUBSUB_PROFILING == true)
285 37cd5dc2 Svenja
      urtMutexUnlock(topic->lock);
286 a5e142de skenneweg
      subscriber->base.topic = NULL;
287
      subscriber->base.lastMessage = NULL;
288
      subscriber->base.lastMessageTime = 0;
289
#endif /* URT_CFG_PUBSUB_PROFILING */
290
    return URT_STATUS_OK;
291
  }
292 5b7188aa skenneweg
293
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
294 1fb06240 skenneweg
}
295
296 7d9678db skenneweg
297
/**
298 5198dfae skenneweg
 * @brief  Initialize the srt Subscriber.
299 7d9678db skenneweg
 *
300 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
301 7d9678db skenneweg
 */
302 5c6cb22f skenneweg
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
303
{
304 a5e142de skenneweg
  urtDebugAssert(subscriber);
305
306 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
307
  urtEventListenerInit(subscriber->base.evtListener);
308
  subscriber->base.lastMessage = NULL;
309
  subscriber->base.lastMessageTime = 0;
310
  #if (URT_CFG_PUBSUB_PROFILING)
311
    subscriber->base.sumLatencies = 0;
312
    subscriber->base.numMessagesReceived = 0;
313
    subscriber->usefulnesscb = NULL;
314
    subscriber->cbparams = NULL;
315
    subscriber->minLatency = URT_DELAY_INFINITE;
316
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
317
  #endif /* URT_CFG_PUBSUB_PROFILING */
318
  return;
319
}
320 7d9678db skenneweg
321
/**
322 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
323
 *
324
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
325
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
326
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
327
 *                     Messages must not be associated to another topic.
328
 *                     Once a message has been contributed, it cannot be removed later.
329
 *                     May be NULL (no messages to contribute)
330
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
331
 * @param[in] cbparams  Optional parameters for the usefulness callback.
332
 *                      May be NULL if the callback expects no parameters.
333
 *
334
 * @return  Returns URT_STATUS_OK on success.
335
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
336 7d9678db skenneweg
 */
337
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
338 a5e142de skenneweg
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
339
{
340
  urtDebugAssert(subscriber);
341
  urtDebugAssert(topic);
342
343
  if (subscriber->base.topic)
344
  {
345
    return URT_STATUS_SUBSCRIBE_TOPICSET;
346
  }
347 5b7188aa skenneweg
348
  subscriber->base.topic = topic;
349
  subscriber->usefulnesscb = usefulnesscb;
350
  subscriber->cbparams = cbparams;
351 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
352 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
353
  subscriber->base.numMessagesReceived = 0;
354
  subscriber->minLatency = URT_DELAY_INFINITE;
355
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
356 a5e142de skenneweg
# endif  /* URT_CFG_PUBSUB_PROFILING */
357
358 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
359 a5e142de skenneweg
  if (messages)
360
  {
361 982056f7 Svenja
    urtContributeMessages(messages);
362 a5e142de skenneweg
  }
363
364
  subscriber->base.lastMessage = topic->latestMessage;
365
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
366
367 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
368 a5e142de skenneweg
369
# if (URT_CFG_PUBSUB_PROFILING == true)
370
    topic->numHrtSubscribers--;
371
# endif /* URT_CFG_PUBSUB_PROFILING */
372
373 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
374 a5e142de skenneweg
  return URT_STATUS_OK;
375
}
376 7d9678db skenneweg
377
/**
378 5198dfae skenneweg
 * @brief  Fetches the next message.
379 7d9678db skenneweg
 *
380 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
381
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
382
 * @param[in] bytes  Payload size in bytes.
383
 * @param[in] latency  The latency can be returned by reference. May be NULL.
384 7d9678db skenneweg
 *
385 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
386
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
387
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
388 7d9678db skenneweg
 */
389 982056f7 Svenja
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
390 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
391
{
392
  urtDebugAssert(subscriber);
393
394
  if (!subscriber->base.topic)
395
      return URT_STATUS_FETCH_NOTOPIC;
396
397
  urtMutexLock(subscriber->base.topic->lock);
398
399 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
400
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
401 65dc89cb skenneweg
  {
402
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
403
    {
404
      urtMutexUnlock(subscriber->base.topic->lock);
405
      return URT_STATUS_FETCH_NOMESSAGE;
406
    }
407 982056f7 Svenja
    oldestMessage = oldestMessage->next;
408 65dc89cb skenneweg
  }
409
  else
410
  {
411 982056f7 Svenja
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
412 65dc89cb skenneweg
  }
413
414 982056f7 Svenja
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
415
416
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
417
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
418 65dc89cb skenneweg
419
#if(URT_CFG_PUBSUB_PROFILING == true)
420
  subscriber->base.sumLatencies += calculatedLatency;
421
422 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
423 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
424
  }
425 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
426 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
427
  }
428
#endif /* URT_CFG_PUBSUB_PROFILING */
429 982056f7 Svenja
430
    if (latency) {
431
      latency = calculatedLatency;
432
    }
433 65dc89cb skenneweg
  }
434
435
#if (URT_CFG_PUBSUB_PROFILING == true)
436
  subscriber->base.lastMessage->numConsumersLeft--;
437
  subscriber->base->numMessagesReceived++;
438
#endif /* URT_CFG_PUBSUB_PROFILING */
439
440
  urtMutexUnlock(subscriber->base.topic->lock);
441
  return URT_STATUS_OK;
442
}
443 7d9678db skenneweg
444
/**
445 5198dfae skenneweg
 * @brief  Fetches the latest message.
446 7d9678db skenneweg
 *
447 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
448
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
449
 * @param[in] bytes  Payload size in bytes.
450
 * @param[in] latency  The latency can be returned by reference. May be NULL.
451 7d9678db skenneweg
 *
452 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
453
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
454
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
455 7d9678db skenneweg
 */
456 982056f7 Svenja
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
457 65dc89cb skenneweg
                                                size_t bytes, urt_delay_t* latency)
458
{
459
  urtDebugAssert(subscriber);
460
461 982056f7 Svenja
  if (!subscriber->base.topic) {
462
    return URT_STATUS_FETCH_NOTOPIC;
463
  }
464 65dc89cb skenneweg
465
  urtMutexLock(subscriber->base.topic->lock);
466
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
467
468
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
469
  {
470
    urtMutexUnlock(subscriber->base.topic->lock);
471
    return URT_STATUS_FETCH_NOMESSAGE;
472
  }
473
474 982056f7 Svenja
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
475
476
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
477
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
478 65dc89cb skenneweg
479
#if(URT_CFG_PUBSUB_PROFILING == true)
480
  subscriber->base.sumLatencies += calculatedLatency;
481
482 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
483 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
484
  }
485 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
486 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
487
  }
488
#endif /* URT_CFG_PUBSUB_PROFILING */
489 982056f7 Svenja
490
    if (latency) {
491
      latency = calculatedLatency;
492
    }
493 65dc89cb skenneweg
  }
494
495
  urtMutexUnlock(subscriber->base.topic->lock);
496
  return URT_STATUS_OK;
497
}
498 7d9678db skenneweg
499
/**
500 5198dfae skenneweg
 * @brief  Calculates the usefulness of the subscriber.
501 7d9678db skenneweg
 *
502 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
503
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
504 7d9678db skenneweg
 *
505 5198dfae skenneweg
 * @return  Returns the usefulness as a value within [0,1].
506 7d9678db skenneweg
 */
507 a5e142de skenneweg
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
508
{
509
  urtDebugAssert(subscriber);
510
511
  return subscriber->usefulnesscb(latency);
512
}
513 7d9678db skenneweg
514
/**
515 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
516 7d9678db skenneweg
 *
517 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
518 7d9678db skenneweg
 *
519 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
520
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
521 7d9678db skenneweg
 */
522 a5e142de skenneweg
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
523
{
524
  urtDebugAssert(subscriber);
525
526
  if (subscriber->base.topic)
527
  {
528 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
529 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
530 5b7188aa skenneweg
    subscriber->base.topic->numSubscribers--;
531
# endif /* URT_CFG_PUBSUB_PROFILING */
532 a5e142de skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
533
# if (URT_CFG_PUBSUB_PROFILING == true)
534 37cd5dc2 Svenja
      urtMutexUnlock(topic->lock);
535 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
536
    subscriber->base.topic = NULL;
537
    subscriber->base.lastMessage = NULL;
538
    subscriber->base.lastMessageTime = 0;
539
    return URT_STATUS_OK;
540
  }
541
542
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
543
}
544 7d9678db skenneweg
545
546
/**
547 5198dfae skenneweg
 * @brief  Initialize the FRT Subscriber.
548 7d9678db skenneweg
 *
549 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
550 7d9678db skenneweg
 */
551 5c6cb22f skenneweg
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
552
{
553 a5e142de skenneweg
  urtDebugAssert(subscriber);
554
555 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
556
  urtEventListenerInit(subscriber->base.evtListener);
557
  subscriber->base.lastMessage = NULL;
558
  subscriber->base.lastMessageTime = 0;
559
560
  #if (URT_CFG_PUBSUB_PROFILING)
561
    subscriber->base.sumLatencies = 0;
562
    subscriber->base.numMessagesReceived = 0;
563
  #endif /* URT_CFG_PUBSUB_PROFILING */
564
565
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
566
    subscriber->deadlineOffset = 0;
567
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
568
569
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
570
    subscriber->maxJitter = 0;
571
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
572
573
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
574
    subscriber->minLatency = URT_DELAY_INFINITE;
575
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
576
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
577
  return;
578
}
579
580 7d9678db skenneweg
581
/**
582 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
583
 *
584
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
585
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
586
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
587
 *                      Messages must not be associated to another topic.
588
 *                      Once a message has been contributed, it cannot be removed later.
589
 *                      May be NULL(no messages to contribute).
590
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
591
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
592
 *                    A value of 0 indicates that jitter is of no concern.
593
 *
594
 * @return  Returns URT_STATUS_OK on success.
595
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
596 7d9678db skenneweg
 */
597
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
598 a5e142de skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
599
{
600
  urtDebugAssert(subscriber);
601
  urtDebugAssert(topic);
602
603 5b7188aa skenneweg
  if (subscriber->base.topic)
604
  {
605
    return URT_STATUS_SUBSCRIBE_TOPICSET;
606
  }
607
608
  subscriber->base.topic = topic;
609 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
610 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
611
  subscriber->base.numMessagesReceived = 0;
612 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
613
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
614 5b7188aa skenneweg
  subscriber->deadlineOffset = deadline;
615 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
616
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
617 5b7188aa skenneweg
  subscriber->maxJitter =jitter;
618 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
619
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
620 5b7188aa skenneweg
  subscriber->minLatency = URT_DELAY_INFINITE;
621
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
622 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
623
624 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
625 5b7188aa skenneweg
  if (messages)
626
  {
627 982056f7 Svenja
    urtContributeMessages(messages);
628 5b7188aa skenneweg
  }
629 a5e142de skenneweg
630 5b7188aa skenneweg
  subscriber->base.lastMessage = topic->latestMessage;
631
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
632 a5e142de skenneweg
633 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
634 a5e142de skenneweg
635
# if (URT_CFG_PUBSUB_PROFILING == true)
636 5b7188aa skenneweg
  topic->numHrtSubscribers--;
637 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
638
639 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
640 5b7188aa skenneweg
  return URT_STATUS_OK;
641 a5e142de skenneweg
}
642 7d9678db skenneweg
643
/**
644 5198dfae skenneweg
 * @brief  Fetches the next message.
645 7d9678db skenneweg
 *
646 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
647
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
648
 * @param[in] bytes  Payload size in bytes.
649
 * @param[in] latency  The latency can be returned by reference. May be NULL.
650 7d9678db skenneweg
 *
651 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
652
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
653
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
654 7d9678db skenneweg
 */
655 982056f7 Svenja
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
656 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
657
{
658
  urtDebugAssert(subscriber);
659
660
  if (!subscriber->base.topic)
661
      return URT_STATUS_FETCH_NOTOPIC;
662
663
  urtMutexLock(subscriber->base.topic->lock);
664
665 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
666
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
667 65dc89cb skenneweg
  {
668
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
669
    {
670
      urtMutexUnlock(subscriber->base.topic->lock);
671
      return URT_STATUS_FETCH_NOMESSAGE;
672
    }
673 982056f7 Svenja
    oldestMessage = oldestMessage->next;
674 65dc89cb skenneweg
  }
675
  else
676
  {
677 982056f7 Svenja
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
678 65dc89cb skenneweg
  }
679
680 982056f7 Svenja
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
681 65dc89cb skenneweg
682 982056f7 Svenja
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
683
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
684
685
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
686 65dc89cb skenneweg
  subscriber->base.sumLatencies += calculatedLatency;
687 982056f7 Svenja
688
  if (calculatedLatency < subscriber->minLatency) {
689
    subscriber->minLatency = calculatedLatency;
690
  }
691
  else if (calculatedLatency > subscriber->maxLatency) {
692
    subscriber->maxLatency = calculatedLatency;
693
  }
694 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
695
696 982056f7 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
697
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
698 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
699
  }
700 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
701 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
702
  }
703 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
704
    urtMutexUnlock(subscriber->base.topic);
705
    return URT_STATUS_JITTERVIOLATION;
706
  }
707
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
708 65dc89cb skenneweg
709 982056f7 Svenja
    if (latency) {
710
      latency = calculatedLatency;
711
    }
712 65dc89cb skenneweg
  }
713
714
#if (URT_CFG_PUBSUB_PROFILING == true)
715
  subscriber->base.lastMessage->numConsumersLeft--;
716
  subscriber->base->numMessagesReceived++;
717
#endif /* URT_CFG_PUBSUB_PROFILING */
718
719
  urtMutexUnlock(subscriber->base.topic->lock);
720 982056f7 Svenja
  return URT_STATUS_OK;
721 65dc89cb skenneweg
}
722 7d9678db skenneweg
723
/**
724 5198dfae skenneweg
 * @brief  Fetches the latest message.
725 7d9678db skenneweg
 *
726 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
727
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
728
 * @param[in] bytes  Payload size in bytes.
729
 * @param[in] latency  The latency can be returned by reference. May be NULL.
730 7d9678db skenneweg
 *
731 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
732
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
733
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
734 7d9678db skenneweg
 */
735 982056f7 Svenja
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
736 65dc89cb skenneweg
                                                size_t bytes, urt_delay_t* latency)
737
{
738
  urtDebugAssert(subscriber);
739
740
  if (!subscriber->base.topic)
741
      return URT_STATUS_FETCH_NOTOPIC;
742
743
  urtMutexLock(subscriber->base.topic->lock);
744
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
745
746
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
747
  {
748
    urtMutexUnlock(subscriber->base.topic->lock);
749
    return URT_STATUS_FETCH_NOMESSAGE;
750
  }
751
752 982056f7 Svenja
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
753 65dc89cb skenneweg
754 982056f7 Svenja
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
755
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
756
757
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
758 65dc89cb skenneweg
  subscriber->base.sumLatencies += calculatedLatency;
759 982056f7 Svenja
760
  if (calculatedLatency < subscriber->minLatency) {
761
    subscriber->minLatency = calculatedLatency;
762
  }
763
  else if (calculatedLatency > subscriber->maxLatency) {
764
    subscriber->maxLatency = calculatedLatency;
765
  }
766 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
767
768 982056f7 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
769
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
770 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
771
  }
772 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
773 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
774
  }
775 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
776
    urtMutexUnlock(subscriber->base.topic);
777
    return URT_STATUS_JITTERVIOLATION;
778
  }
779
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
780 65dc89cb skenneweg
781 982056f7 Svenja
    if (latency) {
782
      latency = calculatedLatency;
783
    }
784 65dc89cb skenneweg
  }
785
786
  urtMutexUnlock(subscriber->base.topic->lock);
787
  return URT_STATUS_OK;
788
}
789 7d9678db skenneweg
790
/**
791 5198dfae skenneweg
 * @brief  Calculates the validity from the subscriber.
792 7d9678db skenneweg
 *
793 5198dfae skenneweg
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
794
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
795 7d9678db skenneweg
 *
796 5198dfae skenneweg
 * @return  Returns a boolean indicator whether the latency is fine.
797 7d9678db skenneweg
 */
798 a5e142de skenneweg
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
799
{
800 37cd5dc2 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
801
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
802 5b7188aa skenneweg
    return true;
803 37cd5dc2 Svenja
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
804
805
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
806
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
807
      return true;
808
809
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
810
      return true;
811 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
812 5b7188aa skenneweg
813 a5e142de skenneweg
  return false;
814
}
815 7d9678db skenneweg
816
/**
817 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
818 7d9678db skenneweg
 *
819 5c6cb22f skenneweg
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
820 7d9678db skenneweg
 *
821 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
822
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
823 7d9678db skenneweg
 */
824 5b7188aa skenneweg
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
825
{
826
  urtDebugAssert(subscriber);
827
828 982056f7 Svenja
  if (!subscriber->base.topic) {
829
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
830
  }
831
832
#if (URT_CFG_PUBSUB_PROFILING == true)
833
  urtMutexLock(topic->lock);
834
#endif /* URT_CFG_PUBSUB_PROFILING */
835
836
  urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
837
838 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
839 982056f7 Svenja
  subscriber->base.topic->numSubscribers--;
840 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
841
842
# if (URT_CFG_PUBSUB_PROFILING == true)
843 982056f7 Svenja
  urtMutexUnlock(topic->lock);
844 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
845 982056f7 Svenja
  subscriber->base.topic = NULL;
846
  subscriber->base.lastMessage = NULL;
847
  subscriber->base.lastMessageTime = 0;
848
  return URT_STATUS_OK;
849 5b7188aa skenneweg
}
850 7d9678db skenneweg
851
852
/**
853 5198dfae skenneweg
 * @brief  Initialize the HRT Subscriber.
854 7d9678db skenneweg
 *
855 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
856 7d9678db skenneweg
 */
857 5c6cb22f skenneweg
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
858
{
859 a5e142de skenneweg
  urtDebugAssert(subscriber);
860
861 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
862
  urtEventListenerInit(subscriber->base.evtListener);
863
  subscriber->base.lastMessage = NULL;
864
  subscriber->base.lastMessageTime = 0;
865
866 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING)
867 5c6cb22f skenneweg
    subscriber->base.sumLatencies = 0;
868
    subscriber->base.numMessagesReceived = 0;
869 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
870 5c6cb22f skenneweg
871
  subscriber->next = NULL;
872
873 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
874 5c6cb22f skenneweg
    subscriber->deadlineOffset = 0;
875
    urtTimerInit(subscriber->qodDeadlineTimer);
876 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
877 5c6cb22f skenneweg
878 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
879 5c6cb22f skenneweg
    subscriber->expectedRate = 0;
880 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
881 5c6cb22f skenneweg
882 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
883 5c6cb22f skenneweg
    subscriber->maxJitter = 0;
884 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
885 5c6cb22f skenneweg
886 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
887 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
888
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
889 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
890 5c6cb22f skenneweg
  return;
891
}
892
893 7d9678db skenneweg
894
/**
895 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
896
 *
897 5c6cb22f skenneweg
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
898 5198dfae skenneweg
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
899
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
900
 *                      Messages must not be associated to another topic.
901
 *                      Once a message has been contributed, it cannot be removed later.
902
 *                      May be NULL(no messages to contribute).
903
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
904
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
905
 *                  A value of 0 indicates, that rate is of no concern.
906
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
907
 *                    A value of 0 indicates that jitter is of no concern.
908
 *
909
 * @return  Returns URT_STATUS_OK on success.
910
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
911 7d9678db skenneweg
 */
912
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
913 5b7188aa skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
914
{
915
  urtDebugAssert(subscriber);
916
  urtDebugAssert(topic);
917
918
  if (subscriber->base.topic)
919
  {
920
    return URT_STATUS_SUBSCRIBE_TOPICSET;
921
  }
922
923
  subscriber->base.topic = topic;
924
# if (URT_CFG_PUBSUB_PROFILING == true)
925
  subscriber->base.sumLatencies = 0;
926
  subscriber->base.numMessagesReceived = 0;
927
# endif /* URT_CFG_PUBSUB_PROFILING */
928
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
929
  subscriber->deadlineOffset = deadline;
930
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
931
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
932
  subscriber->maxJitter =jitter;
933
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
934
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
935
  subscriber->minLatency = URT_DELAY_INFINITE;
936
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
937
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
938
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
939
  subscriber->expectedRate = rate;
940
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
941
942 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
943 5b7188aa skenneweg
  if (messages)
944
  {
945 982056f7 Svenja
    urtContributeMessages(messages);
946 5b7188aa skenneweg
  }
947
948
  subscriber->base.lastMessage = topic->latestMessage;
949
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
950
951 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
952 5b7188aa skenneweg
953
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
954 8378a78b Svenja
  //TODO: Implement
955 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
956
957
  topic->numHrtSubscribers--;
958
# if (URT_CFG_PUBSUB_PROFILING == true)
959
  topic->numSubscribers--;
960
# endif /* URT_CFG_PUBSUB_PROFILING */
961
962 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
963 5b7188aa skenneweg
  return URT_STATUS_OK;
964
}
965 7d9678db skenneweg
966
967
/**
968 5198dfae skenneweg
 * @brief  Fetches the next message.
969 7d9678db skenneweg
 *
970 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
971
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
972
 * @param[in] bytes  Payload size in bytes.
973
 * @param[in] latency  The latency can be returned by reference. May be NULL.
974 7d9678db skenneweg
 *
975 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
976
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
977
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
978 7d9678db skenneweg
 */
979 982056f7 Svenja
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
980 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
981
{
982
  urtDebugAssert(subscriber);
983
984 982056f7 Svenja
  if (!subscriber->base.topic) {
985
    return URT_STATUS_FETCH_NOTOPIC;
986
  }
987 65dc89cb skenneweg
988
  urtMutexLock(subscriber->base.topic->lock);
989
  urt_message_t* messageTemp = subscriber->base.lastMessage;
990
  if (messageTemp->next->originTime > messageTemp.originTime)
991
  {
992
    urtMutexUnlock(subscriber->base.topic->lock);
993
    return URT_STATUS_FETCH_NOMESSAGE;
994
  }
995
  messageTemp = messageTemp->next;
996
997 982056f7 Svenja
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
998
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
999
1000
#if(URT_CFG_PUBSUB_PROFILING == true)
1001 65dc89cb skenneweg
  subscriber->base.sumLatencies += calculatedLatency;
1002 982056f7 Svenja
1003
  if (calculatedLatency < subscriber->minLatency) {
1004
    subscriber->minLatency = calculatedLatency;
1005
  }
1006
  else if (calculatedLatency > subscriber->maxLatency) {
1007
    subscriber->maxLatency = calculatedLatency;
1008
  }
1009 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
1010 982056f7 Svenja
1011
    if (latency) {
1012
      latency = calculatedLatency;
1013
    }
1014 65dc89cb skenneweg
  }
1015
1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
1017
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1018
  {
1019
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1020
  }
1021
1022
#if (URT_CFG_PUBSUB_PROFILING == true)
1023
  subscriber->base.lastMessage->numConsumersLeft--;
1024
  subscriber->base->numMessagesReceived++;
1025
#endif /* URT_CFG_PUBSUB_PROFILING */
1026
1027
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1028 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1029
    subscriber->minLatency = calculatedLatency;
1030
  }
1031
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1032
    subscriber->maxLatency = calculatedLatency;
1033
  }
1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035
    urtMutexUnlock(subscriber->base.topic);
1036 65dc89cb skenneweg
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1037
    return URT_STATUS_JITTERVIOLATION;
1038 982056f7 Svenja
  }
1039 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1040 982056f7 Svenja
1041
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1042
  if (calculatedLatency < subscriber->minLatency) {
1043
    subscriber->minLatency = calculatedLatency;
1044 65dc89cb skenneweg
  }
1045 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
1046
    subscriber->maxLatency = calculatedLatency;
1047
  }
1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1049 65dc89cb skenneweg
1050 982056f7 Svenja
  urtFetchMessage(messageTemp, subscriber, payload, bytes);
1051 65dc89cb skenneweg
1052
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1053
  if (messageTemp->next->originTime < messageTemp->originTime)
1054
  {
1055 982056f7 Svenja
    //TODO: first reset?! (when ... set)
1056
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
1057 65dc89cb skenneweg
  }
1058
  else
1059
  {
1060 982056f7 Svenja
    urtTimerReset(subscriber->qosDeadlineTimer);
1061 65dc89cb skenneweg
  }
1062
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1063
1064
  urtMutexUnlock(subscriber->base.topic->lock);
1065
  return URT_STATUS_OK;
1066
}
1067 7d9678db skenneweg
1068
1069
/**
1070 5198dfae skenneweg
 * @brief  Fetches the latest message.
1071 7d9678db skenneweg
 *
1072 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
1073
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
1074
 * @param[in] bytes  Payload size in bytes.
1075
 * @param[in] latency  The latency can be returned by reference. May be NULL.
1076 7d9678db skenneweg
 *
1077 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
1078
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1079
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1080 7d9678db skenneweg
 */
1081 982056f7 Svenja
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
1082
                                                size_t bytes, urt_delay_t* latency)
1083
{
1084
  if (!subscriber->base.topic) {
1085
    return URT_STATUS_FETCH_NOTOPIC;
1086
  }
1087
1088
  urtMutexLock(subscriber->base.topic->lock);
1089
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1090
  bool hrtZero = false;
1091
  while (lastMessage->next->originTime < lastMessage->originTime) {
1092
    lastMessage = lastMessage->next;
1093
    lastMessage->numHrtConsumersLeft--;
1094
    if (lastMessage->numHrtConsumersLeft == 0) {
1095
        hertZero = true;
1096
    }
1097
#if (URT_CFG_PUBSUB_PROFILING == true)
1098
    lastMessage->numConsumersLeft--;
1099
    subscriber->base.numMessagesReceived++;
1100
#endif /* URT_CFG_PUBSUB_PROFILING */
1101
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1102
    urtTimerReset(subscriber->qosDeadlineTimer);
1103
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1104
  }
1105
1106
  if (hrtZero) {
1107
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1108
  }
1109
1110
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1111
    urtMutexUnlock(subscriber->base.topic);
1112
    return URT_STATUS_FETCH_NOMESSAGE;
1113
  }
1114
1115
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
1116
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
1117
1118
#if(URT_CFG_PUBSUB_PROFILING == true)
1119
  subscriber->base.sumLatencies += calculatedLatency;
1120
1121
  if (calculatedLatency < subscriber->minLatency) {
1122
    subscriber->minLatency = calculatedLatency;
1123
  }
1124
  else if (calculatedLatency > subscriber->maxLatency) {
1125
    subscriber->maxLatency = calculatedLatency;
1126
  }
1127
#endif /* URT_CFG_PUBSUB_PROFILING */
1128
1129
    if (latency) {
1130
      latency = calculatedLatency;
1131
    }
1132
  }
1133
1134
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1135
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1136
    subscriber->minLatency = calculatedLatency;
1137
  }
1138
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1139
    subscriber->maxLatency = calculatedLatency;
1140
  }
1141
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1142
    urtMutexUnlock(subscriber->base.topic);
1143
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1144
    return URT_STATUS_JITTERVIOLATION;
1145
  }
1146
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1147
1148
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1149
  if (calculatedLatency < subscriber->minLatency) {
1150
    subscriber->minLatency = calculatedLatency;
1151
  }
1152
  else if (calculatedLatency > subscriber->maxLatency) {
1153
    subscriber->maxLatency = calculatedLatency;
1154
  }
1155
#endif /* URT_CFG_PUBSUB_PROFILING */
1156
1157
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
1158
1159
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1160
  urtTimerReset(subscriber->qosDeadlineTimer);
1161
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1162
}
1163 7d9678db skenneweg
1164
/**
1165 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
1166 7d9678db skenneweg
 *
1167 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
1168 7d9678db skenneweg
 *
1169 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
1170
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
1171 7d9678db skenneweg
 */
1172 5b7188aa skenneweg
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
1173
{
1174
  urtDebugAssert(subscriber);
1175
1176
  if (subscriber->base.topic)
1177
  {
1178 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
1179 5b7188aa skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
1180
    subscriber->base.topic->numHrtSubscribers--;
1181
# if (URT_CFG_PUBSUB_PROFILING == true)
1182
    subscriber->base.topic->numSubscribers--;
1183
# endif /* URT_CFG_PUBSUB_PROFILING */
1184
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1185
    //TODO: remove self from topics lsit of HRT subscribers
1186
    //TODO: ...
1187
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1188
1189
    urt_message_t* messageTemp = subscriber->base.lastMessage;
1190 37cd5dc2 Svenja
    bool hrtZero = false;
1191 5b7188aa skenneweg
    while (messageTemp->next->originTime < messageTemp->originTime)
1192
    {
1193
        messageTemp = messageTemp->next;
1194
        messageTemp->numHrtConsumersLeft--;
1195 37cd5dc2 Svenja
        if (messageTemp->numHrtConsumersLeft == 0)
1196
        {
1197
            hrtZero = true;
1198
        }
1199 5b7188aa skenneweg
# if(URT_CFG_PUBSUB_PROFILING == true)
1200
        messageTemp->numConsumersLeft--;
1201
# endif /* URT_CFG_PUBSUB_PROFILING */
1202
    }
1203 37cd5dc2 Svenja
    if (hrtZero)
1204 5b7188aa skenneweg
    {
1205 37cd5dc2 Svenja
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
1206 5b7188aa skenneweg
    }
1207
1208 37cd5dc2 Svenja
    urtMutexUnlock(topic->lock);
1209 5b7188aa skenneweg
    subscriber->base.topic = NULL;
1210
    subscriber->base.lastMessage = NULL;
1211
    subscriber->base.lastMessageTime = 0;
1212
    return URT_STATUS_OK;
1213
  }
1214
1215
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1216
}
1217
1218