Revision 982056f7 src/urt_subscriber.c
src/urt_subscriber.c | ||
---|---|---|
20 | 20 |
*/ |
21 | 21 |
|
22 | 22 |
#include <urtware.h> |
23 |
#include <stdio.h> |
|
23 | 24 |
|
24 | 25 |
/******************************************************************************/ |
25 | 26 |
/* LOCAL DEFINITIONS */ |
... | ... | |
41 | 42 |
/* LOCAL FUNCTIONS */ |
42 | 43 |
/******************************************************************************/ |
43 | 44 |
|
44 |
void urtFetchMessage () |
|
45 |
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
|
|
45 | 46 |
{ |
46 |
//TODO: Update message pointer
|
|
47 |
//TODO: Copy message origin time
|
|
48 |
//TODO: Copy message payload
|
|
47 |
subscriber->base.lastMessage = message;
|
|
48 |
*subscriber->base.lastMessageTime = message->originTime;
|
|
49 |
memcpy(message->payload, payload, bytes);
|
|
49 | 50 |
} |
50 | 51 |
|
51 | 52 |
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage) |
... | ... | |
70 | 71 |
} |
71 | 72 |
} |
72 | 73 |
|
74 |
void urtContributeMessages(urt_message_t* messages) |
|
75 |
{ |
|
76 |
urt_message_t* lastMessageContribute = messages; |
|
77 |
while (lastMessageContribute->next) |
|
78 |
{ |
|
79 |
lastMessageContribute = lastMessageContribute->next; |
|
80 |
} |
|
81 |
lastMessageContribute->next = topic->latestMessage->next; |
|
82 |
topic->latestMessage->next = messages; |
|
83 |
} |
|
84 |
|
|
73 | 85 |
/******************************************************************************/ |
74 | 86 |
/* EXPORTED FUNCTIONS */ |
75 | 87 |
/******************************************************************************/ |
... | ... | |
112 | 124 |
urtDebugAssert(subscriber); |
113 | 125 |
urtDebugAssert(topic); |
114 | 126 |
|
115 |
if (!subscriber->base.topic) |
|
116 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
|
127 |
if (!subscriber->base.topic) { |
|
128 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
|
129 |
} |
|
117 | 130 |
|
118 | 131 |
subscriber->base.topic = topic; |
119 | 132 |
urtMutexLock(topic->lock); |
120 | 133 |
|
121 |
if (messages) |
|
122 |
{ |
|
123 |
urt_message_t* lastMessageContribute = messages; |
|
124 |
while (lastMessageContribute->next) |
|
125 |
{ |
|
126 |
lastMessageContribute = lastMessageContribute->next; |
|
127 |
} |
|
128 |
lastMessageContribute->next = topic->latestMessage->next; |
|
129 |
topic->latestMessage->next = messages; |
|
134 |
if (messages) { |
|
135 |
urtContributeMessages(messages); |
|
130 | 136 |
} |
131 | 137 |
|
132 | 138 |
subscriber->base.lastMessage = topic->latestMessage; |
... | ... | |
154 | 160 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
155 | 161 |
* Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
156 | 162 |
*/ |
157 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) |
|
163 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
|
|
158 | 164 |
{ |
159 | 165 |
urtDebugAssert(subscriber); |
160 | 166 |
|
161 |
if (!subscriber->base.topic) |
|
162 |
return URT_STATUS_FETCH_NOTOPIC; |
|
167 |
if (!subscriber->base.topic) { |
|
168 |
return URT_STATUS_FETCH_NOTOPIC; |
|
169 |
} |
|
163 | 170 |
|
164 | 171 |
urtMutexLock(subscriber->base.topic->lock); |
165 | 172 |
|
166 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
167 |
if(messageTemp->originTime == subscriber->base.lastMessageTime) |
|
168 |
{ |
|
169 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
|
170 |
{ |
|
173 |
urt_message_t* oldestMessage = subscriber->base.lastMessage; |
|
174 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) { |
|
175 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) { |
|
171 | 176 |
urtMutexUnlock(subscriber->base.topic->lock); |
172 | 177 |
return URT_STATUS_FETCH_NOMESSAGE; |
173 | 178 |
} |
174 |
messageTemp = messageTemp->next;
|
|
179 |
oldestMessage = oldestMessage->next;
|
|
175 | 180 |
} |
176 |
else |
|
177 |
{ |
|
178 |
messageTemp = urtFindOldestMessage(messageTemp->next); |
|
181 |
else { |
|
182 |
oldestMessage = urtFindOldestMessage(oldestMessage->next); |
|
179 | 183 |
} |
180 | 184 |
|
181 |
urtFetchMessage(); |
|
185 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
|
186 |
|
|
187 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
188 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
|
182 | 189 |
|
183 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
184 | 190 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
185 | 191 |
subscriber->base.sumLatencies += calculatedLatency; |
186 | 192 |
|
187 |
if (calculatedLatency < subscriber->minLatency) |
|
188 |
{ |
|
193 |
if (calculatedLatency < subscriber->minLatency) { |
|
189 | 194 |
subscriber->minLatency = calculatedLatency; |
190 | 195 |
} |
191 |
else if (calculatedLatency > subscriber->maxLatency) |
|
192 |
{ |
|
196 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
193 | 197 |
subscriber->maxLatency = calculatedLatency; |
194 | 198 |
} |
195 | 199 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
196 |
bool temp = false; |
|
197 |
if (temp/*optional latency output argument given*/)
|
|
198 |
{
|
|
199 |
latency = calculatedLatency;
|
|
200 |
|
|
201 |
if (latency) {
|
|
202 |
latency = calculatedLatency;
|
|
203 |
}
|
|
200 | 204 |
} |
201 | 205 |
|
202 | 206 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
220 | 224 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
221 | 225 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
222 | 226 |
*/ |
223 |
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) { |
|
227 |
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
|
|
224 | 228 |
urtDebugAssert(subscriber); |
225 | 229 |
|
226 | 230 |
if (!subscriber->base.topic) |
... | ... | |
235 | 239 |
return URT_STATUS_FETCH_NOMESSAGE; |
236 | 240 |
} |
237 | 241 |
|
238 |
urtFetchMessage(); |
|
242 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
243 |
|
|
244 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
245 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime; |
|
239 | 246 |
|
240 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
241 | 247 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
242 | 248 |
subscriber->base.sumLatencies += calculatedLatency; |
243 | 249 |
|
244 |
if (calculatedLatency < subscriber->minLatency) |
|
245 |
{ |
|
250 |
if (calculatedLatency < subscriber->minLatency) { |
|
246 | 251 |
subscriber->minLatency = calculatedLatency; |
247 | 252 |
} |
248 |
else if (calculatedLatency > subscriber->maxLatency) |
|
249 |
{ |
|
253 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
250 | 254 |
subscriber->maxLatency = calculatedLatency; |
251 | 255 |
} |
252 | 256 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
253 |
bool temp = false; |
|
254 |
if (temp/*optional latency output argument given*/)
|
|
255 |
{
|
|
256 |
latency = calculatedLatency;
|
|
257 |
|
|
258 |
if (latency) {
|
|
259 |
latency = calculatedLatency;
|
|
260 |
}
|
|
257 | 261 |
} |
258 | 262 |
|
259 | 263 |
urtMutexUnlock(subscriber->base.topic->lock); |
... | ... | |
354 | 358 |
urtMutexLock(topic->lock); |
355 | 359 |
if (messages) |
356 | 360 |
{ |
357 |
urt_message_t* lastMessageContribute = messages; |
|
358 |
while (lastMessageContribute->next) |
|
359 |
{ |
|
360 |
lastMessageContribute = lastMessageContribute->next; |
|
361 |
} |
|
362 |
lastMessageContribute->next = topic->latestMessage->next; |
|
363 |
topic->latestMessage->next = messages; |
|
361 |
urtContributeMessages(messages); |
|
364 | 362 |
} |
365 | 363 |
|
366 | 364 |
subscriber->base.lastMessage = topic->latestMessage; |
... | ... | |
388 | 386 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
389 | 387 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
390 | 388 |
*/ |
391 |
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload, |
|
389 |
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
|
|
392 | 390 |
size_t bytes, urt_delay_t* latency) |
393 | 391 |
{ |
394 | 392 |
urtDebugAssert(subscriber); |
... | ... | |
398 | 396 |
|
399 | 397 |
urtMutexLock(subscriber->base.topic->lock); |
400 | 398 |
|
401 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
402 |
if(messageTemp->originTime == subscriber->base.lastMessageTime)
|
|
399 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
|
400 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
|
403 | 401 |
{ |
404 | 402 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
405 | 403 |
{ |
406 | 404 |
urtMutexUnlock(subscriber->base.topic->lock); |
407 | 405 |
return URT_STATUS_FETCH_NOMESSAGE; |
408 | 406 |
} |
409 |
messageTemp = messageTemp->next;
|
|
407 |
oldestMessage = oldestMessage->next;
|
|
410 | 408 |
} |
411 | 409 |
else |
412 | 410 |
{ |
413 |
messageTemp = urtFindOldestMessage(messageTemp->next);
|
|
411 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
|
414 | 412 |
} |
415 | 413 |
|
416 |
urtFetchMessage(); |
|
414 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
|
415 |
|
|
416 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
417 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
|
417 | 418 |
|
418 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
419 | 419 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
420 | 420 |
subscriber->base.sumLatencies += calculatedLatency; |
421 | 421 |
|
422 |
if (calculatedLatency < subscriber->minLatency) |
|
423 |
{ |
|
422 |
if (calculatedLatency < subscriber->minLatency) { |
|
424 | 423 |
subscriber->minLatency = calculatedLatency; |
425 | 424 |
} |
426 |
else if (calculatedLatency > subscriber->maxLatency) |
|
427 |
{ |
|
425 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
428 | 426 |
subscriber->maxLatency = calculatedLatency; |
429 | 427 |
} |
430 | 428 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
431 |
bool temp = false; |
|
432 |
if (temp/*optional latency output argument given*/)
|
|
433 |
{
|
|
434 |
latency = calculatedLatency;
|
|
429 |
|
|
430 |
if (latency) {
|
|
431 |
latency = calculatedLatency;
|
|
432 |
}
|
|
435 | 433 |
} |
436 | 434 |
|
437 | 435 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
455 | 453 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
456 | 454 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
457 | 455 |
*/ |
458 |
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload, |
|
456 |
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
|
|
459 | 457 |
size_t bytes, urt_delay_t* latency) |
460 | 458 |
{ |
461 | 459 |
urtDebugAssert(subscriber); |
462 | 460 |
|
463 |
if (!subscriber->base.topic) |
|
464 |
return URT_STATUS_FETCH_NOTOPIC; |
|
461 |
if (!subscriber->base.topic) { |
|
462 |
return URT_STATUS_FETCH_NOTOPIC; |
|
463 |
} |
|
465 | 464 |
|
466 | 465 |
urtMutexLock(subscriber->base.topic->lock); |
467 | 466 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
... | ... | |
472 | 471 |
return URT_STATUS_FETCH_NOMESSAGE; |
473 | 472 |
} |
474 | 473 |
|
475 |
urtFetchMessage(); |
|
474 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
475 |
|
|
476 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
477 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime; |
|
476 | 478 |
|
477 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
478 | 479 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
479 | 480 |
subscriber->base.sumLatencies += calculatedLatency; |
480 | 481 |
|
481 |
if (calculatedLatency < subscriber->minLatency) |
|
482 |
{ |
|
482 |
if (calculatedLatency < subscriber->minLatency) { |
|
483 | 483 |
subscriber->minLatency = calculatedLatency; |
484 | 484 |
} |
485 |
else if (calculatedLatency > subscriber->maxLatency) |
|
486 |
{ |
|
485 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
487 | 486 |
subscriber->maxLatency = calculatedLatency; |
488 | 487 |
} |
489 | 488 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
490 |
bool temp = false; |
|
491 |
if (temp/*optional latency output argument given*/)
|
|
492 |
{
|
|
493 |
latency = calculatedLatency;
|
|
489 |
|
|
490 |
if (latency) {
|
|
491 |
latency = calculatedLatency;
|
|
492 |
}
|
|
494 | 493 |
} |
495 | 494 |
|
496 | 495 |
urtMutexUnlock(subscriber->base.topic->lock); |
... | ... | |
625 | 624 |
urtMutexLock(topic->lock); |
626 | 625 |
if (messages) |
627 | 626 |
{ |
628 |
urt_message_t* lastMessageContribute = messages; |
|
629 |
while (lastMessageContribute->next) |
|
630 |
{ |
|
631 |
lastMessageContribute = lastMessageContribute->next; |
|
632 |
} |
|
633 |
lastMessageContribute->next = topic->latestMessage->next; |
|
634 |
topic->latestMessage->next = messages; |
|
627 |
urtContributeMessages(messages); |
|
635 | 628 |
} |
636 | 629 |
|
637 | 630 |
subscriber->base.lastMessage = topic->latestMessage; |
... | ... | |
659 | 652 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
660 | 653 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
661 | 654 |
*/ |
662 |
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload, |
|
655 |
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
|
|
663 | 656 |
size_t bytes, urt_delay_t* latency) |
664 | 657 |
{ |
665 | 658 |
urtDebugAssert(subscriber); |
... | ... | |
669 | 662 |
|
670 | 663 |
urtMutexLock(subscriber->base.topic->lock); |
671 | 664 |
|
672 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
673 |
if(messageTemp->originTime == subscriber->base.lastMessageTime)
|
|
665 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
|
666 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
|
674 | 667 |
{ |
675 | 668 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
676 | 669 |
{ |
677 | 670 |
urtMutexUnlock(subscriber->base.topic->lock); |
678 | 671 |
return URT_STATUS_FETCH_NOMESSAGE; |
679 | 672 |
} |
680 |
messageTemp = messageTemp->next;
|
|
673 |
oldestMessage = oldestMessage->next;
|
|
681 | 674 |
} |
682 | 675 |
else |
683 | 676 |
{ |
684 |
messageTemp = urtFindOldestMessage(messageTemp->next);
|
|
677 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
|
685 | 678 |
} |
686 | 679 |
|
687 |
urtFetchMessage(); |
|
680 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
688 | 681 |
|
689 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
690 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
682 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) { |
|
683 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
|
684 |
|
|
685 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
|
691 | 686 |
subscriber->base.sumLatencies += calculatedLatency; |
687 |
|
|
688 |
if (calculatedLatency < subscriber->minLatency) { |
|
689 |
subscriber->minLatency = calculatedLatency; |
|
690 |
} |
|
691 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
692 |
subscriber->maxLatency = calculatedLatency; |
|
693 |
} |
|
692 | 694 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
693 | 695 |
|
694 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist |
|
695 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter) |
|
696 |
{ |
|
696 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
|
697 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) { |
|
697 | 698 |
subscriber->minLatency = calculatedLatency; |
698 | 699 |
} |
699 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter) |
|
700 |
{ |
|
700 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
|
701 | 701 |
subscriber->maxLatency = calculatedLatency; |
702 | 702 |
} |
703 |
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
703 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
|
704 |
urtMutexUnlock(subscriber->base.topic); |
|
705 |
return URT_STATUS_JITTERVIOLATION; |
|
706 |
} |
|
707 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
704 | 708 |
|
705 |
bool temp = false; |
|
706 |
if (temp/*optional latency output argument given*/) |
|
707 |
{ |
|
708 |
latency = calculatedLatency; |
|
709 |
if (latency) { |
|
710 |
latency = calculatedLatency; |
|
711 |
} |
|
709 | 712 |
} |
710 | 713 |
|
711 | 714 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
714 | 717 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
715 | 718 |
|
716 | 719 |
urtMutexUnlock(subscriber->base.topic->lock); |
717 |
return URT_STATUS_OK; //TODO: or urt_status_jitterviolation
|
|
720 |
return URT_STATUS_OK; |
|
718 | 721 |
} |
719 | 722 |
|
720 | 723 |
/** |
... | ... | |
729 | 732 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
730 | 733 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
731 | 734 |
*/ |
732 |
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload, |
|
735 |
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
|
|
733 | 736 |
size_t bytes, urt_delay_t* latency) |
734 | 737 |
{ |
735 | 738 |
urtDebugAssert(subscriber); |
... | ... | |
746 | 749 |
return URT_STATUS_FETCH_NOMESSAGE; |
747 | 750 |
} |
748 | 751 |
|
749 |
urtFetchMessage(); |
|
752 |
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
750 | 753 |
|
751 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
752 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
754 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) { |
|
755 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
|
756 |
|
|
757 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
|
753 | 758 |
subscriber->base.sumLatencies += calculatedLatency; |
759 |
|
|
760 |
if (calculatedLatency < subscriber->minLatency) { |
|
761 |
subscriber->minLatency = calculatedLatency; |
|
762 |
} |
|
763 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
764 |
subscriber->maxLatency = calculatedLatency; |
|
765 |
} |
|
754 | 766 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
755 | 767 |
|
756 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist |
|
757 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter) |
|
758 |
{ |
|
768 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
|
769 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) { |
|
759 | 770 |
subscriber->minLatency = calculatedLatency; |
760 | 771 |
} |
761 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter) |
|
762 |
{ |
|
772 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
|
763 | 773 |
subscriber->maxLatency = calculatedLatency; |
764 | 774 |
} |
765 |
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
775 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
|
776 |
urtMutexUnlock(subscriber->base.topic); |
|
777 |
return URT_STATUS_JITTERVIOLATION; |
|
778 |
} |
|
779 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
766 | 780 |
|
767 |
bool temp = false; |
|
768 |
if (temp/*optional latency output argument given*/) |
|
769 |
{ |
|
770 |
latency = calculatedLatency; |
|
781 |
if (latency) { |
|
782 |
latency = calculatedLatency; |
|
783 |
} |
|
771 | 784 |
} |
772 | 785 |
|
773 | 786 |
urtMutexUnlock(subscriber->base.topic->lock); |
... | ... | |
812 | 825 |
{ |
813 | 826 |
urtDebugAssert(subscriber); |
814 | 827 |
|
815 |
if (subscriber->base.topic) |
|
816 |
{ |
|
817 |
urtMutexLock(topic->lock); |
|
818 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
|
819 |
//TODO: decrement topic's HRT counter |
|
828 |
if (!subscriber->base.topic) { |
|
829 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
830 |
} |
|
831 |
|
|
832 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
833 |
urtMutexLock(topic->lock); |
|
834 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
835 |
|
|
836 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
|
837 |
|
|
820 | 838 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
821 |
subscriber->base.topic->numSubscribers--;
|
|
839 |
subscriber->base.topic->numSubscribers--; |
|
822 | 840 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
823 |
//Hier weiter |
|
824 | 841 |
|
825 | 842 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
826 |
urtMutexUnlock(topic->lock);
|
|
843 |
urtMutexUnlock(topic->lock); |
|
827 | 844 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
828 |
subscriber->base.topic = NULL; |
|
829 |
subscriber->base.lastMessage = NULL; |
|
830 |
subscriber->base.lastMessageTime = 0; |
|
831 |
return URT_STATUS_OK; |
|
832 |
} |
|
833 |
|
|
834 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
845 |
subscriber->base.topic = NULL; |
|
846 |
subscriber->base.lastMessage = NULL; |
|
847 |
subscriber->base.lastMessageTime = 0; |
|
848 |
return URT_STATUS_OK; |
|
835 | 849 |
} |
836 | 850 |
|
837 | 851 |
|
... | ... | |
928 | 942 |
urtMutexLock(topic->lock); |
929 | 943 |
if (messages) |
930 | 944 |
{ |
931 |
urt_message_t* lastMessageContribute = messages; |
|
932 |
while (lastMessageContribute->next) |
|
933 |
{ |
|
934 |
lastMessageContribute = lastMessageContribute->next; |
|
935 |
} |
|
936 |
lastMessageContribute->next = topic->latestMessage->next; |
|
937 |
topic->latestMessage->next = messages; |
|
945 |
urtContributeMessages(messages); |
|
938 | 946 |
} |
939 | 947 |
|
940 | 948 |
subscriber->base.lastMessage = topic->latestMessage; |
... | ... | |
968 | 976 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
969 | 977 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
970 | 978 |
*/ |
971 |
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload, |
|
979 |
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
|
|
972 | 980 |
size_t bytes, urt_delay_t* latency) |
973 | 981 |
{ |
974 | 982 |
urtDebugAssert(subscriber); |
975 | 983 |
|
976 |
if (!subscriber->base.topic) |
|
977 |
return URT_STATUS_FETCH_NOTOPIC; |
|
984 |
if (!subscriber->base.topic) { |
|
985 |
return URT_STATUS_FETCH_NOTOPIC; |
|
986 |
} |
|
978 | 987 |
|
979 | 988 |
urtMutexLock(subscriber->base.topic->lock); |
980 | 989 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
... | ... | |
985 | 994 |
} |
986 | 995 |
messageTemp = messageTemp->next; |
987 | 996 |
|
988 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
989 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
997 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
998 |
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime; |
|
999 |
|
|
1000 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
990 | 1001 |
subscriber->base.sumLatencies += calculatedLatency; |
1002 |
|
|
1003 |
if (calculatedLatency < subscriber->minLatency) { |
|
1004 |
subscriber->minLatency = calculatedLatency; |
|
1005 |
} |
|
1006 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
1007 |
subscriber->maxLatency = calculatedLatency; |
|
1008 |
} |
|
991 | 1009 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
992 |
bool temp = false; |
|
993 |
if (temp /* optional latency output argument given */)
|
|
994 |
{
|
|
995 |
latency = calculatedLatency
|
|
1010 |
|
|
1011 |
if (latency) {
|
|
1012 |
latency = calculatedLatency;
|
|
1013 |
}
|
|
996 | 1014 |
} |
997 | 1015 |
|
998 | 1016 |
subscriber->base.lastMessage->numHrtConsumersLeft--; |
... | ... | |
1006 | 1024 |
subscriber->base->numMessagesReceived++; |
1007 | 1025 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
1008 | 1026 |
|
1009 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
|
1010 |
if (temp /*latency is within allowed jitter range*/) |
|
1011 |
{ |
|
1012 |
if (calculatedLatency < subscriber->minLatency) |
|
1013 |
{ |
|
1014 |
subscriber->minLatency = calculatedLatency; |
|
1015 |
} |
|
1016 |
else if (calculatedLatency > subscriber->maxLatency) |
|
1017 |
{ |
|
1018 |
subscriber->maxLatency = calculatedLatency; |
|
1019 |
} |
|
1020 |
} |
|
1021 |
else |
|
1022 |
{ |
|
1023 | 1027 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
1024 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
1028 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) { |
|
1029 |
subscriber->minLatency = calculatedLatency; |
|
1030 |
} |
|
1031 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
|
1032 |
subscriber->maxLatency = calculatedLatency; |
|
1033 |
} |
|
1034 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
|
1035 |
urtMutexUnlock(subscriber->base.topic); |
|
1025 | 1036 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
1026 | 1037 |
return URT_STATUS_JITTERVIOLATION; |
1038 |
} |
|
1027 | 1039 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
1040 |
|
|
1041 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
|
1042 |
if (calculatedLatency < subscriber->minLatency) { |
|
1043 |
subscriber->minLatency = calculatedLatency; |
|
1028 | 1044 |
} |
1029 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
|
1045 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
1046 |
subscriber->maxLatency = calculatedLatency; |
|
1047 |
} |
|
1048 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
1030 | 1049 |
|
1031 |
urtFetchMessage(); |
|
1050 |
urtFetchMessage(messageTemp, subscriber, payload, bytes);
|
|
1032 | 1051 |
|
1033 | 1052 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
1034 | 1053 |
if (messageTemp->next->originTime < messageTemp->originTime) |
1035 | 1054 |
{ |
1036 |
//TODO: update qos deadliner timer wrt. next message |
|
1055 |
//TODO: first reset?! (when ... set) |
|
1056 |
urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL); |
|
1037 | 1057 |
} |
1038 | 1058 |
else |
1039 | 1059 |
{ |
1040 |
//TODO: reset qos deadline timer
|
|
1060 |
urtTimerReset(subscriber->qosDeadlineTimer);
|
|
1041 | 1061 |
} |
1042 | 1062 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
1043 | 1063 |
|
... | ... | |
1058 | 1078 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic. |
1059 | 1079 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
1060 | 1080 |
*/ |
1061 |
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload, |
|
1062 |
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;} |
|
1081 |
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload, |
|
1082 |
size_t bytes, urt_delay_t* latency) |
|
1083 |
{ |
|
1084 |
if (!subscriber->base.topic) { |
|
1085 |
return URT_STATUS_FETCH_NOTOPIC; |
|
1086 |
} |
|
1087 |
|
|
1088 |
urtMutexLock(subscriber->base.topic->lock); |
|
1089 |
urt_message_t* lastMessage = subscriber->base.lastMessage; |
|
1090 |
bool hrtZero = false; |
|
1091 |
while (lastMessage->next->originTime < lastMessage->originTime) { |
|
1092 |
lastMessage = lastMessage->next; |
|
1093 |
lastMessage->numHrtConsumersLeft--; |
|
1094 |
if (lastMessage->numHrtConsumersLeft == 0) { |
|
1095 |
hertZero = true; |
|
1096 |
} |
|
1097 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
1098 |
lastMessage->numConsumersLeft--; |
|
1099 |
subscriber->base.numMessagesReceived++; |
|
1100 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
1101 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
|
1102 |
urtTimerReset(subscriber->qosDeadlineTimer); |
|
1103 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
|
1104 |
} |
|
1105 |
|
|
1106 |
if (hrtZero) { |
|
1107 |
urtCondvarSignal(subscriber->base.topic->hrtReleased); |
|
1108 |
} |
|
1109 |
|
|
1110 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) { |
|
1111 |
urtMutexUnlock(subscriber->base.topic); |
|
1112 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
1113 |
} |
|
1114 |
|
|
1115 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
1116 |
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime; |
|
1117 |
|
|
1118 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
1119 |
subscriber->base.sumLatencies += calculatedLatency; |
|
1120 |
|
|
1121 |
if (calculatedLatency < subscriber->minLatency) { |
|
1122 |
subscriber->minLatency = calculatedLatency; |
|
1123 |
} |
|
1124 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
1125 |
subscriber->maxLatency = calculatedLatency; |
|
1126 |
} |
|
1127 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
1128 |
|
|
1129 |
if (latency) { |
|
1130 |
latency = calculatedLatency; |
|
1131 |
} |
|
1132 |
} |
|
1133 |
|
|
1134 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
|
1135 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) { |
|
1136 |
subscriber->minLatency = calculatedLatency; |
|
1137 |
} |
|
1138 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
|
1139 |
subscriber->maxLatency = calculatedLatency; |
|
1140 |
} |
|
1141 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
|
1142 |
urtMutexUnlock(subscriber->base.topic); |
|
1143 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
|
1144 |
return URT_STATUS_JITTERVIOLATION; |
|
1145 |
} |
|
1146 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
1147 |
|
|
1148 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
|
1149 |
if (calculatedLatency < subscriber->minLatency) { |
|
1150 |
subscriber->minLatency = calculatedLatency; |
|
1151 |
} |
|
1152 |
else if (calculatedLatency > subscriber->maxLatency) { |
|
1153 |
subscriber->maxLatency = calculatedLatency; |
|
1154 |
} |
|
1155 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
1156 |
|
|
1157 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
1158 |
|
|
1159 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
|
1160 |
urtTimerReset(subscriber->qosDeadlineTimer); |
|
1161 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
|
1162 |
} |
|
1063 | 1163 |
|
1064 | 1164 |
/** |
1065 | 1165 |
* @brief Unsubscribes from a subscriber. |
Also available in: Unified diff