Revision 65dc89cb
src/urt_subscriber.c | ||
---|---|---|
41 | 41 |
/* LOCAL FUNCTIONS */ |
42 | 42 |
/******************************************************************************/ |
43 | 43 |
|
44 |
void urtFetchMessage () |
|
45 |
{ |
|
46 |
//TODO: Update message pointer |
|
47 |
//TODO: Copy message origin time |
|
48 |
//TODO: Copy message payload |
|
49 |
} |
|
50 |
|
|
51 |
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage) |
|
52 |
{ |
|
53 |
while (oldestMessage->next->originTime < oldestMessage->originTime) |
|
54 |
{ |
|
55 |
oldestMessage = oldestMessage->next; |
|
56 |
} |
|
57 |
return oldestMessage; |
|
58 |
} |
|
59 |
|
|
60 |
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage) |
|
61 |
{ |
|
62 |
urt_message_t* lastMessage = subscriber->base.lastMessage; |
|
63 |
while (lastMessage->next->originTime < lastMessage->originTime) |
|
64 |
{ |
|
65 |
lastMessage = lastMessage->next; |
|
66 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
67 |
subscriber->base.lastMessage->numConsumersLeft--; |
|
68 |
subscriber->base->numMessagesReceived++; |
|
69 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
70 |
} |
|
71 |
} |
|
72 |
|
|
44 | 73 |
/******************************************************************************/ |
45 | 74 |
/* EXPORTED FUNCTIONS */ |
46 | 75 |
/******************************************************************************/ |
... | ... | |
126 | 155 |
* Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
127 | 156 |
*/ |
128 | 157 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) |
129 |
{ |
|
158 |
{
|
|
130 | 159 |
urtDebugAssert(subscriber); |
160 |
|
|
161 |
if (!subscriber->base.topic) |
|
162 |
return URT_STATUS_FETCH_NOTOPIC; |
|
163 |
|
|
164 |
urtMutexLock(subscriber->base.topic->lock); |
|
165 |
|
|
166 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
167 |
if(messageTemp->originTime == subscriber->base.lastMessageTime) |
|
168 |
{ |
|
169 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
|
170 |
{ |
|
171 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
172 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
173 |
} |
|
174 |
messageTemp = messageTemp->next; |
|
175 |
} |
|
176 |
else |
|
177 |
{ |
|
178 |
messageTemp = urtFindOldestMessage(messageTemp->next); |
|
179 |
} |
|
180 |
|
|
181 |
urtFetchMessage(); |
|
182 |
|
|
183 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
184 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
185 |
subscriber->base.sumLatencies += calculatedLatency; |
|
186 |
|
|
187 |
if (calculatedLatency < subscriber->minLatency) |
|
188 |
{ |
|
189 |
subscriber->minLatency = calculatedLatency; |
|
190 |
} |
|
191 |
else if (calculatedLatency > subscriber->maxLatency) |
|
192 |
{ |
|
193 |
subscriber->maxLatency = calculatedLatency; |
|
194 |
} |
|
195 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
196 |
bool temp = false; |
|
197 |
if (temp/*optional latency output argument given*/) |
|
198 |
{ |
|
199 |
latency = calculatedLatency; |
|
200 |
} |
|
201 |
|
|
202 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
203 |
subscriber->base.lastMessage->numConsumersLeft--; |
|
204 |
subscriber->base->numMessagesReceived++; |
|
205 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
206 |
|
|
207 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
131 | 208 |
return URT_STATUS_OK; |
132 | 209 |
} |
133 | 210 |
|
... | ... | |
144 | 221 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
145 | 222 |
*/ |
146 | 223 |
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) { |
224 |
urtDebugAssert(subscriber); |
|
225 |
|
|
226 |
if (!subscriber->base.topic) |
|
227 |
return URT_STATUS_FETCH_NOTOPIC; |
|
228 |
|
|
229 |
urtMutexLock(subscriber->base.topic->lock); |
|
230 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
|
231 |
|
|
232 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) |
|
233 |
{ |
|
234 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
235 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
236 |
} |
|
237 |
|
|
238 |
urtFetchMessage(); |
|
239 |
|
|
240 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
241 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
242 |
subscriber->base.sumLatencies += calculatedLatency; |
|
243 |
|
|
244 |
if (calculatedLatency < subscriber->minLatency) |
|
245 |
{ |
|
246 |
subscriber->minLatency = calculatedLatency; |
|
247 |
} |
|
248 |
else if (calculatedLatency > subscriber->maxLatency) |
|
249 |
{ |
|
250 |
subscriber->maxLatency = calculatedLatency; |
|
251 |
} |
|
252 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
253 |
bool temp = false; |
|
254 |
if (temp/*optional latency output argument given*/) |
|
255 |
{ |
|
256 |
latency = calculatedLatency; |
|
257 |
} |
|
258 |
|
|
259 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
147 | 260 |
return URT_STATUS_OK; |
148 | 261 |
} |
149 | 262 |
|
... | ... | |
276 | 389 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
277 | 390 |
*/ |
278 | 391 |
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload, |
279 |
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;} |
|
392 |
size_t bytes, urt_delay_t* latency) |
|
393 |
{ |
|
394 |
urtDebugAssert(subscriber); |
|
395 |
|
|
396 |
if (!subscriber->base.topic) |
|
397 |
return URT_STATUS_FETCH_NOTOPIC; |
|
398 |
|
|
399 |
urtMutexLock(subscriber->base.topic->lock); |
|
400 |
|
|
401 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
402 |
if(messageTemp->originTime == subscriber->base.lastMessageTime) |
|
403 |
{ |
|
404 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
|
405 |
{ |
|
406 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
407 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
408 |
} |
|
409 |
messageTemp = messageTemp->next; |
|
410 |
} |
|
411 |
else |
|
412 |
{ |
|
413 |
messageTemp = urtFindOldestMessage(messageTemp->next); |
|
414 |
} |
|
415 |
|
|
416 |
urtFetchMessage(); |
|
417 |
|
|
418 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
419 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
420 |
subscriber->base.sumLatencies += calculatedLatency; |
|
421 |
|
|
422 |
if (calculatedLatency < subscriber->minLatency) |
|
423 |
{ |
|
424 |
subscriber->minLatency = calculatedLatency; |
|
425 |
} |
|
426 |
else if (calculatedLatency > subscriber->maxLatency) |
|
427 |
{ |
|
428 |
subscriber->maxLatency = calculatedLatency; |
|
429 |
} |
|
430 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
431 |
bool temp = false; |
|
432 |
if (temp/*optional latency output argument given*/) |
|
433 |
{ |
|
434 |
latency = calculatedLatency; |
|
435 |
} |
|
436 |
|
|
437 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
438 |
subscriber->base.lastMessage->numConsumersLeft--; |
|
439 |
subscriber->base->numMessagesReceived++; |
|
440 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
441 |
|
|
442 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
443 |
return URT_STATUS_OK; |
|
444 |
} |
|
280 | 445 |
|
281 | 446 |
/** |
282 | 447 |
* @brief Fetches the latest message. |
... | ... | |
291 | 456 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
292 | 457 |
*/ |
293 | 458 |
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload, |
294 |
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;} |
|
459 |
size_t bytes, urt_delay_t* latency) |
|
460 |
{ |
|
461 |
urtDebugAssert(subscriber); |
|
462 |
|
|
463 |
if (!subscriber->base.topic) |
|
464 |
return URT_STATUS_FETCH_NOTOPIC; |
|
465 |
|
|
466 |
urtMutexLock(subscriber->base.topic->lock); |
|
467 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
|
468 |
|
|
469 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) |
|
470 |
{ |
|
471 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
472 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
473 |
} |
|
474 |
|
|
475 |
urtFetchMessage(); |
|
476 |
|
|
477 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
478 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
479 |
subscriber->base.sumLatencies += calculatedLatency; |
|
480 |
|
|
481 |
if (calculatedLatency < subscriber->minLatency) |
|
482 |
{ |
|
483 |
subscriber->minLatency = calculatedLatency; |
|
484 |
} |
|
485 |
else if (calculatedLatency > subscriber->maxLatency) |
|
486 |
{ |
|
487 |
subscriber->maxLatency = calculatedLatency; |
|
488 |
} |
|
489 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
490 |
bool temp = false; |
|
491 |
if (temp/*optional latency output argument given*/) |
|
492 |
{ |
|
493 |
latency = calculatedLatency; |
|
494 |
} |
|
495 |
|
|
496 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
497 |
return URT_STATUS_OK; |
|
498 |
} |
|
295 | 499 |
|
296 | 500 |
/** |
297 | 501 |
* @brief Calculates the usefulness of the subscriber. |
... | ... | |
456 | 660 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
457 | 661 |
*/ |
458 | 662 |
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload, |
459 |
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;} |
|
663 |
size_t bytes, urt_delay_t* latency) |
|
664 |
{ |
|
665 |
urtDebugAssert(subscriber); |
|
666 |
|
|
667 |
if (!subscriber->base.topic) |
|
668 |
return URT_STATUS_FETCH_NOTOPIC; |
|
669 |
|
|
670 |
urtMutexLock(subscriber->base.topic->lock); |
|
671 |
|
|
672 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
673 |
if(messageTemp->originTime == subscriber->base.lastMessageTime) |
|
674 |
{ |
|
675 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
|
676 |
{ |
|
677 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
678 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
679 |
} |
|
680 |
messageTemp = messageTemp->next; |
|
681 |
} |
|
682 |
else |
|
683 |
{ |
|
684 |
messageTemp = urtFindOldestMessage(messageTemp->next); |
|
685 |
} |
|
686 |
|
|
687 |
urtFetchMessage(); |
|
688 |
|
|
689 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
690 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
691 |
subscriber->base.sumLatencies += calculatedLatency; |
|
692 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
693 |
|
|
694 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist |
|
695 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter) |
|
696 |
{ |
|
697 |
subscriber->minLatency = calculatedLatency; |
|
698 |
} |
|
699 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter) |
|
700 |
{ |
|
701 |
subscriber->maxLatency = calculatedLatency; |
|
702 |
} |
|
703 |
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
704 |
|
|
705 |
bool temp = false; |
|
706 |
if (temp/*optional latency output argument given*/) |
|
707 |
{ |
|
708 |
latency = calculatedLatency; |
|
709 |
} |
|
710 |
|
|
711 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
712 |
subscriber->base.lastMessage->numConsumersLeft--; |
|
713 |
subscriber->base->numMessagesReceived++; |
|
714 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
715 |
|
|
716 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
717 |
return URT_STATUS_OK; //TODO: or urt_status_jitterviolation |
|
718 |
} |
|
460 | 719 |
|
461 | 720 |
/** |
462 | 721 |
* @brief Fetches the latest message. |
... | ... | |
471 | 730 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
472 | 731 |
*/ |
473 | 732 |
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload, |
474 |
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;} |
|
733 |
size_t bytes, urt_delay_t* latency) |
|
734 |
{ |
|
735 |
urtDebugAssert(subscriber); |
|
736 |
|
|
737 |
if (!subscriber->base.topic) |
|
738 |
return URT_STATUS_FETCH_NOTOPIC; |
|
739 |
|
|
740 |
urtMutexLock(subscriber->base.topic->lock); |
|
741 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
|
742 |
|
|
743 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) |
|
744 |
{ |
|
745 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
746 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
747 |
} |
|
748 |
|
|
749 |
urtFetchMessage(); |
|
750 |
|
|
751 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
752 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
|
753 |
subscriber->base.sumLatencies += calculatedLatency; |
|
754 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
755 |
|
|
756 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist |
|
757 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter) |
|
758 |
{ |
|
759 |
subscriber->minLatency = calculatedLatency; |
|
760 |
} |
|
761 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter) |
|
762 |
{ |
|
763 |
subscriber->maxLatency = calculatedLatency; |
|
764 |
} |
|
765 |
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
766 |
|
|
767 |
bool temp = false; |
|
768 |
if (temp/*optional latency output argument given*/) |
|
769 |
{ |
|
770 |
latency = calculatedLatency; |
|
771 |
} |
|
772 |
|
|
773 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
774 |
return URT_STATUS_OK; |
|
775 |
} |
|
475 | 776 |
|
476 | 777 |
/** |
477 | 778 |
* @brief Calculates the validity from the subscriber. |
... | ... | |
668 | 969 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch. |
669 | 970 |
*/ |
670 | 971 |
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload, |
671 |
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;} |
|
972 |
size_t bytes, urt_delay_t* latency) |
|
973 |
{ |
|
974 |
urtDebugAssert(subscriber); |
|
975 |
|
|
976 |
if (!subscriber->base.topic) |
|
977 |
return URT_STATUS_FETCH_NOTOPIC; |
|
978 |
|
|
979 |
urtMutexLock(subscriber->base.topic->lock); |
|
980 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
981 |
if (messageTemp->next->originTime > messageTemp.originTime) |
|
982 |
{ |
|
983 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
984 |
return URT_STATUS_FETCH_NOMESSAGE; |
|
985 |
} |
|
986 |
messageTemp = messageTemp->next; |
|
987 |
|
|
988 |
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency |
|
989 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
990 |
subscriber->base.sumLatencies += calculatedLatency; |
|
991 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
992 |
bool temp = false; |
|
993 |
if (temp /* optional latency output argument given */) |
|
994 |
{ |
|
995 |
latency = calculatedLatency |
|
996 |
} |
|
997 |
|
|
998 |
subscriber->base.lastMessage->numHrtConsumersLeft--; |
|
999 |
if (subscriber->base.lastMessage->numHrtConsumersLeft != 0) |
|
1000 |
{ |
|
1001 |
urtCondvarSignal(subscriber->base.topic->hrtReleased); |
|
1002 |
} |
|
1003 |
|
|
1004 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
1005 |
subscriber->base.lastMessage->numConsumersLeft--; |
|
1006 |
subscriber->base->numMessagesReceived++; |
|
1007 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
1008 |
|
|
1009 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
|
1010 |
if (temp /*latency is within allowed jitter range*/) |
|
1011 |
{ |
|
1012 |
if (calculatedLatency < subscriber->minLatency) |
|
1013 |
{ |
|
1014 |
subscriber->minLatency = calculatedLatency; |
|
1015 |
} |
|
1016 |
else if (calculatedLatency > subscriber->maxLatency) |
|
1017 |
{ |
|
1018 |
subscriber->maxLatency = calculatedLatency; |
|
1019 |
} |
|
1020 |
} |
|
1021 |
else |
|
1022 |
{ |
|
1023 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
|
1024 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
1025 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
|
1026 |
return URT_STATUS_JITTERVIOLATION; |
|
1027 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
1028 |
} |
|
1029 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
|
1030 |
|
|
1031 |
urtFetchMessage(); |
|
1032 |
|
|
1033 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
|
1034 |
if (messageTemp->next->originTime < messageTemp->originTime) |
|
1035 |
{ |
|
1036 |
//TODO: update qos deadliner timer wrt. next message |
|
1037 |
} |
|
1038 |
else |
|
1039 |
{ |
|
1040 |
//TODO: reset qos deadline timer |
|
1041 |
} |
|
1042 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
|
1043 |
|
|
1044 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
1045 |
return URT_STATUS_OK; |
|
1046 |
} |
|
672 | 1047 |
|
673 | 1048 |
|
674 | 1049 |
/** |
Also available in: Unified diff