Revision 5b7188aa src/urt_subscriber.c
src/urt_subscriber.c | ||
---|---|---|
128 | 128 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) |
129 | 129 |
{ |
130 | 130 |
urtDebugAssert(subscriber); |
131 |
|
|
132 |
urt_message_t youngestMessage; |
|
133 |
if (subscriber->base.topic) |
|
134 |
{ |
|
135 |
//TODO: Lock Topic |
|
136 |
urt_osTime_t localCopy; //TODO: replace with local copy |
|
137 |
if (subscriber->base.lastMessageTime == localCopy) |
|
138 |
{ |
|
139 |
if(subscriber->base.lastMessage->next->originTime < localCopy) |
|
140 |
{ |
|
141 |
youngestMessage = subscriber->base.lastMessage->next; |
|
142 |
} |
|
143 |
else |
|
144 |
{ |
|
145 |
//TODO: Unlock Topic |
|
146 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
147 |
} |
|
148 |
} |
|
149 |
else |
|
150 |
{ |
|
151 |
youngestMessage = subscriber->base.lastMessage->next; |
|
152 |
while (youngestMessage.originTime < localCopy) |
|
153 |
{ |
|
154 |
youngestMessage = youngestMessage.next; |
|
155 |
} |
|
156 |
} |
|
157 |
} |
|
158 |
else |
|
159 |
{ |
|
160 |
return URT_STATUS_FETCH_NOTOPIC; |
|
161 |
} |
|
162 |
|
|
163 |
latency = subscriber->base.lastMessageTime - youngestMessage.originTime; |
|
164 |
|
|
165 |
//TODO: Other cases |
|
166 |
//TODO: Unlock topic |
|
167 | 131 |
return URT_STATUS_OK; |
168 | 132 |
} |
169 | 133 |
|
... | ... | |
191 | 155 |
* @return Returns URT_STATUS_OK on sucess. |
192 | 156 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic. |
193 | 157 |
*/ |
194 |
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) { |
|
158 |
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) |
|
159 |
{ |
|
195 | 160 |
if (subscriber->base.topic) |
196 | 161 |
{ |
197 | 162 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
198 | 163 |
//TODO: LOCK TOPIC |
199 |
subscriber->base.topic->numHrtSubscribers--;
|
|
164 |
subscriber->base.topic->numSubscribers--; |
|
200 | 165 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
201 | 166 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
202 | 167 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
207 | 172 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
208 | 173 |
return URT_STATUS_OK; |
209 | 174 |
} |
210 |
else |
|
211 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
175 |
|
|
176 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
212 | 177 |
} |
213 | 178 |
|
214 | 179 |
|
... | ... | |
262 | 227 |
{ |
263 | 228 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
264 | 229 |
} |
265 |
else |
|
266 |
{ |
|
267 |
subscriber->base.topic = topic; |
|
268 |
subscriber->usefulnesscb = usefulnesscb; |
|
269 |
subscriber->cbparams = cbparams; |
|
230 |
|
|
231 |
subscriber->base.topic = topic; |
|
232 |
subscriber->usefulnesscb = usefulnesscb; |
|
233 |
subscriber->cbparams = cbparams; |
|
270 | 234 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
271 |
subscriber->base.sumLatencies = 0;
|
|
272 |
subscriber->base.numMessagesReceived = 0;
|
|
273 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
274 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
235 |
subscriber->base.sumLatencies = 0; |
|
236 |
subscriber->base.numMessagesReceived = 0; |
|
237 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
238 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
275 | 239 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
276 |
} |
|
277 | 240 |
|
278 | 241 |
//TODO: Lock topic |
279 | 242 |
if (messages) |
... | ... | |
359 | 322 |
|
360 | 323 |
if (subscriber->base.topic) |
361 | 324 |
{ |
362 |
if (URT_CFG_PUBSUB_PROFILING == true) |
|
363 |
{ |
|
364 |
//TODO: lock topic |
|
365 |
subscriber->base.topic->numHrtSubscribers--; |
|
366 |
} |
|
325 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
|
326 |
//TODO: lock topic |
|
327 |
subscriber->base.topic->numSubscribers--; |
|
328 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
367 | 329 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
368 | 330 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
369 | 331 |
//TODO: unlock topic |
... | ... | |
435 | 397 |
urtDebugAssert(subscriber); |
436 | 398 |
urtDebugAssert(topic); |
437 | 399 |
|
438 |
if (subscriber->base.topic) |
|
439 |
{ |
|
440 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
|
441 |
} |
|
442 |
else |
|
443 |
{ |
|
444 |
subscriber->base.topic = topic; |
|
400 |
if (subscriber->base.topic) |
|
401 |
{ |
|
402 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
|
403 |
} |
|
404 |
|
|
405 |
subscriber->base.topic = topic; |
|
445 | 406 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
446 |
subscriber->base.sumLatencies = 0;
|
|
447 |
subscriber->base.numMessagesReceived = 0;
|
|
407 |
subscriber->base.sumLatencies = 0; |
|
408 |
subscriber->base.numMessagesReceived = 0; |
|
448 | 409 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
449 | 410 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
450 |
subscriber->deadlineOffset = deadline;
|
|
411 |
subscriber->deadlineOffset = deadline; |
|
451 | 412 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
452 | 413 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
453 |
subscriber->maxJitter =jitter;
|
|
414 |
subscriber->maxJitter =jitter; |
|
454 | 415 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
455 | 416 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
456 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
457 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
417 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
418 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
458 | 419 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
459 |
} |
|
460 | 420 |
|
461 |
//TODO: Lock topic |
|
462 |
if (messages) |
|
421 |
//TODO: Lock topic |
|
422 |
if (messages) |
|
423 |
{ |
|
424 |
urt_message_t* lastMessageContribute = messages; |
|
425 |
while (lastMessageContribute->next) |
|
463 | 426 |
{ |
464 |
urt_message_t* lastMessageContribute = messages; |
|
465 |
while (lastMessageContribute->next) |
|
466 |
{ |
|
467 |
lastMessageContribute = lastMessageContribute->next; |
|
468 |
} |
|
469 |
lastMessageContribute->next = topic->latestMessage->next; |
|
470 |
topic->latestMessage->next = messages; |
|
427 |
lastMessageContribute = lastMessageContribute->next; |
|
471 | 428 |
} |
429 |
lastMessageContribute->next = topic->latestMessage->next; |
|
430 |
topic->latestMessage->next = messages; |
|
431 |
} |
|
472 | 432 |
|
473 |
subscriber->base.lastMessage = topic->latestMessage;
|
|
474 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
|
433 |
subscriber->base.lastMessage = topic->latestMessage; |
|
434 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
|
475 | 435 |
|
476 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
|
436 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag? |
|
477 | 437 |
|
478 | 438 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
479 |
topic->numHrtSubscribers--;
|
|
439 |
topic->numHrtSubscribers--; |
|
480 | 440 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
481 | 441 |
|
482 |
//TODO: Unlock topic
|
|
483 |
return URT_STATUS_OK;
|
|
442 |
//TODO: Unlock topic |
|
443 |
return URT_STATUS_OK; |
|
484 | 444 |
} |
485 | 445 |
|
486 | 446 |
/** |
... | ... | |
524 | 484 |
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency) |
525 | 485 |
{ |
526 | 486 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
527 |
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
|
528 |
return true;
|
|
487 |
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter) |
|
488 |
return true; |
|
529 | 489 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
490 |
|
|
530 | 491 |
return false; |
531 | 492 |
} |
532 | 493 |
|
... | ... | |
538 | 499 |
* @return Returns URT_STATUS_OK on sucess. |
539 | 500 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic. |
540 | 501 |
*/ |
541 |
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;} |
|
502 |
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber) |
|
503 |
{ |
|
504 |
urtDebugAssert(subscriber); |
|
505 |
|
|
506 |
if (subscriber->base.topic) |
|
507 |
{ |
|
508 |
//TODO: lock topic |
|
509 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
|
510 |
//TODO: decrement topic's HRT counter |
|
511 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
|
512 |
subscriber->base.topic->numSubscribers--; |
|
513 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
514 |
//Hier weiter |
|
515 |
|
|
516 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
|
517 |
//TODO: unlock topic |
|
518 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
519 |
subscriber->base.topic = NULL; |
|
520 |
subscriber->base.lastMessage = NULL; |
|
521 |
subscriber->base.lastMessageTime = 0; |
|
522 |
return URT_STATUS_OK; |
|
523 |
} |
|
524 |
|
|
525 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
526 |
} |
|
542 | 527 |
|
543 | 528 |
|
544 | 529 |
/** |
... | ... | |
602 | 587 |
* Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic. |
603 | 588 |
*/ |
604 | 589 |
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic, |
605 |
urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;} |
|
590 |
urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter) |
|
591 |
{ |
|
592 |
urtDebugAssert(subscriber); |
|
593 |
urtDebugAssert(topic); |
|
594 |
|
|
595 |
if (subscriber->base.topic) |
|
596 |
{ |
|
597 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
|
598 |
} |
|
599 |
|
|
600 |
subscriber->base.topic = topic; |
|
601 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
|
602 |
subscriber->base.sumLatencies = 0; |
|
603 |
subscriber->base.numMessagesReceived = 0; |
|
604 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
605 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
|
606 |
subscriber->deadlineOffset = deadline; |
|
607 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
|
608 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
|
609 |
subscriber->maxJitter =jitter; |
|
610 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
611 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
|
612 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
613 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
614 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
|
615 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
|
616 |
subscriber->expectedRate = rate; |
|
617 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
|
618 |
|
|
619 |
//TODO: Lock topic |
|
620 |
if (messages) |
|
621 |
{ |
|
622 |
urt_message_t* lastMessageContribute = messages; |
|
623 |
while (lastMessageContribute->next) |
|
624 |
{ |
|
625 |
lastMessageContribute = lastMessageContribute->next; |
|
626 |
} |
|
627 |
lastMessageContribute->next = topic->latestMessage->next; |
|
628 |
topic->latestMessage->next = messages; |
|
629 |
} |
|
630 |
|
|
631 |
subscriber->base.lastMessage = topic->latestMessage; |
|
632 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
|
633 |
|
|
634 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag? |
|
635 |
|
|
636 |
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
|
637 |
urt_hrtsubscriber_t* hrtSubscriber = subscriber->base.topic->hrtSubscribers; |
|
638 |
while (!hrtSubscriber /* && expected Rate is lower */) |
|
639 |
{ |
|
640 |
hrtSubscriber = hrtSubscriber->next; |
|
641 |
} |
|
642 |
|
|
643 |
if (!hrtSubscriber) |
|
644 |
{ |
|
645 |
//TODO: Append self to topic's list of HRT subscribers |
|
646 |
} |
|
647 |
else |
|
648 |
{ |
|
649 |
//TOOD: insert self in front of current HRT susbcriber |
|
650 |
subscriber->next = hrtSubscriber; |
|
651 |
} |
|
652 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
|
653 |
|
|
654 |
topic->numHrtSubscribers--; |
|
655 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
|
656 |
topic->numSubscribers--; |
|
657 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
658 |
|
|
659 |
//TODO: Unlock topic |
|
660 |
return URT_STATUS_OK; |
|
661 |
} |
|
606 | 662 |
|
607 | 663 |
|
608 | 664 |
/** |
... | ... | |
644 | 700 |
* @return Returns URT_STATUS_OK on sucess. |
645 | 701 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic. |
646 | 702 |
*/ |
647 |
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;} |
|
703 |
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber) |
|
704 |
{ |
|
705 |
urtDebugAssert(subscriber); |
|
706 |
|
|
707 |
if (subscriber->base.topic) |
|
708 |
{ |
|
709 |
//TODO: lock topic |
|
710 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
|
711 |
subscriber->base.topic->numHrtSubscribers--; |
|
712 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
|
713 |
subscriber->base.topic->numSubscribers--; |
|
714 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
715 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
|
716 |
//TODO: remove self from topics lsit of HRT subscribers |
|
717 |
//TODO: ... |
|
718 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
|
719 |
|
|
720 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
721 |
while (messageTemp->next->originTime < messageTemp->originTime) |
|
722 |
{ |
|
723 |
messageTemp = messageTemp->next; |
|
724 |
messageTemp->numHrtConsumersLeft--; |
|
725 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
|
726 |
messageTemp->numConsumersLeft--; |
|
727 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
|
728 |
} |
|
729 |
bool temp = false; |
|
730 |
if (temp /*TODO: HRT counter of any message became 0?*/) |
|
731 |
{ |
|
732 |
//TODO: signal topics condition variable |
|
733 |
} |
|
734 |
|
|
735 |
//TODO: unlock topic |
|
736 |
subscriber->base.topic = NULL; |
|
737 |
subscriber->base.lastMessage = NULL; |
|
738 |
subscriber->base.lastMessageTime = 0; |
|
739 |
return URT_STATUS_OK; |
|
740 |
} |
|
741 |
|
|
742 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
743 |
} |
|
744 |
|
|
745 |
|
|
746 |
|
Also available in: Unified diff