Revision 37cd5dc2 src/urt_subscriber.c
src/urt_subscriber.c | ||
---|---|---|
87 | 87 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
88 | 88 |
|
89 | 89 |
subscriber->base.topic = topic; |
90 |
//TODO: Lock topic
|
|
90 |
urtMutexLock(topic->lock);
|
|
91 | 91 |
|
92 | 92 |
if (messages) |
93 | 93 |
{ |
... | ... | |
109 | 109 |
topic->numHrtSubscribers--; |
110 | 110 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
111 | 111 |
|
112 |
//TODO: Unlock topic
|
|
112 |
urtMutexUnlock(topic->lock);
|
|
113 | 113 |
return URT_STATUS_OK; |
114 | 114 |
} |
115 | 115 |
|
... | ... | |
160 | 160 |
if (subscriber->base.topic) |
161 | 161 |
{ |
162 | 162 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
163 |
//TODO: LOCK TOPIC
|
|
163 |
urtMutexLock(topic->lock);
|
|
164 | 164 |
subscriber->base.topic->numSubscribers--; |
165 | 165 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
166 | 166 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
167 | 167 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
168 |
//TODO: Unlock TOPIC
|
|
168 |
urtMutexUnlock(topic->lock);
|
|
169 | 169 |
subscriber->base.topic = NULL; |
170 | 170 |
subscriber->base.lastMessage = NULL; |
171 | 171 |
subscriber->base.lastMessageTime = 0; |
... | ... | |
238 | 238 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
239 | 239 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
240 | 240 |
|
241 |
//TODO: Lock topic
|
|
241 |
urtMutexLock(topic->lock);
|
|
242 | 242 |
if (messages) |
243 | 243 |
{ |
244 | 244 |
urt_message_t* lastMessageContribute = messages; |
... | ... | |
259 | 259 |
topic->numHrtSubscribers--; |
260 | 260 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
261 | 261 |
|
262 |
//TODO: Unlock topic
|
|
262 |
urtMutexUnlock(topic->lock);
|
|
263 | 263 |
return URT_STATUS_OK; |
264 | 264 |
} |
265 | 265 |
|
... | ... | |
323 | 323 |
if (subscriber->base.topic) |
324 | 324 |
{ |
325 | 325 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
326 |
//TODO: lock topic
|
|
326 |
urtMutexLock(topic->lock);
|
|
327 | 327 |
subscriber->base.topic->numSubscribers--; |
328 | 328 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
329 | 329 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
330 | 330 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
331 |
//TODO: unlock topic
|
|
331 |
urtMutexUnlock(topic->lock);
|
|
332 | 332 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
333 | 333 |
subscriber->base.topic = NULL; |
334 | 334 |
subscriber->base.lastMessage = NULL; |
... | ... | |
418 | 418 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
419 | 419 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
420 | 420 |
|
421 |
//TODO: Lock topic
|
|
421 |
urtMutexLock(topic->lock);
|
|
422 | 422 |
if (messages) |
423 | 423 |
{ |
424 | 424 |
urt_message_t* lastMessageContribute = messages; |
... | ... | |
439 | 439 |
topic->numHrtSubscribers--; |
440 | 440 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
441 | 441 |
|
442 |
//TODO: Unlock topic
|
|
442 |
urtMutexUnlock(topic->lock);
|
|
443 | 443 |
return URT_STATUS_OK; |
444 | 444 |
} |
445 | 445 |
|
... | ... | |
483 | 483 |
*/ |
484 | 484 |
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency) |
485 | 485 |
{ |
486 |
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
|
487 |
if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
|
486 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
|
|
487 |
if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
|
|
488 | 488 |
return true; |
489 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
|
490 |
|
|
491 |
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
|
492 |
if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency) |
|
493 |
return true; |
|
494 |
|
|
495 |
if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency) |
|
496 |
return true; |
|
489 | 497 |
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
490 | 498 |
|
491 | 499 |
return false; |
... | ... | |
505 | 513 |
|
506 | 514 |
if (subscriber->base.topic) |
507 | 515 |
{ |
508 |
//TODO: lock topic
|
|
516 |
urtMutexLock(topic->lock);
|
|
509 | 517 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
510 | 518 |
//TODO: decrement topic's HRT counter |
511 | 519 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
514 | 522 |
//Hier weiter |
515 | 523 |
|
516 | 524 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
517 |
//TODO: unlock topic
|
|
525 |
urtMutexUnlock(topic->lock);
|
|
518 | 526 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
519 | 527 |
subscriber->base.topic = NULL; |
520 | 528 |
subscriber->base.lastMessage = NULL; |
... | ... | |
616 | 624 |
subscriber->expectedRate = rate; |
617 | 625 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
618 | 626 |
|
619 |
//TODO: Lock topic
|
|
627 |
urtMutexLock(topic->lock);
|
|
620 | 628 |
if (messages) |
621 | 629 |
{ |
622 | 630 |
urt_message_t* lastMessageContribute = messages; |
... | ... | |
642 | 650 |
topic->numSubscribers--; |
643 | 651 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
644 | 652 |
|
645 |
//TODO: Unlock topic
|
|
653 |
urtMutexUnlock(topic->lock);
|
|
646 | 654 |
return URT_STATUS_OK; |
647 | 655 |
} |
648 | 656 |
|
... | ... | |
692 | 700 |
|
693 | 701 |
if (subscriber->base.topic) |
694 | 702 |
{ |
695 |
//TODO: lock topic
|
|
703 |
urtMutexLock(topic->lock);
|
|
696 | 704 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
697 | 705 |
subscriber->base.topic->numHrtSubscribers--; |
698 | 706 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
704 | 712 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
705 | 713 |
|
706 | 714 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
715 |
bool hrtZero = false; |
|
707 | 716 |
while (messageTemp->next->originTime < messageTemp->originTime) |
708 | 717 |
{ |
709 | 718 |
messageTemp = messageTemp->next; |
710 | 719 |
messageTemp->numHrtConsumersLeft--; |
720 |
if (messageTemp->numHrtConsumersLeft == 0) |
|
721 |
{ |
|
722 |
hrtZero = true; |
|
723 |
} |
|
711 | 724 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
712 | 725 |
messageTemp->numConsumersLeft--; |
713 | 726 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
714 | 727 |
} |
715 |
bool temp = false; |
|
716 |
if (temp /*TODO: HRT counter of any message became 0?*/) |
|
728 |
if (hrtZero) |
|
717 | 729 |
{ |
718 |
//TODO: signal topics condition variable
|
|
730 |
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
|
719 | 731 |
} |
720 | 732 |
|
721 |
//TODO: unlock topic
|
|
733 |
urtMutexUnlock(topic->lock);
|
|
722 | 734 |
subscriber->base.topic = NULL; |
723 | 735 |
subscriber->base.lastMessage = NULL; |
724 | 736 |
subscriber->base.lastMessageTime = 0; |
Also available in: Unified diff