Revision 982056f7

View differences:

inc/urt_basesubscriber.h
1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

  
6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

  
8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

  
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

  
18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

  
22
#ifndef URTWARE_BASESUBSCRIBER_H
23
#define URTWARE_BASESUBSCRIBER_H
24

  
25
#include <urt_osal.h>
26
#include <urt_confcheck.h>
27

  
28
/******************************************************************************/
29
/* CONSTANTS                                                                  */
30
/******************************************************************************/
31

  
32
/******************************************************************************/
33
/* SETTINGS                                                                   */
34
/******************************************************************************/
35

  
36
/******************************************************************************/
37
/* CHECKS                                                                     */
38
/******************************************************************************/
39

  
40
/******************************************************************************/
41
/* DATA STRUCTURES AND TYPES                                                  */
42
/******************************************************************************/
43

  
44
struct urt_topic;
45

  
46
/**
47
 * @brief  Base subscriber
48
 * @details The base subscriber is included in each specialized subscriber
49
 */
50
typedef struct urt_basesubscriber
51
{
52
  struct urt_topic* topic;
53
  urt_osEventListener_t evtListener;
54
  urt_message_t* lastMessage;
55
  urt_osTime_t lastMessageTime;
56
  #if (URT_CFG_PUBSUB_PROFILING)
57
    uint64_t sumLatencies;
58
    uint64_t numMessagesReceived;
59
  #endif /* URT_CFG_PUBSUB_PROFILING */
60
}urt_basesubscriber_t;
61

  
62
/******************************************************************************/
63
/* MACROS                                                                     */
64
/******************************************************************************/
65

  
66
/******************************************************************************/
67
/* EXTERN DECLARATIONS                                                        */
68
/******************************************************************************/
69

  
70
/******************************************************************************/
71
/* INLINE FUNCTIONS                                                           */
72
/******************************************************************************/
73

  
74
#endif /* URTWARE_BASESUBSCRIBER_H */
inc/urt_core.h
73 73
  urt_service_t urtCoreGetService(urt_serviceid_t id);
74 74
#endif /* URT_CFG_RPC_ENABLED */
75 75

  
76
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
77
  urtCoreCallbackDefault(void* params)
78
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
79

  
76 80
#if defined(__cplusplus)
77 81
}
78 82
#endif /* defined(__cplusplus) */
inc/urt_subscriber.h
23 23
#define URTWARE_SUBSCRIBER_H
24 24

  
25 25
#include <urtware.h>
26
#include <urt_basesubscriber.h>
26 27

  
27 28
/******************************************************************************/
28 29
/* CONSTANTS                                                                  */
......
41 42
/******************************************************************************/
42 43

  
43 44
/**
44
 * @brief  Base subscriber
45
 * @details The base subscriber is included in each specialized subscriber
46
 */
47
typedef struct urt_basesubscriber
48
{
49
  urt_topic_t* topic;
50
  urt_osEventListener_t evtListener;
51
  urt_message_t* lastMessage;
52
  urt_osTime_t lastMessageTime;
53
  #if (URT_CFG_PUBSUB_PROFILING)
54
    uint64_t sumLatencies;
55
    uint64_t numMessagesReceived;
56
  #endif /* URT_CFG_PUBSUB_PROFILING */
57
}urt_basesubscriber_t;
58

  
59
/**
60 45
 * @brief  nrt subscriber
61 46
 */
62 47
typedef struct urt_nrtsubscriber
inc/urt_topic.h
54 54
  urt_message_t mandatoryMessage;
55 55
  urt_message_t* latestMessage;
56 56
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
57
    struct urt_hrtsubscriber* hrtSubscribers;
57
    urt_hrtsubscriber_t* hrtSubscribers;
58 58
    urt_osTimer_t qosRateTimer;
59 59
  #endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
60 60
  #if (URT_CFG_PUBSUB_PROFILING)
src/urt_subscriber.c
20 20
*/
21 21

  
22 22
#include <urtware.h>
23
#include <stdio.h>
23 24

  
24 25
/******************************************************************************/
25 26
/* LOCAL DEFINITIONS                                                          */
......
41 42
/* LOCAL FUNCTIONS                                                            */
42 43
/******************************************************************************/
43 44

  
44
void urtFetchMessage ()
45
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
45 46
{
46
    //TODO: Update message pointer
47
    //TODO: Copy message origin time
48
    //TODO: Copy message payload
47
  subscriber->base.lastMessage = message;
48
  *subscriber->base.lastMessageTime = message->originTime;
49
  memcpy(message->payload, payload, bytes);
49 50
}
50 51

  
51 52
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
......
70 71
  }
71 72
}
72 73

  
74
void urtContributeMessages(urt_message_t* messages)
75
{
76
  urt_message_t* lastMessageContribute = messages;
77
  while (lastMessageContribute->next)
78
  {
79
    lastMessageContribute = lastMessageContribute->next;
80
  }
81
  lastMessageContribute->next = topic->latestMessage->next;
82
  topic->latestMessage->next = messages;
83
}
84

  
73 85
/******************************************************************************/
74 86
/* EXPORTED FUNCTIONS                                                         */
75 87
/******************************************************************************/
......
112 124
  urtDebugAssert(subscriber);
113 125
  urtDebugAssert(topic);
114 126

  
115
  if (!subscriber->base.topic)
116
      return URT_STATUS_SUBSCRIBE_TOPICSET;
127
  if (!subscriber->base.topic) {
128
    return URT_STATUS_SUBSCRIBE_TOPICSET;
129
  }
117 130

  
118 131
  subscriber->base.topic = topic;
119 132
  urtMutexLock(topic->lock);
120 133

  
121
  if (messages)
122
  {
123
    urt_message_t* lastMessageContribute = messages;
124
    while (lastMessageContribute->next)
125
    {
126
        lastMessageContribute = lastMessageContribute->next;
127
    }
128
    lastMessageContribute->next = topic->latestMessage->next;
129
    topic->latestMessage->next = messages;
134
  if (messages) {
135
    urtContributeMessages(messages);
130 136
  }
131 137

  
132 138
  subscriber->base.lastMessage = topic->latestMessage;
......
154 160
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
155 161
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
156 162
 */
157
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
163
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
158 164
{   
159 165
  urtDebugAssert(subscriber);
160 166

  
161
  if (!subscriber->base.topic)
162
      return URT_STATUS_FETCH_NOTOPIC;
167
  if (!subscriber->base.topic) {
168
    return URT_STATUS_FETCH_NOTOPIC;
169
  }
163 170

  
164 171
  urtMutexLock(subscriber->base.topic->lock);
165 172

  
166
  urt_message_t* messageTemp = subscriber->base.lastMessage;
167
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
168
  {
169
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
170
    {
173
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
174
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
175
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
171 176
      urtMutexUnlock(subscriber->base.topic->lock);
172 177
      return URT_STATUS_FETCH_NOMESSAGE;
173 178
    }
174
    messageTemp = messageTemp->next;
179
    oldestMessage = oldestMessage->next;
175 180
  }
176
  else
177
  {
178
    messageTemp = urtFindOldestMessage(messageTemp->next);
181
  else {
182
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
179 183
  }
180 184

  
181
  urtFetchMessage();
185
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
186

  
187
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
188
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
182 189

  
183
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
184 190
#if(URT_CFG_PUBSUB_PROFILING == true)
185 191
  subscriber->base.sumLatencies += calculatedLatency;
186 192

  
187
  if (calculatedLatency < subscriber->minLatency)
188
  {
193
  if (calculatedLatency < subscriber->minLatency) {
189 194
    subscriber->minLatency = calculatedLatency;
190 195
  }
191
  else if (calculatedLatency > subscriber->maxLatency)
192
  {
196
  else if (calculatedLatency > subscriber->maxLatency) {
193 197
    subscriber->maxLatency = calculatedLatency;
194 198
  }
195 199
#endif /* URT_CFG_PUBSUB_PROFILING */
196
  bool temp = false;
197
  if (temp/*optional latency output argument given*/)
198
  {
199
    latency = calculatedLatency;
200

  
201
    if (latency) {
202
      latency = calculatedLatency;
203
    }
200 204
  }
201 205

  
202 206
#if (URT_CFG_PUBSUB_PROFILING == true)
......
220 224
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
221 225
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
222 226
 */
223
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
227
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
224 228
  urtDebugAssert(subscriber);
225 229

  
226 230
  if (!subscriber->base.topic)
......
235 239
    return URT_STATUS_FETCH_NOMESSAGE;
236 240
  }
237 241

  
238
  urtFetchMessage();
242
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
243

  
244
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
245
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
239 246

  
240
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
241 247
#if(URT_CFG_PUBSUB_PROFILING == true)
242 248
  subscriber->base.sumLatencies += calculatedLatency;
243 249

  
244
  if (calculatedLatency < subscriber->minLatency)
245
  {
250
  if (calculatedLatency < subscriber->minLatency) {
246 251
    subscriber->minLatency = calculatedLatency;
247 252
  }
248
  else if (calculatedLatency > subscriber->maxLatency)
249
  {
253
  else if (calculatedLatency > subscriber->maxLatency) {
250 254
    subscriber->maxLatency = calculatedLatency;
251 255
  }
252 256
#endif /* URT_CFG_PUBSUB_PROFILING */
253
  bool temp = false;
254
  if (temp/*optional latency output argument given*/)
255
  {
256
    latency = calculatedLatency;
257

  
258
    if (latency) {
259
      latency = calculatedLatency;
260
    }
257 261
  }
258 262

  
259 263
  urtMutexUnlock(subscriber->base.topic->lock);
......
354 358
  urtMutexLock(topic->lock);
355 359
  if (messages)
356 360
  {
357
    urt_message_t* lastMessageContribute = messages;
358
    while (lastMessageContribute->next)
359
    {
360
        lastMessageContribute = lastMessageContribute->next;
361
    }
362
    lastMessageContribute->next = topic->latestMessage->next;
363
    topic->latestMessage->next = messages;
361
    urtContributeMessages(messages);
364 362
  }
365 363

  
366 364
  subscriber->base.lastMessage = topic->latestMessage;
......
388 386
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
389 387
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
390 388
 */
391
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
389
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
392 390
                                              size_t bytes, urt_delay_t* latency)
393 391
{
394 392
  urtDebugAssert(subscriber);
......
398 396

  
399 397
  urtMutexLock(subscriber->base.topic->lock);
400 398

  
401
  urt_message_t* messageTemp = subscriber->base.lastMessage;
402
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
399
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
400
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
403 401
  {
404 402
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
405 403
    {
406 404
      urtMutexUnlock(subscriber->base.topic->lock);
407 405
      return URT_STATUS_FETCH_NOMESSAGE;
408 406
    }
409
    messageTemp = messageTemp->next;
407
    oldestMessage = oldestMessage->next;
410 408
  }
411 409
  else
412 410
  {
413
    messageTemp = urtFindOldestMessage(messageTemp->next);
411
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
414 412
  }
415 413

  
416
  urtFetchMessage();
414
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
415

  
416
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
417
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
417 418

  
418
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
419 419
#if(URT_CFG_PUBSUB_PROFILING == true)
420 420
  subscriber->base.sumLatencies += calculatedLatency;
421 421

  
422
  if (calculatedLatency < subscriber->minLatency)
423
  {
422
  if (calculatedLatency < subscriber->minLatency) {
424 423
    subscriber->minLatency = calculatedLatency;
425 424
  }
426
  else if (calculatedLatency > subscriber->maxLatency)
427
  {
425
  else if (calculatedLatency > subscriber->maxLatency) {
428 426
    subscriber->maxLatency = calculatedLatency;
429 427
  }
430 428
#endif /* URT_CFG_PUBSUB_PROFILING */
431
  bool temp = false;
432
  if (temp/*optional latency output argument given*/)
433
  {
434
    latency = calculatedLatency;
429

  
430
    if (latency) {
431
      latency = calculatedLatency;
432
    }
435 433
  }
436 434

  
437 435
#if (URT_CFG_PUBSUB_PROFILING == true)
......
455 453
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
456 454
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
457 455
 */
458
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
456
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
459 457
                                                size_t bytes, urt_delay_t* latency)
460 458
{
461 459
  urtDebugAssert(subscriber);
462 460

  
463
  if (!subscriber->base.topic)
464
      return URT_STATUS_FETCH_NOTOPIC;
461
  if (!subscriber->base.topic) {
462
    return URT_STATUS_FETCH_NOTOPIC;
463
  }
465 464

  
466 465
  urtMutexLock(subscriber->base.topic->lock);
467 466
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
......
472 471
    return URT_STATUS_FETCH_NOMESSAGE;
473 472
  }
474 473

  
475
  urtFetchMessage();
474
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
475

  
476
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
477
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
476 478

  
477
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
478 479
#if(URT_CFG_PUBSUB_PROFILING == true)
479 480
  subscriber->base.sumLatencies += calculatedLatency;
480 481

  
481
  if (calculatedLatency < subscriber->minLatency)
482
  {
482
  if (calculatedLatency < subscriber->minLatency) {
483 483
    subscriber->minLatency = calculatedLatency;
484 484
  }
485
  else if (calculatedLatency > subscriber->maxLatency)
486
  {
485
  else if (calculatedLatency > subscriber->maxLatency) {
487 486
    subscriber->maxLatency = calculatedLatency;
488 487
  }
489 488
#endif /* URT_CFG_PUBSUB_PROFILING */
490
  bool temp = false;
491
  if (temp/*optional latency output argument given*/)
492
  {
493
    latency = calculatedLatency;
489

  
490
    if (latency) {
491
      latency = calculatedLatency;
492
    }
494 493
  }
495 494

  
496 495
  urtMutexUnlock(subscriber->base.topic->lock);
......
625 624
  urtMutexLock(topic->lock);
626 625
  if (messages)
627 626
  {
628
    urt_message_t* lastMessageContribute = messages;
629
    while (lastMessageContribute->next)
630
    {
631
      lastMessageContribute = lastMessageContribute->next;
632
    }
633
    lastMessageContribute->next = topic->latestMessage->next;
634
    topic->latestMessage->next = messages;
627
    urtContributeMessages(messages);
635 628
  }
636 629

  
637 630
  subscriber->base.lastMessage = topic->latestMessage;
......
659 652
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
660 653
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
661 654
 */
662
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
655
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
663 656
                                              size_t bytes, urt_delay_t* latency)
664 657
{
665 658
  urtDebugAssert(subscriber);
......
669 662

  
670 663
  urtMutexLock(subscriber->base.topic->lock);
671 664

  
672
  urt_message_t* messageTemp = subscriber->base.lastMessage;
673
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
665
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
666
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
674 667
  {
675 668
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
676 669
    {
677 670
      urtMutexUnlock(subscriber->base.topic->lock);
678 671
      return URT_STATUS_FETCH_NOMESSAGE;
679 672
    }
680
    messageTemp = messageTemp->next;
673
    oldestMessage = oldestMessage->next;
681 674
  }
682 675
  else
683 676
  {
684
    messageTemp = urtFindOldestMessage(messageTemp->next);
677
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
685 678
  }
686 679

  
687
  urtFetchMessage();
680
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
688 681

  
689
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
690
#if(URT_CFG_PUBSUB_PROFILING == true)
682
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
683
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
684

  
685
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
691 686
  subscriber->base.sumLatencies += calculatedLatency;
687

  
688
  if (calculatedLatency < subscriber->minLatency) {
689
    subscriber->minLatency = calculatedLatency;
690
  }
691
  else if (calculatedLatency > subscriber->maxLatency) {
692
    subscriber->maxLatency = calculatedLatency;
693
  }
692 694
#endif /* URT_CFG_PUBSUB_PROFILING */
693 695

  
694
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
695
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
696
  {
696
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
697
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
697 698
    subscriber->minLatency = calculatedLatency;
698 699
  }
699
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
700
  {
700
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
701 701
    subscriber->maxLatency = calculatedLatency;
702 702
  }
703
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
703
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
704
    urtMutexUnlock(subscriber->base.topic);
705
    return URT_STATUS_JITTERVIOLATION;
706
  }
707
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
704 708

  
705
  bool temp = false;
706
  if (temp/*optional latency output argument given*/)
707
  {
708
    latency = calculatedLatency;
709
    if (latency) {
710
      latency = calculatedLatency;
711
    }
709 712
  }
710 713

  
711 714
#if (URT_CFG_PUBSUB_PROFILING == true)
......
714 717
#endif /* URT_CFG_PUBSUB_PROFILING */
715 718

  
716 719
  urtMutexUnlock(subscriber->base.topic->lock);
717
  return URT_STATUS_OK; //TODO: or urt_status_jitterviolation
720
  return URT_STATUS_OK;
718 721
}
719 722

  
720 723
/**
......
729 732
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
730 733
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
731 734
 */
732
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
735
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
733 736
                                                size_t bytes, urt_delay_t* latency)
734 737
{
735 738
  urtDebugAssert(subscriber);
......
746 749
    return URT_STATUS_FETCH_NOMESSAGE;
747 750
  }
748 751

  
749
  urtFetchMessage();
752
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
750 753

  
751
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
752
#if(URT_CFG_PUBSUB_PROFILING == true)
754
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
755
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
756

  
757
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
753 758
  subscriber->base.sumLatencies += calculatedLatency;
759

  
760
  if (calculatedLatency < subscriber->minLatency) {
761
    subscriber->minLatency = calculatedLatency;
762
  }
763
  else if (calculatedLatency > subscriber->maxLatency) {
764
    subscriber->maxLatency = calculatedLatency;
765
  }
754 766
#endif /* URT_CFG_PUBSUB_PROFILING */
755 767

  
756
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
757
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
758
  {
768
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
769
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
759 770
    subscriber->minLatency = calculatedLatency;
760 771
  }
761
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
762
  {
772
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
763 773
    subscriber->maxLatency = calculatedLatency;
764 774
  }
765
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
775
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
776
    urtMutexUnlock(subscriber->base.topic);
777
    return URT_STATUS_JITTERVIOLATION;
778
  }
779
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
766 780

  
767
  bool temp = false;
768
  if (temp/*optional latency output argument given*/)
769
  {
770
    latency = calculatedLatency;
781
    if (latency) {
782
      latency = calculatedLatency;
783
    }
771 784
  }
772 785

  
773 786
  urtMutexUnlock(subscriber->base.topic->lock);
......
812 825
{
813 826
  urtDebugAssert(subscriber);
814 827

  
815
  if (subscriber->base.topic)
816
  {
817
    urtMutexLock(topic->lock);
818
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
819
    //TODO: decrement topic's HRT counter
828
  if (!subscriber->base.topic) {
829
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
830
  }
831

  
832
#if (URT_CFG_PUBSUB_PROFILING == true)
833
  urtMutexLock(topic->lock);
834
#endif /* URT_CFG_PUBSUB_PROFILING */
835

  
836
  urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
837

  
820 838
# if (URT_CFG_PUBSUB_PROFILING == true)
821
    subscriber->base.topic->numSubscribers--;
839
  subscriber->base.topic->numSubscribers--;
822 840
# endif /* URT_CFG_PUBSUB_PROFILING */
823
//Hier weiter
824 841

  
825 842
# if (URT_CFG_PUBSUB_PROFILING == true)
826
    urtMutexUnlock(topic->lock);
843
  urtMutexUnlock(topic->lock);
827 844
# endif /* URT_CFG_PUBSUB_PROFILING */
828
    subscriber->base.topic = NULL;
829
    subscriber->base.lastMessage = NULL;
830
    subscriber->base.lastMessageTime = 0;
831
    return URT_STATUS_OK;
832
  }
833

  
834
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
845
  subscriber->base.topic = NULL;
846
  subscriber->base.lastMessage = NULL;
847
  subscriber->base.lastMessageTime = 0;
848
  return URT_STATUS_OK;
835 849
}
836 850

  
837 851

  
......
928 942
  urtMutexLock(topic->lock);
929 943
  if (messages)
930 944
  {
931
    urt_message_t* lastMessageContribute = messages;
932
    while (lastMessageContribute->next)
933
    {
934
      lastMessageContribute = lastMessageContribute->next;
935
    }
936
    lastMessageContribute->next = topic->latestMessage->next;
937
    topic->latestMessage->next = messages;
945
    urtContributeMessages(messages);
938 946
  }
939 947

  
940 948
  subscriber->base.lastMessage = topic->latestMessage;
......
968 976
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
969 977
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
970 978
 */
971
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
979
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
972 980
                                              size_t bytes, urt_delay_t* latency)
973 981
{
974 982
  urtDebugAssert(subscriber);
975 983

  
976
  if (!subscriber->base.topic)
977
      return URT_STATUS_FETCH_NOTOPIC;
984
  if (!subscriber->base.topic) {
985
    return URT_STATUS_FETCH_NOTOPIC;
986
  }
978 987

  
979 988
  urtMutexLock(subscriber->base.topic->lock);
980 989
  urt_message_t* messageTemp = subscriber->base.lastMessage;
......
985 994
  }
986 995
  messageTemp = messageTemp->next;
987 996

  
988
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
989
#if (URT_CFG_PUBSUB_PROFILING == true)
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
998
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
999

  
1000
#if(URT_CFG_PUBSUB_PROFILING == true)
990 1001
  subscriber->base.sumLatencies += calculatedLatency;
1002

  
1003
  if (calculatedLatency < subscriber->minLatency) {
1004
    subscriber->minLatency = calculatedLatency;
1005
  }
1006
  else if (calculatedLatency > subscriber->maxLatency) {
1007
    subscriber->maxLatency = calculatedLatency;
1008
  }
991 1009
#endif /* URT_CFG_PUBSUB_PROFILING */
992
  bool temp = false;
993
  if (temp /* optional latency output argument given */)
994
  {
995
    latency = calculatedLatency
1010

  
1011
    if (latency) {
1012
      latency = calculatedLatency;
1013
    }
996 1014
  }
997 1015

  
998 1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
......
1006 1024
  subscriber->base->numMessagesReceived++;
1007 1025
#endif /* URT_CFG_PUBSUB_PROFILING */
1008 1026

  
1009
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
1010
  if (temp /*latency is within allowed jitter range*/)
1011
  {
1012
    if (calculatedLatency < subscriber->minLatency)
1013
    {
1014
      subscriber->minLatency = calculatedLatency;
1015
    }
1016
    else if (calculatedLatency > subscriber->maxLatency)
1017
    {
1018
      subscriber->maxLatency = calculatedLatency;
1019
    }
1020
  }
1021
  else
1022
  {
1023 1027
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1024
    urtMutexUnlock(subscriber->base.topic->lock);
1028
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1029
    subscriber->minLatency = calculatedLatency;
1030
  }
1031
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1032
    subscriber->maxLatency = calculatedLatency;
1033
  }
1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035
    urtMutexUnlock(subscriber->base.topic);
1025 1036
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1026 1037
    return URT_STATUS_JITTERVIOLATION;
1038
  }
1027 1039
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1040

  
1041
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1042
  if (calculatedLatency < subscriber->minLatency) {
1043
    subscriber->minLatency = calculatedLatency;
1028 1044
  }
1029
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
1045
  else if (calculatedLatency > subscriber->maxLatency) {
1046
    subscriber->maxLatency = calculatedLatency;
1047
  }
1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1030 1049

  
1031
  urtFetchMessage();
1050
  urtFetchMessage(messageTemp, subscriber, payload, bytes);
1032 1051

  
1033 1052
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1034 1053
  if (messageTemp->next->originTime < messageTemp->originTime)
1035 1054
  {
1036
    //TODO: update qos deadliner timer wrt. next message
1055
    //TODO: first reset?! (when ... set)
1056
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
1037 1057
  }
1038 1058
  else
1039 1059
  {
1040
    //TODO: reset qos deadline timer
1060
    urtTimerReset(subscriber->qosDeadlineTimer);
1041 1061
  }
1042 1062
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1043 1063

  
......
1058 1078
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1059 1079
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1060 1080
 */
1061
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
1062
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
1081
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
1082
                                                size_t bytes, urt_delay_t* latency)
1083
{
1084
  if (!subscriber->base.topic) {
1085
    return URT_STATUS_FETCH_NOTOPIC;
1086
  }
1087

  
1088
  urtMutexLock(subscriber->base.topic->lock);
1089
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1090
  bool hrtZero = false;
1091
  while (lastMessage->next->originTime < lastMessage->originTime) {
1092
    lastMessage = lastMessage->next;
1093
    lastMessage->numHrtConsumersLeft--;
1094
    if (lastMessage->numHrtConsumersLeft == 0) {
1095
        hertZero = true;
1096
    }
1097
#if (URT_CFG_PUBSUB_PROFILING == true)
1098
    lastMessage->numConsumersLeft--;
1099
    subscriber->base.numMessagesReceived++;
1100
#endif /* URT_CFG_PUBSUB_PROFILING */
1101
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1102
    urtTimerReset(subscriber->qosDeadlineTimer);
1103
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1104
  }
1105

  
1106
  if (hrtZero) {
1107
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1108
  }
1109

  
1110
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1111
    urtMutexUnlock(subscriber->base.topic);
1112
    return URT_STATUS_FETCH_NOMESSAGE;
1113
  }
1114

  
1115
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
1116
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
1117

  
1118
#if(URT_CFG_PUBSUB_PROFILING == true)
1119
  subscriber->base.sumLatencies += calculatedLatency;
1120

  
1121
  if (calculatedLatency < subscriber->minLatency) {
1122
    subscriber->minLatency = calculatedLatency;
1123
  }
1124
  else if (calculatedLatency > subscriber->maxLatency) {
1125
    subscriber->maxLatency = calculatedLatency;
1126
  }
1127
#endif /* URT_CFG_PUBSUB_PROFILING */
1128

  
1129
    if (latency) {
1130
      latency = calculatedLatency;
1131
    }
1132
  }
1133

  
1134
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1135
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1136
    subscriber->minLatency = calculatedLatency;
1137
  }
1138
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1139
    subscriber->maxLatency = calculatedLatency;
1140
  }
1141
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1142
    urtMutexUnlock(subscriber->base.topic);
1143
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1144
    return URT_STATUS_JITTERVIOLATION;
1145
  }
1146
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1147

  
1148
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1149
  if (calculatedLatency < subscriber->minLatency) {
1150
    subscriber->minLatency = calculatedLatency;
1151
  }
1152
  else if (calculatedLatency > subscriber->maxLatency) {
1153
    subscriber->maxLatency = calculatedLatency;
1154
  }
1155
#endif /* URT_CFG_PUBSUB_PROFILING */
1156

  
1157
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
1158

  
1159
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1160
  urtTimerReset(subscriber->qosDeadlineTimer);
1161
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1162
}
1063 1163

  
1064 1164
/**
1065 1165
 * @brief  Unsubscribes from a subscriber.

Also available in: Unified diff