| 128 |
128 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
|
| 129 |
129 |
{
|
| 130 |
130 |
urtDebugAssert(subscriber);
|
| 131 |
|
|
| 132 |
|
urt_message_t youngestMessage;
|
| 133 |
|
if (subscriber->base.topic)
|
| 134 |
|
{
|
| 135 |
|
//TODO: Lock Topic
|
| 136 |
|
urt_osTime_t localCopy; //TODO: replace with local copy
|
| 137 |
|
if (subscriber->base.lastMessageTime == localCopy)
|
| 138 |
|
{
|
| 139 |
|
if(subscriber->base.lastMessage->next->originTime < localCopy)
|
| 140 |
|
{
|
| 141 |
|
youngestMessage = subscriber->base.lastMessage->next;
|
| 142 |
|
}
|
| 143 |
|
else
|
| 144 |
|
{
|
| 145 |
|
//TODO: Unlock Topic
|
| 146 |
|
return URT_STATUS_FETCH_NOMESSAGE;
|
| 147 |
|
}
|
| 148 |
|
}
|
| 149 |
|
else
|
| 150 |
|
{
|
| 151 |
|
youngestMessage = subscriber->base.lastMessage->next;
|
| 152 |
|
while (youngestMessage.originTime < localCopy)
|
| 153 |
|
{
|
| 154 |
|
youngestMessage = youngestMessage.next;
|
| 155 |
|
}
|
| 156 |
|
}
|
| 157 |
|
}
|
| 158 |
|
else
|
| 159 |
|
{
|
| 160 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
| 161 |
|
}
|
| 162 |
|
|
| 163 |
|
latency = subscriber->base.lastMessageTime - youngestMessage.originTime;
|
| 164 |
|
|
| 165 |
|
//TODO: Other cases
|
| 166 |
|
//TODO: Unlock topic
|
| 167 |
131 |
return URT_STATUS_OK;
|
| 168 |
132 |
}
|
| 169 |
133 |
|
| ... | ... | |
| 191 |
155 |
* @return Returns URT_STATUS_OK on sucess.
|
| 192 |
156 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
| 193 |
157 |
*/
|
| 194 |
|
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) {
|
|
158 |
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
|
|
159 |
{
|
| 195 |
160 |
if (subscriber->base.topic)
|
| 196 |
161 |
{
|
| 197 |
162 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
| 198 |
163 |
//TODO: LOCK TOPIC
|
| 199 |
|
subscriber->base.topic->numHrtSubscribers--;
|
|
164 |
subscriber->base.topic->numSubscribers--;
|
| 200 |
165 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 201 |
166 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
| 202 |
167 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
| ... | ... | |
| 207 |
172 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 208 |
173 |
return URT_STATUS_OK;
|
| 209 |
174 |
}
|
| 210 |
|
else
|
| 211 |
|
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
175 |
|
|
176 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
| 212 |
177 |
}
|
| 213 |
178 |
|
| 214 |
179 |
|
| ... | ... | |
| 262 |
227 |
{
|
| 263 |
228 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
| 264 |
229 |
}
|
| 265 |
|
else
|
| 266 |
|
{
|
| 267 |
|
subscriber->base.topic = topic;
|
| 268 |
|
subscriber->usefulnesscb = usefulnesscb;
|
| 269 |
|
subscriber->cbparams = cbparams;
|
|
230 |
|
|
231 |
subscriber->base.topic = topic;
|
|
232 |
subscriber->usefulnesscb = usefulnesscb;
|
|
233 |
subscriber->cbparams = cbparams;
|
| 270 |
234 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 271 |
|
subscriber->base.sumLatencies = 0;
|
| 272 |
|
subscriber->base.numMessagesReceived = 0;
|
| 273 |
|
subscriber->minLatency = URT_DELAY_INFINITE;
|
| 274 |
|
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
235 |
subscriber->base.sumLatencies = 0;
|
|
236 |
subscriber->base.numMessagesReceived = 0;
|
|
237 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
238 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
| 275 |
239 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 276 |
|
}
|
| 277 |
240 |
|
| 278 |
241 |
//TODO: Lock topic
|
| 279 |
242 |
if (messages)
|
| ... | ... | |
| 359 |
322 |
|
| 360 |
323 |
if (subscriber->base.topic)
|
| 361 |
324 |
{
|
| 362 |
|
if (URT_CFG_PUBSUB_PROFILING == true)
|
| 363 |
|
{
|
| 364 |
|
//TODO: lock topic
|
| 365 |
|
subscriber->base.topic->numHrtSubscribers--;
|
| 366 |
|
}
|
|
325 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
326 |
//TODO: lock topic
|
|
327 |
subscriber->base.topic->numSubscribers--;
|
|
328 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 367 |
329 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
| 368 |
330 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 369 |
331 |
//TODO: unlock topic
|
| ... | ... | |
| 435 |
397 |
urtDebugAssert(subscriber);
|
| 436 |
398 |
urtDebugAssert(topic);
|
| 437 |
399 |
|
| 438 |
|
if (subscriber->base.topic)
|
| 439 |
|
{
|
| 440 |
|
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
| 441 |
|
}
|
| 442 |
|
else
|
| 443 |
|
{
|
| 444 |
|
subscriber->base.topic = topic;
|
|
400 |
if (subscriber->base.topic)
|
|
401 |
{
|
|
402 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
|
403 |
}
|
|
404 |
|
|
405 |
subscriber->base.topic = topic;
|
| 445 |
406 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 446 |
|
subscriber->base.sumLatencies = 0;
|
| 447 |
|
subscriber->base.numMessagesReceived = 0;
|
|
407 |
subscriber->base.sumLatencies = 0;
|
|
408 |
subscriber->base.numMessagesReceived = 0;
|
| 448 |
409 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 449 |
410 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
| 450 |
|
subscriber->deadlineOffset = deadline;
|
|
411 |
subscriber->deadlineOffset = deadline;
|
| 451 |
412 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
| 452 |
413 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
| 453 |
|
subscriber->maxJitter =jitter;
|
|
414 |
subscriber->maxJitter =jitter;
|
| 454 |
415 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
| 455 |
416 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
|
| 456 |
|
subscriber->minLatency = URT_DELAY_INFINITE;
|
| 457 |
|
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
417 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
418 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
| 458 |
419 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
| 459 |
|
}
|
| 460 |
420 |
|
| 461 |
|
//TODO: Lock topic
|
| 462 |
|
if (messages)
|
|
421 |
//TODO: Lock topic
|
|
422 |
if (messages)
|
|
423 |
{
|
|
424 |
urt_message_t* lastMessageContribute = messages;
|
|
425 |
while (lastMessageContribute->next)
|
| 463 |
426 |
{
|
| 464 |
|
urt_message_t* lastMessageContribute = messages;
|
| 465 |
|
while (lastMessageContribute->next)
|
| 466 |
|
{
|
| 467 |
|
lastMessageContribute = lastMessageContribute->next;
|
| 468 |
|
}
|
| 469 |
|
lastMessageContribute->next = topic->latestMessage->next;
|
| 470 |
|
topic->latestMessage->next = messages;
|
|
427 |
lastMessageContribute = lastMessageContribute->next;
|
| 471 |
428 |
}
|
|
429 |
lastMessageContribute->next = topic->latestMessage->next;
|
|
430 |
topic->latestMessage->next = messages;
|
|
431 |
}
|
| 472 |
432 |
|
| 473 |
|
subscriber->base.lastMessage = topic->latestMessage;
|
| 474 |
|
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
|
433 |
subscriber->base.lastMessage = topic->latestMessage;
|
|
434 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
| 475 |
435 |
|
| 476 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
|
436 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
| 477 |
437 |
|
| 478 |
438 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 479 |
|
topic->numHrtSubscribers--;
|
|
439 |
topic->numHrtSubscribers--;
|
| 480 |
440 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 481 |
441 |
|
| 482 |
|
//TODO: Unlock topic
|
| 483 |
|
return URT_STATUS_OK;
|
|
442 |
//TODO: Unlock topic
|
|
443 |
return URT_STATUS_OK;
|
| 484 |
444 |
}
|
| 485 |
445 |
|
| 486 |
446 |
/**
|
| ... | ... | |
| 524 |
484 |
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
|
| 525 |
485 |
{
|
| 526 |
486 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
| 527 |
|
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
| 528 |
|
return true;
|
|
487 |
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
|
488 |
return true;
|
| 529 |
489 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
490 |
|
| 530 |
491 |
return false;
|
| 531 |
492 |
}
|
| 532 |
493 |
|
| ... | ... | |
| 538 |
499 |
* @return Returns URT_STATUS_OK on sucess.
|
| 539 |
500 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
| 540 |
501 |
*/
|
| 541 |
|
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
|
|
502 |
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
|
|
503 |
{
|
|
504 |
urtDebugAssert(subscriber);
|
|
505 |
|
|
506 |
if (subscriber->base.topic)
|
|
507 |
{
|
|
508 |
//TODO: lock topic
|
|
509 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
510 |
//TODO: decrement topic's HRT counter
|
|
511 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
512 |
subscriber->base.topic->numSubscribers--;
|
|
513 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
514 |
//Hier weiter
|
|
515 |
|
|
516 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
517 |
//TODO: unlock topic
|
|
518 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
519 |
subscriber->base.topic = NULL;
|
|
520 |
subscriber->base.lastMessage = NULL;
|
|
521 |
subscriber->base.lastMessageTime = 0;
|
|
522 |
return URT_STATUS_OK;
|
|
523 |
}
|
|
524 |
|
|
525 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
526 |
}
|
| 542 |
527 |
|
| 543 |
528 |
|
| 544 |
529 |
/**
|
| ... | ... | |
| 602 |
587 |
* Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
| 603 |
588 |
*/
|
| 604 |
589 |
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
|
| 605 |
|
urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;}
|
|
590 |
urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
|
|
591 |
{
|
|
592 |
urtDebugAssert(subscriber);
|
|
593 |
urtDebugAssert(topic);
|
|
594 |
|
|
595 |
if (subscriber->base.topic)
|
|
596 |
{
|
|
597 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
|
598 |
}
|
|
599 |
|
|
600 |
subscriber->base.topic = topic;
|
|
601 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
602 |
subscriber->base.sumLatencies = 0;
|
|
603 |
subscriber->base.numMessagesReceived = 0;
|
|
604 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
605 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
|
606 |
subscriber->deadlineOffset = deadline;
|
|
607 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
|
608 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
|
609 |
subscriber->maxJitter =jitter;
|
|
610 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
611 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
|
|
612 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
613 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
614 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
|
615 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
|
616 |
subscriber->expectedRate = rate;
|
|
617 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
|
618 |
|
|
619 |
//TODO: Lock topic
|
|
620 |
if (messages)
|
|
621 |
{
|
|
622 |
urt_message_t* lastMessageContribute = messages;
|
|
623 |
while (lastMessageContribute->next)
|
|
624 |
{
|
|
625 |
lastMessageContribute = lastMessageContribute->next;
|
|
626 |
}
|
|
627 |
lastMessageContribute->next = topic->latestMessage->next;
|
|
628 |
topic->latestMessage->next = messages;
|
|
629 |
}
|
|
630 |
|
|
631 |
subscriber->base.lastMessage = topic->latestMessage;
|
|
632 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
|
633 |
|
|
634 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
|
635 |
|
|
636 |
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
|
637 |
urt_hrtsubscriber_t* hrtSubscriber = subscriber->base.topic->hrtSubscribers;
|
|
638 |
while (!hrtSubscriber /* && expected Rate is lower */)
|
|
639 |
{
|
|
640 |
hrtSubscriber = hrtSubscriber->next;
|
|
641 |
}
|
|
642 |
|
|
643 |
if (!hrtSubscriber)
|
|
644 |
{
|
|
645 |
//TODO: Append self to topic's list of HRT subscribers
|
|
646 |
}
|
|
647 |
else
|
|
648 |
{
|
|
649 |
//TOOD: insert self in front of current HRT susbcriber
|
|
650 |
subscriber->next = hrtSubscriber;
|
|
651 |
}
|
|
652 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
|
653 |
|
|
654 |
topic->numHrtSubscribers--;
|
|
655 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
656 |
topic->numSubscribers--;
|
|
657 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
658 |
|
|
659 |
//TODO: Unlock topic
|
|
660 |
return URT_STATUS_OK;
|
|
661 |
}
|
| 606 |
662 |
|
| 607 |
663 |
|
| 608 |
664 |
/**
|
| ... | ... | |
| 644 |
700 |
* @return Returns URT_STATUS_OK on sucess.
|
| 645 |
701 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
| 646 |
702 |
*/
|
| 647 |
|
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;}
|
|
703 |
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
|
|
704 |
{
|
|
705 |
urtDebugAssert(subscriber);
|
|
706 |
|
|
707 |
if (subscriber->base.topic)
|
|
708 |
{
|
|
709 |
//TODO: lock topic
|
|
710 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
711 |
subscriber->base.topic->numHrtSubscribers--;
|
|
712 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
713 |
subscriber->base.topic->numSubscribers--;
|
|
714 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
715 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
|
716 |
//TODO: remove self from topics lsit of HRT subscribers
|
|
717 |
//TODO: ...
|
|
718 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
|
719 |
|
|
720 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
721 |
while (messageTemp->next->originTime < messageTemp->originTime)
|
|
722 |
{
|
|
723 |
messageTemp = messageTemp->next;
|
|
724 |
messageTemp->numHrtConsumersLeft--;
|
|
725 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
|
726 |
messageTemp->numConsumersLeft--;
|
|
727 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
728 |
}
|
|
729 |
bool temp = false;
|
|
730 |
if (temp /*TODO: HRT counter of any message became 0?*/)
|
|
731 |
{
|
|
732 |
//TODO: signal topics condition variable
|
|
733 |
}
|
|
734 |
|
|
735 |
//TODO: unlock topic
|
|
736 |
subscriber->base.topic = NULL;
|
|
737 |
subscriber->base.lastMessage = NULL;
|
|
738 |
subscriber->base.lastMessageTime = 0;
|
|
739 |
return URT_STATUS_OK;
|
|
740 |
}
|
|
741 |
|
|
742 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
743 |
}
|
|
744 |
|
|
745 |
|
|
746 |
|