| 42 |
42 |
/* LOCAL FUNCTIONS */
|
| 43 |
43 |
/******************************************************************************/
|
| 44 |
44 |
|
| 45 |
|
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
|
| 46 |
|
{
|
| 47 |
|
subscriber->base.lastMessage = message;
|
| 48 |
|
*subscriber->base.lastMessageTime = message->originTime;
|
| 49 |
|
memcpy(message->payload, payload, bytes);
|
| 50 |
|
}
|
| 51 |
|
|
| 52 |
45 |
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
|
| 53 |
46 |
{
|
| 54 |
|
while (oldestMessage->next->originTime < oldestMessage->originTime)
|
| 55 |
|
{
|
|
47 |
while (oldestMessage->next->originTime < oldestMessage->originTime) {
|
| 56 |
48 |
oldestMessage = oldestMessage->next;
|
| 57 |
49 |
}
|
| 58 |
50 |
return oldestMessage;
|
| ... | ... | |
| 60 |
52 |
|
| 61 |
53 |
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
|
| 62 |
54 |
{
|
| 63 |
|
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
| 64 |
|
while (lastMessage->next->originTime < lastMessage->originTime)
|
| 65 |
|
{
|
|
55 |
urt_message_t* lastMessage = latestMessage;
|
|
56 |
while (lastMessage->next->originTime < lastMessage->originTime) {
|
| 66 |
57 |
lastMessage = lastMessage->next;
|
| 67 |
58 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
| 68 |
59 |
subscriber->base.lastMessage->numConsumersLeft--;
|
| 69 |
60 |
subscriber->base->numMessagesReceived++;
|
| 70 |
61 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 71 |
62 |
}
|
|
63 |
return latestMessage;
|
| 72 |
64 |
}
|
| 73 |
65 |
|
| 74 |
|
void urtContributeMessages(urt_message_t* messages)
|
|
66 |
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
|
| 75 |
67 |
{
|
| 76 |
68 |
urt_message_t* lastMessageContribute = messages;
|
| 77 |
|
while (lastMessageContribute->next)
|
| 78 |
|
{
|
|
69 |
while (lastMessageContribute->next) {
|
| 79 |
70 |
lastMessageContribute = lastMessageContribute->next;
|
| 80 |
71 |
}
|
| 81 |
72 |
lastMessageContribute->next = topic->latestMessage->next;
|
| ... | ... | |
| 129 |
120 |
}
|
| 130 |
121 |
|
| 131 |
122 |
subscriber->base.topic = topic;
|
| 132 |
|
urtMutexLock(topic->lock);
|
|
123 |
urtMutexLock(&topic->lock);
|
| 133 |
124 |
|
| 134 |
125 |
if (messages) {
|
| 135 |
|
urtContributeMessages(messages);
|
|
126 |
urtContributeMessages(messages, topic);
|
| 136 |
127 |
}
|
| 137 |
128 |
|
| 138 |
129 |
subscriber->base.lastMessage = topic->latestMessage;
|
| 139 |
130 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
| 140 |
131 |
|
| 141 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
132 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
| 142 |
133 |
|
| 143 |
134 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
| 144 |
135 |
topic->numHrtSubscribers--;
|
| 145 |
136 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 146 |
137 |
|
| 147 |
|
urtMutexUnlock(topic->lock);
|
|
138 |
urtMutexUnlock(&topic->lock);
|
| 148 |
139 |
return URT_STATUS_OK;
|
| 149 |
140 |
}
|
| 150 |
141 |
|
| ... | ... | |
| 168 |
159 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 169 |
160 |
}
|
| 170 |
161 |
|
| 171 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
162 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 172 |
163 |
|
| 173 |
164 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
| 174 |
165 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
|
| 175 |
166 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
|
| 176 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
167 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 177 |
168 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 178 |
169 |
}
|
| 179 |
170 |
oldestMessage = oldestMessage->next;
|
| ... | ... | |
| 182 |
173 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
| 183 |
174 |
}
|
| 184 |
175 |
|
| 185 |
|
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
176 |
subscriber->base.lastMessage = oldestMessage;
|
|
177 |
subscriber->base.lastMessageTime = oldestMessage->originTime;
|
|
178 |
memcpy(oldestMessage->payload, payload, bytes);
|
| 186 |
179 |
|
| 187 |
180 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
| 188 |
181 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
| ... | ... | |
| 199 |
192 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 200 |
193 |
|
| 201 |
194 |
if (latency) {
|
| 202 |
|
latency = calculatedLatency;
|
|
195 |
*latency = calculatedLatency;
|
| 203 |
196 |
}
|
| 204 |
197 |
}
|
| 205 |
198 |
|
| ... | ... | |
| 208 |
201 |
subscriber->base->numMessagesReceived++;
|
| 209 |
202 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 210 |
203 |
|
| 211 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
204 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 212 |
205 |
return URT_STATUS_OK;
|
| 213 |
206 |
}
|
| 214 |
207 |
|
| ... | ... | |
| 230 |
223 |
if (!subscriber->base.topic)
|
| 231 |
224 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 232 |
225 |
|
| 233 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
226 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 234 |
227 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
| 235 |
228 |
|
| 236 |
229 |
if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
| 237 |
230 |
{
|
| 238 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
231 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 239 |
232 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 240 |
233 |
}
|
| 241 |
234 |
|
| 242 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
235 |
subscriber->base.lastMessage = lastMessage;
|
|
236 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
237 |
memcpy(lastMessage->payload, payload, bytes);
|
| 243 |
238 |
|
| 244 |
239 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
| 245 |
240 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
| ... | ... | |
| 256 |
251 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 257 |
252 |
|
| 258 |
253 |
if (latency) {
|
| 259 |
|
latency = calculatedLatency;
|
|
254 |
*latency = calculatedLatency;
|
| 260 |
255 |
}
|
| 261 |
256 |
}
|
| 262 |
257 |
|
| 263 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
258 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 264 |
259 |
return URT_STATUS_OK;
|
| 265 |
260 |
}
|
| 266 |
261 |
|
| ... | ... | |
| 277 |
272 |
if (subscriber->base.topic)
|
| 278 |
273 |
{
|
| 279 |
274 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
| 280 |
|
urtMutexLock(topic->lock);
|
|
275 |
urtMutexLock(&topic->lock);
|
| 281 |
276 |
subscriber->base.topic->numSubscribers--;
|
| 282 |
277 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 283 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
278 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
| 284 |
279 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
| 285 |
|
urtMutexUnlock(topic->lock);
|
|
280 |
urtMutexUnlock(&topic->lock);
|
| 286 |
281 |
subscriber->base.topic = NULL;
|
| 287 |
282 |
subscriber->base.lastMessage = NULL;
|
| 288 |
283 |
subscriber->base.lastMessageTime = 0;
|
| ... | ... | |
| 307 |
302 |
urtEventListenerInit(subscriber->base.evtListener);
|
| 308 |
303 |
subscriber->base.lastMessage = NULL;
|
| 309 |
304 |
subscriber->base.lastMessageTime = 0;
|
| 310 |
|
#if (URT_CFG_PUBSUB_PROFILING)
|
| 311 |
|
subscriber->base.sumLatencies = 0;
|
| 312 |
|
subscriber->base.numMessagesReceived = 0;
|
| 313 |
|
subscriber->usefulnesscb = NULL;
|
| 314 |
|
subscriber->cbparams = NULL;
|
| 315 |
|
subscriber->minLatency = URT_DELAY_INFINITE;
|
| 316 |
|
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
| 317 |
|
#endif /* URT_CFG_PUBSUB_PROFILING */
|
|
305 |
#if (URT_CFG_PUBSUB_PROFILING)
|
|
306 |
subscriber->base.sumLatencies = 0;
|
|
307 |
subscriber->base.numMessagesReceived = 0;
|
|
308 |
subscriber->usefulnesscb = NULL;
|
|
309 |
subscriber->cbparams = NULL;
|
|
310 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
|
311 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
|
312 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 318 |
313 |
return;
|
| 319 |
314 |
}
|
| 320 |
315 |
|
| ... | ... | |
| 335 |
330 |
* Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
| 336 |
331 |
*/
|
| 337 |
332 |
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
|
| 338 |
|
urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
|
333 |
urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
|
| 339 |
334 |
{
|
| 340 |
335 |
urtDebugAssert(subscriber);
|
| 341 |
336 |
urtDebugAssert(topic);
|
| 342 |
337 |
|
| 343 |
|
if (subscriber->base.topic)
|
| 344 |
|
{
|
|
338 |
if (subscriber->base.topic) {
|
| 345 |
339 |
return URT_STATUS_SUBSCRIBE_TOPICSET;
|
| 346 |
340 |
}
|
| 347 |
341 |
|
| 348 |
342 |
subscriber->base.topic = topic;
|
| 349 |
343 |
subscriber->usefulnesscb = usefulnesscb;
|
| 350 |
344 |
subscriber->cbparams = cbparams;
|
| 351 |
|
# if (URT_CFG_PUBSUB_PROFILING == true)
|
|
345 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
| 352 |
346 |
subscriber->base.sumLatencies = 0;
|
| 353 |
347 |
subscriber->base.numMessagesReceived = 0;
|
| 354 |
348 |
subscriber->minLatency = URT_DELAY_INFINITE;
|
| 355 |
349 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
| 356 |
|
# endif /* URT_CFG_PUBSUB_PROFILING */
|
|
350 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 357 |
351 |
|
| 358 |
|
urtMutexLock(topic->lock);
|
| 359 |
|
if (messages)
|
| 360 |
|
{
|
| 361 |
|
urtContributeMessages(messages);
|
|
352 |
urtMutexLock(&topic->lock);
|
|
353 |
if (messages) {
|
|
354 |
urtContributeMessages(messages, topic);
|
| 362 |
355 |
}
|
| 363 |
356 |
|
| 364 |
357 |
subscriber->base.lastMessage = topic->latestMessage;
|
| 365 |
358 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
| 366 |
359 |
|
| 367 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
360 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
| 368 |
361 |
|
| 369 |
362 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 370 |
363 |
topic->numHrtSubscribers--;
|
| 371 |
364 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 372 |
365 |
|
| 373 |
|
urtMutexUnlock(topic->lock);
|
|
366 |
urtMutexUnlock(&topic->lock);
|
| 374 |
367 |
return URT_STATUS_OK;
|
| 375 |
368 |
}
|
| 376 |
369 |
|
| ... | ... | |
| 391 |
384 |
{
|
| 392 |
385 |
urtDebugAssert(subscriber);
|
| 393 |
386 |
|
| 394 |
|
if (!subscriber->base.topic)
|
| 395 |
|
return URT_STATUS_FETCH_NOTOPIC;
|
|
387 |
if (!subscriber->base.topic){
|
|
388 |
return URT_STATUS_FETCH_NOTOPIC;
|
|
389 |
}
|
| 396 |
390 |
|
| 397 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
391 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 398 |
392 |
|
| 399 |
393 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
| 400 |
|
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
| 401 |
|
{
|
| 402 |
|
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
| 403 |
|
{
|
| 404 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
394 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime){
|
|
395 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){
|
|
396 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 405 |
397 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 406 |
398 |
}
|
| 407 |
399 |
oldestMessage = oldestMessage->next;
|
| 408 |
400 |
}
|
| 409 |
|
else
|
| 410 |
|
{
|
|
401 |
else{
|
| 411 |
402 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
| 412 |
403 |
}
|
| 413 |
404 |
|
| 414 |
|
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
405 |
subscriber->base.lastMessage = oldestMessage;
|
|
406 |
subscriber->base.lastMessageTime = oldestMessage->originTime;
|
|
407 |
memcpy(oldestMessage->payload, payload, bytes);
|
| 415 |
408 |
|
| 416 |
409 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
| 417 |
410 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
| ... | ... | |
| 428 |
421 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 429 |
422 |
|
| 430 |
423 |
if (latency) {
|
| 431 |
|
latency = calculatedLatency;
|
|
424 |
*latency = calculatedLatency;
|
| 432 |
425 |
}
|
| 433 |
426 |
}
|
| 434 |
427 |
|
| ... | ... | |
| 437 |
430 |
subscriber->base->numMessagesReceived++;
|
| 438 |
431 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 439 |
432 |
|
| 440 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
433 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 441 |
434 |
return URT_STATUS_OK;
|
| 442 |
435 |
}
|
| 443 |
436 |
|
| ... | ... | |
| 462 |
455 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 463 |
456 |
}
|
| 464 |
457 |
|
| 465 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
458 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 466 |
459 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
| 467 |
460 |
|
| 468 |
461 |
if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
| 469 |
462 |
{
|
| 470 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
463 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 471 |
464 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 472 |
465 |
}
|
| 473 |
466 |
|
| 474 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
467 |
subscriber->base.lastMessage = lastMessage;
|
|
468 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
469 |
memcpy(lastMessage->payload, payload, bytes);
|
| 475 |
470 |
|
| 476 |
471 |
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
| 477 |
472 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
| ... | ... | |
| 487 |
482 |
}
|
| 488 |
483 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 489 |
484 |
|
| 490 |
|
if (latency) {
|
| 491 |
|
latency = calculatedLatency;
|
|
485 |
if (latency != NULL) {
|
|
486 |
*latency = calculatedLatency;
|
| 492 |
487 |
}
|
| 493 |
488 |
}
|
| 494 |
489 |
|
| 495 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
490 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 496 |
491 |
return URT_STATUS_OK;
|
| 497 |
492 |
}
|
| 498 |
493 |
|
| ... | ... | |
| 508 |
503 |
{
|
| 509 |
504 |
urtDebugAssert(subscriber);
|
| 510 |
505 |
|
| 511 |
|
return subscriber->usefulnesscb(latency);
|
|
506 |
return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
|
| 512 |
507 |
}
|
| 513 |
508 |
|
| 514 |
509 |
/**
|
| ... | ... | |
| 526 |
521 |
if (subscriber->base.topic)
|
| 527 |
522 |
{
|
| 528 |
523 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 529 |
|
urtMutexLock(topic->lock);
|
|
524 |
urtMutexLock(&topic->lock);
|
| 530 |
525 |
subscriber->base.topic->numSubscribers--;
|
| 531 |
526 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 532 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
527 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
| 533 |
528 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 534 |
|
urtMutexUnlock(topic->lock);
|
|
529 |
urtMutexUnlock(&topic->lock);
|
| 535 |
530 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 536 |
531 |
subscriber->base.topic = NULL;
|
| 537 |
532 |
subscriber->base.lastMessage = NULL;
|
| ... | ... | |
| 621 |
616 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE;
|
| 622 |
617 |
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
|
| 623 |
618 |
|
| 624 |
|
urtMutexLock(topic->lock);
|
|
619 |
urtMutexLock(&topic->lock);
|
| 625 |
620 |
if (messages)
|
| 626 |
621 |
{
|
| 627 |
|
urtContributeMessages(messages);
|
|
622 |
urtContributeMessages(messages, topic);
|
| 628 |
623 |
}
|
| 629 |
624 |
|
| 630 |
625 |
subscriber->base.lastMessage = topic->latestMessage;
|
| 631 |
626 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
| 632 |
627 |
|
| 633 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
628 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
| 634 |
629 |
|
| 635 |
630 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 636 |
631 |
topic->numHrtSubscribers--;
|
| 637 |
632 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 638 |
633 |
|
| 639 |
|
urtMutexUnlock(topic->lock);
|
|
634 |
urtMutexUnlock(&topic->lock);
|
| 640 |
635 |
return URT_STATUS_OK;
|
| 641 |
636 |
}
|
| 642 |
637 |
|
| ... | ... | |
| 660 |
655 |
if (!subscriber->base.topic)
|
| 661 |
656 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 662 |
657 |
|
| 663 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
658 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 664 |
659 |
|
| 665 |
660 |
urt_message_t* oldestMessage = subscriber->base.lastMessage;
|
| 666 |
661 |
if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
| 667 |
662 |
{
|
| 668 |
663 |
if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
| 669 |
664 |
{
|
| 670 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
665 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 671 |
666 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 672 |
667 |
}
|
| 673 |
668 |
oldestMessage = oldestMessage->next;
|
| ... | ... | |
| 677 |
672 |
oldestMessage = urtFindOldestMessage(oldestMessage->next);
|
| 678 |
673 |
}
|
| 679 |
674 |
|
| 680 |
|
urtFetchMessage(oldestMessage, subscriber, payload, bytes);
|
|
675 |
subscriber->base.lastMessage = oldestMessage;
|
|
676 |
subscriber->base.lastMessageTime = oldestMessage->originTime;
|
|
677 |
memcpy(oldestMessage->payload, payload, bytes);
|
| 681 |
678 |
|
| 682 |
679 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
|
| 683 |
680 |
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
| ... | ... | |
| 701 |
698 |
subscriber->maxLatency = calculatedLatency;
|
| 702 |
699 |
}
|
| 703 |
700 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
| 704 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
701 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 705 |
702 |
return URT_STATUS_JITTERVIOLATION;
|
| 706 |
703 |
}
|
| 707 |
704 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
| 708 |
705 |
|
| 709 |
706 |
if (latency) {
|
| 710 |
|
latency = calculatedLatency;
|
|
707 |
*latency = calculatedLatency;
|
| 711 |
708 |
}
|
| 712 |
709 |
}
|
| 713 |
710 |
|
| ... | ... | |
| 716 |
713 |
subscriber->base->numMessagesReceived++;
|
| 717 |
714 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 718 |
715 |
|
| 719 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
716 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 720 |
717 |
return URT_STATUS_OK;
|
| 721 |
718 |
}
|
| 722 |
719 |
|
| ... | ... | |
| 740 |
737 |
if (!subscriber->base.topic)
|
| 741 |
738 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 742 |
739 |
|
| 743 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
740 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 744 |
741 |
urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
|
| 745 |
742 |
|
| 746 |
743 |
if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
| 747 |
744 |
{
|
| 748 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
745 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 749 |
746 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 750 |
747 |
}
|
| 751 |
748 |
|
| 752 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
749 |
subscriber->base.lastMessage = lastMessage;
|
|
750 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
751 |
memcpy(lastMessage->payload, payload, bytes);
|
| 753 |
752 |
|
| 754 |
753 |
if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
|
| 755 |
|
uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
|
|
754 |
uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
| 756 |
755 |
|
| 757 |
756 |
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
|
| 758 |
757 |
subscriber->base.sumLatencies += calculatedLatency;
|
| ... | ... | |
| 773 |
772 |
subscriber->maxLatency = calculatedLatency;
|
| 774 |
773 |
}
|
| 775 |
774 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
| 776 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
775 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 777 |
776 |
return URT_STATUS_JITTERVIOLATION;
|
| 778 |
777 |
}
|
| 779 |
778 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
|
| 780 |
779 |
|
| 781 |
780 |
if (latency) {
|
| 782 |
|
latency = calculatedLatency;
|
|
781 |
*latency = calculatedLatency;
|
| 783 |
782 |
}
|
| 784 |
783 |
}
|
| 785 |
784 |
|
| 786 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
785 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 787 |
786 |
return URT_STATUS_OK;
|
| 788 |
787 |
}
|
| 789 |
788 |
|
| ... | ... | |
| 830 |
829 |
}
|
| 831 |
830 |
|
| 832 |
831 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
| 833 |
|
urtMutexLock(topic->lock);
|
|
832 |
urtMutexLock(&topic->lock);
|
| 834 |
833 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 835 |
834 |
|
| 836 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
|
835 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
| 837 |
836 |
|
| 838 |
837 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 839 |
838 |
subscriber->base.topic->numSubscribers--;
|
| 840 |
839 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 841 |
840 |
|
| 842 |
841 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 843 |
|
urtMutexUnlock(topic->lock);
|
|
842 |
urtMutexUnlock(&topic->lock);
|
| 844 |
843 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 845 |
844 |
subscriber->base.topic = NULL;
|
| 846 |
845 |
subscriber->base.lastMessage = NULL;
|
| ... | ... | |
| 939 |
938 |
subscriber->expectedRate = rate;
|
| 940 |
939 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
| 941 |
940 |
|
| 942 |
|
urtMutexLock(topic->lock);
|
|
941 |
urtMutexLock(&topic->lock);
|
| 943 |
942 |
if (messages)
|
| 944 |
943 |
{
|
| 945 |
|
urtContributeMessages(messages);
|
|
944 |
urtContributeMessages(messages, topic);
|
| 946 |
945 |
}
|
| 947 |
946 |
|
| 948 |
947 |
subscriber->base.lastMessage = topic->latestMessage;
|
| 949 |
948 |
subscriber->base.lastMessageTime = topic->latestMessage->originTime;
|
| 950 |
949 |
|
| 951 |
|
urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
|
950 |
urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
|
| 952 |
951 |
|
| 953 |
952 |
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
| 954 |
953 |
//TODO: Implement
|
| ... | ... | |
| 959 |
958 |
topic->numSubscribers--;
|
| 960 |
959 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 961 |
960 |
|
| 962 |
|
urtMutexUnlock(topic->lock);
|
|
961 |
urtMutexUnlock(&topic->lock);
|
| 963 |
962 |
return URT_STATUS_OK;
|
| 964 |
963 |
}
|
| 965 |
964 |
|
| ... | ... | |
| 985 |
984 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 986 |
985 |
}
|
| 987 |
986 |
|
| 988 |
|
urtMutexLock(subscriber->base.topic->lock);
|
| 989 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
| 990 |
|
if (messageTemp->next->originTime > messageTemp.originTime)
|
|
987 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
988 |
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
|
989 |
if (lastMessage->next->originTime > lastMessage->originTime)
|
| 991 |
990 |
{
|
| 992 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
991 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 993 |
992 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 994 |
993 |
}
|
| 995 |
|
messageTemp = messageTemp->next;
|
|
994 |
lastMessage = lastMessage->next;
|
| 996 |
995 |
|
| 997 |
|
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
| 998 |
|
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
|
|
996 |
uint64_t calculatedLatency;
|
|
997 |
if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
|
|
998 |
calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
|
999 |
}
|
| 999 |
1000 |
|
| 1000 |
1001 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
| 1001 |
1002 |
subscriber->base.sumLatencies += calculatedLatency;
|
| ... | ... | |
| 1008 |
1009 |
}
|
| 1009 |
1010 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 1010 |
1011 |
|
| 1011 |
|
if (latency) {
|
| 1012 |
|
latency = calculatedLatency;
|
| 1013 |
|
}
|
|
1012 |
if (latency) {
|
|
1013 |
*latency = calculatedLatency;
|
| 1014 |
1014 |
}
|
| 1015 |
1015 |
|
| 1016 |
1016 |
subscriber->base.lastMessage->numHrtConsumersLeft--;
|
| 1017 |
1017 |
if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
|
| 1018 |
1018 |
{
|
| 1019 |
|
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
|
1019 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
| 1020 |
1020 |
}
|
| 1021 |
1021 |
|
| 1022 |
1022 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
| ... | ... | |
| 1032 |
1032 |
subscriber->maxLatency = calculatedLatency;
|
| 1033 |
1033 |
}
|
| 1034 |
1034 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
| 1035 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
1035 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 1036 |
1036 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
|
| 1037 |
1037 |
return URT_STATUS_JITTERVIOLATION;
|
| 1038 |
1038 |
}
|
| ... | ... | |
| 1047 |
1047 |
}
|
| 1048 |
1048 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 1049 |
1049 |
|
| 1050 |
|
urtFetchMessage(messageTemp, subscriber, payload, bytes);
|
|
1050 |
subscriber->base.lastMessage = lastMessage;
|
|
1051 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
1052 |
memcpy(lastMessage->payload, payload, bytes);
|
| 1051 |
1053 |
|
| 1052 |
1054 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
| 1053 |
|
if (messageTemp->next->originTime < messageTemp->originTime)
|
|
1055 |
if (lastMessage->next->originTime < lastMessage->originTime)
|
| 1054 |
1056 |
{
|
| 1055 |
1057 |
//TODO: first reset?! (when ... set)
|
| 1056 |
1058 |
urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
|
| ... | ... | |
| 1061 |
1063 |
}
|
| 1062 |
1064 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
| 1063 |
1065 |
|
| 1064 |
|
urtMutexUnlock(subscriber->base.topic->lock);
|
|
1066 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 1065 |
1067 |
return URT_STATUS_OK;
|
| 1066 |
1068 |
}
|
| 1067 |
1069 |
|
| ... | ... | |
| 1085 |
1087 |
return URT_STATUS_FETCH_NOTOPIC;
|
| 1086 |
1088 |
}
|
| 1087 |
1089 |
|
| 1088 |
|
urtMutexLock(subscriber->base.topic->lock);
|
|
1090 |
urtMutexLock(&subscriber->base.topic->lock);
|
| 1089 |
1091 |
urt_message_t* lastMessage = subscriber->base.lastMessage;
|
| 1090 |
1092 |
bool hrtZero = false;
|
| 1091 |
1093 |
while (lastMessage->next->originTime < lastMessage->originTime) {
|
| 1092 |
1094 |
lastMessage = lastMessage->next;
|
| 1093 |
1095 |
lastMessage->numHrtConsumersLeft--;
|
| 1094 |
1096 |
if (lastMessage->numHrtConsumersLeft == 0) {
|
| 1095 |
|
hertZero = true;
|
|
1097 |
hrtZero = true;
|
| 1096 |
1098 |
}
|
| 1097 |
1099 |
#if (URT_CFG_PUBSUB_PROFILING == true)
|
| 1098 |
1100 |
lastMessage->numConsumersLeft--;
|
| ... | ... | |
| 1104 |
1106 |
}
|
| 1105 |
1107 |
|
| 1106 |
1108 |
if (hrtZero) {
|
| 1107 |
|
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
|
1109 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
| 1108 |
1110 |
}
|
| 1109 |
1111 |
|
| 1110 |
1112 |
if (lastMessage->originTime == subscriber->base.lastMessageTime) {
|
| 1111 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
1113 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 1112 |
1114 |
return URT_STATUS_FETCH_NOMESSAGE;
|
| 1113 |
1115 |
}
|
| 1114 |
1116 |
|
| 1115 |
|
if (URT_CFG_PUBSUB_PROFILING == true || latency) {
|
| 1116 |
|
uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
|
|
1117 |
uint64_t calculatedLatency;
|
|
1118 |
if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
|
|
1119 |
calculatedLatency = urtTimeNow() - lastMessage->originTime;
|
|
1120 |
}
|
| 1117 |
1121 |
|
| 1118 |
1122 |
#if(URT_CFG_PUBSUB_PROFILING == true)
|
| 1119 |
1123 |
subscriber->base.sumLatencies += calculatedLatency;
|
| ... | ... | |
| 1126 |
1130 |
}
|
| 1127 |
1131 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 1128 |
1132 |
|
| 1129 |
|
if (latency) {
|
| 1130 |
|
latency = calculatedLatency;
|
| 1131 |
|
}
|
|
1133 |
if (latency) {
|
|
1134 |
*latency = calculatedLatency;
|
| 1132 |
1135 |
}
|
| 1133 |
1136 |
|
| 1134 |
1137 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
|
| ... | ... | |
| 1139 |
1142 |
subscriber->maxLatency = calculatedLatency;
|
| 1140 |
1143 |
}
|
| 1141 |
1144 |
else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
|
| 1142 |
|
urtMutexUnlock(subscriber->base.topic);
|
|
1145 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
| 1143 |
1146 |
urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
|
| 1144 |
1147 |
return URT_STATUS_JITTERVIOLATION;
|
| 1145 |
1148 |
}
|
| ... | ... | |
| 1154 |
1157 |
}
|
| 1155 |
1158 |
#endif /* URT_CFG_PUBSUB_PROFILING */
|
| 1156 |
1159 |
|
| 1157 |
|
urtFetchMessage(lastMessage, subscriber, payload, bytes);
|
|
1160 |
subscriber->base.lastMessage = lastMessage;
|
|
1161 |
subscriber->base.lastMessageTime = lastMessage->originTime;
|
|
1162 |
memcpy(lastMessage->payload, payload, bytes);
|
| 1158 |
1163 |
|
| 1159 |
1164 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
|
| 1160 |
1165 |
urtTimerReset(subscriber->qosDeadlineTimer);
|
| 1161 |
1166 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
|
|
1167 |
|
|
1168 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1169 |
return URT_STATUS_OK;
|
| 1162 |
1170 |
}
|
| 1163 |
1171 |
|
| 1164 |
1172 |
/**
|
| ... | ... | |
| 1173 |
1181 |
{
|
| 1174 |
1182 |
urtDebugAssert(subscriber);
|
| 1175 |
1183 |
|
| 1176 |
|
if (subscriber->base.topic)
|
| 1177 |
|
{
|
| 1178 |
|
urtMutexLock(topic->lock);
|
| 1179 |
|
urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
|
| 1180 |
|
subscriber->base.topic->numHrtSubscribers--;
|
|
1184 |
if(!subscriber->base.topic) {
|
|
1185 |
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
1186 |
}
|
|
1187 |
|
|
1188 |
urtMutexLock(&subscriber->base.topic->lock);
|
|
1189 |
urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
|
|
1190 |
subscriber->base.topic->numHrtSubscribers--;
|
| 1181 |
1191 |
# if (URT_CFG_PUBSUB_PROFILING == true)
|
| 1182 |
|
subscriber->base.topic->numSubscribers--;
|
|
1192 |
subscriber->base.topic->numSubscribers--;
|
| 1183 |
1193 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 1184 |
1194 |
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
|
| 1185 |
|
//TODO: remove self from topics lsit of HRT subscribers
|
| 1186 |
|
//TODO: ...
|
|
1195 |
//TODO: remove self from topics list of HRT subscribers, ...
|
| 1187 |
1196 |
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
|
| 1188 |
1197 |
|
| 1189 |
|
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
| 1190 |
|
bool hrtZero = false;
|
| 1191 |
|
while (messageTemp->next->originTime < messageTemp->originTime)
|
| 1192 |
|
{
|
| 1193 |
|
messageTemp = messageTemp->next;
|
| 1194 |
|
messageTemp->numHrtConsumersLeft--;
|
| 1195 |
|
if (messageTemp->numHrtConsumersLeft == 0)
|
| 1196 |
|
{
|
| 1197 |
|
hrtZero = true;
|
| 1198 |
|
}
|
|
1198 |
urt_message_t* messageTemp = subscriber->base.lastMessage;
|
|
1199 |
bool hrtZero = false;
|
|
1200 |
while (messageTemp->next->originTime < messageTemp->originTime){
|
|
1201 |
messageTemp = messageTemp->next;
|
|
1202 |
messageTemp->numHrtConsumersLeft--;
|
|
1203 |
if (messageTemp->numHrtConsumersLeft == 0) {
|
|
1204 |
hrtZero = true;
|
|
1205 |
}
|
| 1199 |
1206 |
# if(URT_CFG_PUBSUB_PROFILING == true)
|
| 1200 |
|
messageTemp->numConsumersLeft--;
|
|
1207 |
messageTemp->numConsumersLeft--;
|
| 1201 |
1208 |
# endif /* URT_CFG_PUBSUB_PROFILING */
|
| 1202 |
|
}
|
| 1203 |
|
if (hrtZero)
|
| 1204 |
|
{
|
| 1205 |
|
urtCondvarSignal(subscriber->base.topic->hrtReleased);
|
| 1206 |
|
}
|
| 1207 |
|
|
| 1208 |
|
urtMutexUnlock(topic->lock);
|
| 1209 |
|
subscriber->base.topic = NULL;
|
| 1210 |
|
subscriber->base.lastMessage = NULL;
|
| 1211 |
|
subscriber->base.lastMessageTime = 0;
|
| 1212 |
|
return URT_STATUS_OK;
|
|
1209 |
}
|
|
1210 |
if (hrtZero){
|
|
1211 |
urtCondvarSignal(&subscriber->base.topic->hrtReleased);
|
| 1213 |
1212 |
}
|
| 1214 |
1213 |
|
| 1215 |
|
return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
|
1214 |
urtMutexUnlock(&subscriber->base.topic->lock);
|
|
1215 |
subscriber->base.topic = NULL;
|
|
1216 |
subscriber->base.lastMessage = NULL;
|
|
1217 |
subscriber->base.lastMessageTime = 0;
|
|
1218 |
return URT_STATUS_OK;
|
| 1216 |
1219 |
}
|
| 1217 |
1220 |
|
| 1218 |
1221 |
|