Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 982056f7

History | View | Annotate | Download (42.379 KB)

1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

    
22
#include <urtware.h>
23
#include <stdio.h>
24

    
25
/******************************************************************************/
26
/* LOCAL DEFINITIONS                                                          */
27
/******************************************************************************/
28

    
29
/******************************************************************************/
30
/* EXPORTED VARIABLES                                                         */
31
/******************************************************************************/
32

    
33
/******************************************************************************/
34
/* LOCAL TYPES                                                                */
35
/******************************************************************************/
36

    
37
/******************************************************************************/
38
/* LOCAL VARIABLES                                                            */
39
/******************************************************************************/
40

    
41
/******************************************************************************/
42
/* LOCAL FUNCTIONS                                                            */
43
/******************************************************************************/
44

    
45
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
46
{
47
  subscriber->base.lastMessage = message;
48
  *subscriber->base.lastMessageTime = message->originTime;
49
  memcpy(message->payload, payload, bytes);
50
}
51

    
52
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
53
{
54
  while (oldestMessage->next->originTime < oldestMessage->originTime)
55
  {
56
    oldestMessage = oldestMessage->next;
57
  }
58
  return oldestMessage;
59
}
60

    
61
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
62
{
63
  urt_message_t* lastMessage = subscriber->base.lastMessage;
64
  while (lastMessage->next->originTime < lastMessage->originTime)
65
  {
66
    lastMessage = lastMessage->next;
67
#if (URT_CFG_PUBSUB_PROFILING == true)
68
    subscriber->base.lastMessage->numConsumersLeft--;
69
    subscriber->base->numMessagesReceived++;
70
#endif /* URT_CFG_PUBSUB_PROFILING */
71
  }
72
}
73

    
74
void urtContributeMessages(urt_message_t* messages)
75
{
76
  urt_message_t* lastMessageContribute = messages;
77
  while (lastMessageContribute->next)
78
  {
79
    lastMessageContribute = lastMessageContribute->next;
80
  }
81
  lastMessageContribute->next = topic->latestMessage->next;
82
  topic->latestMessage->next = messages;
83
}
84

    
85
/******************************************************************************/
86
/* EXPORTED FUNCTIONS                                                         */
87
/******************************************************************************/
88

    
89
/**
90
 * @brief   Initialize the nrt Subscriber.
91
 *
92
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
93
 */
94
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
95
{
96
  urtDebugAssert(subscriber);
97

    
98
  subscriber->base.topic = NULL;
99
  urtEventListenerInit(subscriber->base.evtListener);
100
  subscriber->base.lastMessage = NULL;
101
  subscriber->base.lastMessageTime = 0;
102
#if (URT_CFG_PUBSUB_PROFILING == true)
103
    subscriber->minLatency = URT_DELAY_INFINITE;
104
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
105
#endif /* URT_CFG_PUBSUB_PROFILING */
106
  return;
107
}
108

    
109
/**
110
 * @brief  Subscribes the subscriber to a topic.
111
 *
112
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
113
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
114
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
115
 *                      Messages must not be associated to another topic.
116
 *                      Once a message has been contributed, it cannot be removed later.
117
 *                      May be NULL(no messages to contribute).
118
 *
119
 * @return  Returns URT_STATUS_OK on success.
120
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
121
 */
122
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
123
{
124
  urtDebugAssert(subscriber);
125
  urtDebugAssert(topic);
126

    
127
  if (!subscriber->base.topic) {
128
    return URT_STATUS_SUBSCRIBE_TOPICSET;
129
  }
130

    
131
  subscriber->base.topic = topic;
132
  urtMutexLock(topic->lock);
133

    
134
  if (messages) {
135
    urtContributeMessages(messages);
136
  }
137

    
138
  subscriber->base.lastMessage = topic->latestMessage;
139
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
140

    
141
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
142

    
143
#if (URT_CFG_PUBSUB_PROFILING == true)
144
    topic->numHrtSubscribers--;
145
#endif /* URT_CFG_PUBSUB_PROFILING */
146

    
147
  urtMutexUnlock(topic->lock);
148
  return URT_STATUS_OK;
149
}
150

    
151
/**
152
 * @brief  Fetches the next message.
153
 *
154
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
155
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
156
 * @param[in] bytes  Payload size in bytes.
157
 * @param[in] latency  The latency can be returned by reference. May be NULL.
158
 *
159
 * @return  Returns URT_STATUS_OK on success.
160
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
161
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
162
 */
163
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
164
{   
165
  urtDebugAssert(subscriber);
166

    
167
  if (!subscriber->base.topic) {
168
    return URT_STATUS_FETCH_NOTOPIC;
169
  }
170

    
171
  urtMutexLock(subscriber->base.topic->lock);
172

    
173
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
174
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
175
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
176
      urtMutexUnlock(subscriber->base.topic->lock);
177
      return URT_STATUS_FETCH_NOMESSAGE;
178
    }
179
    oldestMessage = oldestMessage->next;
180
  }
181
  else {
182
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
183
  }
184

    
185
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
186

    
187
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
188
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
189

    
190
#if(URT_CFG_PUBSUB_PROFILING == true)
191
  subscriber->base.sumLatencies += calculatedLatency;
192

    
193
  if (calculatedLatency < subscriber->minLatency) {
194
    subscriber->minLatency = calculatedLatency;
195
  }
196
  else if (calculatedLatency > subscriber->maxLatency) {
197
    subscriber->maxLatency = calculatedLatency;
198
  }
199
#endif /* URT_CFG_PUBSUB_PROFILING */
200

    
201
    if (latency) {
202
      latency = calculatedLatency;
203
    }
204
  }
205

    
206
#if (URT_CFG_PUBSUB_PROFILING == true)
207
  subscriber->base.lastMessage->numConsumersLeft--;
208
  subscriber->base->numMessagesReceived++;
209
#endif /* URT_CFG_PUBSUB_PROFILING */
210

    
211
  urtMutexUnlock(subscriber->base.topic->lock);
212
  return URT_STATUS_OK;
213
}
214

    
215
/**
216
 * @brief Fetches the latest message.
217
 *
218
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
219
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
220
 * @param[in] bytes  Payload size in bytes.
221
 * @param[in] latency  The latency can be returned by reference. May be NULL.
222
 *
223
 * @return  Returns URT_STATUS_OK on success.
224
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
225
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
226
 */
227
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
228
  urtDebugAssert(subscriber);
229

    
230
  if (!subscriber->base.topic)
231
      return URT_STATUS_FETCH_NOTOPIC;
232

    
233
  urtMutexLock(subscriber->base.topic->lock);
234
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
235

    
236
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
237
  {
238
    urtMutexUnlock(subscriber->base.topic->lock);
239
    return URT_STATUS_FETCH_NOMESSAGE;
240
  }
241

    
242
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
243

    
244
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
245
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
246

    
247
#if(URT_CFG_PUBSUB_PROFILING == true)
248
  subscriber->base.sumLatencies += calculatedLatency;
249

    
250
  if (calculatedLatency < subscriber->minLatency) {
251
    subscriber->minLatency = calculatedLatency;
252
  }
253
  else if (calculatedLatency > subscriber->maxLatency) {
254
    subscriber->maxLatency = calculatedLatency;
255
  }
256
#endif /* URT_CFG_PUBSUB_PROFILING */
257

    
258
    if (latency) {
259
      latency = calculatedLatency;
260
    }
261
  }
262

    
263
  urtMutexUnlock(subscriber->base.topic->lock);
264
  return URT_STATUS_OK;
265
}
266

    
267
/**
268
 * @brief  Unsubscribes from a subscriber.
269
 *
270
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
271
 *
272
 * @return  Returns URT_STATUS_OK on sucess.
273
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
274
 */
275
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
276
{
277
  if (subscriber->base.topic)
278
  {
279
# if(URT_CFG_PUBSUB_PROFILING == true)
280
      urtMutexLock(topic->lock);
281
      subscriber->base.topic->numSubscribers--;
282
# endif /* URT_CFG_PUBSUB_PROFILING */
283
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
284
# if(URT_CFG_PUBSUB_PROFILING == true)
285
      urtMutexUnlock(topic->lock);
286
      subscriber->base.topic = NULL;
287
      subscriber->base.lastMessage = NULL;
288
      subscriber->base.lastMessageTime = 0;
289
#endif /* URT_CFG_PUBSUB_PROFILING */
290
    return URT_STATUS_OK;
291
  }
292

    
293
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
294
}
295

    
296

    
297
/**
298
 * @brief  Initialize the srt Subscriber.
299
 *
300
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
301
 */
302
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
303
{
304
  urtDebugAssert(subscriber);
305

    
306
  subscriber->base.topic = NULL;
307
  urtEventListenerInit(subscriber->base.evtListener);
308
  subscriber->base.lastMessage = NULL;
309
  subscriber->base.lastMessageTime = 0;
310
  #if (URT_CFG_PUBSUB_PROFILING)
311
    subscriber->base.sumLatencies = 0;
312
    subscriber->base.numMessagesReceived = 0;
313
    subscriber->usefulnesscb = NULL;
314
    subscriber->cbparams = NULL;
315
    subscriber->minLatency = URT_DELAY_INFINITE;
316
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
317
  #endif /* URT_CFG_PUBSUB_PROFILING */
318
  return;
319
}
320

    
321
/**
322
 * @brief  Subscribes the subscriber to a topic.
323
 *
324
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
325
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
326
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
327
 *                     Messages must not be associated to another topic.
328
 *                     Once a message has been contributed, it cannot be removed later.
329
 *                     May be NULL (no messages to contribute)
330
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
331
 * @param[in] cbparams  Optional parameters for the usefulness callback.
332
 *                      May be NULL if the callback expects no parameters.
333
 *
334
 * @return  Returns URT_STATUS_OK on success.
335
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
336
 */
337
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
338
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
339
{
340
  urtDebugAssert(subscriber);
341
  urtDebugAssert(topic);
342

    
343
  if (subscriber->base.topic)
344
  {
345
    return URT_STATUS_SUBSCRIBE_TOPICSET;
346
  }
347

    
348
  subscriber->base.topic = topic;
349
  subscriber->usefulnesscb = usefulnesscb;
350
  subscriber->cbparams = cbparams;
351
# if (URT_CFG_PUBSUB_PROFILING == true)
352
  subscriber->base.sumLatencies = 0;
353
  subscriber->base.numMessagesReceived = 0;
354
  subscriber->minLatency = URT_DELAY_INFINITE;
355
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
356
# endif  /* URT_CFG_PUBSUB_PROFILING */
357

    
358
  urtMutexLock(topic->lock);
359
  if (messages)
360
  {
361
    urtContributeMessages(messages);
362
  }
363

    
364
  subscriber->base.lastMessage = topic->latestMessage;
365
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
366

    
367
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
368

    
369
# if (URT_CFG_PUBSUB_PROFILING == true)
370
    topic->numHrtSubscribers--;
371
# endif /* URT_CFG_PUBSUB_PROFILING */
372

    
373
  urtMutexUnlock(topic->lock);
374
  return URT_STATUS_OK;
375
}
376

    
377
/**
378
 * @brief  Fetches the next message.
379
 *
380
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
381
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
382
 * @param[in] bytes  Payload size in bytes.
383
 * @param[in] latency  The latency can be returned by reference. May be NULL.
384
 *
385
 * @return  Returns URT_STATUS_OK on success.
386
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
387
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
388
 */
389
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
390
                                              size_t bytes, urt_delay_t* latency)
391
{
392
  urtDebugAssert(subscriber);
393

    
394
  if (!subscriber->base.topic)
395
      return URT_STATUS_FETCH_NOTOPIC;
396

    
397
  urtMutexLock(subscriber->base.topic->lock);
398

    
399
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
400
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
401
  {
402
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
403
    {
404
      urtMutexUnlock(subscriber->base.topic->lock);
405
      return URT_STATUS_FETCH_NOMESSAGE;
406
    }
407
    oldestMessage = oldestMessage->next;
408
  }
409
  else
410
  {
411
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
412
  }
413

    
414
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
415

    
416
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
417
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
418

    
419
#if(URT_CFG_PUBSUB_PROFILING == true)
420
  subscriber->base.sumLatencies += calculatedLatency;
421

    
422
  if (calculatedLatency < subscriber->minLatency) {
423
    subscriber->minLatency = calculatedLatency;
424
  }
425
  else if (calculatedLatency > subscriber->maxLatency) {
426
    subscriber->maxLatency = calculatedLatency;
427
  }
428
#endif /* URT_CFG_PUBSUB_PROFILING */
429

    
430
    if (latency) {
431
      latency = calculatedLatency;
432
    }
433
  }
434

    
435
#if (URT_CFG_PUBSUB_PROFILING == true)
436
  subscriber->base.lastMessage->numConsumersLeft--;
437
  subscriber->base->numMessagesReceived++;
438
#endif /* URT_CFG_PUBSUB_PROFILING */
439

    
440
  urtMutexUnlock(subscriber->base.topic->lock);
441
  return URT_STATUS_OK;
442
}
443

    
444
/**
445
 * @brief  Fetches the latest message.
446
 *
447
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
448
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
449
 * @param[in] bytes  Payload size in bytes.
450
 * @param[in] latency  The latency can be returned by reference. May be NULL.
451
 *
452
 * @return  Returns URT_STATUS_OK on success.
453
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
454
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
455
 */
456
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
457
                                                size_t bytes, urt_delay_t* latency)
458
{
459
  urtDebugAssert(subscriber);
460

    
461
  if (!subscriber->base.topic) {
462
    return URT_STATUS_FETCH_NOTOPIC;
463
  }
464

    
465
  urtMutexLock(subscriber->base.topic->lock);
466
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
467

    
468
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
469
  {
470
    urtMutexUnlock(subscriber->base.topic->lock);
471
    return URT_STATUS_FETCH_NOMESSAGE;
472
  }
473

    
474
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
475

    
476
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
477
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
478

    
479
#if(URT_CFG_PUBSUB_PROFILING == true)
480
  subscriber->base.sumLatencies += calculatedLatency;
481

    
482
  if (calculatedLatency < subscriber->minLatency) {
483
    subscriber->minLatency = calculatedLatency;
484
  }
485
  else if (calculatedLatency > subscriber->maxLatency) {
486
    subscriber->maxLatency = calculatedLatency;
487
  }
488
#endif /* URT_CFG_PUBSUB_PROFILING */
489

    
490
    if (latency) {
491
      latency = calculatedLatency;
492
    }
493
  }
494

    
495
  urtMutexUnlock(subscriber->base.topic->lock);
496
  return URT_STATUS_OK;
497
}
498

    
499
/**
500
 * @brief  Calculates the usefulness of the subscriber.
501
 *
502
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
503
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
504
 *
505
 * @return  Returns the usefulness as a value within [0,1].
506
 */
507
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
508
{
509
  urtDebugAssert(subscriber);
510

    
511
  return subscriber->usefulnesscb(latency);
512
}
513

    
514
/**
515
 * @brief  Unsubscribes from a subscriber.
516
 *
517
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
518
 *
519
 * @return  Returns URT_STATUS_OK on sucess.
520
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
521
 */
522
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
523
{
524
  urtDebugAssert(subscriber);
525

    
526
  if (subscriber->base.topic)
527
  {
528
# if (URT_CFG_PUBSUB_PROFILING == true)
529
    urtMutexLock(topic->lock);
530
    subscriber->base.topic->numSubscribers--;
531
# endif /* URT_CFG_PUBSUB_PROFILING */
532
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
533
# if (URT_CFG_PUBSUB_PROFILING == true)
534
      urtMutexUnlock(topic->lock);
535
# endif /* URT_CFG_PUBSUB_PROFILING */
536
    subscriber->base.topic = NULL;
537
    subscriber->base.lastMessage = NULL;
538
    subscriber->base.lastMessageTime = 0;
539
    return URT_STATUS_OK;
540
  }
541

    
542
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
543
}
544

    
545

    
546
/**
547
 * @brief  Initialize the FRT Subscriber.
548
 *
549
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
550
 */
551
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
552
{
553
  urtDebugAssert(subscriber);
554

    
555
  subscriber->base.topic = NULL;
556
  urtEventListenerInit(subscriber->base.evtListener);
557
  subscriber->base.lastMessage = NULL;
558
  subscriber->base.lastMessageTime = 0;
559

    
560
  #if (URT_CFG_PUBSUB_PROFILING)
561
    subscriber->base.sumLatencies = 0;
562
    subscriber->base.numMessagesReceived = 0;
563
  #endif /* URT_CFG_PUBSUB_PROFILING */
564

    
565
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
566
    subscriber->deadlineOffset = 0;
567
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
568

    
569
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
570
    subscriber->maxJitter = 0;
571
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
572

    
573
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
574
    subscriber->minLatency = URT_DELAY_INFINITE;
575
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
576
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
577
  return;
578
}
579

    
580

    
581
/**
582
 * @brief  Subscribes the subscriber to a topic.
583
 *
584
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
585
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
586
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
587
 *                      Messages must not be associated to another topic.
588
 *                      Once a message has been contributed, it cannot be removed later.
589
 *                      May be NULL(no messages to contribute).
590
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
591
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
592
 *                    A value of 0 indicates that jitter is of no concern.
593
 *
594
 * @return  Returns URT_STATUS_OK on success.
595
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
596
 */
597
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
598
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
599
{
600
  urtDebugAssert(subscriber);
601
  urtDebugAssert(topic);
602

    
603
  if (subscriber->base.topic)
604
  {
605
    return URT_STATUS_SUBSCRIBE_TOPICSET;
606
  }
607

    
608
  subscriber->base.topic = topic;
609
# if (URT_CFG_PUBSUB_PROFILING == true)
610
  subscriber->base.sumLatencies = 0;
611
  subscriber->base.numMessagesReceived = 0;
612
# endif /* URT_CFG_PUBSUB_PROFILING */
613
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
614
  subscriber->deadlineOffset = deadline;
615
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
616
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
617
  subscriber->maxJitter =jitter;
618
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
619
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
620
  subscriber->minLatency = URT_DELAY_INFINITE;
621
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
622
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
623

    
624
  urtMutexLock(topic->lock);
625
  if (messages)
626
  {
627
    urtContributeMessages(messages);
628
  }
629

    
630
  subscriber->base.lastMessage = topic->latestMessage;
631
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
632

    
633
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
634

    
635
# if (URT_CFG_PUBSUB_PROFILING == true)
636
  topic->numHrtSubscribers--;
637
# endif /* URT_CFG_PUBSUB_PROFILING */
638

    
639
  urtMutexUnlock(topic->lock);
640
  return URT_STATUS_OK;
641
}
642

    
643
/**
644
 * @brief  Fetches the next message.
645
 *
646
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
647
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
648
 * @param[in] bytes  Payload size in bytes.
649
 * @param[in] latency  The latency can be returned by reference. May be NULL.
650
 *
651
 * @return  Returns URT_STATUS_OK on success.
652
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
653
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
654
 */
655
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
656
                                              size_t bytes, urt_delay_t* latency)
657
{
658
  urtDebugAssert(subscriber);
659

    
660
  if (!subscriber->base.topic)
661
      return URT_STATUS_FETCH_NOTOPIC;
662

    
663
  urtMutexLock(subscriber->base.topic->lock);
664

    
665
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
666
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
667
  {
668
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
669
    {
670
      urtMutexUnlock(subscriber->base.topic->lock);
671
      return URT_STATUS_FETCH_NOMESSAGE;
672
    }
673
    oldestMessage = oldestMessage->next;
674
  }
675
  else
676
  {
677
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
678
  }
679

    
680
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
681

    
682
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
683
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
684

    
685
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
686
  subscriber->base.sumLatencies += calculatedLatency;
687

    
688
  if (calculatedLatency < subscriber->minLatency) {
689
    subscriber->minLatency = calculatedLatency;
690
  }
691
  else if (calculatedLatency > subscriber->maxLatency) {
692
    subscriber->maxLatency = calculatedLatency;
693
  }
694
#endif /* URT_CFG_PUBSUB_PROFILING */
695

    
696
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
697
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
698
    subscriber->minLatency = calculatedLatency;
699
  }
700
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
701
    subscriber->maxLatency = calculatedLatency;
702
  }
703
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
704
    urtMutexUnlock(subscriber->base.topic);
705
    return URT_STATUS_JITTERVIOLATION;
706
  }
707
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
708

    
709
    if (latency) {
710
      latency = calculatedLatency;
711
    }
712
  }
713

    
714
#if (URT_CFG_PUBSUB_PROFILING == true)
715
  subscriber->base.lastMessage->numConsumersLeft--;
716
  subscriber->base->numMessagesReceived++;
717
#endif /* URT_CFG_PUBSUB_PROFILING */
718

    
719
  urtMutexUnlock(subscriber->base.topic->lock);
720
  return URT_STATUS_OK;
721
}
722

    
723
/**
724
 * @brief  Fetches the latest message.
725
 *
726
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
727
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
728
 * @param[in] bytes  Payload size in bytes.
729
 * @param[in] latency  The latency can be returned by reference. May be NULL.
730
 *
731
 * @return  Returns URT_STATUS_OK on success.
732
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
733
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
734
 */
735
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
736
                                                size_t bytes, urt_delay_t* latency)
737
{
738
  urtDebugAssert(subscriber);
739

    
740
  if (!subscriber->base.topic)
741
      return URT_STATUS_FETCH_NOTOPIC;
742

    
743
  urtMutexLock(subscriber->base.topic->lock);
744
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
745

    
746
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
747
  {
748
    urtMutexUnlock(subscriber->base.topic->lock);
749
    return URT_STATUS_FETCH_NOMESSAGE;
750
  }
751

    
752
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
753

    
754
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
755
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
756

    
757
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
758
  subscriber->base.sumLatencies += calculatedLatency;
759

    
760
  if (calculatedLatency < subscriber->minLatency) {
761
    subscriber->minLatency = calculatedLatency;
762
  }
763
  else if (calculatedLatency > subscriber->maxLatency) {
764
    subscriber->maxLatency = calculatedLatency;
765
  }
766
#endif /* URT_CFG_PUBSUB_PROFILING */
767

    
768
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
769
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
770
    subscriber->minLatency = calculatedLatency;
771
  }
772
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
773
    subscriber->maxLatency = calculatedLatency;
774
  }
775
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
776
    urtMutexUnlock(subscriber->base.topic);
777
    return URT_STATUS_JITTERVIOLATION;
778
  }
779
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
780

    
781
    if (latency) {
782
      latency = calculatedLatency;
783
    }
784
  }
785

    
786
  urtMutexUnlock(subscriber->base.topic->lock);
787
  return URT_STATUS_OK;
788
}
789

    
790
/**
791
 * @brief  Calculates the validity from the subscriber.
792
 *
793
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
794
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
795
 *
796
 * @return  Returns a boolean indicator whether the latency is fine.
797
 */
798
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
799
{
800
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
801
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
802
    return true;
803
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
804

    
805
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
806
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
807
      return true;
808

    
809
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
810
      return true;
811
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
812

    
813
  return false;
814
}
815

    
816
/**
817
 * @brief  Unsubscribes from a subscriber.
818
 *
819
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
820
 *
821
 * @return  Returns URT_STATUS_OK on sucess.
822
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
823
 */
824
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
825
{
826
  urtDebugAssert(subscriber);
827

    
828
  if (!subscriber->base.topic) {
829
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
830
  }
831

    
832
#if (URT_CFG_PUBSUB_PROFILING == true)
833
  urtMutexLock(topic->lock);
834
#endif /* URT_CFG_PUBSUB_PROFILING */
835

    
836
  urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
837

    
838
# if (URT_CFG_PUBSUB_PROFILING == true)
839
  subscriber->base.topic->numSubscribers--;
840
# endif /* URT_CFG_PUBSUB_PROFILING */
841

    
842
# if (URT_CFG_PUBSUB_PROFILING == true)
843
  urtMutexUnlock(topic->lock);
844
# endif /* URT_CFG_PUBSUB_PROFILING */
845
  subscriber->base.topic = NULL;
846
  subscriber->base.lastMessage = NULL;
847
  subscriber->base.lastMessageTime = 0;
848
  return URT_STATUS_OK;
849
}
850

    
851

    
852
/**
853
 * @brief  Initialize the HRT Subscriber.
854
 *
855
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
856
 */
857
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
858
{
859
  urtDebugAssert(subscriber);
860

    
861
  subscriber->base.topic = NULL;
862
  urtEventListenerInit(subscriber->base.evtListener);
863
  subscriber->base.lastMessage = NULL;
864
  subscriber->base.lastMessageTime = 0;
865

    
866
# if (URT_CFG_PUBSUB_PROFILING)
867
    subscriber->base.sumLatencies = 0;
868
    subscriber->base.numMessagesReceived = 0;
869
# endif /* URT_CFG_PUBSUB_PROFILING */
870

    
871
  subscriber->next = NULL;
872

    
873
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
874
    subscriber->deadlineOffset = 0;
875
    urtTimerInit(subscriber->qodDeadlineTimer);
876
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
877

    
878
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
879
    subscriber->expectedRate = 0;
880
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
881

    
882
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
883
    subscriber->maxJitter = 0;
884
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
885

    
886
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
887
    subscriber->minLatency = URT_DELAY_INFINITE;
888
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
889
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
890
  return;
891
}
892

    
893

    
894
/**
895
 * @brief  Subscribes the subscriber to a topic.
896
 *
897
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
898
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
899
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
900
 *                      Messages must not be associated to another topic.
901
 *                      Once a message has been contributed, it cannot be removed later.
902
 *                      May be NULL(no messages to contribute).
903
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
904
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
905
 *                  A value of 0 indicates, that rate is of no concern.
906
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
907
 *                    A value of 0 indicates that jitter is of no concern.
908
 *
909
 * @return  Returns URT_STATUS_OK on success.
910
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
911
 */
912
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
913
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
914
{
915
  urtDebugAssert(subscriber);
916
  urtDebugAssert(topic);
917

    
918
  if (subscriber->base.topic)
919
  {
920
    return URT_STATUS_SUBSCRIBE_TOPICSET;
921
  }
922

    
923
  subscriber->base.topic = topic;
924
# if (URT_CFG_PUBSUB_PROFILING == true)
925
  subscriber->base.sumLatencies = 0;
926
  subscriber->base.numMessagesReceived = 0;
927
# endif /* URT_CFG_PUBSUB_PROFILING */
928
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
929
  subscriber->deadlineOffset = deadline;
930
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
931
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
932
  subscriber->maxJitter =jitter;
933
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
934
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
935
  subscriber->minLatency = URT_DELAY_INFINITE;
936
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
937
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
938
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
939
  subscriber->expectedRate = rate;
940
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
941

    
942
  urtMutexLock(topic->lock);
943
  if (messages)
944
  {
945
    urtContributeMessages(messages);
946
  }
947

    
948
  subscriber->base.lastMessage = topic->latestMessage;
949
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
950

    
951
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
952

    
953
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
954
  //TODO: Implement
955
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
956

    
957
  topic->numHrtSubscribers--;
958
# if (URT_CFG_PUBSUB_PROFILING == true)
959
  topic->numSubscribers--;
960
# endif /* URT_CFG_PUBSUB_PROFILING */
961

    
962
  urtMutexUnlock(topic->lock);
963
  return URT_STATUS_OK;
964
}
965

    
966

    
967
/**
968
 * @brief  Fetches the next message.
969
 *
970
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
971
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
972
 * @param[in] bytes  Payload size in bytes.
973
 * @param[in] latency  The latency can be returned by reference. May be NULL.
974
 *
975
 * @return  Returns URT_STATUS_OK on success.
976
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
977
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
978
 */
979
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
980
                                              size_t bytes, urt_delay_t* latency)
981
{
982
  urtDebugAssert(subscriber);
983

    
984
  if (!subscriber->base.topic) {
985
    return URT_STATUS_FETCH_NOTOPIC;
986
  }
987

    
988
  urtMutexLock(subscriber->base.topic->lock);
989
  urt_message_t* messageTemp = subscriber->base.lastMessage;
990
  if (messageTemp->next->originTime > messageTemp.originTime)
991
  {
992
    urtMutexUnlock(subscriber->base.topic->lock);
993
    return URT_STATUS_FETCH_NOMESSAGE;
994
  }
995
  messageTemp = messageTemp->next;
996

    
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
998
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
999

    
1000
#if(URT_CFG_PUBSUB_PROFILING == true)
1001
  subscriber->base.sumLatencies += calculatedLatency;
1002

    
1003
  if (calculatedLatency < subscriber->minLatency) {
1004
    subscriber->minLatency = calculatedLatency;
1005
  }
1006
  else if (calculatedLatency > subscriber->maxLatency) {
1007
    subscriber->maxLatency = calculatedLatency;
1008
  }
1009
#endif /* URT_CFG_PUBSUB_PROFILING */
1010

    
1011
    if (latency) {
1012
      latency = calculatedLatency;
1013
    }
1014
  }
1015

    
1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
1017
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1018
  {
1019
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1020
  }
1021

    
1022
#if (URT_CFG_PUBSUB_PROFILING == true)
1023
  subscriber->base.lastMessage->numConsumersLeft--;
1024
  subscriber->base->numMessagesReceived++;
1025
#endif /* URT_CFG_PUBSUB_PROFILING */
1026

    
1027
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1028
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1029
    subscriber->minLatency = calculatedLatency;
1030
  }
1031
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1032
    subscriber->maxLatency = calculatedLatency;
1033
  }
1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035
    urtMutexUnlock(subscriber->base.topic);
1036
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1037
    return URT_STATUS_JITTERVIOLATION;
1038
  }
1039
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1040

    
1041
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1042
  if (calculatedLatency < subscriber->minLatency) {
1043
    subscriber->minLatency = calculatedLatency;
1044
  }
1045
  else if (calculatedLatency > subscriber->maxLatency) {
1046
    subscriber->maxLatency = calculatedLatency;
1047
  }
1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1049

    
1050
  urtFetchMessage(messageTemp, subscriber, payload, bytes);
1051

    
1052
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1053
  if (messageTemp->next->originTime < messageTemp->originTime)
1054
  {
1055
    //TODO: first reset?! (when ... set)
1056
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
1057
  }
1058
  else
1059
  {
1060
    urtTimerReset(subscriber->qosDeadlineTimer);
1061
  }
1062
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1063

    
1064
  urtMutexUnlock(subscriber->base.topic->lock);
1065
  return URT_STATUS_OK;
1066
}
1067

    
1068

    
1069
/**
1070
 * @brief  Fetches the latest message.
1071
 *
1072
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
1073
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
1074
 * @param[in] bytes  Payload size in bytes.
1075
 * @param[in] latency  The latency can be returned by reference. May be NULL.
1076
 *
1077
 * @return  Returns URT_STATUS_OK on success.
1078
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1079
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1080
 */
1081
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
1082
                                                size_t bytes, urt_delay_t* latency)
1083
{
1084
  if (!subscriber->base.topic) {
1085
    return URT_STATUS_FETCH_NOTOPIC;
1086
  }
1087

    
1088
  urtMutexLock(subscriber->base.topic->lock);
1089
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1090
  bool hrtZero = false;
1091
  while (lastMessage->next->originTime < lastMessage->originTime) {
1092
    lastMessage = lastMessage->next;
1093
    lastMessage->numHrtConsumersLeft--;
1094
    if (lastMessage->numHrtConsumersLeft == 0) {
1095
        hertZero = true;
1096
    }
1097
#if (URT_CFG_PUBSUB_PROFILING == true)
1098
    lastMessage->numConsumersLeft--;
1099
    subscriber->base.numMessagesReceived++;
1100
#endif /* URT_CFG_PUBSUB_PROFILING */
1101
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1102
    urtTimerReset(subscriber->qosDeadlineTimer);
1103
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1104
  }
1105

    
1106
  if (hrtZero) {
1107
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1108
  }
1109

    
1110
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1111
    urtMutexUnlock(subscriber->base.topic);
1112
    return URT_STATUS_FETCH_NOMESSAGE;
1113
  }
1114

    
1115
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
1116
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
1117

    
1118
#if(URT_CFG_PUBSUB_PROFILING == true)
1119
  subscriber->base.sumLatencies += calculatedLatency;
1120

    
1121
  if (calculatedLatency < subscriber->minLatency) {
1122
    subscriber->minLatency = calculatedLatency;
1123
  }
1124
  else if (calculatedLatency > subscriber->maxLatency) {
1125
    subscriber->maxLatency = calculatedLatency;
1126
  }
1127
#endif /* URT_CFG_PUBSUB_PROFILING */
1128

    
1129
    if (latency) {
1130
      latency = calculatedLatency;
1131
    }
1132
  }
1133

    
1134
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1135
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1136
    subscriber->minLatency = calculatedLatency;
1137
  }
1138
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1139
    subscriber->maxLatency = calculatedLatency;
1140
  }
1141
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1142
    urtMutexUnlock(subscriber->base.topic);
1143
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1144
    return URT_STATUS_JITTERVIOLATION;
1145
  }
1146
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1147

    
1148
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1149
  if (calculatedLatency < subscriber->minLatency) {
1150
    subscriber->minLatency = calculatedLatency;
1151
  }
1152
  else if (calculatedLatency > subscriber->maxLatency) {
1153
    subscriber->maxLatency = calculatedLatency;
1154
  }
1155
#endif /* URT_CFG_PUBSUB_PROFILING */
1156

    
1157
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
1158

    
1159
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1160
  urtTimerReset(subscriber->qosDeadlineTimer);
1161
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1162
}
1163

    
1164
/**
1165
 * @brief  Unsubscribes from a subscriber.
1166
 *
1167
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
1168
 *
1169
 * @return  Returns URT_STATUS_OK on sucess.
1170
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
1171
 */
1172
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
1173
{
1174
  urtDebugAssert(subscriber);
1175

    
1176
  if (subscriber->base.topic)
1177
  {
1178
    urtMutexLock(topic->lock);
1179
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
1180
    subscriber->base.topic->numHrtSubscribers--;
1181
# if (URT_CFG_PUBSUB_PROFILING == true)
1182
    subscriber->base.topic->numSubscribers--;
1183
# endif /* URT_CFG_PUBSUB_PROFILING */
1184
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1185
    //TODO: remove self from topics lsit of HRT subscribers
1186
    //TODO: ...
1187
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1188

    
1189
    urt_message_t* messageTemp = subscriber->base.lastMessage;
1190
    bool hrtZero = false;
1191
    while (messageTemp->next->originTime < messageTemp->originTime)
1192
    {
1193
        messageTemp = messageTemp->next;
1194
        messageTemp->numHrtConsumersLeft--;
1195
        if (messageTemp->numHrtConsumersLeft == 0)
1196
        {
1197
            hrtZero = true;
1198
        }
1199
# if(URT_CFG_PUBSUB_PROFILING == true)
1200
        messageTemp->numConsumersLeft--;
1201
# endif /* URT_CFG_PUBSUB_PROFILING */
1202
    }
1203
    if (hrtZero)
1204
    {
1205
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
1206
    }
1207

    
1208
    urtMutexUnlock(topic->lock);
1209
    subscriber->base.topic = NULL;
1210
    subscriber->base.lastMessage = NULL;
1211
    subscriber->base.lastMessageTime = 0;
1212
    return URT_STATUS_OK;
1213
  }
1214

    
1215
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1216
}
1217

    
1218

    
1219