Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ fb72e91b

History | View | Annotate | Download (43.174 KB)

1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

    
22
#include <urtware.h>
23
#include <stdio.h>
24

    
25
/******************************************************************************/
26
/* LOCAL DEFINITIONS                                                          */
27
/******************************************************************************/
28

    
29
/******************************************************************************/
30
/* EXPORTED VARIABLES                                                         */
31
/******************************************************************************/
32

    
33
/******************************************************************************/
34
/* LOCAL TYPES                                                                */
35
/******************************************************************************/
36

    
37
/******************************************************************************/
38
/* LOCAL VARIABLES                                                            */
39
/******************************************************************************/
40

    
41
/******************************************************************************/
42
/* LOCAL FUNCTIONS                                                            */
43
/******************************************************************************/
44

    
45
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
46
{
47
  while (oldestMessage->next->originTime < oldestMessage->originTime) {
48
    oldestMessage = oldestMessage->next;
49
  }
50
  return oldestMessage;
51
}
52

    
53
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
54
{
55
  urt_message_t* lastMessage = latestMessage;
56
  while (lastMessage->next->originTime < lastMessage->originTime) {
57
    lastMessage = lastMessage->next;
58
#if (URT_CFG_PUBSUB_PROFILING == true)
59
    subscriber->base.lastMessage->numConsumersLeft--;
60
    subscriber->base->numMessagesReceived++;
61
#endif /* URT_CFG_PUBSUB_PROFILING */
62
  }
63
  return latestMessage;
64
}
65

    
66
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
67
{
68
  urt_message_t* lastMessageContribute = messages;
69
  while (lastMessageContribute->next) {
70
    lastMessageContribute = lastMessageContribute->next;
71
  }
72
  lastMessageContribute->next = topic->latestMessage->next;
73
  topic->latestMessage->next = messages;
74
}
75

    
76
/******************************************************************************/
77
/* EXPORTED FUNCTIONS                                                         */
78
/******************************************************************************/
79

    
80
/**
81
 * @brief   Initialize the nrt Subscriber.
82
 *
83
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
84
 */
85
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
86
{
87
  urtDebugAssert(subscriber);
88

    
89
  subscriber->base.topic = NULL;
90
  urtEventListenerInit(subscriber->base.evtListener);
91
  subscriber->base.lastMessage = NULL;
92
  subscriber->base.lastMessageTime = 0;
93
#if (URT_CFG_PUBSUB_PROFILING == true)
94
    subscriber->minLatency = URT_DELAY_INFINITE;
95
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
96
#endif /* URT_CFG_PUBSUB_PROFILING */
97
  return;
98
}
99

    
100
/**
101
 * @brief  Subscribes the subscriber to a topic.
102
 *
103
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
104
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
105
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
106
 *                      Messages must not be associated to another topic.
107
 *                      Once a message has been contributed, it cannot be removed later.
108
 *                      May be NULL(no messages to contribute).
109
 *
110
 * @return  Returns URT_STATUS_OK on success.
111
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
112
 */
113
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
114
{
115
  urtDebugAssert(subscriber);
116
  urtDebugAssert(topic);
117

    
118
  if (!subscriber->base.topic) {
119
    return URT_STATUS_SUBSCRIBE_TOPICSET;
120
  }
121

    
122
  subscriber->base.topic = topic;
123
  urtMutexLock(&topic->lock);
124

    
125
  if (messages) {
126
    urtContributeMessages(messages, topic);
127
  }
128

    
129
  subscriber->base.lastMessage = topic->latestMessage;
130
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
131

    
132
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
133

    
134
#if (URT_CFG_PUBSUB_PROFILING == true)
135
    topic->numHrtSubscribers--;
136
#endif /* URT_CFG_PUBSUB_PROFILING */
137

    
138
  urtMutexUnlock(&topic->lock);
139
  return URT_STATUS_OK;
140
}
141

    
142
/**
143
 * @brief  Fetches the next message.
144
 *
145
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
146
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
147
 * @param[in] bytes  Payload size in bytes.
148
 * @param[in] latency  The latency can be returned by reference. May be NULL.
149
 *
150
 * @return  Returns URT_STATUS_OK on success.
151
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
152
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
153
 */
154
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
155
{   
156
  urtDebugAssert(subscriber);
157

    
158
  if (!subscriber->base.topic) {
159
    return URT_STATUS_FETCH_NOTOPIC;
160
  }
161

    
162
  urtMutexLock(&subscriber->base.topic->lock);
163

    
164
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
165
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
166
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
167
      urtMutexUnlock(&subscriber->base.topic->lock);
168
      return URT_STATUS_FETCH_NOMESSAGE;
169
    }
170
    oldestMessage = oldestMessage->next;
171
  }
172
  else {
173
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
174
  }
175

    
176
  subscriber->base.lastMessage = oldestMessage;
177
  subscriber->base.lastMessageTime = oldestMessage->originTime;
178
  memcpy(oldestMessage->payload, payload, bytes);
179

    
180
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
181
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
182

    
183
#if(URT_CFG_PUBSUB_PROFILING == true)
184
  subscriber->base.sumLatencies += calculatedLatency;
185

    
186
  if (calculatedLatency < subscriber->minLatency) {
187
    subscriber->minLatency = calculatedLatency;
188
  }
189
  else if (calculatedLatency > subscriber->maxLatency) {
190
    subscriber->maxLatency = calculatedLatency;
191
  }
192
#endif /* URT_CFG_PUBSUB_PROFILING */
193

    
194
    if (latency) {
195
      *latency = calculatedLatency;
196
    }
197
  }
198

    
199
#if (URT_CFG_PUBSUB_PROFILING == true)
200
  subscriber->base.lastMessage->numConsumersLeft--;
201
  subscriber->base->numMessagesReceived++;
202
#endif /* URT_CFG_PUBSUB_PROFILING */
203

    
204
  urtMutexUnlock(&subscriber->base.topic->lock);
205
  return URT_STATUS_OK;
206
}
207

    
208
/**
209
 * @brief Fetches the latest message.
210
 *
211
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
212
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
213
 * @param[in] bytes  Payload size in bytes.
214
 * @param[in] latency  The latency can be returned by reference. May be NULL.
215
 *
216
 * @return  Returns URT_STATUS_OK on success.
217
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
218
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
219
 */
220
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
221
  urtDebugAssert(subscriber);
222

    
223
  if (!subscriber->base.topic)
224
      return URT_STATUS_FETCH_NOTOPIC;
225

    
226
  urtMutexLock(&subscriber->base.topic->lock);
227
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
228

    
229
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
230
  {
231
    urtMutexUnlock(&subscriber->base.topic->lock);
232
    return URT_STATUS_FETCH_NOMESSAGE;
233
  }
234

    
235
  subscriber->base.lastMessage = lastMessage;
236
  subscriber->base.lastMessageTime = lastMessage->originTime;
237
  memcpy(lastMessage->payload, payload, bytes);
238

    
239
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
240
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
241

    
242
#if(URT_CFG_PUBSUB_PROFILING == true)
243
  subscriber->base.sumLatencies += calculatedLatency;
244

    
245
  if (calculatedLatency < subscriber->minLatency) {
246
    subscriber->minLatency = calculatedLatency;
247
  }
248
  else if (calculatedLatency > subscriber->maxLatency) {
249
    subscriber->maxLatency = calculatedLatency;
250
  }
251
#endif /* URT_CFG_PUBSUB_PROFILING */
252

    
253
    if (latency) {
254
      *latency = calculatedLatency;
255
    }
256
  }
257

    
258
  urtMutexUnlock(&subscriber->base.topic->lock);
259
  return URT_STATUS_OK;
260
}
261

    
262
/**
263
 * @brief  Unsubscribes from a subscriber.
264
 *
265
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
266
 *
267
 * @return  Returns URT_STATUS_OK on sucess.
268
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
269
 */
270
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
271
{
272
  if (subscriber->base.topic)
273
  {
274
# if(URT_CFG_PUBSUB_PROFILING == true)
275
      urtMutexLock(&topic->lock);
276
      subscriber->base.topic->numSubscribers--;
277
# endif /* URT_CFG_PUBSUB_PROFILING */
278
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
279
# if(URT_CFG_PUBSUB_PROFILING == true)
280
      urtMutexUnlock(&topic->lock);
281
      subscriber->base.topic = NULL;
282
      subscriber->base.lastMessage = NULL;
283
      subscriber->base.lastMessageTime = 0;
284
#endif /* URT_CFG_PUBSUB_PROFILING */
285
    return URT_STATUS_OK;
286
  }
287

    
288
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
289
}
290

    
291

    
292
/**
293
 * @brief  Initialize the srt Subscriber.
294
 *
295
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
296
 */
297
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
298
{
299
  urtDebugAssert(subscriber);
300

    
301
  subscriber->base.topic = NULL;
302
  urtEventListenerInit(subscriber->base.evtListener);
303
  subscriber->base.lastMessage = NULL;
304
  subscriber->base.lastMessageTime = 0;
305
#if (URT_CFG_PUBSUB_PROFILING)
306
  subscriber->base.sumLatencies = 0;
307
  subscriber->base.numMessagesReceived = 0;
308
  subscriber->usefulnesscb = NULL;
309
  subscriber->cbparams = NULL;
310
  subscriber->minLatency = URT_DELAY_INFINITE;
311
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
312
#endif /* URT_CFG_PUBSUB_PROFILING */
313
  return;
314
}
315

    
316
/**
317
 * @brief  Subscribes the subscriber to a topic.
318
 *
319
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
320
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
321
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
322
 *                     Messages must not be associated to another topic.
323
 *                     Once a message has been contributed, it cannot be removed later.
324
 *                     May be NULL (no messages to contribute)
325
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
326
 * @param[in] cbparams  Optional parameters for the usefulness callback.
327
 *                      May be NULL if the callback expects no parameters.
328
 *
329
 * @return  Returns URT_STATUS_OK on success.
330
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
331
 */
332
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
333
                                       urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
334
{
335
  urtDebugAssert(subscriber);
336
  urtDebugAssert(topic);
337

    
338
  if (subscriber->base.topic) {
339
    return URT_STATUS_SUBSCRIBE_TOPICSET;
340
  }
341

    
342
  subscriber->base.topic = topic;
343
  subscriber->usefulnesscb = usefulnesscb;
344
  subscriber->cbparams = cbparams;
345
#if (URT_CFG_PUBSUB_PROFILING == true)
346
  subscriber->base.sumLatencies = 0;
347
  subscriber->base.numMessagesReceived = 0;
348
  subscriber->minLatency = URT_DELAY_INFINITE;
349
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
350
#endif  /* URT_CFG_PUBSUB_PROFILING */
351

    
352
  urtMutexLock(&topic->lock);
353
  if (messages) {
354
    urtContributeMessages(messages, topic);
355
  }
356

    
357
  subscriber->base.lastMessage = topic->latestMessage;
358
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
359

    
360
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
361

    
362
# if (URT_CFG_PUBSUB_PROFILING == true)
363
    topic->numHrtSubscribers--;
364
# endif /* URT_CFG_PUBSUB_PROFILING */
365

    
366
  urtMutexUnlock(&topic->lock);
367
  return URT_STATUS_OK;
368
}
369

    
370
/**
371
 * @brief  Fetches the next message.
372
 *
373
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
374
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
375
 * @param[in] bytes  Payload size in bytes.
376
 * @param[in] latency  The latency can be returned by reference. May be NULL.
377
 *
378
 * @return  Returns URT_STATUS_OK on success.
379
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
380
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
381
 */
382
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
383
                                              size_t bytes, urt_delay_t* latency)
384
{
385
  urtDebugAssert(subscriber);
386

    
387
  if (!subscriber->base.topic){
388
    return URT_STATUS_FETCH_NOTOPIC;
389
  }
390

    
391
  urtMutexLock(&subscriber->base.topic->lock);
392

    
393
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
394
  if(oldestMessage->originTime == subscriber->base.lastMessageTime){
395
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){
396
      urtMutexUnlock(&subscriber->base.topic->lock);
397
      return URT_STATUS_FETCH_NOMESSAGE;
398
    }
399
    oldestMessage = oldestMessage->next;
400
  }
401
  else{
402
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
403
  }
404

    
405
  subscriber->base.lastMessage = oldestMessage;
406
  subscriber->base.lastMessageTime = oldestMessage->originTime;
407
  memcpy(oldestMessage->payload, payload, bytes);
408

    
409
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
410
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
411

    
412
#if(URT_CFG_PUBSUB_PROFILING == true)
413
  subscriber->base.sumLatencies += calculatedLatency;
414

    
415
  if (calculatedLatency < subscriber->minLatency) {
416
    subscriber->minLatency = calculatedLatency;
417
  }
418
  else if (calculatedLatency > subscriber->maxLatency) {
419
    subscriber->maxLatency = calculatedLatency;
420
  }
421
#endif /* URT_CFG_PUBSUB_PROFILING */
422

    
423
    if (latency) {
424
      *latency = calculatedLatency;
425
    }
426
  }
427

    
428
#if (URT_CFG_PUBSUB_PROFILING == true)
429
  subscriber->base.lastMessage->numConsumersLeft--;
430
  subscriber->base->numMessagesReceived++;
431
#endif /* URT_CFG_PUBSUB_PROFILING */
432

    
433
  urtMutexUnlock(&subscriber->base.topic->lock);
434
  return URT_STATUS_OK;
435
}
436

    
437
/**
438
 * @brief  Fetches the latest message.
439
 *
440
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
441
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
442
 * @param[in] bytes  Payload size in bytes.
443
 * @param[in] latency  The latency can be returned by reference. May be NULL.
444
 *
445
 * @return  Returns URT_STATUS_OK on success.
446
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
447
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
448
 */
449
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
450
                                                size_t bytes, urt_delay_t* latency)
451
{
452
  urtDebugAssert(subscriber);
453

    
454
  if (!subscriber->base.topic) {
455
    return URT_STATUS_FETCH_NOTOPIC;
456
  }
457

    
458
  urtMutexLock(&subscriber->base.topic->lock);
459
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
460

    
461
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
462
  {
463
    urtMutexUnlock(&subscriber->base.topic->lock);
464
    return URT_STATUS_FETCH_NOMESSAGE;
465
  }
466

    
467
  subscriber->base.lastMessage = lastMessage;
468
  subscriber->base.lastMessageTime = lastMessage->originTime;
469
  memcpy(lastMessage->payload, payload, bytes);
470

    
471
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
472
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
473

    
474
#if(URT_CFG_PUBSUB_PROFILING == true)
475
  subscriber->base.sumLatencies += calculatedLatency;
476

    
477
  if (calculatedLatency < subscriber->minLatency) {
478
    subscriber->minLatency = calculatedLatency;
479
  }
480
  else if (calculatedLatency > subscriber->maxLatency) {
481
    subscriber->maxLatency = calculatedLatency;
482
  }
483
#endif /* URT_CFG_PUBSUB_PROFILING */
484

    
485
    if (latency != NULL) {
486
      *latency = calculatedLatency;
487
    }
488
  }
489

    
490
  urtMutexUnlock(&subscriber->base.topic->lock);
491
  return URT_STATUS_OK;
492
}
493

    
494
/**
495
 * @brief  Calculates the usefulness of the subscriber.
496
 *
497
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
498
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
499
 *
500
 * @return  Returns the usefulness as a value within [0,1].
501
 */
502
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
503
{
504
  urtDebugAssert(subscriber);
505

    
506
  return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
507
}
508

    
509
/**
510
 * @brief  Unsubscribes from a subscriber.
511
 *
512
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
513
 *
514
 * @return  Returns URT_STATUS_OK on sucess.
515
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
516
 */
517
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
518
{
519
  urtDebugAssert(subscriber);
520

    
521
  if (subscriber->base.topic)
522
  {
523
# if (URT_CFG_PUBSUB_PROFILING == true)
524
    urtMutexLock(&topic->lock);
525
    subscriber->base.topic->numSubscribers--;
526
# endif /* URT_CFG_PUBSUB_PROFILING */
527
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
528
# if (URT_CFG_PUBSUB_PROFILING == true)
529
      urtMutexUnlock(&topic->lock);
530
# endif /* URT_CFG_PUBSUB_PROFILING */
531
    subscriber->base.topic = NULL;
532
    subscriber->base.lastMessage = NULL;
533
    subscriber->base.lastMessageTime = 0;
534
    return URT_STATUS_OK;
535
  }
536

    
537
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
538
}
539

    
540

    
541
/**
542
 * @brief  Initialize the FRT Subscriber.
543
 *
544
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
545
 */
546
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
547
{
548
  urtDebugAssert(subscriber);
549

    
550
  subscriber->base.topic = NULL;
551
  urtEventListenerInit(subscriber->base.evtListener);
552
  subscriber->base.lastMessage = NULL;
553
  subscriber->base.lastMessageTime = 0;
554

    
555
  #if (URT_CFG_PUBSUB_PROFILING)
556
    subscriber->base.sumLatencies = 0;
557
    subscriber->base.numMessagesReceived = 0;
558
  #endif /* URT_CFG_PUBSUB_PROFILING */
559

    
560
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
561
    subscriber->deadlineOffset = 0;
562
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
563

    
564
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
565
    subscriber->maxJitter = 0;
566
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
567

    
568
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
569
    subscriber->minLatency = URT_DELAY_INFINITE;
570
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
571
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
572
  return;
573
}
574

    
575

    
576
/**
577
 * @brief  Subscribes the subscriber to a topic.
578
 *
579
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
580
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
581
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
582
 *                      Messages must not be associated to another topic.
583
 *                      Once a message has been contributed, it cannot be removed later.
584
 *                      May be NULL(no messages to contribute).
585
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
586
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
587
 *                    A value of 0 indicates that jitter is of no concern.
588
 *
589
 * @return  Returns URT_STATUS_OK on success.
590
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
591
 */
592
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
593
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
594
{
595
  urtDebugAssert(subscriber);
596
  urtDebugAssert(topic);
597

    
598
  if (subscriber->base.topic)
599
  {
600
    return URT_STATUS_SUBSCRIBE_TOPICSET;
601
  }
602

    
603
  subscriber->base.topic = topic;
604
# if (URT_CFG_PUBSUB_PROFILING == true)
605
  subscriber->base.sumLatencies = 0;
606
  subscriber->base.numMessagesReceived = 0;
607
# endif /* URT_CFG_PUBSUB_PROFILING */
608
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
609
  subscriber->deadlineOffset = deadline;
610
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
611
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
612
  subscriber->maxJitter =jitter;
613
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
614
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
615
  subscriber->minLatency = URT_DELAY_INFINITE;
616
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
617
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
618

    
619
  urtMutexLock(&topic->lock);
620
  if (messages)
621
  {
622
    urtContributeMessages(messages, topic);
623
  }
624

    
625
  subscriber->base.lastMessage = topic->latestMessage;
626
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
627

    
628
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
629

    
630
# if (URT_CFG_PUBSUB_PROFILING == true)
631
  topic->numHrtSubscribers--;
632
# endif /* URT_CFG_PUBSUB_PROFILING */
633

    
634
  urtMutexUnlock(&topic->lock);
635
  return URT_STATUS_OK;
636
}
637

    
638
/**
639
 * @brief  Fetches the next message.
640
 *
641
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
642
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
643
 * @param[in] bytes  Payload size in bytes.
644
 * @param[in] latency  The latency can be returned by reference. May be NULL.
645
 *
646
 * @return  Returns URT_STATUS_OK on success.
647
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
648
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
649
 */
650
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
651
                                              size_t bytes, urt_delay_t* latency)
652
{
653
  urtDebugAssert(subscriber);
654

    
655
  if (!subscriber->base.topic)
656
      return URT_STATUS_FETCH_NOTOPIC;
657

    
658
  urtMutexLock(&subscriber->base.topic->lock);
659

    
660
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
661
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
662
  {
663
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
664
    {
665
      urtMutexUnlock(&subscriber->base.topic->lock);
666
      return URT_STATUS_FETCH_NOMESSAGE;
667
    }
668
    oldestMessage = oldestMessage->next;
669
  }
670
  else
671
  {
672
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
673
  }
674

    
675
  subscriber->base.lastMessage = oldestMessage;
676
  subscriber->base.lastMessageTime = oldestMessage->originTime;
677
  memcpy(oldestMessage->payload, payload, bytes);
678

    
679
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
680
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
681

    
682
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
683
  subscriber->base.sumLatencies += calculatedLatency;
684

    
685
  if (calculatedLatency < subscriber->minLatency) {
686
    subscriber->minLatency = calculatedLatency;
687
  }
688
  else if (calculatedLatency > subscriber->maxLatency) {
689
    subscriber->maxLatency = calculatedLatency;
690
  }
691
#endif /* URT_CFG_PUBSUB_PROFILING */
692

    
693
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
694
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
695
    subscriber->minLatency = calculatedLatency;
696
  }
697
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
698
    subscriber->maxLatency = calculatedLatency;
699
  }
700
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
701
    urtMutexUnlock(&subscriber->base.topic->lock);
702
    return URT_STATUS_JITTERVIOLATION;
703
  }
704
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
705

    
706
    if (latency) {
707
      *latency = calculatedLatency;
708
    }
709
  }
710

    
711
#if (URT_CFG_PUBSUB_PROFILING == true)
712
  subscriber->base.lastMessage->numConsumersLeft--;
713
  subscriber->base->numMessagesReceived++;
714
#endif /* URT_CFG_PUBSUB_PROFILING */
715

    
716
  urtMutexUnlock(&subscriber->base.topic->lock);
717
  return URT_STATUS_OK;
718
}
719

    
720
/**
721
 * @brief  Fetches the latest message.
722
 *
723
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
724
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
725
 * @param[in] bytes  Payload size in bytes.
726
 * @param[in] latency  The latency can be returned by reference. May be NULL.
727
 *
728
 * @return  Returns URT_STATUS_OK on success.
729
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
730
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
731
 */
732
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
733
                                                size_t bytes, urt_delay_t* latency)
734
{
735
  urtDebugAssert(subscriber);
736

    
737
  if (!subscriber->base.topic)
738
      return URT_STATUS_FETCH_NOTOPIC;
739

    
740
  urtMutexLock(&subscriber->base.topic->lock);
741
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
742

    
743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
744
  {
745
    urtMutexUnlock(&subscriber->base.topic->lock);
746
    return URT_STATUS_FETCH_NOMESSAGE;
747
  }
748

    
749
    subscriber->base.lastMessage = lastMessage;
750
    subscriber->base.lastMessageTime = lastMessage->originTime;
751
    memcpy(lastMessage->payload, payload, bytes);
752

    
753
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
754
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
755

    
756
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
757
  subscriber->base.sumLatencies += calculatedLatency;
758

    
759
  if (calculatedLatency < subscriber->minLatency) {
760
    subscriber->minLatency = calculatedLatency;
761
  }
762
  else if (calculatedLatency > subscriber->maxLatency) {
763
    subscriber->maxLatency = calculatedLatency;
764
  }
765
#endif /* URT_CFG_PUBSUB_PROFILING */
766

    
767
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
768
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
769
    subscriber->minLatency = calculatedLatency;
770
  }
771
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
772
    subscriber->maxLatency = calculatedLatency;
773
  }
774
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
775
    urtMutexUnlock(&subscriber->base.topic->lock);
776
    return URT_STATUS_JITTERVIOLATION;
777
  }
778
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
779

    
780
    if (latency) {
781
      *latency = calculatedLatency;
782
    }
783
  }
784

    
785
  urtMutexUnlock(&subscriber->base.topic->lock);
786
  return URT_STATUS_OK;
787
}
788

    
789
/**
790
 * @brief  Calculates the validity from the subscriber.
791
 *
792
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
793
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
794
 *
795
 * @return  Returns a boolean indicator whether the latency is fine.
796
 */
797
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
798
{
799
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
800
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
801
    return true;
802
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
803

    
804
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
805
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
806
      return true;
807

    
808
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
809
      return true;
810
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
811

    
812
  return false;
813
}
814

    
815
/**
816
 * @brief  Unsubscribes from a subscriber.
817
 *
818
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
819
 *
820
 * @return  Returns URT_STATUS_OK on sucess.
821
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
822
 */
823
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
824
{
825
  urtDebugAssert(subscriber);
826

    
827
  if (!subscriber->base.topic) {
828
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
829
  }
830

    
831
#if (URT_CFG_PUBSUB_PROFILING == true)
832
  urtMutexLock(&topic->lock);
833
#endif /* URT_CFG_PUBSUB_PROFILING */
834

    
835
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
836

    
837
# if (URT_CFG_PUBSUB_PROFILING == true)
838
  subscriber->base.topic->numSubscribers--;
839
# endif /* URT_CFG_PUBSUB_PROFILING */
840

    
841
# if (URT_CFG_PUBSUB_PROFILING == true)
842
  urtMutexUnlock(&topic->lock);
843
# endif /* URT_CFG_PUBSUB_PROFILING */
844
  subscriber->base.topic = NULL;
845
  subscriber->base.lastMessage = NULL;
846
  subscriber->base.lastMessageTime = 0;
847
  return URT_STATUS_OK;
848
}
849

    
850

    
851
/**
852
 * @brief  Initialize the HRT Subscriber.
853
 *
854
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
855
 */
856
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
857
{
858
  urtDebugAssert(subscriber);
859

    
860
  subscriber->base.topic = NULL;
861
  urtEventListenerInit(subscriber->base.evtListener);
862
  subscriber->base.lastMessage = NULL;
863
  subscriber->base.lastMessageTime = 0;
864

    
865
# if (URT_CFG_PUBSUB_PROFILING)
866
    subscriber->base.sumLatencies = 0;
867
    subscriber->base.numMessagesReceived = 0;
868
# endif /* URT_CFG_PUBSUB_PROFILING */
869

    
870
  subscriber->next = NULL;
871

    
872
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
873
    subscriber->deadlineOffset = 0;
874
    urtTimerInit(subscriber->qodDeadlineTimer);
875
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
876

    
877
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
878
    subscriber->expectedRate = 0;
879
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
880

    
881
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
882
    subscriber->maxJitter = 0;
883
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
884

    
885
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
886
    subscriber->minLatency = URT_DELAY_INFINITE;
887
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
888
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
889
  return;
890
}
891

    
892

    
893
/**
894
 * @brief  Subscribes the subscriber to a topic.
895
 *
896
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
897
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
898
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
899
 *                      Messages must not be associated to another topic.
900
 *                      Once a message has been contributed, it cannot be removed later.
901
 *                      May be NULL(no messages to contribute).
902
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
903
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
904
 *                  A value of 0 indicates, that rate is of no concern.
905
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
906
 *                    A value of 0 indicates that jitter is of no concern.
907
 *
908
 * @return  Returns URT_STATUS_OK on success.
909
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
910
 */
911
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
912
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
913
{
914
  urtDebugAssert(subscriber);
915
  urtDebugAssert(topic);
916

    
917
  if (subscriber->base.topic)
918
  {
919
    return URT_STATUS_SUBSCRIBE_TOPICSET;
920
  }
921

    
922
  subscriber->base.topic = topic;
923
# if (URT_CFG_PUBSUB_PROFILING == true)
924
  subscriber->base.sumLatencies = 0;
925
  subscriber->base.numMessagesReceived = 0;
926
# endif /* URT_CFG_PUBSUB_PROFILING */
927
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
928
  subscriber->deadlineOffset = deadline;
929
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
930
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
931
  subscriber->maxJitter =jitter;
932
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
933
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
934
  subscriber->minLatency = URT_DELAY_INFINITE;
935
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
936
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
937
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
938
  subscriber->expectedRate = rate;
939
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
940

    
941
  urtMutexLock(&topic->lock);
942
  if (messages)
943
  {
944
    urtContributeMessages(messages, topic);
945
  }
946

    
947
  subscriber->base.lastMessage = topic->latestMessage;
948
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
949

    
950
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
951

    
952
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
953
  //TODO: Implement
954
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
955

    
956
  topic->numHrtSubscribers--;
957
# if (URT_CFG_PUBSUB_PROFILING == true)
958
  topic->numSubscribers--;
959
# endif /* URT_CFG_PUBSUB_PROFILING */
960

    
961
  urtMutexUnlock(&topic->lock);
962
  return URT_STATUS_OK;
963
}
964

    
965

    
966
/**
967
 * @brief  Fetches the next message.
968
 *
969
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
970
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
971
 * @param[in] bytes  Payload size in bytes.
972
 * @param[in] latency  The latency can be returned by reference. May be NULL.
973
 *
974
 * @return  Returns URT_STATUS_OK on success.
975
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
976
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
977
 */
978
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
979
                                              size_t bytes, urt_delay_t* latency)
980
{
981
  urtDebugAssert(subscriber);
982

    
983
  if (!subscriber->base.topic) {
984
    return URT_STATUS_FETCH_NOTOPIC;
985
  }
986

    
987
  urtMutexLock(&subscriber->base.topic->lock);
988
  urt_message_t* lastMessage = subscriber->base.lastMessage;
989
  if (lastMessage->next->originTime > lastMessage->originTime)
990
  {
991
    urtMutexUnlock(&subscriber->base.topic->lock);
992
    return URT_STATUS_FETCH_NOMESSAGE;
993
  }
994
  lastMessage = lastMessage->next;
995

    
996
  uint64_t calculatedLatency;
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
998
    calculatedLatency = urtTimeNow() - lastMessage->originTime;
999
  }
1000

    
1001
#if(URT_CFG_PUBSUB_PROFILING == true)
1002
  subscriber->base.sumLatencies += calculatedLatency;
1003

    
1004
  if (calculatedLatency < subscriber->minLatency) {
1005
    subscriber->minLatency = calculatedLatency;
1006
  }
1007
  else if (calculatedLatency > subscriber->maxLatency) {
1008
    subscriber->maxLatency = calculatedLatency;
1009
  }
1010
#endif /* URT_CFG_PUBSUB_PROFILING */
1011

    
1012
  if (latency) {
1013
    *latency = calculatedLatency;
1014
  }
1015

    
1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
1017
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1018
  {
1019
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1020
  }
1021

    
1022
#if (URT_CFG_PUBSUB_PROFILING == true)
1023
  subscriber->base.lastMessage->numConsumersLeft--;
1024
  subscriber->base->numMessagesReceived++;
1025
#endif /* URT_CFG_PUBSUB_PROFILING */
1026

    
1027
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1028
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1029
    subscriber->minLatency = calculatedLatency;
1030
  }
1031
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1032
    subscriber->maxLatency = calculatedLatency;
1033
  }
1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035
    urtMutexUnlock(&subscriber->base.topic->lock);
1036
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1037
    return URT_STATUS_JITTERVIOLATION;
1038
  }
1039
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1040

    
1041
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1042
  if (calculatedLatency < subscriber->minLatency) {
1043
    subscriber->minLatency = calculatedLatency;
1044
  }
1045
  else if (calculatedLatency > subscriber->maxLatency) {
1046
    subscriber->maxLatency = calculatedLatency;
1047
  }
1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1049

    
1050
    subscriber->base.lastMessage = lastMessage;
1051
    subscriber->base.lastMessageTime = lastMessage->originTime;
1052
    memcpy(lastMessage->payload, payload, bytes);
1053

    
1054
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1055
  if (lastMessage->next->originTime < lastMessage->originTime)
1056
  {
1057
    //TODO: first reset?! (when ... set)
1058
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
1059
  }
1060
  else
1061
  {
1062
    urtTimerReset(subscriber->qosDeadlineTimer);
1063
  }
1064
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1065

    
1066
  urtMutexUnlock(&subscriber->base.topic->lock);
1067
  return URT_STATUS_OK;
1068
}
1069

    
1070

    
1071
/**
1072
 * @brief  Fetches the latest message.
1073
 *
1074
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
1075
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
1076
 * @param[in] bytes  Payload size in bytes.
1077
 * @param[in] latency  The latency can be returned by reference. May be NULL.
1078
 *
1079
 * @return  Returns URT_STATUS_OK on success.
1080
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1081
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1082
 */
1083
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
1084
                                                size_t bytes, urt_delay_t* latency)
1085
{
1086
  if (!subscriber->base.topic) {
1087
    return URT_STATUS_FETCH_NOTOPIC;
1088
  }
1089

    
1090
  urtMutexLock(&subscriber->base.topic->lock);
1091
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1092
  bool hrtZero = false;
1093
  while (lastMessage->next->originTime < lastMessage->originTime) {
1094
    lastMessage = lastMessage->next;
1095
    lastMessage->numHrtConsumersLeft--;
1096
    if (lastMessage->numHrtConsumersLeft == 0) {
1097
        hrtZero = true;
1098
    }
1099
#if (URT_CFG_PUBSUB_PROFILING == true)
1100
    lastMessage->numConsumersLeft--;
1101
    subscriber->base.numMessagesReceived++;
1102
#endif /* URT_CFG_PUBSUB_PROFILING */
1103
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1104
    urtTimerReset(subscriber->qosDeadlineTimer);
1105
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1106
  }
1107

    
1108
  if (hrtZero) {
1109
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1110
  }
1111

    
1112
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1113
    urtMutexUnlock(&subscriber->base.topic->lock);
1114
    return URT_STATUS_FETCH_NOMESSAGE;
1115
  }
1116

    
1117
  uint64_t calculatedLatency;
1118
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
1119
     calculatedLatency = urtTimeNow() - lastMessage->originTime;
1120
  }
1121

    
1122
#if(URT_CFG_PUBSUB_PROFILING == true)
1123
  subscriber->base.sumLatencies += calculatedLatency;
1124

    
1125
  if (calculatedLatency < subscriber->minLatency) {
1126
    subscriber->minLatency = calculatedLatency;
1127
  }
1128
  else if (calculatedLatency > subscriber->maxLatency) {
1129
    subscriber->maxLatency = calculatedLatency;
1130
  }
1131
#endif /* URT_CFG_PUBSUB_PROFILING */
1132

    
1133
  if (latency) {
1134
    *latency = calculatedLatency;
1135
  }
1136

    
1137
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1138
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1139
    subscriber->minLatency = calculatedLatency;
1140
  }
1141
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1142
    subscriber->maxLatency = calculatedLatency;
1143
  }
1144
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1145
    urtMutexUnlock(&subscriber->base.topic->lock);
1146
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1147
    return URT_STATUS_JITTERVIOLATION;
1148
  }
1149
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1150

    
1151
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1152
  if (calculatedLatency < subscriber->minLatency) {
1153
    subscriber->minLatency = calculatedLatency;
1154
  }
1155
  else if (calculatedLatency > subscriber->maxLatency) {
1156
    subscriber->maxLatency = calculatedLatency;
1157
  }
1158
#endif /* URT_CFG_PUBSUB_PROFILING */
1159

    
1160
    subscriber->base.lastMessage = lastMessage;
1161
    subscriber->base.lastMessageTime = lastMessage->originTime;
1162
    memcpy(lastMessage->payload, payload, bytes);
1163

    
1164
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1165
  urtTimerReset(subscriber->qosDeadlineTimer);
1166
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1167

    
1168
  urtMutexUnlock(&subscriber->base.topic->lock);
1169
  return URT_STATUS_OK;
1170
}
1171

    
1172
/**
1173
 * @brief  Unsubscribes from a subscriber.
1174
 *
1175
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
1176
 *
1177
 * @return  Returns URT_STATUS_OK on sucess.
1178
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
1179
 */
1180
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
1181
{
1182
  urtDebugAssert(subscriber);
1183

    
1184
  if(!subscriber->base.topic) {
1185
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1186
  }
1187

    
1188
  urtMutexLock(&subscriber->base.topic->lock);
1189
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
1190
  subscriber->base.topic->numHrtSubscribers--;
1191
# if (URT_CFG_PUBSUB_PROFILING == true)
1192
  subscriber->base.topic->numSubscribers--;
1193
# endif /* URT_CFG_PUBSUB_PROFILING */
1194
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1195
  //TODO: remove self from topics list of HRT subscribers, ...
1196
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1197

    
1198
  urt_message_t* messageTemp = subscriber->base.lastMessage;
1199
  bool hrtZero = false;
1200
  while (messageTemp->next->originTime < messageTemp->originTime){
1201
    messageTemp = messageTemp->next;
1202
    messageTemp->numHrtConsumersLeft--;
1203
    if (messageTemp->numHrtConsumersLeft == 0) {
1204
      hrtZero = true;
1205
    }
1206
# if(URT_CFG_PUBSUB_PROFILING == true)
1207
    messageTemp->numConsumersLeft--;
1208
# endif /* URT_CFG_PUBSUB_PROFILING */
1209
  }
1210
  if (hrtZero){
1211
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1212
  }
1213

    
1214
  urtMutexUnlock(&subscriber->base.topic->lock);
1215
  subscriber->base.topic = NULL;
1216
  subscriber->base.lastMessage = NULL;
1217
  subscriber->base.lastMessageTime = 0;
1218
  return URT_STATUS_OK;
1219
}
1220

    
1221

    
1222