Revision fb72e91b src/urt_subscriber.c
src/urt_subscriber.c | ||
---|---|---|
42 | 42 |
/* LOCAL FUNCTIONS */ |
43 | 43 |
/******************************************************************************/ |
44 | 44 |
|
45 |
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes) |
|
46 |
{ |
|
47 |
subscriber->base.lastMessage = message; |
|
48 |
*subscriber->base.lastMessageTime = message->originTime; |
|
49 |
memcpy(message->payload, payload, bytes); |
|
50 |
} |
|
51 |
|
|
52 | 45 |
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage) |
53 | 46 |
{ |
54 |
while (oldestMessage->next->originTime < oldestMessage->originTime) |
|
55 |
{ |
|
47 |
while (oldestMessage->next->originTime < oldestMessage->originTime) { |
|
56 | 48 |
oldestMessage = oldestMessage->next; |
57 | 49 |
} |
58 | 50 |
return oldestMessage; |
... | ... | |
60 | 52 |
|
61 | 53 |
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage) |
62 | 54 |
{ |
63 |
urt_message_t* lastMessage = subscriber->base.lastMessage; |
|
64 |
while (lastMessage->next->originTime < lastMessage->originTime) |
|
65 |
{ |
|
55 |
urt_message_t* lastMessage = latestMessage; |
|
56 |
while (lastMessage->next->originTime < lastMessage->originTime) { |
|
66 | 57 |
lastMessage = lastMessage->next; |
67 | 58 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
68 | 59 |
subscriber->base.lastMessage->numConsumersLeft--; |
69 | 60 |
subscriber->base->numMessagesReceived++; |
70 | 61 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
71 | 62 |
} |
63 |
return latestMessage; |
|
72 | 64 |
} |
73 | 65 |
|
74 |
void urtContributeMessages(urt_message_t* messages) |
|
66 |
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
|
|
75 | 67 |
{ |
76 | 68 |
urt_message_t* lastMessageContribute = messages; |
77 |
while (lastMessageContribute->next) |
|
78 |
{ |
|
69 |
while (lastMessageContribute->next) { |
|
79 | 70 |
lastMessageContribute = lastMessageContribute->next; |
80 | 71 |
} |
81 | 72 |
lastMessageContribute->next = topic->latestMessage->next; |
... | ... | |
129 | 120 |
} |
130 | 121 |
|
131 | 122 |
subscriber->base.topic = topic; |
132 |
urtMutexLock(topic->lock); |
|
123 |
urtMutexLock(&topic->lock);
|
|
133 | 124 |
|
134 | 125 |
if (messages) { |
135 |
urtContributeMessages(messages); |
|
126 |
urtContributeMessages(messages, topic);
|
|
136 | 127 |
} |
137 | 128 |
|
138 | 129 |
subscriber->base.lastMessage = topic->latestMessage; |
139 | 130 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
140 | 131 |
|
141 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
132 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
142 | 133 |
|
143 | 134 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
144 | 135 |
topic->numHrtSubscribers--; |
145 | 136 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
146 | 137 |
|
147 |
urtMutexUnlock(topic->lock); |
|
138 |
urtMutexUnlock(&topic->lock);
|
|
148 | 139 |
return URT_STATUS_OK; |
149 | 140 |
} |
150 | 141 |
|
... | ... | |
168 | 159 |
return URT_STATUS_FETCH_NOTOPIC; |
169 | 160 |
} |
170 | 161 |
|
171 |
urtMutexLock(subscriber->base.topic->lock); |
|
162 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
172 | 163 |
|
173 | 164 |
urt_message_t* oldestMessage = subscriber->base.lastMessage; |
174 | 165 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) { |
175 | 166 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) { |
176 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
167 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
177 | 168 |
return URT_STATUS_FETCH_NOMESSAGE; |
178 | 169 |
} |
179 | 170 |
oldestMessage = oldestMessage->next; |
... | ... | |
182 | 173 |
oldestMessage = urtFindOldestMessage(oldestMessage->next); |
183 | 174 |
} |
184 | 175 |
|
185 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
|
176 |
subscriber->base.lastMessage = oldestMessage; |
|
177 |
subscriber->base.lastMessageTime = oldestMessage->originTime; |
|
178 |
memcpy(oldestMessage->payload, payload, bytes); |
|
186 | 179 |
|
187 | 180 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
188 | 181 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
... | ... | |
199 | 192 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
200 | 193 |
|
201 | 194 |
if (latency) { |
202 |
latency = calculatedLatency; |
|
195 |
*latency = calculatedLatency;
|
|
203 | 196 |
} |
204 | 197 |
} |
205 | 198 |
|
... | ... | |
208 | 201 |
subscriber->base->numMessagesReceived++; |
209 | 202 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
210 | 203 |
|
211 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
204 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
212 | 205 |
return URT_STATUS_OK; |
213 | 206 |
} |
214 | 207 |
|
... | ... | |
230 | 223 |
if (!subscriber->base.topic) |
231 | 224 |
return URT_STATUS_FETCH_NOTOPIC; |
232 | 225 |
|
233 |
urtMutexLock(subscriber->base.topic->lock); |
|
226 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
234 | 227 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
235 | 228 |
|
236 | 229 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) |
237 | 230 |
{ |
238 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
231 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
239 | 232 |
return URT_STATUS_FETCH_NOMESSAGE; |
240 | 233 |
} |
241 | 234 |
|
242 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
235 |
subscriber->base.lastMessage = lastMessage; |
|
236 |
subscriber->base.lastMessageTime = lastMessage->originTime; |
|
237 |
memcpy(lastMessage->payload, payload, bytes); |
|
243 | 238 |
|
244 | 239 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
245 | 240 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime; |
... | ... | |
256 | 251 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
257 | 252 |
|
258 | 253 |
if (latency) { |
259 |
latency = calculatedLatency; |
|
254 |
*latency = calculatedLatency;
|
|
260 | 255 |
} |
261 | 256 |
} |
262 | 257 |
|
263 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
258 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
264 | 259 |
return URT_STATUS_OK; |
265 | 260 |
} |
266 | 261 |
|
... | ... | |
277 | 272 |
if (subscriber->base.topic) |
278 | 273 |
{ |
279 | 274 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
280 |
urtMutexLock(topic->lock); |
|
275 |
urtMutexLock(&topic->lock);
|
|
281 | 276 |
subscriber->base.topic->numSubscribers--; |
282 | 277 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
283 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
278 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
|
284 | 279 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
285 |
urtMutexUnlock(topic->lock); |
|
280 |
urtMutexUnlock(&topic->lock);
|
|
286 | 281 |
subscriber->base.topic = NULL; |
287 | 282 |
subscriber->base.lastMessage = NULL; |
288 | 283 |
subscriber->base.lastMessageTime = 0; |
... | ... | |
307 | 302 |
urtEventListenerInit(subscriber->base.evtListener); |
308 | 303 |
subscriber->base.lastMessage = NULL; |
309 | 304 |
subscriber->base.lastMessageTime = 0; |
310 |
#if (URT_CFG_PUBSUB_PROFILING)
|
|
311 |
subscriber->base.sumLatencies = 0;
|
|
312 |
subscriber->base.numMessagesReceived = 0;
|
|
313 |
subscriber->usefulnesscb = NULL;
|
|
314 |
subscriber->cbparams = NULL;
|
|
315 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
316 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
317 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
305 |
#if (URT_CFG_PUBSUB_PROFILING) |
|
306 |
subscriber->base.sumLatencies = 0; |
|
307 |
subscriber->base.numMessagesReceived = 0; |
|
308 |
subscriber->usefulnesscb = NULL; |
|
309 |
subscriber->cbparams = NULL; |
|
310 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
311 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
312 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
318 | 313 |
return; |
319 | 314 |
} |
320 | 315 |
|
... | ... | |
335 | 330 |
* Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic. |
336 | 331 |
*/ |
337 | 332 |
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, |
338 |
urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
|
333 |
urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
|
|
339 | 334 |
{ |
340 | 335 |
urtDebugAssert(subscriber); |
341 | 336 |
urtDebugAssert(topic); |
342 | 337 |
|
343 |
if (subscriber->base.topic) |
|
344 |
{ |
|
338 |
if (subscriber->base.topic) { |
|
345 | 339 |
return URT_STATUS_SUBSCRIBE_TOPICSET; |
346 | 340 |
} |
347 | 341 |
|
348 | 342 |
subscriber->base.topic = topic; |
349 | 343 |
subscriber->usefulnesscb = usefulnesscb; |
350 | 344 |
subscriber->cbparams = cbparams; |
351 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
345 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
|
352 | 346 |
subscriber->base.sumLatencies = 0; |
353 | 347 |
subscriber->base.numMessagesReceived = 0; |
354 | 348 |
subscriber->minLatency = URT_DELAY_INFINITE; |
355 | 349 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
356 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
350 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
357 | 351 |
|
358 |
urtMutexLock(topic->lock); |
|
359 |
if (messages) |
|
360 |
{ |
|
361 |
urtContributeMessages(messages); |
|
352 |
urtMutexLock(&topic->lock); |
|
353 |
if (messages) { |
|
354 |
urtContributeMessages(messages, topic); |
|
362 | 355 |
} |
363 | 356 |
|
364 | 357 |
subscriber->base.lastMessage = topic->latestMessage; |
365 | 358 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
366 | 359 |
|
367 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
360 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
368 | 361 |
|
369 | 362 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
370 | 363 |
topic->numHrtSubscribers--; |
371 | 364 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
372 | 365 |
|
373 |
urtMutexUnlock(topic->lock); |
|
366 |
urtMutexUnlock(&topic->lock);
|
|
374 | 367 |
return URT_STATUS_OK; |
375 | 368 |
} |
376 | 369 |
|
... | ... | |
391 | 384 |
{ |
392 | 385 |
urtDebugAssert(subscriber); |
393 | 386 |
|
394 |
if (!subscriber->base.topic) |
|
395 |
return URT_STATUS_FETCH_NOTOPIC; |
|
387 |
if (!subscriber->base.topic){ |
|
388 |
return URT_STATUS_FETCH_NOTOPIC; |
|
389 |
} |
|
396 | 390 |
|
397 |
urtMutexLock(subscriber->base.topic->lock); |
|
391 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
398 | 392 |
|
399 | 393 |
urt_message_t* oldestMessage = subscriber->base.lastMessage; |
400 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) |
|
401 |
{ |
|
402 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
|
403 |
{ |
|
404 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
394 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime){ |
|
395 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){ |
|
396 |
urtMutexUnlock(&subscriber->base.topic->lock); |
|
405 | 397 |
return URT_STATUS_FETCH_NOMESSAGE; |
406 | 398 |
} |
407 | 399 |
oldestMessage = oldestMessage->next; |
408 | 400 |
} |
409 |
else |
|
410 |
{ |
|
401 |
else{ |
|
411 | 402 |
oldestMessage = urtFindOldestMessage(oldestMessage->next); |
412 | 403 |
} |
413 | 404 |
|
414 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
|
405 |
subscriber->base.lastMessage = oldestMessage; |
|
406 |
subscriber->base.lastMessageTime = oldestMessage->originTime; |
|
407 |
memcpy(oldestMessage->payload, payload, bytes); |
|
415 | 408 |
|
416 | 409 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
417 | 410 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
... | ... | |
428 | 421 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
429 | 422 |
|
430 | 423 |
if (latency) { |
431 |
latency = calculatedLatency; |
|
424 |
*latency = calculatedLatency;
|
|
432 | 425 |
} |
433 | 426 |
} |
434 | 427 |
|
... | ... | |
437 | 430 |
subscriber->base->numMessagesReceived++; |
438 | 431 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
439 | 432 |
|
440 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
433 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
441 | 434 |
return URT_STATUS_OK; |
442 | 435 |
} |
443 | 436 |
|
... | ... | |
462 | 455 |
return URT_STATUS_FETCH_NOTOPIC; |
463 | 456 |
} |
464 | 457 |
|
465 |
urtMutexLock(subscriber->base.topic->lock); |
|
458 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
466 | 459 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
467 | 460 |
|
468 | 461 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) |
469 | 462 |
{ |
470 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
463 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
471 | 464 |
return URT_STATUS_FETCH_NOMESSAGE; |
472 | 465 |
} |
473 | 466 |
|
474 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
467 |
subscriber->base.lastMessage = lastMessage; |
|
468 |
subscriber->base.lastMessageTime = lastMessage->originTime; |
|
469 |
memcpy(lastMessage->payload, payload, bytes); |
|
475 | 470 |
|
476 | 471 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
477 | 472 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime; |
... | ... | |
487 | 482 |
} |
488 | 483 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
489 | 484 |
|
490 |
if (latency) { |
|
491 |
latency = calculatedLatency; |
|
485 |
if (latency != NULL) {
|
|
486 |
*latency = calculatedLatency;
|
|
492 | 487 |
} |
493 | 488 |
} |
494 | 489 |
|
495 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
490 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
496 | 491 |
return URT_STATUS_OK; |
497 | 492 |
} |
498 | 493 |
|
... | ... | |
508 | 503 |
{ |
509 | 504 |
urtDebugAssert(subscriber); |
510 | 505 |
|
511 |
return subscriber->usefulnesscb(latency);
|
|
506 |
return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
|
|
512 | 507 |
} |
513 | 508 |
|
514 | 509 |
/** |
... | ... | |
526 | 521 |
if (subscriber->base.topic) |
527 | 522 |
{ |
528 | 523 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
529 |
urtMutexLock(topic->lock); |
|
524 |
urtMutexLock(&topic->lock);
|
|
530 | 525 |
subscriber->base.topic->numSubscribers--; |
531 | 526 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
532 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
527 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
|
533 | 528 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
534 |
urtMutexUnlock(topic->lock); |
|
529 |
urtMutexUnlock(&topic->lock);
|
|
535 | 530 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
536 | 531 |
subscriber->base.topic = NULL; |
537 | 532 |
subscriber->base.lastMessage = NULL; |
... | ... | |
621 | 616 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
622 | 617 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
623 | 618 |
|
624 |
urtMutexLock(topic->lock); |
|
619 |
urtMutexLock(&topic->lock);
|
|
625 | 620 |
if (messages) |
626 | 621 |
{ |
627 |
urtContributeMessages(messages); |
|
622 |
urtContributeMessages(messages, topic);
|
|
628 | 623 |
} |
629 | 624 |
|
630 | 625 |
subscriber->base.lastMessage = topic->latestMessage; |
631 | 626 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
632 | 627 |
|
633 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
628 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
634 | 629 |
|
635 | 630 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
636 | 631 |
topic->numHrtSubscribers--; |
637 | 632 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
638 | 633 |
|
639 |
urtMutexUnlock(topic->lock); |
|
634 |
urtMutexUnlock(&topic->lock);
|
|
640 | 635 |
return URT_STATUS_OK; |
641 | 636 |
} |
642 | 637 |
|
... | ... | |
660 | 655 |
if (!subscriber->base.topic) |
661 | 656 |
return URT_STATUS_FETCH_NOTOPIC; |
662 | 657 |
|
663 |
urtMutexLock(subscriber->base.topic->lock); |
|
658 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
664 | 659 |
|
665 | 660 |
urt_message_t* oldestMessage = subscriber->base.lastMessage; |
666 | 661 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) |
667 | 662 |
{ |
668 | 663 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) |
669 | 664 |
{ |
670 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
665 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
671 | 666 |
return URT_STATUS_FETCH_NOMESSAGE; |
672 | 667 |
} |
673 | 668 |
oldestMessage = oldestMessage->next; |
... | ... | |
677 | 672 |
oldestMessage = urtFindOldestMessage(oldestMessage->next); |
678 | 673 |
} |
679 | 674 |
|
680 |
urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
|
675 |
subscriber->base.lastMessage = oldestMessage; |
|
676 |
subscriber->base.lastMessageTime = oldestMessage->originTime; |
|
677 |
memcpy(oldestMessage->payload, payload, bytes); |
|
681 | 678 |
|
682 | 679 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) { |
683 | 680 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
... | ... | |
701 | 698 |
subscriber->maxLatency = calculatedLatency; |
702 | 699 |
} |
703 | 700 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
704 |
urtMutexUnlock(subscriber->base.topic);
|
|
701 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
705 | 702 |
return URT_STATUS_JITTERVIOLATION; |
706 | 703 |
} |
707 | 704 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
708 | 705 |
|
709 | 706 |
if (latency) { |
710 |
latency = calculatedLatency; |
|
707 |
*latency = calculatedLatency;
|
|
711 | 708 |
} |
712 | 709 |
} |
713 | 710 |
|
... | ... | |
716 | 713 |
subscriber->base->numMessagesReceived++; |
717 | 714 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
718 | 715 |
|
719 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
716 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
720 | 717 |
return URT_STATUS_OK; |
721 | 718 |
} |
722 | 719 |
|
... | ... | |
740 | 737 |
if (!subscriber->base.topic) |
741 | 738 |
return URT_STATUS_FETCH_NOTOPIC; |
742 | 739 |
|
743 |
urtMutexLock(subscriber->base.topic->lock); |
|
740 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
744 | 741 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
745 | 742 |
|
746 | 743 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) |
747 | 744 |
{ |
748 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
745 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
749 | 746 |
return URT_STATUS_FETCH_NOMESSAGE; |
750 | 747 |
} |
751 | 748 |
|
752 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
749 |
subscriber->base.lastMessage = lastMessage; |
|
750 |
subscriber->base.lastMessageTime = lastMessage->originTime; |
|
751 |
memcpy(lastMessage->payload, payload, bytes); |
|
753 | 752 |
|
754 | 753 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) { |
755 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
|
754 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
|
756 | 755 |
|
757 | 756 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
758 | 757 |
subscriber->base.sumLatencies += calculatedLatency; |
... | ... | |
773 | 772 |
subscriber->maxLatency = calculatedLatency; |
774 | 773 |
} |
775 | 774 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
776 |
urtMutexUnlock(subscriber->base.topic);
|
|
775 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
777 | 776 |
return URT_STATUS_JITTERVIOLATION; |
778 | 777 |
} |
779 | 778 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
780 | 779 |
|
781 | 780 |
if (latency) { |
782 |
latency = calculatedLatency; |
|
781 |
*latency = calculatedLatency;
|
|
783 | 782 |
} |
784 | 783 |
} |
785 | 784 |
|
786 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
785 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
787 | 786 |
return URT_STATUS_OK; |
788 | 787 |
} |
789 | 788 |
|
... | ... | |
830 | 829 |
} |
831 | 830 |
|
832 | 831 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
833 |
urtMutexLock(topic->lock); |
|
832 |
urtMutexLock(&topic->lock);
|
|
834 | 833 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
835 | 834 |
|
836 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
835 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
|
837 | 836 |
|
838 | 837 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
839 | 838 |
subscriber->base.topic->numSubscribers--; |
840 | 839 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
841 | 840 |
|
842 | 841 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
843 |
urtMutexUnlock(topic->lock); |
|
842 |
urtMutexUnlock(&topic->lock);
|
|
844 | 843 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
845 | 844 |
subscriber->base.topic = NULL; |
846 | 845 |
subscriber->base.lastMessage = NULL; |
... | ... | |
939 | 938 |
subscriber->expectedRate = rate; |
940 | 939 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
941 | 940 |
|
942 |
urtMutexLock(topic->lock); |
|
941 |
urtMutexLock(&topic->lock);
|
|
943 | 942 |
if (messages) |
944 | 943 |
{ |
945 |
urtContributeMessages(messages); |
|
944 |
urtContributeMessages(messages, topic);
|
|
946 | 945 |
} |
947 | 946 |
|
948 | 947 |
subscriber->base.lastMessage = topic->latestMessage; |
949 | 948 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
950 | 949 |
|
951 |
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
950 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
952 | 951 |
|
953 | 952 |
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
954 | 953 |
//TODO: Implement |
... | ... | |
959 | 958 |
topic->numSubscribers--; |
960 | 959 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
961 | 960 |
|
962 |
urtMutexUnlock(topic->lock); |
|
961 |
urtMutexUnlock(&topic->lock);
|
|
963 | 962 |
return URT_STATUS_OK; |
964 | 963 |
} |
965 | 964 |
|
... | ... | |
985 | 984 |
return URT_STATUS_FETCH_NOTOPIC; |
986 | 985 |
} |
987 | 986 |
|
988 |
urtMutexLock(subscriber->base.topic->lock); |
|
989 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
990 |
if (messageTemp->next->originTime > messageTemp.originTime)
|
|
987 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
988 |
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
|
989 |
if (lastMessage->next->originTime > lastMessage->originTime)
|
|
991 | 990 |
{ |
992 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
991 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
993 | 992 |
return URT_STATUS_FETCH_NOMESSAGE; |
994 | 993 |
} |
995 |
messageTemp = messageTemp->next;
|
|
994 |
lastMessage = lastMessage->next;
|
|
996 | 995 |
|
997 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
998 |
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime; |
|
996 |
uint64_t calculatedLatency; |
|
997 |
if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) { |
|
998 |
calculatedLatency = urtTimeNow() - lastMessage->originTime; |
|
999 |
} |
|
999 | 1000 |
|
1000 | 1001 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
1001 | 1002 |
subscriber->base.sumLatencies += calculatedLatency; |
... | ... | |
1008 | 1009 |
} |
1009 | 1010 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
1010 | 1011 |
|
1011 |
if (latency) { |
|
1012 |
latency = calculatedLatency; |
|
1013 |
} |
|
1012 |
if (latency) { |
|
1013 |
*latency = calculatedLatency; |
|
1014 | 1014 |
} |
1015 | 1015 |
|
1016 | 1016 |
subscriber->base.lastMessage->numHrtConsumersLeft--; |
1017 | 1017 |
if (subscriber->base.lastMessage->numHrtConsumersLeft != 0) |
1018 | 1018 |
{ |
1019 |
urtCondvarSignal(subscriber->base.topic->hrtReleased); |
|
1019 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
|
1020 | 1020 |
} |
1021 | 1021 |
|
1022 | 1022 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
... | ... | |
1032 | 1032 |
subscriber->maxLatency = calculatedLatency; |
1033 | 1033 |
} |
1034 | 1034 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
1035 |
urtMutexUnlock(subscriber->base.topic);
|
|
1035 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1036 | 1036 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
1037 | 1037 |
return URT_STATUS_JITTERVIOLATION; |
1038 | 1038 |
} |
... | ... | |
1047 | 1047 |
} |
1048 | 1048 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
1049 | 1049 |
|
1050 |
urtFetchMessage(messageTemp, subscriber, payload, bytes); |
|
1050 |
subscriber->base.lastMessage = lastMessage; |
|
1051 |
subscriber->base.lastMessageTime = lastMessage->originTime; |
|
1052 |
memcpy(lastMessage->payload, payload, bytes); |
|
1051 | 1053 |
|
1052 | 1054 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
1053 |
if (messageTemp->next->originTime < messageTemp->originTime)
|
|
1055 |
if (lastMessage->next->originTime < lastMessage->originTime)
|
|
1054 | 1056 |
{ |
1055 | 1057 |
//TODO: first reset?! (when ... set) |
1056 | 1058 |
urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL); |
... | ... | |
1061 | 1063 |
} |
1062 | 1064 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
1063 | 1065 |
|
1064 |
urtMutexUnlock(subscriber->base.topic->lock); |
|
1066 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1065 | 1067 |
return URT_STATUS_OK; |
1066 | 1068 |
} |
1067 | 1069 |
|
... | ... | |
1085 | 1087 |
return URT_STATUS_FETCH_NOTOPIC; |
1086 | 1088 |
} |
1087 | 1089 |
|
1088 |
urtMutexLock(subscriber->base.topic->lock); |
|
1090 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
1089 | 1091 |
urt_message_t* lastMessage = subscriber->base.lastMessage; |
1090 | 1092 |
bool hrtZero = false; |
1091 | 1093 |
while (lastMessage->next->originTime < lastMessage->originTime) { |
1092 | 1094 |
lastMessage = lastMessage->next; |
1093 | 1095 |
lastMessage->numHrtConsumersLeft--; |
1094 | 1096 |
if (lastMessage->numHrtConsumersLeft == 0) { |
1095 |
hertZero = true;
|
|
1097 |
hrtZero = true; |
|
1096 | 1098 |
} |
1097 | 1099 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
1098 | 1100 |
lastMessage->numConsumersLeft--; |
... | ... | |
1104 | 1106 |
} |
1105 | 1107 |
|
1106 | 1108 |
if (hrtZero) { |
1107 |
urtCondvarSignal(subscriber->base.topic->hrtReleased); |
|
1109 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
|
1108 | 1110 |
} |
1109 | 1111 |
|
1110 | 1112 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) { |
1111 |
urtMutexUnlock(subscriber->base.topic);
|
|
1113 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1112 | 1114 |
return URT_STATUS_FETCH_NOMESSAGE; |
1113 | 1115 |
} |
1114 | 1116 |
|
1115 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
|
1116 |
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime; |
|
1117 |
uint64_t calculatedLatency; |
|
1118 |
if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) { |
|
1119 |
calculatedLatency = urtTimeNow() - lastMessage->originTime; |
|
1120 |
} |
|
1117 | 1121 |
|
1118 | 1122 |
#if(URT_CFG_PUBSUB_PROFILING == true) |
1119 | 1123 |
subscriber->base.sumLatencies += calculatedLatency; |
... | ... | |
1126 | 1130 |
} |
1127 | 1131 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
1128 | 1132 |
|
1129 |
if (latency) { |
|
1130 |
latency = calculatedLatency; |
|
1131 |
} |
|
1133 |
if (latency) { |
|
1134 |
*latency = calculatedLatency; |
|
1132 | 1135 |
} |
1133 | 1136 |
|
1134 | 1137 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
... | ... | |
1139 | 1142 |
subscriber->maxLatency = calculatedLatency; |
1140 | 1143 |
} |
1141 | 1144 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
1142 |
urtMutexUnlock(subscriber->base.topic);
|
|
1145 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1143 | 1146 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
1144 | 1147 |
return URT_STATUS_JITTERVIOLATION; |
1145 | 1148 |
} |
... | ... | |
1154 | 1157 |
} |
1155 | 1158 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
1156 | 1159 |
|
1157 |
urtFetchMessage(lastMessage, subscriber, payload, bytes); |
|
1160 |
subscriber->base.lastMessage = lastMessage; |
|
1161 |
subscriber->base.lastMessageTime = lastMessage->originTime; |
|
1162 |
memcpy(lastMessage->payload, payload, bytes); |
|
1158 | 1163 |
|
1159 | 1164 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
1160 | 1165 |
urtTimerReset(subscriber->qosDeadlineTimer); |
1161 | 1166 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
1167 |
|
|
1168 |
urtMutexUnlock(&subscriber->base.topic->lock); |
|
1169 |
return URT_STATUS_OK; |
|
1162 | 1170 |
} |
1163 | 1171 |
|
1164 | 1172 |
/** |
... | ... | |
1173 | 1181 |
{ |
1174 | 1182 |
urtDebugAssert(subscriber); |
1175 | 1183 |
|
1176 |
if (subscriber->base.topic) |
|
1177 |
{ |
|
1178 |
urtMutexLock(topic->lock); |
|
1179 |
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
|
1180 |
subscriber->base.topic->numHrtSubscribers--; |
|
1184 |
if(!subscriber->base.topic) { |
|
1185 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
1186 |
} |
|
1187 |
|
|
1188 |
urtMutexLock(&subscriber->base.topic->lock); |
|
1189 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener); |
|
1190 |
subscriber->base.topic->numHrtSubscribers--; |
|
1181 | 1191 |
# if (URT_CFG_PUBSUB_PROFILING == true) |
1182 |
subscriber->base.topic->numSubscribers--;
|
|
1192 |
subscriber->base.topic->numSubscribers--; |
|
1183 | 1193 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
1184 | 1194 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
1185 |
//TODO: remove self from topics lsit of HRT subscribers |
|
1186 |
//TODO: ... |
|
1195 |
//TODO: remove self from topics list of HRT subscribers, ... |
|
1187 | 1196 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
1188 | 1197 |
|
1189 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
1190 |
bool hrtZero = false; |
|
1191 |
while (messageTemp->next->originTime < messageTemp->originTime) |
|
1192 |
{ |
|
1193 |
messageTemp = messageTemp->next; |
|
1194 |
messageTemp->numHrtConsumersLeft--; |
|
1195 |
if (messageTemp->numHrtConsumersLeft == 0) |
|
1196 |
{ |
|
1197 |
hrtZero = true; |
|
1198 |
} |
|
1198 |
urt_message_t* messageTemp = subscriber->base.lastMessage; |
|
1199 |
bool hrtZero = false; |
|
1200 |
while (messageTemp->next->originTime < messageTemp->originTime){ |
|
1201 |
messageTemp = messageTemp->next; |
|
1202 |
messageTemp->numHrtConsumersLeft--; |
|
1203 |
if (messageTemp->numHrtConsumersLeft == 0) { |
|
1204 |
hrtZero = true; |
|
1205 |
} |
|
1199 | 1206 |
# if(URT_CFG_PUBSUB_PROFILING == true) |
1200 |
messageTemp->numConsumersLeft--;
|
|
1207 |
messageTemp->numConsumersLeft--; |
|
1201 | 1208 |
# endif /* URT_CFG_PUBSUB_PROFILING */ |
1202 |
} |
|
1203 |
if (hrtZero) |
|
1204 |
{ |
|
1205 |
urtCondvarSignal(subscriber->base.topic->hrtReleased); |
|
1206 |
} |
|
1207 |
|
|
1208 |
urtMutexUnlock(topic->lock); |
|
1209 |
subscriber->base.topic = NULL; |
|
1210 |
subscriber->base.lastMessage = NULL; |
|
1211 |
subscriber->base.lastMessageTime = 0; |
|
1212 |
return URT_STATUS_OK; |
|
1209 |
} |
|
1210 |
if (hrtZero){ |
|
1211 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased); |
|
1213 | 1212 |
} |
1214 | 1213 |
|
1215 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC; |
|
1214 |
urtMutexUnlock(&subscriber->base.topic->lock); |
|
1215 |
subscriber->base.topic = NULL; |
|
1216 |
subscriber->base.lastMessage = NULL; |
|
1217 |
subscriber->base.lastMessageTime = 0; |
|
1218 |
return URT_STATUS_OK; |
|
1216 | 1219 |
} |
1217 | 1220 |
|
1218 | 1221 |
|
Also available in: Unified diff