20 |
20 |
*/
|
21 |
21 |
|
22 |
22 |
#include <urtware.h>
|
|
23 |
#include <stdio.h>
|
23 |
24 |
|
24 |
25 |
/******************************************************************************/
|
25 |
26 |
/* LOCAL DEFINITIONS */
|
... | ... | |
41 |
42 |
/* LOCAL FUNCTIONS */
|
42 |
43 |
/******************************************************************************/
|
43 |
44 |
|
44 |
|
void urtFetchMessage ()
|
|
45 |
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
|
45 |
46 |
{
|
46 |
|
//TODO: Update message pointer
|
47 |
|
//TODO: Copy message origin time
|
48 |
|
//TODO: Copy message payload
|
|
47 |
subscriber->base.lastMessage = message;
|
|
48 |
*subscriber->base.lastMessageTime = message->originTime;
|
|
49 |
memcpy(message->payload, payload, bytes);
|
49 |
50 |
}
|
50 |
51 |
|
51 |
52 |
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
|
... | ... | |
70 |
71 |
}
|
71 |
72 |
}
|
72 |
73 |
|
|
74 |
void urtContributeMessages(urt_message_t* messages)
|
|
75 |
{
|
|
76 |
urt_message_t* lastMessageContribute = messages;
|
|
77 |
while (lastMessageContribute->next)
|
|
78 |
{
|
|
79 |
lastMessageContribute = lastMessageContribute->next;
|
|
80 |
}
|
|
81 |
lastMessageContribute->next = topic->latestMessage->next;
|
|
82 |
topic->latestMessage->next = messages;
|
|
83 |
}
|
|
84 |
|
73 |
85 |
/******************************************************************************/
|
74 |
86 |
/* EXPORTED FUNCTIONS */
|
75 |
87 |
/******************************************************************************/
|
... | ... | |
112 |
124 |
urtDebugAssert(subscriber);
|
113 |
125 |
urtDebugAssert(topic);
|
114 |
126 |
|
115 |
|
if (!subscriber->base.topic)
|
116 |
|
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
|
127 |
if (!subscriber->base.topic) {
|
|
128 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
|
129 |
}
|
117 |
130 |
|
118 |
131 |
subscriber->base.topic = topic;
|
119 |
132 |
urtMutexLock(topic->lock);
|
120 |
133 |
|
121 |
|
if (messages)
|
122 |
|
{
|
123 |
|
urt_message_t* lastMessageContribute = messages;
|
124 |
|
while (lastMessageContribute->next)
|
125 |
|
{
|
126 |
|
lastMessageContribute = lastMessageContribute->next;
|
127 |
|
}
|
128 |
|
lastMessageContribute->next = topic->latestMessage->next;
|
129 |
|
topic->latestMessage->next = messages;
|
|
134 |
if (messages) {
|
|
135 |
urtContributeMessages(messages);
|
130 |
136 |
}
|
131 |
137 |
|
132 |
138 |
subscriber->base.lastMessage = topic->latestMessage;
|
... | ... | |
154 |
160 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
155 |
161 |
* Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
156 |
162 |
*/
|
157 |
|
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
|
|
163 |
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
|
158 |
164 |
{
|
159 |
165 |
urtDebugAssert(subscriber);
|
160 |
166 |
|
161 |
|
if (!subscriber->base.topic)
|
162 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
|
167 |
if (!subscriber->base.topic) {
|
|
168 |
return URT_STATUS_FETCH_NOTOPIC;
|
|
169 |
}
|
163 |
170 |
|
164 |
171 |
urtMutexLock(subscriber->base.topic->lock);
|
165 |
172 |
|
166 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
167 |
|
if(messageTemp->originTime == subscriber->base.lastMessageTime)
|
168 |
|
{
|
169 |
|
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
170 |
|
{
|
|
173 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
|
174 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
|
|
175 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
|
171 |
176 |
urtMutexUnlock(subscriber->base.topic->lock);
|
172 |
177 |
return URT_STATUS_FETCH_NOMESSAGE;
|
173 |
178 |
}
|
174 |
|
messageTemp = messageTemp->next;
|
|
179 |
oldestMessage = oldestMessage->next;
|
175 |
180 |
}
|
176 |
|
else
|
177 |
|
{
|
178 |
|
messageTemp = urtFindOldestMessage(messageTemp->next);
|
|
181 |
else {
|
|
182 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
179 |
183 |
}
|
180 |
184 |
|
181 |
|
urtFetchMessage();
|
|
185 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
186 |
|
|
187 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
|
188 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
182 |
189 |
|
183 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
184 |
190 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
185 |
191 |
subscriber->base.sumLatencies += calculatedLatency;
|
186 |
192 |
|
187 |
|
if (calculatedLatency < subscriber->minLatency)
|
188 |
|
{
|
|
193 |
if (calculatedLatency < subscriber->minLatency) {
|
189 |
194 |
subscriber->minLatency = calculatedLatency;
|
190 |
195 |
}
|
191 |
|
else if (calculatedLatency > subscriber->maxLatency)
|
192 |
|
{
|
|
196 |
else if (calculatedLatency > subscriber->maxLatency) {
|
193 |
197 |
subscriber->maxLatency = calculatedLatency;
|
194 |
198 |
}
|
195 |
199 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
196 |
|
bool temp = false;
|
197 |
|
if (temp/*optional latency output argument given*/)
|
198 |
|
{
|
199 |
|
latency = calculatedLatency;
|
|
200 |
|
|
201 |
if (latency) {
|
|
202 |
latency = calculatedLatency;
|
|
203 |
}
|
200 |
204 |
}
|
201 |
205 |
|
202 |
206 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
... | ... | |
220 |
224 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
221 |
225 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
222 |
226 |
*/
|
223 |
|
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
|
|
227 |
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
|
224 |
228 |
urtDebugAssert(subscriber);
|
225 |
229 |
|
226 |
230 |
if (!subscriber->base.topic)
|
... | ... | |
235 |
239 |
return URT_STATUS_FETCH_NOMESSAGE;
|
236 |
240 |
}
|
237 |
241 |
|
238 |
|
urtFetchMessage();
|
|
242 |
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
243 |
|
|
244 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
|
245 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
239 |
246 |
|
240 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
241 |
247 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
242 |
248 |
subscriber->base.sumLatencies += calculatedLatency;
|
243 |
249 |
|
244 |
|
if (calculatedLatency < subscriber->minLatency)
|
245 |
|
{
|
|
250 |
if (calculatedLatency < subscriber->minLatency) {
|
246 |
251 |
subscriber->minLatency = calculatedLatency;
|
247 |
252 |
}
|
248 |
|
else if (calculatedLatency > subscriber->maxLatency)
|
249 |
|
{
|
|
253 |
else if (calculatedLatency > subscriber->maxLatency) {
|
250 |
254 |
subscriber->maxLatency = calculatedLatency;
|
251 |
255 |
}
|
252 |
256 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
253 |
|
bool temp = false;
|
254 |
|
if (temp/*optional latency output argument given*/)
|
255 |
|
{
|
256 |
|
latency = calculatedLatency;
|
|
257 |
|
|
258 |
if (latency) {
|
|
259 |
latency = calculatedLatency;
|
|
260 |
}
|
257 |
261 |
}
|
258 |
262 |
|
259 |
263 |
urtMutexUnlock(subscriber->base.topic->lock);
|
... | ... | |
354 |
358 |
urtMutexLock(topic->lock);
|
355 |
359 |
if (messages)
|
356 |
360 |
{
|
357 |
|
urt_message_t* lastMessageContribute = messages;
|
358 |
|
while (lastMessageContribute->next)
|
359 |
|
{
|
360 |
|
lastMessageContribute = lastMessageContribute->next;
|
361 |
|
}
|
362 |
|
lastMessageContribute->next = topic->latestMessage->next;
|
363 |
|
topic->latestMessage->next = messages;
|
|
361 |
urtContributeMessages(messages);
|
364 |
362 |
}
|
365 |
363 |
|
366 |
364 |
subscriber->base.lastMessage = topic->latestMessage;
|
... | ... | |
388 |
386 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
389 |
387 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
390 |
388 |
*/
|
391 |
|
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
|
389 |
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
|
392 |
390 |
size_t bytes, urt_delay_t* latency)
|
393 |
391 |
{
|
394 |
392 |
urtDebugAssert(subscriber);
|
... | ... | |
398 |
396 |
|
399 |
397 |
urtMutexLock(subscriber->base.topic->lock);
|
400 |
398 |
|
401 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
402 |
|
if(messageTemp->originTime == subscriber->base.lastMessageTime)
|
|
399 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
|
400 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
403 |
401 |
{
|
404 |
402 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
405 |
403 |
{
|
406 |
404 |
urtMutexUnlock(subscriber->base.topic->lock);
|
407 |
405 |
return URT_STATUS_FETCH_NOMESSAGE;
|
408 |
406 |
}
|
409 |
|
messageTemp = messageTemp->next;
|
|
407 |
oldestMessage = oldestMessage->next;
|
410 |
408 |
}
|
411 |
409 |
else
|
412 |
410 |
{
|
413 |
|
messageTemp = urtFindOldestMessage(messageTemp->next);
|
|
411 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
414 |
412 |
}
|
415 |
413 |
|
416 |
|
urtFetchMessage();
|
|
414 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
415 |
|
|
416 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
|
417 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
417 |
418 |
|
418 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
419 |
419 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
420 |
420 |
subscriber->base.sumLatencies += calculatedLatency;
|
421 |
421 |
|
422 |
|
if (calculatedLatency < subscriber->minLatency)
|
423 |
|
{
|
|
422 |
if (calculatedLatency < subscriber->minLatency) {
|
424 |
423 |
subscriber->minLatency = calculatedLatency;
|
425 |
424 |
}
|
426 |
|
else if (calculatedLatency > subscriber->maxLatency)
|
427 |
|
{
|
|
425 |
else if (calculatedLatency > subscriber->maxLatency) {
|
428 |
426 |
subscriber->maxLatency = calculatedLatency;
|
429 |
427 |
}
|
430 |
428 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
431 |
|
bool temp = false;
|
432 |
|
if (temp/*optional latency output argument given*/)
|
433 |
|
{
|
434 |
|
latency = calculatedLatency;
|
|
429 |
|
|
430 |
if (latency) {
|
|
431 |
latency = calculatedLatency;
|
|
432 |
}
|
435 |
433 |
}
|
436 |
434 |
|
437 |
435 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
... | ... | |
455 |
453 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
456 |
454 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
457 |
455 |
*/
|
458 |
|
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
|
456 |
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
|
459 |
457 |
size_t bytes, urt_delay_t* latency)
|
460 |
458 |
{
|
461 |
459 |
urtDebugAssert(subscriber);
|
462 |
460 |
|
463 |
|
if (!subscriber->base.topic)
|
464 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
|
461 |
if (!subscriber->base.topic) {
|
|
462 |
return URT_STATUS_FETCH_NOTOPIC;
|
|
463 |
}
|
465 |
464 |
|
466 |
465 |
urtMutexLock(subscriber->base.topic->lock);
|
467 |
466 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
... | ... | |
472 |
471 |
return URT_STATUS_FETCH_NOMESSAGE;
|
473 |
472 |
}
|
474 |
473 |
|
475 |
|
urtFetchMessage();
|
|
474 |
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
475 |
|
|
476 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
|
477 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
476 |
478 |
|
477 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
478 |
479 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
479 |
480 |
subscriber->base.sumLatencies += calculatedLatency;
|
480 |
481 |
|
481 |
|
if (calculatedLatency < subscriber->minLatency)
|
482 |
|
{
|
|
482 |
if (calculatedLatency < subscriber->minLatency) {
|
483 |
483 |
subscriber->minLatency = calculatedLatency;
|
484 |
484 |
}
|
485 |
|
else if (calculatedLatency > subscriber->maxLatency)
|
486 |
|
{
|
|
485 |
else if (calculatedLatency > subscriber->maxLatency) {
|
487 |
486 |
subscriber->maxLatency = calculatedLatency;
|
488 |
487 |
}
|
489 |
488 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
490 |
|
bool temp = false;
|
491 |
|
if (temp/*optional latency output argument given*/)
|
492 |
|
{
|
493 |
|
latency = calculatedLatency;
|
|
489 |
|
|
490 |
if (latency) {
|
|
491 |
latency = calculatedLatency;
|
|
492 |
}
|
494 |
493 |
}
|
495 |
494 |
|
496 |
495 |
urtMutexUnlock(subscriber->base.topic->lock);
|
... | ... | |
625 |
624 |
urtMutexLock(topic->lock);
|
626 |
625 |
if (messages)
|
627 |
626 |
{
|
628 |
|
urt_message_t* lastMessageContribute = messages;
|
629 |
|
while (lastMessageContribute->next)
|
630 |
|
{
|
631 |
|
lastMessageContribute = lastMessageContribute->next;
|
632 |
|
}
|
633 |
|
lastMessageContribute->next = topic->latestMessage->next;
|
634 |
|
topic->latestMessage->next = messages;
|
|
627 |
urtContributeMessages(messages);
|
635 |
628 |
}
|
636 |
629 |
|
637 |
630 |
subscriber->base.lastMessage = topic->latestMessage;
|
... | ... | |
659 |
652 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
660 |
653 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
661 |
654 |
*/
|
662 |
|
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
|
655 |
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
|
663 |
656 |
size_t bytes, urt_delay_t* latency)
|
664 |
657 |
{
|
665 |
658 |
urtDebugAssert(subscriber);
|
... | ... | |
669 |
662 |
|
670 |
663 |
urtMutexLock(subscriber->base.topic->lock);
|
671 |
664 |
|
672 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
673 |
|
if(messageTemp->originTime == subscriber->base.lastMessageTime)
|
|
665 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
|
666 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
674 |
667 |
{
|
675 |
668 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
676 |
669 |
{
|
677 |
670 |
urtMutexUnlock(subscriber->base.topic->lock);
|
678 |
671 |
return URT_STATUS_FETCH_NOMESSAGE;
|
679 |
672 |
}
|
680 |
|
messageTemp = messageTemp->next;
|
|
673 |
oldestMessage = oldestMessage->next;
|
681 |
674 |
}
|
682 |
675 |
else
|
683 |
676 |
{
|
684 |
|
messageTemp = urtFindOldestMessage(messageTemp->next);
|
|
677 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
685 |
678 |
}
|
686 |
679 |
|
687 |
|
urtFetchMessage();
|
|
680 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
688 |
681 |
|
689 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
690 |
|
#if(URT_CFG_PUBSUB_PROFILING == true)
|
|
682 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
|
|
683 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
|
684 |
|
|
685 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
|
691 |
686 |
subscriber->base.sumLatencies += calculatedLatency;
|
|
687 |
|
|
688 |
if (calculatedLatency < subscriber->minLatency) {
|
|
689 |
subscriber->minLatency = calculatedLatency;
|
|
690 |
}
|
|
691 |
else if (calculatedLatency > subscriber->maxLatency) {
|
|
692 |
subscriber->maxLatency = calculatedLatency;
|
|
693 |
}
|
692 |
694 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
693 |
695 |
|
694 |
|
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
|
695 |
|
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
|
696 |
|
{
|
|
696 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
|
697 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
697 |
698 |
subscriber->minLatency = calculatedLatency;
|
698 |
699 |
}
|
699 |
|
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
|
700 |
|
{
|
|
700 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
|
701 |
701 |
subscriber->maxLatency = calculatedLatency;
|
702 |
702 |
}
|
703 |
|
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
703 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
|
704 |
urtMutexUnlock(subscriber->base.topic);
|
|
705 |
return URT_STATUS_JITTERVIOLATION;
|
|
706 |
}
|
|
707 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
704 |
708 |
|
705 |
|
bool temp = false;
|
706 |
|
if (temp/*optional latency output argument given*/)
|
707 |
|
{
|
708 |
|
latency = calculatedLatency;
|
|
709 |
if (latency) {
|
|
710 |
latency = calculatedLatency;
|
|
711 |
}
|
709 |
712 |
}
|
710 |
713 |
|
711 |
714 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
... | ... | |
714 |
717 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
715 |
718 |
|
716 |
719 |
urtMutexUnlock(subscriber->base.topic->lock);
|
717 |
|
return URT_STATUS_OK; //TODO: or urt_status_jitterviolation
|
|
720 |
return URT_STATUS_OK;
|
718 |
721 |
}
|
719 |
722 |
|
720 |
723 |
/**
|
... | ... | |
729 |
732 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
730 |
733 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
731 |
734 |
*/
|
732 |
|
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
|
735 |
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
|
733 |
736 |
size_t bytes, urt_delay_t* latency)
|
734 |
737 |
{
|
735 |
738 |
urtDebugAssert(subscriber);
|
... | ... | |
746 |
749 |
return URT_STATUS_FETCH_NOMESSAGE;
|
747 |
750 |
}
|
748 |
751 |
|
749 |
|
urtFetchMessage();
|
|
752 |
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
750 |
753 |
|
751 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
752 |
|
#if(URT_CFG_PUBSUB_PROFILING == true)
|
|
754 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
|
|
755 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
|
756 |
|
|
757 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
|
753 |
758 |
subscriber->base.sumLatencies += calculatedLatency;
|
|
759 |
|
|
760 |
if (calculatedLatency < subscriber->minLatency) {
|
|
761 |
subscriber->minLatency = calculatedLatency;
|
|
762 |
}
|
|
763 |
else if (calculatedLatency > subscriber->maxLatency) {
|
|
764 |
subscriber->maxLatency = calculatedLatency;
|
|
765 |
}
|
754 |
766 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
755 |
767 |
|
756 |
|
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
|
757 |
|
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
|
758 |
|
{
|
|
768 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
|
769 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
759 |
770 |
subscriber->minLatency = calculatedLatency;
|
760 |
771 |
}
|
761 |
|
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
|
762 |
|
{
|
|
772 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
|
763 |
773 |
subscriber->maxLatency = calculatedLatency;
|
764 |
774 |
}
|
765 |
|
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
775 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
|
776 |
urtMutexUnlock(subscriber->base.topic);
|
|
777 |
return URT_STATUS_JITTERVIOLATION;
|
|
778 |
}
|
|
779 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
766 |
780 |
|
767 |
|
bool temp = false;
|
768 |
|
if (temp/*optional latency output argument given*/)
|
769 |
|
{
|
770 |
|
latency = calculatedLatency;
|
|
781 |
if (latency) {
|
|
782 |
latency = calculatedLatency;
|
|
783 |
}
|
771 |
784 |
}
|
772 |
785 |
|
773 |
786 |
urtMutexUnlock(subscriber->base.topic->lock);
|
... | ... | |
812 |
825 |
{
|
813 |
826 |
urtDebugAssert(subscriber);
|
814 |
827 |
|
815 |
|
if (subscriber->base.topic)
|
816 |
|
{
|
817 |
|
urtMutexLock(topic->lock);
|
818 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
819 |
|
//TODO: decrement topic's HRT counter
|
|
828 |
if (!subscriber->base.topic) {
|
|
829 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
830 |
}
|
|
831 |
|
|
832 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
|
833 |
urtMutexLock(topic->lock);
|
|
834 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
835 |
|
|
836 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
837 |
|
820 |
838 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
821 |
|
subscriber->base.topic->numSubscribers--;
|
|
839 |
subscriber->base.topic->numSubscribers--;
|
822 |
840 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
823 |
|
//Hier weiter
|
824 |
841 |
|
825 |
842 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
826 |
|
urtMutexUnlock(topic->lock);
|
|
843 |
urtMutexUnlock(topic->lock);
|
827 |
844 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
828 |
|
subscriber->base.topic = NULL;
|
829 |
|
subscriber->base.lastMessage = NULL;
|
830 |
|
subscriber->base.lastMessageTime = 0;
|
831 |
|
return URT_STATUS_OK;
|
832 |
|
}
|
833 |
|
|
834 |
|
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
845 |
subscriber->base.topic = NULL;
|
|
846 |
subscriber->base.lastMessage = NULL;
|
|
847 |
subscriber->base.lastMessageTime = 0;
|
|
848 |
return URT_STATUS_OK;
|
835 |
849 |
}
|
836 |
850 |
|
837 |
851 |
|
... | ... | |
928 |
942 |
urtMutexLock(topic->lock);
|
929 |
943 |
if (messages)
|
930 |
944 |
{
|
931 |
|
urt_message_t* lastMessageContribute = messages;
|
932 |
|
while (lastMessageContribute->next)
|
933 |
|
{
|
934 |
|
lastMessageContribute = lastMessageContribute->next;
|
935 |
|
}
|
936 |
|
lastMessageContribute->next = topic->latestMessage->next;
|
937 |
|
topic->latestMessage->next = messages;
|
|
945 |
urtContributeMessages(messages);
|
938 |
946 |
}
|
939 |
947 |
|
940 |
948 |
subscriber->base.lastMessage = topic->latestMessage;
|
... | ... | |
968 |
976 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
969 |
977 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
970 |
978 |
*/
|
971 |
|
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
|
979 |
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
|
972 |
980 |
size_t bytes, urt_delay_t* latency)
|
973 |
981 |
{
|
974 |
982 |
urtDebugAssert(subscriber);
|
975 |
983 |
|
976 |
|
if (!subscriber->base.topic)
|
977 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
|
984 |
if (!subscriber->base.topic) {
|
|
985 |
return URT_STATUS_FETCH_NOTOPIC;
|
|
986 |
}
|
978 |
987 |
|
979 |
988 |
urtMutexLock(subscriber->base.topic->lock);
|
980 |
989 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
... | ... | |
985 |
994 |
}
|
986 |
995 |
messageTemp = messageTemp->next;
|
987 |
996 |
|
988 |
|
uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
|
989 |
|
#if (URT_CFG_PUBSUB_PROFILING == true)
|
|
997 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
|
998 |
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
|
|
999 |
|
|
1000 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
990 |
1001 |
subscriber->base.sumLatencies += calculatedLatency;
|
|
1002 |
|
|
1003 |
if (calculatedLatency < subscriber->minLatency) {
|
|
1004 |
subscriber->minLatency = calculatedLatency;
|
|
1005 |
}
|
|
1006 |
else if (calculatedLatency > subscriber->maxLatency) {
|
|
1007 |
subscriber->maxLatency = calculatedLatency;
|
|
1008 |
}
|
991 |
1009 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
992 |
|
bool temp = false;
|
993 |
|
if (temp /* optional latency output argument given */)
|
994 |
|
{
|
995 |
|
latency = calculatedLatency
|
|
1010 |
|
|
1011 |
if (latency) {
|
|
1012 |
latency = calculatedLatency;
|
|
1013 |
}
|
996 |
1014 |
}
|
997 |
1015 |
|
998 |
1016 |
subscriber->base.lastMessage->numHrtConsumersLeft--;
|
... | ... | |
1006 |
1024 |
subscriber->base->numMessagesReceived++;
|
1007 |
1025 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
1008 |
1026 |
|
1009 |
|
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
|
1010 |
|
if (temp /*latency is within allowed jitter range*/)
|
1011 |
|
{
|
1012 |
|
if (calculatedLatency < subscriber->minLatency)
|
1013 |
|
{
|
1014 |
|
subscriber->minLatency = calculatedLatency;
|
1015 |
|
}
|
1016 |
|
else if (calculatedLatency > subscriber->maxLatency)
|
1017 |
|
{
|
1018 |
|
subscriber->maxLatency = calculatedLatency;
|
1019 |
|
}
|
1020 |
|
}
|
1021 |
|
else
|
1022 |
|
{
|
1023 |
1027 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
1024 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
1028 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
|
1029 |
subscriber->minLatency = calculatedLatency;
|
|
1030 |
}
|
|
1031 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
|
|
1032 |
subscriber->maxLatency = calculatedLatency;
|
|
1033 |
}
|
|
1034 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
|
1035 |
urtMutexUnlock(subscriber->base.topic);
|
1025 |
1036 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
|
1026 |
1037 |
return URT_STATUS_JITTERVIOLATION;
|
|
1038 |
}
|
1027 |
1039 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
1040 |
|
|
1041 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
|
|
1042 |
if (calculatedLatency < subscriber->minLatency) {
|
|
1043 |
subscriber->minLatency = calculatedLatency;
|
1028 |
1044 |
}
|
1029 |
|
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
|
1045 |
else if (calculatedLatency > subscriber->maxLatency) {
|
|
1046 |
subscriber->maxLatency = calculatedLatency;
|
|
1047 |
}
|
|
1048 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
1030 |
1049 |
|
1031 |
|
urtFetchMessage();
|
|
1050 |
urtFetchMessage(messageTemp, subscriber, payload, bytes);
|
1032 |
1051 |
|
1033 |
1052 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
1034 |
1053 |
if (messageTemp->next->originTime < messageTemp->originTime)
|
1035 |
1054 |
{
|
1036 |
|
//TODO: update qos deadliner timer wrt. next message
|
|
1055 |
//TODO: first reset?! (when ... set)
|
|
1056 |
urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
|
1037 |
1057 |
}
|
1038 |
1058 |
else
|
1039 |
1059 |
{
|
1040 |
|
//TODO: reset qos deadline timer
|
|
1060 |
urtTimerReset(subscriber->qosDeadlineTimer);
|
1041 |
1061 |
}
|
1042 |
1062 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
1043 |
1063 |
|
... | ... | |
1058 |
1078 |
* Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
1059 |
1079 |
* Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
1060 |
1080 |
*/
|
1061 |
|
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
1062 |
|
size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
|
1081 |
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
|
|
1082 |
size_t bytes, urt_delay_t* latency)
|
|
1083 |
{
|
|
1084 |
if (!subscriber->base.topic) {
|
|
1085 |
return URT_STATUS_FETCH_NOTOPIC;
|
|
1086 |
}
|
|
1087 |
|
|
1088 |
urtMutexLock(subscriber->base.topic->lock);
|
|
1089 |
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
|
1090 |
bool hrtZero = false;
|
|
1091 |
while (lastMessage->next->originTime < lastMessage->originTime) {
|
|
1092 |
lastMessage = lastMessage->next;
|
|
1093 |
lastMessage->numHrtConsumersLeft--;
|
|
1094 |
if (lastMessage->numHrtConsumersLeft == 0) {
|
|
1095 |
hertZero = true;
|
|
1096 |
}
|
|
1097 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
|
1098 |
lastMessage->numConsumersLeft--;
|
|
1099 |
subscriber->base.numMessagesReceived++;
|
|
1100 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
1101 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
|
1102 |
urtTimerReset(subscriber->qosDeadlineTimer);
|
|
1103 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
|
1104 |
}
|
|
1105 |
|
|
1106 |
if (hrtZero) {
|
|
1107 |
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
|
1108 |
}
|
|
1109 |
|
|
1110 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) {
|
|
1111 |
urtMutexUnlock(subscriber->base.topic);
|
|
1112 |
return URT_STATUS_FETCH_NOMESSAGE;
|
|
1113 |
}
|
|
1114 |
|
|
1115 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
|
1116 |
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
|
|
1117 |
|
|
1118 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
|
1119 |
subscriber->base.sumLatencies += calculatedLatency;
|
|
1120 |
|
|
1121 |
if (calculatedLatency < subscriber->minLatency) {
|
|
1122 |
subscriber->minLatency = calculatedLatency;
|
|
1123 |
}
|
|
1124 |
else if (calculatedLatency > subscriber->maxLatency) {
|
|
1125 |
subscriber->maxLatency = calculatedLatency;
|
|
1126 |
}
|
|
1127 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
1128 |
|
|
1129 |
if (latency) {
|
|
1130 |
latency = calculatedLatency;
|
|
1131 |
}
|
|
1132 |
}
|
|
1133 |
|
|
1134 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
|
1135 |
if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
|
1136 |
subscriber->minLatency = calculatedLatency;
|
|
1137 |
}
|
|
1138 |
else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
|
|
1139 |
subscriber->maxLatency = calculatedLatency;
|
|
1140 |
}
|
|
1141 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
|
1142 |
urtMutexUnlock(subscriber->base.topic);
|
|
1143 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
|
|
1144 |
return URT_STATUS_JITTERVIOLATION;
|
|
1145 |
}
|
|
1146 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
|
1147 |
|
|
1148 |
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
|
|
1149 |
if (calculatedLatency < subscriber->minLatency) {
|
|
1150 |
subscriber->minLatency = calculatedLatency;
|
|
1151 |
}
|
|
1152 |
else if (calculatedLatency > subscriber->maxLatency) {
|
|
1153 |
subscriber->maxLatency = calculatedLatency;
|
|
1154 |
}
|
|
1155 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
1156 |
|
|
1157 |
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
1158 |
|
|
1159 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
|
1160 |
urtTimerReset(subscriber->qosDeadlineTimer);
|
|
1161 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
|
1162 |
}
|
1063 |
1163 |
|
1064 |
1164 |
/**
|
1065 |
1165 |
* @brief Unsubscribes from a subscriber.
|