42 |
42 |
/* LOCAL FUNCTIONS */
|
43 |
43 |
/******************************************************************************/
|
44 |
44 |
|
45 |
|
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
|
46 |
|
{
|
47 |
|
subscriber->base.lastMessage = message;
|
48 |
|
*subscriber->base.lastMessageTime = message->originTime;
|
49 |
|
memcpy(message->payload, payload, bytes);
|
50 |
|
}
|
51 |
|
|
52 |
45 |
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
|
53 |
46 |
{
|
54 |
|
while (oldestMessage->next->originTime < oldestMessage->originTime)
|
55 |
|
{
|
|
47 |
while (oldestMessage->next->originTime < oldestMessage->originTime) {
|
56 |
48 |
oldestMessage = oldestMessage->next;
|
57 |
49 |
}
|
58 |
50 |
return oldestMessage;
|
... | ... | |
60 |
52 |
|
61 |
53 |
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
|
62 |
54 |
{
|
63 |
|
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
64 |
|
while (lastMessage->next->originTime < lastMessage->originTime)
|
65 |
|
{
|
|
55 |
urt_message_t* lastMessage = latestMessage;
|
|
56 |
while (lastMessage->next->originTime < lastMessage->originTime) {
|
66 |
57 |
lastMessage = lastMessage->next;
|
67 |
58 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
68 |
59 |
subscriber->base.lastMessage->numConsumersLeft--;
|
69 |
60 |
subscriber->base->numMessagesReceived++;
|
70 |
61 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
71 |
62 |
}
|
|
63 |
return latestMessage;
|
72 |
64 |
}
|
73 |
65 |
|
74 |
|
void urtContributeMessages(urt_message_t* messages)
|
|
66 |
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
|
75 |
67 |
{
|
76 |
68 |
urt_message_t* lastMessageContribute = messages;
|
77 |
|
while (lastMessageContribute->next)
|
78 |
|
{
|
|
69 |
while (lastMessageContribute->next) {
|
79 |
70 |
lastMessageContribute = lastMessageContribute->next;
|
80 |
71 |
}
|
81 |
72 |
lastMessageContribute->next = topic->latestMessage->next;
|
... | ... | |
129 |
120 |
}
|
130 |
121 |
|
131 |
122 |
subscriber->base.topic = topic;
|
132 |
|
urtMutexLock(topic->lock);
|
|
123 |
urtMutexLock(&topic->lock);
|
133 |
124 |
|
134 |
125 |
if (messages) {
|
135 |
|
urtContributeMessages(messages);
|
|
126 |
urtContributeMessages(messages, topic);
|
136 |
127 |
}
|
137 |
128 |
|
138 |
129 |
subscriber->base.lastMessage = topic->latestMessage;
|
139 |
130 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
140 |
131 |
|
141 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
132 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
142 |
133 |
|
143 |
134 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
144 |
135 |
topic->numHrtSubscribers--;
|
145 |
136 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
146 |
137 |
|
147 |
|
urtMutexUnlock(topic->lock);
|
|
138 |
urtMutexUnlock(&topic->lock);
|
148 |
139 |
return URT_STATUS_OK;
|
149 |
140 |
}
|
150 |
141 |
|
... | ... | |
168 |
159 |
return URT_STATUS_FETCH_NOTOPIC;
|
169 |
160 |
}
|
170 |
161 |
|
171 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
162 |
urtMutexLock(&subscriber->base.topic->lock);
|
172 |
163 |
|
173 |
164 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
174 |
165 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
|
175 |
166 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
|
176 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
167 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
177 |
168 |
return URT_STATUS_FETCH_NOMESSAGE;
|
178 |
169 |
}
|
179 |
170 |
oldestMessage = oldestMessage->next;
|
... | ... | |
182 |
173 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
183 |
174 |
}
|
184 |
175 |
|
185 |
|
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
176 |
subscriber->base.lastMessage = oldestMessage;
|
|
177 |
subscriber->base.lastMessageTime = oldestMessage->originTime;
|
|
178 |
memcpy(oldestMessage->payload, payload, bytes);
|
186 |
179 |
|
187 |
180 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
188 |
181 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
... | ... | |
199 |
192 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
200 |
193 |
|
201 |
194 |
if (latency) {
|
202 |
|
latency = calculatedLatency;
|
|
195 |
*latency = calculatedLatency;
|
203 |
196 |
}
|
204 |
197 |
}
|
205 |
198 |
|
... | ... | |
208 |
201 |
subscriber->base->numMessagesReceived++;
|
209 |
202 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
210 |
203 |
|
211 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
204 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
212 |
205 |
return URT_STATUS_OK;
|
213 |
206 |
}
|
214 |
207 |
|
... | ... | |
230 |
223 |
if (!subscriber->base.topic)
|
231 |
224 |
return URT_STATUS_FETCH_NOTOPIC;
|
232 |
225 |
|
233 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
226 |
urtMutexLock(&subscriber->base.topic->lock);
|
234 |
227 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
235 |
228 |
|
236 |
229 |
if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
237 |
230 |
{
|
238 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
231 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
239 |
232 |
return URT_STATUS_FETCH_NOMESSAGE;
|
240 |
233 |
}
|
241 |
234 |
|
242 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
235 |
subscriber->base.lastMessage = lastMessage;
|
|
236 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
237 |
memcpy(lastMessage->payload, payload, bytes);
|
243 |
238 |
|
244 |
239 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
245 |
240 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
... | ... | |
256 |
251 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
257 |
252 |
|
258 |
253 |
if (latency) {
|
259 |
|
latency = calculatedLatency;
|
|
254 |
*latency = calculatedLatency;
|
260 |
255 |
}
|
261 |
256 |
}
|
262 |
257 |
|
263 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
258 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
264 |
259 |
return URT_STATUS_OK;
|
265 |
260 |
}
|
266 |
261 |
|
... | ... | |
277 |
272 |
if (subscriber->base.topic)
|
278 |
273 |
{
|
279 |
274 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
280 |
|
urtMutexLock(topic->lock);
|
|
275 |
urtMutexLock(&topic->lock);
|
281 |
276 |
subscriber->base.topic->numSubscribers--;
|
282 |
277 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
283 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
278 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
284 |
279 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
285 |
|
urtMutexUnlock(topic->lock);
|
|
280 |
urtMutexUnlock(&topic->lock);
|
286 |
281 |
subscriber->base.topic = NULL;
|
287 |
282 |
subscriber->base.lastMessage = NULL;
|
288 |
283 |
subscriber->base.lastMessageTime = 0;
|
... | ... | |
307 |
302 |
urtEventListenerInit(subscriber->base.evtListener);
|
308 |
303 |
subscriber->base.lastMessage = NULL;
|
309 |
304 |
subscriber->base.lastMessageTime = 0;
|
310 |
|
#if (URT_CFG_PUBSUB_PROFILING)
|
311 |
|
subscriber->base.sumLatencies = 0;
|
312 |
|
subscriber->base.numMessagesReceived = 0;
|
313 |
|
subscriber->usefulnesscb = NULL;
|
314 |
|
subscriber->cbparams = NULL;
|
315 |
|
subscriber->minLatency = URT_DELAY_INFINITE;
|
316 |
|
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
317 |
|
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
305 |
#if (URT_CFG_PUBSUB_PROFILING)
|
|
306 |
subscriber->base.sumLatencies = 0;
|
|
307 |
subscriber->base.numMessagesReceived = 0;
|
|
308 |
subscriber->usefulnesscb = NULL;
|
|
309 |
subscriber->cbparams = NULL;
|
|
310 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
311 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
312 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
318 |
313 |
return;
|
319 |
314 |
}
|
320 |
315 |
|
... | ... | |
335 |
330 |
* Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
336 |
331 |
*/
|
337 |
332 |
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
|
338 |
|
urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
|
333 |
urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
|
339 |
334 |
{
|
340 |
335 |
urtDebugAssert(subscriber);
|
341 |
336 |
urtDebugAssert(topic);
|
342 |
337 |
|
343 |
|
if (subscriber->base.topic)
|
344 |
|
{
|
|
338 |
if (subscriber->base.topic) {
|
345 |
339 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
346 |
340 |
}
|
347 |
341 |
|
348 |
342 |
subscriber->base.topic = topic;
|
349 |
343 |
subscriber->usefulnesscb = usefulnesscb;
|
350 |
344 |
subscriber->cbparams = cbparams;
|
351 |
|
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
345 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
352 |
346 |
subscriber->base.sumLatencies = 0;
|
353 |
347 |
subscriber->base.numMessagesReceived = 0;
|
354 |
348 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
355 |
349 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
356 |
|
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
350 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
357 |
351 |
|
358 |
|
urtMutexLock(topic->lock);
|
359 |
|
if (messages)
|
360 |
|
{
|
361 |
|
urtContributeMessages(messages);
|
|
352 |
urtMutexLock(&topic->lock);
|
|
353 |
if (messages) {
|
|
354 |
urtContributeMessages(messages, topic);
|
362 |
355 |
}
|
363 |
356 |
|
364 |
357 |
subscriber->base.lastMessage = topic->latestMessage;
|
365 |
358 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
366 |
359 |
|
367 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
360 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
368 |
361 |
|
369 |
362 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
370 |
363 |
topic->numHrtSubscribers--;
|
371 |
364 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
372 |
365 |
|
373 |
|
urtMutexUnlock(topic->lock);
|
|
366 |
urtMutexUnlock(&topic->lock);
|
374 |
367 |
return URT_STATUS_OK;
|
375 |
368 |
}
|
376 |
369 |
|
... | ... | |
391 |
384 |
{
|
392 |
385 |
urtDebugAssert(subscriber);
|
393 |
386 |
|
394 |
|
if (!subscriber->base.topic)
|
395 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
|
387 |
if (!subscriber->base.topic){
|
|
388 |
return URT_STATUS_FETCH_NOTOPIC;
|
|
389 |
}
|
396 |
390 |
|
397 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
391 |
urtMutexLock(&subscriber->base.topic->lock);
|
398 |
392 |
|
399 |
393 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
400 |
|
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
401 |
|
{
|
402 |
|
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
403 |
|
{
|
404 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
394 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime){
|
|
395 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){
|
|
396 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
405 |
397 |
return URT_STATUS_FETCH_NOMESSAGE;
|
406 |
398 |
}
|
407 |
399 |
oldestMessage = oldestMessage->next;
|
408 |
400 |
}
|
409 |
|
else
|
410 |
|
{
|
|
401 |
else{
|
411 |
402 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
412 |
403 |
}
|
413 |
404 |
|
414 |
|
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
405 |
subscriber->base.lastMessage = oldestMessage;
|
|
406 |
subscriber->base.lastMessageTime = oldestMessage->originTime;
|
|
407 |
memcpy(oldestMessage->payload, payload, bytes);
|
415 |
408 |
|
416 |
409 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
417 |
410 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
... | ... | |
428 |
421 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
429 |
422 |
|
430 |
423 |
if (latency) {
|
431 |
|
latency = calculatedLatency;
|
|
424 |
*latency = calculatedLatency;
|
432 |
425 |
}
|
433 |
426 |
}
|
434 |
427 |
|
... | ... | |
437 |
430 |
subscriber->base->numMessagesReceived++;
|
438 |
431 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
439 |
432 |
|
440 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
433 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
441 |
434 |
return URT_STATUS_OK;
|
442 |
435 |
}
|
443 |
436 |
|
... | ... | |
462 |
455 |
return URT_STATUS_FETCH_NOTOPIC;
|
463 |
456 |
}
|
464 |
457 |
|
465 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
458 |
urtMutexLock(&subscriber->base.topic->lock);
|
466 |
459 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
467 |
460 |
|
468 |
461 |
if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
469 |
462 |
{
|
470 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
463 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
471 |
464 |
return URT_STATUS_FETCH_NOMESSAGE;
|
472 |
465 |
}
|
473 |
466 |
|
474 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
467 |
subscriber->base.lastMessage = lastMessage;
|
|
468 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
469 |
memcpy(lastMessage->payload, payload, bytes);
|
475 |
470 |
|
476 |
471 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
477 |
472 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
... | ... | |
487 |
482 |
}
|
488 |
483 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
489 |
484 |
|
490 |
|
if (latency) {
|
491 |
|
latency = calculatedLatency;
|
|
485 |
if (latency != NULL) {
|
|
486 |
*latency = calculatedLatency;
|
492 |
487 |
}
|
493 |
488 |
}
|
494 |
489 |
|
495 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
490 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
496 |
491 |
return URT_STATUS_OK;
|
497 |
492 |
}
|
498 |
493 |
|
... | ... | |
508 |
503 |
{
|
509 |
504 |
urtDebugAssert(subscriber);
|
510 |
505 |
|
511 |
|
return subscriber->usefulnesscb(latency);
|
|
506 |
return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
|
512 |
507 |
}
|
513 |
508 |
|
514 |
509 |
/**
|
... | ... | |
526 |
521 |
if (subscriber->base.topic)
|
527 |
522 |
{
|
528 |
523 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
529 |
|
urtMutexLock(topic->lock);
|
|
524 |
urtMutexLock(&topic->lock);
|
530 |
525 |
subscriber->base.topic->numSubscribers--;
|
531 |
526 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
532 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
527 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
533 |
528 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
534 |
|
urtMutexUnlock(topic->lock);
|
|
529 |
urtMutexUnlock(&topic->lock);
|
535 |
530 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
536 |
531 |
subscriber->base.topic = NULL;
|
537 |
532 |
subscriber->base.lastMessage = NULL;
|
... | ... | |
621 |
616 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
622 |
617 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
623 |
618 |
|
624 |
|
urtMutexLock(topic->lock);
|
|
619 |
urtMutexLock(&topic->lock);
|
625 |
620 |
if (messages)
|
626 |
621 |
{
|
627 |
|
urtContributeMessages(messages);
|
|
622 |
urtContributeMessages(messages, topic);
|
628 |
623 |
}
|
629 |
624 |
|
630 |
625 |
subscriber->base.lastMessage = topic->latestMessage;
|
631 |
626 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
632 |
627 |
|
633 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
628 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
634 |
629 |
|
635 |
630 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
636 |
631 |
topic->numHrtSubscribers--;
|
637 |
632 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
638 |
633 |
|
639 |
|
urtMutexUnlock(topic->lock);
|
|
634 |
urtMutexUnlock(&topic->lock);
|
640 |
635 |
return URT_STATUS_OK;
|
641 |
636 |
}
|
642 |
637 |
|
... | ... | |
660 |
655 |
if (!subscriber->base.topic)
|
661 |
656 |
return URT_STATUS_FETCH_NOTOPIC;
|
662 |
657 |
|
663 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
658 |
urtMutexLock(&subscriber->base.topic->lock);
|
664 |
659 |
|
665 |
660 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
666 |
661 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
667 |
662 |
{
|
668 |
663 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
669 |
664 |
{
|
670 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
665 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
671 |
666 |
return URT_STATUS_FETCH_NOMESSAGE;
|
672 |
667 |
}
|
673 |
668 |
oldestMessage = oldestMessage->next;
|
... | ... | |
677 |
672 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
678 |
673 |
}
|
679 |
674 |
|
680 |
|
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
675 |
subscriber->base.lastMessage = oldestMessage;
|
|
676 |
subscriber->base.lastMessageTime = oldestMessage->originTime;
|
|
677 |
memcpy(oldestMessage->payload, payload, bytes);
|
681 |
678 |
|
682 |
679 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
|
683 |
680 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
... | ... | |
701 |
698 |
subscriber->maxLatency = calculatedLatency;
|
702 |
699 |
}
|
703 |
700 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
704 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
701 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
705 |
702 |
return URT_STATUS_JITTERVIOLATION;
|
706 |
703 |
}
|
707 |
704 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
708 |
705 |
|
709 |
706 |
if (latency) {
|
710 |
|
latency = calculatedLatency;
|
|
707 |
*latency = calculatedLatency;
|
711 |
708 |
}
|
712 |
709 |
}
|
713 |
710 |
|
... | ... | |
716 |
713 |
subscriber->base->numMessagesReceived++;
|
717 |
714 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
718 |
715 |
|
719 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
716 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
720 |
717 |
return URT_STATUS_OK;
|
721 |
718 |
}
|
722 |
719 |
|
... | ... | |
740 |
737 |
if (!subscriber->base.topic)
|
741 |
738 |
return URT_STATUS_FETCH_NOTOPIC;
|
742 |
739 |
|
743 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
740 |
urtMutexLock(&subscriber->base.topic->lock);
|
744 |
741 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
745 |
742 |
|
746 |
743 |
if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
747 |
744 |
{
|
748 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
745 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
749 |
746 |
return URT_STATUS_FETCH_NOMESSAGE;
|
750 |
747 |
}
|
751 |
748 |
|
752 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
749 |
subscriber->base.lastMessage = lastMessage;
|
|
750 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
751 |
memcpy(lastMessage->payload, payload, bytes);
|
753 |
752 |
|
754 |
753 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
|
755 |
|
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
|
754 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
756 |
755 |
|
757 |
756 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
|
758 |
757 |
subscriber->base.sumLatencies += calculatedLatency;
|
... | ... | |
773 |
772 |
subscriber->maxLatency = calculatedLatency;
|
774 |
773 |
}
|
775 |
774 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
776 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
775 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
777 |
776 |
return URT_STATUS_JITTERVIOLATION;
|
778 |
777 |
}
|
779 |
778 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
780 |
779 |
|
781 |
780 |
if (latency) {
|
782 |
|
latency = calculatedLatency;
|
|
781 |
*latency = calculatedLatency;
|
783 |
782 |
}
|
784 |
783 |
}
|
785 |
784 |
|
786 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
785 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
787 |
786 |
return URT_STATUS_OK;
|
788 |
787 |
}
|
789 |
788 |
|
... | ... | |
830 |
829 |
}
|
831 |
830 |
|
832 |
831 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
833 |
|
urtMutexLock(topic->lock);
|
|
832 |
urtMutexLock(&topic->lock);
|
834 |
833 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
835 |
834 |
|
836 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
835 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
837 |
836 |
|
838 |
837 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
839 |
838 |
subscriber->base.topic->numSubscribers--;
|
840 |
839 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
841 |
840 |
|
842 |
841 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
843 |
|
urtMutexUnlock(topic->lock);
|
|
842 |
urtMutexUnlock(&topic->lock);
|
844 |
843 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
845 |
844 |
subscriber->base.topic = NULL;
|
846 |
845 |
subscriber->base.lastMessage = NULL;
|
... | ... | |
939 |
938 |
subscriber->expectedRate = rate;
|
940 |
939 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
941 |
940 |
|
942 |
|
urtMutexLock(topic->lock);
|
|
941 |
urtMutexLock(&topic->lock);
|
943 |
942 |
if (messages)
|
944 |
943 |
{
|
945 |
|
urtContributeMessages(messages);
|
|
944 |
urtContributeMessages(messages, topic);
|
946 |
945 |
}
|
947 |
946 |
|
948 |
947 |
subscriber->base.lastMessage = topic->latestMessage;
|
949 |
948 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
950 |
949 |
|
951 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
950 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
952 |
951 |
|
953 |
952 |
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
954 |
953 |
//TODO: Implement
|
... | ... | |
959 |
958 |
topic->numSubscribers--;
|
960 |
959 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
961 |
960 |
|
962 |
|
urtMutexUnlock(topic->lock);
|
|
961 |
urtMutexUnlock(&topic->lock);
|
963 |
962 |
return URT_STATUS_OK;
|
964 |
963 |
}
|
965 |
964 |
|
... | ... | |
985 |
984 |
return URT_STATUS_FETCH_NOTOPIC;
|
986 |
985 |
}
|
987 |
986 |
|
988 |
|
urtMutexLock(subscriber->base.topic->lock);
|
989 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
990 |
|
if (messageTemp->next->originTime > messageTemp.originTime)
|
|
987 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
988 |
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
|
989 |
if (lastMessage->next->originTime > lastMessage->originTime)
|
991 |
990 |
{
|
992 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
991 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
993 |
992 |
return URT_STATUS_FETCH_NOMESSAGE;
|
994 |
993 |
}
|
995 |
|
messageTemp = messageTemp->next;
|
|
994 |
lastMessage = lastMessage->next;
|
996 |
995 |
|
997 |
|
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
998 |
|
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
|
|
996 |
uint64_t calculatedLatency;
|
|
997 |
if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
|
|
998 |
calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
|
999 |
}
|
999 |
1000 |
|
1000 |
1001 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
1001 |
1002 |
subscriber->base.sumLatencies += calculatedLatency;
|
... | ... | |
1008 |
1009 |
}
|
1009 |
1010 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
1010 |
1011 |
|
1011 |
|
if (latency) {
|
1012 |
|
latency = calculatedLatency;
|
1013 |
|
}
|
|
1012 |
if (latency) {
|
|
1013 |
*latency = calculatedLatency;
|
1014 |
1014 |
}
|
1015 |
1015 |
|
1016 |
1016 |
subscriber->base.lastMessage->numHrtConsumersLeft--;
|
1017 |
1017 |
if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
|
1018 |
1018 |
{
|
1019 |
|
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
|
1019 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
1020 |
1020 |
}
|
1021 |
1021 |
|
1022 |
1022 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
... | ... | |
1032 |
1032 |
subscriber->maxLatency = calculatedLatency;
|
1033 |
1033 |
}
|
1034 |
1034 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
1035 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
1035 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
1036 |
1036 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
|
1037 |
1037 |
return URT_STATUS_JITTERVIOLATION;
|
1038 |
1038 |
}
|
... | ... | |
1047 |
1047 |
}
|
1048 |
1048 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
1049 |
1049 |
|
1050 |
|
urtFetchMessage(messageTemp, subscriber, payload, bytes);
|
|
1050 |
subscriber->base.lastMessage = lastMessage;
|
|
1051 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
1052 |
memcpy(lastMessage->payload, payload, bytes);
|
1051 |
1053 |
|
1052 |
1054 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
1053 |
|
if (messageTemp->next->originTime < messageTemp->originTime)
|
|
1055 |
if (lastMessage->next->originTime < lastMessage->originTime)
|
1054 |
1056 |
{
|
1055 |
1057 |
//TODO: first reset?! (when ... set)
|
1056 |
1058 |
urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
|
... | ... | |
1061 |
1063 |
}
|
1062 |
1064 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
1063 |
1065 |
|
1064 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
1066 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
1065 |
1067 |
return URT_STATUS_OK;
|
1066 |
1068 |
}
|
1067 |
1069 |
|
... | ... | |
1085 |
1087 |
return URT_STATUS_FETCH_NOTOPIC;
|
1086 |
1088 |
}
|
1087 |
1089 |
|
1088 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
1090 |
urtMutexLock(&subscriber->base.topic->lock);
|
1089 |
1091 |
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
1090 |
1092 |
bool hrtZero = false;
|
1091 |
1093 |
while (lastMessage->next->originTime < lastMessage->originTime) {
|
1092 |
1094 |
lastMessage = lastMessage->next;
|
1093 |
1095 |
lastMessage->numHrtConsumersLeft--;
|
1094 |
1096 |
if (lastMessage->numHrtConsumersLeft == 0) {
|
1095 |
|
hertZero = true;
|
|
1097 |
hrtZero = true;
|
1096 |
1098 |
}
|
1097 |
1099 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
1098 |
1100 |
lastMessage->numConsumersLeft--;
|
... | ... | |
1104 |
1106 |
}
|
1105 |
1107 |
|
1106 |
1108 |
if (hrtZero) {
|
1107 |
|
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
|
1109 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
1108 |
1110 |
}
|
1109 |
1111 |
|
1110 |
1112 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) {
|
1111 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
1113 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
1112 |
1114 |
return URT_STATUS_FETCH_NOMESSAGE;
|
1113 |
1115 |
}
|
1114 |
1116 |
|
1115 |
|
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
1116 |
|
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
|
|
1117 |
uint64_t calculatedLatency;
|
|
1118 |
if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
|
|
1119 |
calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
|
1120 |
}
|
1117 |
1121 |
|
1118 |
1122 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
1119 |
1123 |
subscriber->base.sumLatencies += calculatedLatency;
|
... | ... | |
1126 |
1130 |
}
|
1127 |
1131 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
1128 |
1132 |
|
1129 |
|
if (latency) {
|
1130 |
|
latency = calculatedLatency;
|
1131 |
|
}
|
|
1133 |
if (latency) {
|
|
1134 |
*latency = calculatedLatency;
|
1132 |
1135 |
}
|
1133 |
1136 |
|
1134 |
1137 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
... | ... | |
1139 |
1142 |
subscriber->maxLatency = calculatedLatency;
|
1140 |
1143 |
}
|
1141 |
1144 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
1142 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
1145 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
1143 |
1146 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
|
1144 |
1147 |
return URT_STATUS_JITTERVIOLATION;
|
1145 |
1148 |
}
|
... | ... | |
1154 |
1157 |
}
|
1155 |
1158 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
1156 |
1159 |
|
1157 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
1160 |
subscriber->base.lastMessage = lastMessage;
|
|
1161 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
1162 |
memcpy(lastMessage->payload, payload, bytes);
|
1158 |
1163 |
|
1159 |
1164 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
1160 |
1165 |
urtTimerReset(subscriber->qosDeadlineTimer);
|
1161 |
1166 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
|
1167 |
|
|
1168 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1169 |
return URT_STATUS_OK;
|
1162 |
1170 |
}
|
1163 |
1171 |
|
1164 |
1172 |
/**
|
... | ... | |
1173 |
1181 |
{
|
1174 |
1182 |
urtDebugAssert(subscriber);
|
1175 |
1183 |
|
1176 |
|
if (subscriber->base.topic)
|
1177 |
|
{
|
1178 |
|
urtMutexLock(topic->lock);
|
1179 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
1180 |
|
subscriber->base.topic->numHrtSubscribers--;
|
|
1184 |
if(!subscriber->base.topic) {
|
|
1185 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
1186 |
}
|
|
1187 |
|
|
1188 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
1189 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
|
1190 |
subscriber->base.topic->numHrtSubscribers--;
|
1181 |
1191 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
1182 |
|
subscriber->base.topic->numSubscribers--;
|
|
1192 |
subscriber->base.topic->numSubscribers--;
|
1183 |
1193 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
1184 |
1194 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
1185 |
|
//TODO: remove self from topics lsit of HRT subscribers
|
1186 |
|
//TODO: ...
|
|
1195 |
//TODO: remove self from topics list of HRT subscribers, ...
|
1187 |
1196 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
1188 |
1197 |
|
1189 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
1190 |
|
bool hrtZero = false;
|
1191 |
|
while (messageTemp->next->originTime < messageTemp->originTime)
|
1192 |
|
{
|
1193 |
|
messageTemp = messageTemp->next;
|
1194 |
|
messageTemp->numHrtConsumersLeft--;
|
1195 |
|
if (messageTemp->numHrtConsumersLeft == 0)
|
1196 |
|
{
|
1197 |
|
hrtZero = true;
|
1198 |
|
}
|
|
1198 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
1199 |
bool hrtZero = false;
|
|
1200 |
while (messageTemp->next->originTime < messageTemp->originTime){
|
|
1201 |
messageTemp = messageTemp->next;
|
|
1202 |
messageTemp->numHrtConsumersLeft--;
|
|
1203 |
if (messageTemp->numHrtConsumersLeft == 0) {
|
|
1204 |
hrtZero = true;
|
|
1205 |
}
|
1199 |
1206 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
1200 |
|
messageTemp->numConsumersLeft--;
|
|
1207 |
messageTemp->numConsumersLeft--;
|
1201 |
1208 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
1202 |
|
}
|
1203 |
|
if (hrtZero)
|
1204 |
|
{
|
1205 |
|
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
1206 |
|
}
|
1207 |
|
|
1208 |
|
urtMutexUnlock(topic->lock);
|
1209 |
|
subscriber->base.topic = NULL;
|
1210 |
|
subscriber->base.lastMessage = NULL;
|
1211 |
|
subscriber->base.lastMessageTime = 0;
|
1212 |
|
return URT_STATUS_OK;
|
|
1209 |
}
|
|
1210 |
if (hrtZero){
|
|
1211 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
1213 |
1212 |
}
|
1214 |
1213 |
|
1215 |
|
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
1214 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1215 |
subscriber->base.topic = NULL;
|
|
1216 |
subscriber->base.lastMessage = NULL;
|
|
1217 |
subscriber->base.lastMessageTime = 0;
|
|
1218 |
return URT_STATUS_OK;
|
1216 |
1219 |
}
|
1217 |
1220 |
|
1218 |
1221 |
|