128 |
128 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
|
129 |
129 |
{
|
130 |
130 |
urtDebugAssert(subscriber);
|
131 |
|
|
132 |
|
urt_message_t youngestMessage;
|
133 |
|
if (subscriber->base.topic)
|
134 |
|
{
|
135 |
|
//TODO: Lock Topic
|
136 |
|
urt_osTime_t localCopy; //TODO: replace with local copy
|
137 |
|
if (subscriber->base.lastMessageTime == localCopy)
|
138 |
|
{
|
139 |
|
if(subscriber->base.lastMessage->next->originTime < localCopy)
|
140 |
|
{
|
141 |
|
youngestMessage = subscriber->base.lastMessage->next;
|
142 |
|
}
|
143 |
|
else
|
144 |
|
{
|
145 |
|
//TODO: Unlock Topic
|
146 |
|
return URT_STATUS_FETCH_NOMESSAGE;
|
147 |
|
}
|
148 |
|
}
|
149 |
|
else
|
150 |
|
{
|
151 |
|
youngestMessage = subscriber->base.lastMessage->next;
|
152 |
|
while (youngestMessage.originTime < localCopy)
|
153 |
|
{
|
154 |
|
youngestMessage = youngestMessage.next;
|
155 |
|
}
|
156 |
|
}
|
157 |
|
}
|
158 |
|
else
|
159 |
|
{
|
160 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
161 |
|
}
|
162 |
|
|
163 |
|
latency = subscriber->base.lastMessageTime - youngestMessage.originTime;
|
164 |
|
|
165 |
|
//TODO: Other cases
|
166 |
|
//TODO: Unlock topic
|
167 |
131 |
return URT_STATUS_OK;
|
168 |
132 |
}
|
169 |
133 |
|
... | ... | |
191 |
155 |
* @return Returns URT_STATUS_OK on sucess.
|
192 |
156 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
193 |
157 |
*/
|
194 |
|
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) {
|
|
158 |
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
|
|
159 |
{
|
195 |
160 |
if (subscriber->base.topic)
|
196 |
161 |
{
|
197 |
162 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
198 |
163 |
//TODO: LOCK TOPIC
|
199 |
|
subscriber->base.topic->numHrtSubscribers--;
|
|
164 |
subscriber->base.topic->numSubscribers--;
|
200 |
165 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
201 |
166 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
202 |
167 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
... | ... | |
207 |
172 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
208 |
173 |
return URT_STATUS_OK;
|
209 |
174 |
}
|
210 |
|
else
|
211 |
|
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
175 |
|
|
176 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
212 |
177 |
}
|
213 |
178 |
|
214 |
179 |
|
... | ... | |
262 |
227 |
{
|
263 |
228 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
264 |
229 |
}
|
265 |
|
else
|
266 |
|
{
|
267 |
|
subscriber->base.topic = topic;
|
268 |
|
subscriber->usefulnesscb = usefulnesscb;
|
269 |
|
subscriber->cbparams = cbparams;
|
|
230 |
|
|
231 |
subscriber->base.topic = topic;
|
|
232 |
subscriber->usefulnesscb = usefulnesscb;
|
|
233 |
subscriber->cbparams = cbparams;
|
270 |
234 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
271 |
|
subscriber->base.sumLatencies = 0;
|
272 |
|
subscriber->base.numMessagesReceived = 0;
|
273 |
|
subscriber->minLatency = URT_DELAY_INFINITE;
|
274 |
|
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
235 |
subscriber->base.sumLatencies = 0;
|
|
236 |
subscriber->base.numMessagesReceived = 0;
|
|
237 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
238 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
275 |
239 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
276 |
|
}
|
277 |
240 |
|
278 |
241 |
//TODO: Lock topic
|
279 |
242 |
if (messages)
|
... | ... | |
359 |
322 |
|
360 |
323 |
if (subscriber->base.topic)
|
361 |
324 |
{
|
362 |
|
if (URT_CFG_PUBSUB_PROFILING == true)
|
363 |
|
{
|
364 |
|
//TODO: lock topic
|
365 |
|
subscriber->base.topic->numHrtSubscribers--;
|
366 |
|
}
|
|
325 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
326 |
//TODO: lock topic
|
|
327 |
subscriber->base.topic->numSubscribers--;
|
|
328 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
367 |
329 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
368 |
330 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
369 |
331 |
//TODO: unlock topic
|
... | ... | |
435 |
397 |
urtDebugAssert(subscriber);
|
436 |
398 |
urtDebugAssert(topic);
|
437 |
399 |
|
438 |
|
if (subscriber->base.topic)
|
439 |
|
{
|
440 |
|
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
441 |
|
}
|
442 |
|
else
|
443 |
|
{
|
444 |
|
subscriber->base.topic = topic;
|
|
400 |
if (subscriber->base.topic)
|
|
401 |
{
|
|
402 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
|
403 |
}
|
|
404 |
|
|
405 |
subscriber->base.topic = topic;
|
445 |
406 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
446 |
|
subscriber->base.sumLatencies = 0;
|
447 |
|
subscriber->base.numMessagesReceived = 0;
|
|
407 |
subscriber->base.sumLatencies = 0;
|
|
408 |
subscriber->base.numMessagesReceived = 0;
|
448 |
409 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
449 |
410 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
450 |
|
subscriber->deadlineOffset = deadline;
|
|
411 |
subscriber->deadlineOffset = deadline;
|
451 |
412 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
452 |
413 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
453 |
|
subscriber->maxJitter =jitter;
|
|
414 |
subscriber->maxJitter =jitter;
|
454 |
415 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
455 |
416 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
|
456 |
|
subscriber->minLatency = URT_DELAY_INFINITE;
|
457 |
|
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
417 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
418 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
458 |
419 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
459 |
|
}
|
460 |
420 |
|
461 |
|
//TODO: Lock topic
|
462 |
|
if (messages)
|
|
421 |
//TODO: Lock topic
|
|
422 |
if (messages)
|
|
423 |
{
|
|
424 |
urt_message_t* lastMessageContribute = messages;
|
|
425 |
while (lastMessageContribute->next)
|
463 |
426 |
{
|
464 |
|
urt_message_t* lastMessageContribute = messages;
|
465 |
|
while (lastMessageContribute->next)
|
466 |
|
{
|
467 |
|
lastMessageContribute = lastMessageContribute->next;
|
468 |
|
}
|
469 |
|
lastMessageContribute->next = topic->latestMessage->next;
|
470 |
|
topic->latestMessage->next = messages;
|
|
427 |
lastMessageContribute = lastMessageContribute->next;
|
471 |
428 |
}
|
|
429 |
lastMessageContribute->next = topic->latestMessage->next;
|
|
430 |
topic->latestMessage->next = messages;
|
|
431 |
}
|
472 |
432 |
|
473 |
|
subscriber->base.lastMessage = topic->latestMessage;
|
474 |
|
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
|
433 |
subscriber->base.lastMessage = topic->latestMessage;
|
|
434 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
475 |
435 |
|
476 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
|
436 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
477 |
437 |
|
478 |
438 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
479 |
|
topic->numHrtSubscribers--;
|
|
439 |
topic->numHrtSubscribers--;
|
480 |
440 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
481 |
441 |
|
482 |
|
//TODO: Unlock topic
|
483 |
|
return URT_STATUS_OK;
|
|
442 |
//TODO: Unlock topic
|
|
443 |
return URT_STATUS_OK;
|
484 |
444 |
}
|
485 |
445 |
|
486 |
446 |
/**
|
... | ... | |
524 |
484 |
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
|
525 |
485 |
{
|
526 |
486 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
527 |
|
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
528 |
|
return true;
|
|
487 |
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
|
488 |
return true;
|
529 |
489 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
490 |
|
530 |
491 |
return false;
|
531 |
492 |
}
|
532 |
493 |
|
... | ... | |
538 |
499 |
* @return Returns URT_STATUS_OK on sucess.
|
539 |
500 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
540 |
501 |
*/
|
541 |
|
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
|
|
502 |
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
|
|
503 |
{
|
|
504 |
urtDebugAssert(subscriber);
|
|
505 |
|
|
506 |
if (subscriber->base.topic)
|
|
507 |
{
|
|
508 |
//TODO: lock topic
|
|
509 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
510 |
//TODO: decrement topic's HRT counter
|
|
511 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
512 |
subscriber->base.topic->numSubscribers--;
|
|
513 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
514 |
//Hier weiter
|
|
515 |
|
|
516 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
517 |
//TODO: unlock topic
|
|
518 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
519 |
subscriber->base.topic = NULL;
|
|
520 |
subscriber->base.lastMessage = NULL;
|
|
521 |
subscriber->base.lastMessageTime = 0;
|
|
522 |
return URT_STATUS_OK;
|
|
523 |
}
|
|
524 |
|
|
525 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
526 |
}
|
542 |
527 |
|
543 |
528 |
|
544 |
529 |
/**
|
... | ... | |
602 |
587 |
* Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
603 |
588 |
*/
|
604 |
589 |
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
|
605 |
|
urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;}
|
|
590 |
urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
|
|
591 |
{
|
|
592 |
urtDebugAssert(subscriber);
|
|
593 |
urtDebugAssert(topic);
|
|
594 |
|
|
595 |
if (subscriber->base.topic)
|
|
596 |
{
|
|
597 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
|
598 |
}
|
|
599 |
|
|
600 |
subscriber->base.topic = topic;
|
|
601 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
602 |
subscriber->base.sumLatencies = 0;
|
|
603 |
subscriber->base.numMessagesReceived = 0;
|
|
604 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
605 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
|
606 |
subscriber->deadlineOffset = deadline;
|
|
607 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
|
608 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
|
609 |
subscriber->maxJitter =jitter;
|
|
610 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
611 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
|
|
612 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
613 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
614 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
|
615 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
|
616 |
subscriber->expectedRate = rate;
|
|
617 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
|
618 |
|
|
619 |
//TODO: Lock topic
|
|
620 |
if (messages)
|
|
621 |
{
|
|
622 |
urt_message_t* lastMessageContribute = messages;
|
|
623 |
while (lastMessageContribute->next)
|
|
624 |
{
|
|
625 |
lastMessageContribute = lastMessageContribute->next;
|
|
626 |
}
|
|
627 |
lastMessageContribute->next = topic->latestMessage->next;
|
|
628 |
topic->latestMessage->next = messages;
|
|
629 |
}
|
|
630 |
|
|
631 |
subscriber->base.lastMessage = topic->latestMessage;
|
|
632 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
|
633 |
|
|
634 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
|
635 |
|
|
636 |
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
|
637 |
urt_hrtsubscriber_t* hrtSubscriber = subscriber->base.topic->hrtSubscribers;
|
|
638 |
while (!hrtSubscriber /* && expected Rate is lower */)
|
|
639 |
{
|
|
640 |
hrtSubscriber = hrtSubscriber->next;
|
|
641 |
}
|
|
642 |
|
|
643 |
if (!hrtSubscriber)
|
|
644 |
{
|
|
645 |
//TODO: Append self to topic's list of HRT subscribers
|
|
646 |
}
|
|
647 |
else
|
|
648 |
{
|
|
649 |
//TOOD: insert self in front of current HRT susbcriber
|
|
650 |
subscriber->next = hrtSubscriber;
|
|
651 |
}
|
|
652 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
|
653 |
|
|
654 |
topic->numHrtSubscribers--;
|
|
655 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
656 |
topic->numSubscribers--;
|
|
657 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
658 |
|
|
659 |
//TODO: Unlock topic
|
|
660 |
return URT_STATUS_OK;
|
|
661 |
}
|
606 |
662 |
|
607 |
663 |
|
608 |
664 |
/**
|
... | ... | |
644 |
700 |
* @return Returns URT_STATUS_OK on sucess.
|
645 |
701 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
646 |
702 |
*/
|
647 |
|
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;}
|
|
703 |
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
|
|
704 |
{
|
|
705 |
urtDebugAssert(subscriber);
|
|
706 |
|
|
707 |
if (subscriber->base.topic)
|
|
708 |
{
|
|
709 |
//TODO: lock topic
|
|
710 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
711 |
subscriber->base.topic->numHrtSubscribers--;
|
|
712 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
713 |
subscriber->base.topic->numSubscribers--;
|
|
714 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
715 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
|
716 |
//TODO: remove self from topics lsit of HRT subscribers
|
|
717 |
//TODO: ...
|
|
718 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
|
719 |
|
|
720 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
721 |
while (messageTemp->next->originTime < messageTemp->originTime)
|
|
722 |
{
|
|
723 |
messageTemp = messageTemp->next;
|
|
724 |
messageTemp->numHrtConsumersLeft--;
|
|
725 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
|
726 |
messageTemp->numConsumersLeft--;
|
|
727 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
728 |
}
|
|
729 |
bool temp = false;
|
|
730 |
if (temp /*TODO: HRT counter of any message became 0?*/)
|
|
731 |
{
|
|
732 |
//TODO: signal topics condition variable
|
|
733 |
}
|
|
734 |
|
|
735 |
//TODO: unlock topic
|
|
736 |
subscriber->base.topic = NULL;
|
|
737 |
subscriber->base.lastMessage = NULL;
|
|
738 |
subscriber->base.lastMessageTime = 0;
|
|
739 |
return URT_STATUS_OK;
|
|
740 |
}
|
|
741 |
|
|
742 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
743 |
}
|
|
744 |
|
|
745 |
|
|
746 |
|