Revision 65dc89cb src/urt_subscriber.c

View differences:

src/urt_subscriber.c
41 41
/* LOCAL FUNCTIONS                                                            */
42 42
/******************************************************************************/
43 43

  
44
void urtFetchMessage ()
45
{
46
    //TODO: Update message pointer
47
    //TODO: Copy message origin time
48
    //TODO: Copy message payload
49
}
50

  
51
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
52
{
53
  while (oldestMessage->next->originTime < oldestMessage->originTime)
54
  {
55
    oldestMessage = oldestMessage->next;
56
  }
57
  return oldestMessage;
58
}
59

  
60
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
61
{
62
  urt_message_t* lastMessage = subscriber->base.lastMessage;
63
  while (lastMessage->next->originTime < lastMessage->originTime)
64
  {
65
    lastMessage = lastMessage->next;
66
#if (URT_CFG_PUBSUB_PROFILING == true)
67
    subscriber->base.lastMessage->numConsumersLeft--;
68
    subscriber->base->numMessagesReceived++;
69
#endif /* URT_CFG_PUBSUB_PROFILING */
70
  }
71
}
72

  
44 73
/******************************************************************************/
45 74
/* EXPORTED FUNCTIONS                                                         */
46 75
/******************************************************************************/
......
126 155
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
127 156
 */
128 157
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129
{
158
{   
130 159
  urtDebugAssert(subscriber);
160

  
161
  if (!subscriber->base.topic)
162
      return URT_STATUS_FETCH_NOTOPIC;
163

  
164
  urtMutexLock(subscriber->base.topic->lock);
165

  
166
  urt_message_t* messageTemp = subscriber->base.lastMessage;
167
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
168
  {
169
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
170
    {
171
      urtMutexUnlock(subscriber->base.topic->lock);
172
      return URT_STATUS_FETCH_NOMESSAGE;
173
    }
174
    messageTemp = messageTemp->next;
175
  }
176
  else
177
  {
178
    messageTemp = urtFindOldestMessage(messageTemp->next);
179
  }
180

  
181
  urtFetchMessage();
182

  
183
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
184
#if(URT_CFG_PUBSUB_PROFILING == true)
185
  subscriber->base.sumLatencies += calculatedLatency;
186

  
187
  if (calculatedLatency < subscriber->minLatency)
188
  {
189
    subscriber->minLatency = calculatedLatency;
190
  }
191
  else if (calculatedLatency > subscriber->maxLatency)
192
  {
193
    subscriber->maxLatency = calculatedLatency;
194
  }
195
#endif /* URT_CFG_PUBSUB_PROFILING */
196
  bool temp = false;
197
  if (temp/*optional latency output argument given*/)
198
  {
199
    latency = calculatedLatency;
200
  }
201

  
202
#if (URT_CFG_PUBSUB_PROFILING == true)
203
  subscriber->base.lastMessage->numConsumersLeft--;
204
  subscriber->base->numMessagesReceived++;
205
#endif /* URT_CFG_PUBSUB_PROFILING */
206

  
207
  urtMutexUnlock(subscriber->base.topic->lock);
131 208
  return URT_STATUS_OK;
132 209
}
133 210

  
......
144 221
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
145 222
 */
146 223
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
224
  urtDebugAssert(subscriber);
225

  
226
  if (!subscriber->base.topic)
227
      return URT_STATUS_FETCH_NOTOPIC;
228

  
229
  urtMutexLock(subscriber->base.topic->lock);
230
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
231

  
232
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
233
  {
234
    urtMutexUnlock(subscriber->base.topic->lock);
235
    return URT_STATUS_FETCH_NOMESSAGE;
236
  }
237

  
238
  urtFetchMessage();
239

  
240
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
241
#if(URT_CFG_PUBSUB_PROFILING == true)
242
  subscriber->base.sumLatencies += calculatedLatency;
243

  
244
  if (calculatedLatency < subscriber->minLatency)
245
  {
246
    subscriber->minLatency = calculatedLatency;
247
  }
248
  else if (calculatedLatency > subscriber->maxLatency)
249
  {
250
    subscriber->maxLatency = calculatedLatency;
251
  }
252
#endif /* URT_CFG_PUBSUB_PROFILING */
253
  bool temp = false;
254
  if (temp/*optional latency output argument given*/)
255
  {
256
    latency = calculatedLatency;
257
  }
258

  
259
  urtMutexUnlock(subscriber->base.topic->lock);
147 260
  return URT_STATUS_OK;
148 261
}
149 262

  
......
276 389
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
277 390
 */
278 391
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
279
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
392
                                              size_t bytes, urt_delay_t* latency)
393
{
394
  urtDebugAssert(subscriber);
395

  
396
  if (!subscriber->base.topic)
397
      return URT_STATUS_FETCH_NOTOPIC;
398

  
399
  urtMutexLock(subscriber->base.topic->lock);
400

  
401
  urt_message_t* messageTemp = subscriber->base.lastMessage;
402
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
403
  {
404
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
405
    {
406
      urtMutexUnlock(subscriber->base.topic->lock);
407
      return URT_STATUS_FETCH_NOMESSAGE;
408
    }
409
    messageTemp = messageTemp->next;
410
  }
411
  else
412
  {
413
    messageTemp = urtFindOldestMessage(messageTemp->next);
414
  }
415

  
416
  urtFetchMessage();
417

  
418
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
419
#if(URT_CFG_PUBSUB_PROFILING == true)
420
  subscriber->base.sumLatencies += calculatedLatency;
421

  
422
  if (calculatedLatency < subscriber->minLatency)
423
  {
424
    subscriber->minLatency = calculatedLatency;
425
  }
426
  else if (calculatedLatency > subscriber->maxLatency)
427
  {
428
    subscriber->maxLatency = calculatedLatency;
429
  }
430
#endif /* URT_CFG_PUBSUB_PROFILING */
431
  bool temp = false;
432
  if (temp/*optional latency output argument given*/)
433
  {
434
    latency = calculatedLatency;
435
  }
436

  
437
#if (URT_CFG_PUBSUB_PROFILING == true)
438
  subscriber->base.lastMessage->numConsumersLeft--;
439
  subscriber->base->numMessagesReceived++;
440
#endif /* URT_CFG_PUBSUB_PROFILING */
441

  
442
  urtMutexUnlock(subscriber->base.topic->lock);
443
  return URT_STATUS_OK;
444
}
280 445

  
281 446
/**
282 447
 * @brief  Fetches the latest message.
......
291 456
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
292 457
 */
293 458
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
294
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
459
                                                size_t bytes, urt_delay_t* latency)
460
{
461
  urtDebugAssert(subscriber);
462

  
463
  if (!subscriber->base.topic)
464
      return URT_STATUS_FETCH_NOTOPIC;
465

  
466
  urtMutexLock(subscriber->base.topic->lock);
467
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
468

  
469
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
470
  {
471
    urtMutexUnlock(subscriber->base.topic->lock);
472
    return URT_STATUS_FETCH_NOMESSAGE;
473
  }
474

  
475
  urtFetchMessage();
476

  
477
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
478
#if(URT_CFG_PUBSUB_PROFILING == true)
479
  subscriber->base.sumLatencies += calculatedLatency;
480

  
481
  if (calculatedLatency < subscriber->minLatency)
482
  {
483
    subscriber->minLatency = calculatedLatency;
484
  }
485
  else if (calculatedLatency > subscriber->maxLatency)
486
  {
487
    subscriber->maxLatency = calculatedLatency;
488
  }
489
#endif /* URT_CFG_PUBSUB_PROFILING */
490
  bool temp = false;
491
  if (temp/*optional latency output argument given*/)
492
  {
493
    latency = calculatedLatency;
494
  }
495

  
496
  urtMutexUnlock(subscriber->base.topic->lock);
497
  return URT_STATUS_OK;
498
}
295 499

  
296 500
/**
297 501
 * @brief  Calculates the usefulness of the subscriber.
......
456 660
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
457 661
 */
458 662
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
459
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
663
                                              size_t bytes, urt_delay_t* latency)
664
{
665
  urtDebugAssert(subscriber);
666

  
667
  if (!subscriber->base.topic)
668
      return URT_STATUS_FETCH_NOTOPIC;
669

  
670
  urtMutexLock(subscriber->base.topic->lock);
671

  
672
  urt_message_t* messageTemp = subscriber->base.lastMessage;
673
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
674
  {
675
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
676
    {
677
      urtMutexUnlock(subscriber->base.topic->lock);
678
      return URT_STATUS_FETCH_NOMESSAGE;
679
    }
680
    messageTemp = messageTemp->next;
681
  }
682
  else
683
  {
684
    messageTemp = urtFindOldestMessage(messageTemp->next);
685
  }
686

  
687
  urtFetchMessage();
688

  
689
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
690
#if(URT_CFG_PUBSUB_PROFILING == true)
691
  subscriber->base.sumLatencies += calculatedLatency;
692
#endif /* URT_CFG_PUBSUB_PROFILING */
693

  
694
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
695
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
696
  {
697
    subscriber->minLatency = calculatedLatency;
698
  }
699
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
700
  {
701
    subscriber->maxLatency = calculatedLatency;
702
  }
703
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
704

  
705
  bool temp = false;
706
  if (temp/*optional latency output argument given*/)
707
  {
708
    latency = calculatedLatency;
709
  }
710

  
711
#if (URT_CFG_PUBSUB_PROFILING == true)
712
  subscriber->base.lastMessage->numConsumersLeft--;
713
  subscriber->base->numMessagesReceived++;
714
#endif /* URT_CFG_PUBSUB_PROFILING */
715

  
716
  urtMutexUnlock(subscriber->base.topic->lock);
717
  return URT_STATUS_OK; //TODO: or urt_status_jitterviolation
718
}
460 719

  
461 720
/**
462 721
 * @brief  Fetches the latest message.
......
471 730
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
472 731
 */
473 732
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
474
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
733
                                                size_t bytes, urt_delay_t* latency)
734
{
735
  urtDebugAssert(subscriber);
736

  
737
  if (!subscriber->base.topic)
738
      return URT_STATUS_FETCH_NOTOPIC;
739

  
740
  urtMutexLock(subscriber->base.topic->lock);
741
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
742

  
743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
744
  {
745
    urtMutexUnlock(subscriber->base.topic->lock);
746
    return URT_STATUS_FETCH_NOMESSAGE;
747
  }
748

  
749
  urtFetchMessage();
750

  
751
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
752
#if(URT_CFG_PUBSUB_PROFILING == true)
753
  subscriber->base.sumLatencies += calculatedLatency;
754
#endif /* URT_CFG_PUBSUB_PROFILING */
755

  
756
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
757
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
758
  {
759
    subscriber->minLatency = calculatedLatency;
760
  }
761
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
762
  {
763
    subscriber->maxLatency = calculatedLatency;
764
  }
765
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
766

  
767
  bool temp = false;
768
  if (temp/*optional latency output argument given*/)
769
  {
770
    latency = calculatedLatency;
771
  }
772

  
773
  urtMutexUnlock(subscriber->base.topic->lock);
774
  return URT_STATUS_OK;
775
}
475 776

  
476 777
/**
477 778
 * @brief  Calculates the validity from the subscriber.
......
668 969
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
669 970
 */
670 971
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
671
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
972
                                              size_t bytes, urt_delay_t* latency)
973
{
974
  urtDebugAssert(subscriber);
975

  
976
  if (!subscriber->base.topic)
977
      return URT_STATUS_FETCH_NOTOPIC;
978

  
979
  urtMutexLock(subscriber->base.topic->lock);
980
  urt_message_t* messageTemp = subscriber->base.lastMessage;
981
  if (messageTemp->next->originTime > messageTemp.originTime)
982
  {
983
    urtMutexUnlock(subscriber->base.topic->lock);
984
    return URT_STATUS_FETCH_NOMESSAGE;
985
  }
986
  messageTemp = messageTemp->next;
987

  
988
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
989
#if (URT_CFG_PUBSUB_PROFILING == true)
990
  subscriber->base.sumLatencies += calculatedLatency;
991
#endif /* URT_CFG_PUBSUB_PROFILING */
992
  bool temp = false;
993
  if (temp /* optional latency output argument given */)
994
  {
995
    latency = calculatedLatency
996
  }
997

  
998
  subscriber->base.lastMessage->numHrtConsumersLeft--;
999
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1000
  {
1001
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1002
  }
1003

  
1004
#if (URT_CFG_PUBSUB_PROFILING == true)
1005
  subscriber->base.lastMessage->numConsumersLeft--;
1006
  subscriber->base->numMessagesReceived++;
1007
#endif /* URT_CFG_PUBSUB_PROFILING */
1008

  
1009
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
1010
  if (temp /*latency is within allowed jitter range*/)
1011
  {
1012
    if (calculatedLatency < subscriber->minLatency)
1013
    {
1014
      subscriber->minLatency = calculatedLatency;
1015
    }
1016
    else if (calculatedLatency > subscriber->maxLatency)
1017
    {
1018
      subscriber->maxLatency = calculatedLatency;
1019
    }
1020
  }
1021
  else
1022
  {
1023
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1024
    urtMutexUnlock(subscriber->base.topic->lock);
1025
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1026
    return URT_STATUS_JITTERVIOLATION;
1027
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1028
  }
1029
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
1030

  
1031
  urtFetchMessage();
1032

  
1033
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1034
  if (messageTemp->next->originTime < messageTemp->originTime)
1035
  {
1036
    //TODO: update qos deadliner timer wrt. next message
1037
  }
1038
  else
1039
  {
1040
    //TODO: reset qos deadline timer
1041
  }
1042
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1043

  
1044
  urtMutexUnlock(subscriber->base.topic->lock);
1045
  return URT_STATUS_OK;
1046
}
672 1047

  
673 1048

  
674 1049
/**

Also available in: Unified diff