Revision 5b7188aa src/urt_subscriber.c

View differences:

src/urt_subscriber.c
128 128
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129 129
{
130 130
  urtDebugAssert(subscriber);
131

  
132
  urt_message_t youngestMessage;
133
  if (subscriber->base.topic)
134
  {
135
    //TODO: Lock Topic
136
    urt_osTime_t localCopy; //TODO: replace with local copy
137
    if (subscriber->base.lastMessageTime == localCopy)
138
    {
139
      if(subscriber->base.lastMessage->next->originTime < localCopy)
140
      {
141
        youngestMessage = subscriber->base.lastMessage->next;
142
      }
143
      else
144
      {
145
        //TODO: Unlock Topic
146
        return URT_STATUS_FETCH_NOMESSAGE;
147
      }
148
    }
149
    else
150
    {
151
      youngestMessage = subscriber->base.lastMessage->next;
152
      while (youngestMessage.originTime < localCopy)
153
      {
154
        youngestMessage = youngestMessage.next;
155
      }
156
    }
157
  }
158
  else
159
  {
160
    return URT_STATUS_FETCH_NOTOPIC;
161
  }
162

  
163
  latency = subscriber->base.lastMessageTime - youngestMessage.originTime;
164

  
165
  //TODO: Other cases
166
  //TODO: Unlock topic
167 131
  return URT_STATUS_OK;
168 132
}
169 133

  
......
191 155
 * @return  Returns URT_STATUS_OK on sucess.
192 156
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
193 157
 */
194
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) {
158
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
159
{
195 160
  if (subscriber->base.topic)
196 161
  {
197 162
# if(URT_CFG_PUBSUB_PROFILING == true)
198 163
      //TODO: LOCK TOPIC
199
      subscriber->base.topic->numHrtSubscribers--;
164
      subscriber->base.topic->numSubscribers--;
200 165
# endif /* URT_CFG_PUBSUB_PROFILING */
201 166
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
202 167
# if(URT_CFG_PUBSUB_PROFILING == true)
......
207 172
#endif /* URT_CFG_PUBSUB_PROFILING */
208 173
    return URT_STATUS_OK;
209 174
  }
210
  else
211
      return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
175

  
176
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
212 177
}
213 178

  
214 179

  
......
262 227
  {
263 228
    return URT_STATUS_SUBSCRIBE_TOPICSET;
264 229
  }
265
  else
266
  {
267
    subscriber->base.topic = topic;
268
    subscriber->usefulnesscb = usefulnesscb;
269
    subscriber->cbparams = cbparams;
230

  
231
  subscriber->base.topic = topic;
232
  subscriber->usefulnesscb = usefulnesscb;
233
  subscriber->cbparams = cbparams;
270 234
# if (URT_CFG_PUBSUB_PROFILING == true)
271
      subscriber->base.sumLatencies = 0;
272
      subscriber->base.numMessagesReceived = 0;
273
      subscriber->minLatency = URT_DELAY_INFINITE;
274
      subscriber->maxLatency = URT_DELAY_IMMEDIATE;
235
  subscriber->base.sumLatencies = 0;
236
  subscriber->base.numMessagesReceived = 0;
237
  subscriber->minLatency = URT_DELAY_INFINITE;
238
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
275 239
# endif  /* URT_CFG_PUBSUB_PROFILING */
276
  }
277 240

  
278 241
  //TODO: Lock topic
279 242
  if (messages)
......
359 322

  
360 323
  if (subscriber->base.topic)
361 324
  {
362
    if (URT_CFG_PUBSUB_PROFILING == true)
363
    {
364
      //TODO: lock topic
365
      subscriber->base.topic->numHrtSubscribers--;
366
    }
325
# if (URT_CFG_PUBSUB_PROFILING == true)
326
    //TODO: lock topic
327
    subscriber->base.topic->numSubscribers--;
328
# endif /* URT_CFG_PUBSUB_PROFILING */
367 329
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
368 330
# if (URT_CFG_PUBSUB_PROFILING == true)
369 331
      //TODO: unlock topic
......
435 397
  urtDebugAssert(subscriber);
436 398
  urtDebugAssert(topic);
437 399

  
438
    if (subscriber->base.topic)
439
    {
440
      return URT_STATUS_SUBSCRIBE_TOPICSET;
441
    }
442
    else
443
    {
444
      subscriber->base.topic = topic;
400
  if (subscriber->base.topic)
401
  {
402
    return URT_STATUS_SUBSCRIBE_TOPICSET;
403
  }
404

  
405
  subscriber->base.topic = topic;
445 406
# if (URT_CFG_PUBSUB_PROFILING == true)
446
        subscriber->base.sumLatencies = 0;
447
        subscriber->base.numMessagesReceived = 0;
407
  subscriber->base.sumLatencies = 0;
408
  subscriber->base.numMessagesReceived = 0;
448 409
# endif /* URT_CFG_PUBSUB_PROFILING */
449 410
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
450
        subscriber->deadlineOffset = deadline;
411
  subscriber->deadlineOffset = deadline;
451 412
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
452 413
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
453
        subscriber->maxJitter =jitter;
414
  subscriber->maxJitter =jitter;
454 415
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
455 416
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
456
        subscriber->minLatency = URT_DELAY_INFINITE;
457
        subscriber->maxLatency = URT_DELAY_IMMEDIATE;
417
  subscriber->minLatency = URT_DELAY_INFINITE;
418
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
458 419
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
459
    }
460 420

  
461
    //TODO: Lock topic
462
    if (messages)
421
  //TODO: Lock topic
422
  if (messages)
423
  {
424
    urt_message_t* lastMessageContribute = messages;
425
    while (lastMessageContribute->next)
463 426
    {
464
      urt_message_t* lastMessageContribute = messages;
465
      while (lastMessageContribute->next)
466
      {
467
          lastMessageContribute = lastMessageContribute->next;
468
      }
469
      lastMessageContribute->next = topic->latestMessage->next;
470
      topic->latestMessage->next = messages;
427
      lastMessageContribute = lastMessageContribute->next;
471 428
    }
429
    lastMessageContribute->next = topic->latestMessage->next;
430
    topic->latestMessage->next = messages;
431
  }
472 432

  
473
    subscriber->base.lastMessage = topic->latestMessage;
474
    subscriber->base.lastMessageTime = topic->latestMessage->originTime;
433
  subscriber->base.lastMessage = topic->latestMessage;
434
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
475 435

  
476
    urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
436
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
477 437

  
478 438
# if (URT_CFG_PUBSUB_PROFILING == true)
479
      topic->numHrtSubscribers--;
439
  topic->numHrtSubscribers--;
480 440
# endif /* URT_CFG_PUBSUB_PROFILING */
481 441

  
482
    //TODO: Unlock topic
483
    return URT_STATUS_OK;
442
  //TODO: Unlock topic
443
  return URT_STATUS_OK;
484 444
}
485 445

  
486 446
/**
......
524 484
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
525 485
{
526 486
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
527
    if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
528
        return true;
487
  if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
488
    return true;
529 489
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
490

  
530 491
  return false;
531 492
}
532 493

  
......
538 499
 * @return  Returns URT_STATUS_OK on sucess.
539 500
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
540 501
 */
541
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
502
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
503
{
504
  urtDebugAssert(subscriber);
505

  
506
  if (subscriber->base.topic)
507
  {
508
    //TODO: lock topic
509
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
510
    //TODO: decrement topic's HRT counter
511
# if (URT_CFG_PUBSUB_PROFILING == true)
512
    subscriber->base.topic->numSubscribers--;
513
# endif /* URT_CFG_PUBSUB_PROFILING */
514
//Hier weiter
515

  
516
# if (URT_CFG_PUBSUB_PROFILING == true)
517
    //TODO: unlock topic
518
# endif /* URT_CFG_PUBSUB_PROFILING */
519
    subscriber->base.topic = NULL;
520
    subscriber->base.lastMessage = NULL;
521
    subscriber->base.lastMessageTime = 0;
522
    return URT_STATUS_OK;
523
  }
524

  
525
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
526
}
542 527

  
543 528

  
544 529
/**
......
602 587
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
603 588
 */
604 589
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
605
                                       urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;}
590
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
591
{
592
  urtDebugAssert(subscriber);
593
  urtDebugAssert(topic);
594

  
595
  if (subscriber->base.topic)
596
  {
597
    return URT_STATUS_SUBSCRIBE_TOPICSET;
598
  }
599

  
600
  subscriber->base.topic = topic;
601
# if (URT_CFG_PUBSUB_PROFILING == true)
602
  subscriber->base.sumLatencies = 0;
603
  subscriber->base.numMessagesReceived = 0;
604
# endif /* URT_CFG_PUBSUB_PROFILING */
605
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
606
  subscriber->deadlineOffset = deadline;
607
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
608
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
609
  subscriber->maxJitter =jitter;
610
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
611
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
612
  subscriber->minLatency = URT_DELAY_INFINITE;
613
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
614
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
615
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
616
  subscriber->expectedRate = rate;
617
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
618

  
619
  //TODO: Lock topic
620
  if (messages)
621
  {
622
    urt_message_t* lastMessageContribute = messages;
623
    while (lastMessageContribute->next)
624
    {
625
      lastMessageContribute = lastMessageContribute->next;
626
    }
627
    lastMessageContribute->next = topic->latestMessage->next;
628
    topic->latestMessage->next = messages;
629
  }
630

  
631
  subscriber->base.lastMessage = topic->latestMessage;
632
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
633

  
634
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
635

  
636
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
637
  urt_hrtsubscriber_t* hrtSubscriber = subscriber->base.topic->hrtSubscribers;
638
  while (!hrtSubscriber /* && expected Rate is lower */)
639
  {
640
      hrtSubscriber = hrtSubscriber->next;
641
  }
642

  
643
  if (!hrtSubscriber)
644
  {
645
    //TODO: Append self to topic's list of HRT subscribers
646
  }
647
  else
648
  {
649
    //TOOD: insert self in front of current HRT susbcriber
650
    subscriber->next = hrtSubscriber;
651
  }
652
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
653

  
654
  topic->numHrtSubscribers--;
655
# if (URT_CFG_PUBSUB_PROFILING == true)
656
  topic->numSubscribers--;
657
# endif /* URT_CFG_PUBSUB_PROFILING */
658

  
659
  //TODO: Unlock topic
660
  return URT_STATUS_OK;
661
}
606 662

  
607 663

  
608 664
/**
......
644 700
 * @return  Returns URT_STATUS_OK on sucess.
645 701
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
646 702
 */
647
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;}
703
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
704
{
705
  urtDebugAssert(subscriber);
706

  
707
  if (subscriber->base.topic)
708
  {
709
    //TODO: lock topic
710
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
711
    subscriber->base.topic->numHrtSubscribers--;
712
# if (URT_CFG_PUBSUB_PROFILING == true)
713
    subscriber->base.topic->numSubscribers--;
714
# endif /* URT_CFG_PUBSUB_PROFILING */
715
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
716
    //TODO: remove self from topics lsit of HRT subscribers
717
    //TODO: ...
718
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
719

  
720
    urt_message_t* messageTemp = subscriber->base.lastMessage;
721
    while (messageTemp->next->originTime < messageTemp->originTime)
722
    {
723
        messageTemp = messageTemp->next;
724
        messageTemp->numHrtConsumersLeft--;
725
# if(URT_CFG_PUBSUB_PROFILING == true)
726
        messageTemp->numConsumersLeft--;
727
# endif /* URT_CFG_PUBSUB_PROFILING */
728
    }
729
    bool temp = false;
730
    if (temp /*TODO: HRT counter of any message became 0?*/)
731
    {
732
      //TODO: signal topics condition variable
733
    }
734

  
735
    //TODO: unlock topic
736
    subscriber->base.topic = NULL;
737
    subscriber->base.lastMessage = NULL;
738
    subscriber->base.lastMessageTime = 0;
739
    return URT_STATUS_OK;
740
  }
741

  
742
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
743
}
744

  
745

  
746

  

Also available in: Unified diff