Revision fb72e91b src/urt_subscriber.c

View differences:

src/urt_subscriber.c
42 42
/* LOCAL FUNCTIONS                                                            */
43 43
/******************************************************************************/
44 44

  
45
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
46
{
47
  subscriber->base.lastMessage = message;
48
  *subscriber->base.lastMessageTime = message->originTime;
49
  memcpy(message->payload, payload, bytes);
50
}
51

  
52 45
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
53 46
{
54
  while (oldestMessage->next->originTime < oldestMessage->originTime)
55
  {
47
  while (oldestMessage->next->originTime < oldestMessage->originTime) {
56 48
    oldestMessage = oldestMessage->next;
57 49
  }
58 50
  return oldestMessage;
......
60 52

  
61 53
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
62 54
{
63
  urt_message_t* lastMessage = subscriber->base.lastMessage;
64
  while (lastMessage->next->originTime < lastMessage->originTime)
65
  {
55
  urt_message_t* lastMessage = latestMessage;
56
  while (lastMessage->next->originTime < lastMessage->originTime) {
66 57
    lastMessage = lastMessage->next;
67 58
#if (URT_CFG_PUBSUB_PROFILING == true)
68 59
    subscriber->base.lastMessage->numConsumersLeft--;
69 60
    subscriber->base->numMessagesReceived++;
70 61
#endif /* URT_CFG_PUBSUB_PROFILING */
71 62
  }
63
  return latestMessage;
72 64
}
73 65

  
74
void urtContributeMessages(urt_message_t* messages)
66
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
75 67
{
76 68
  urt_message_t* lastMessageContribute = messages;
77
  while (lastMessageContribute->next)
78
  {
69
  while (lastMessageContribute->next) {
79 70
    lastMessageContribute = lastMessageContribute->next;
80 71
  }
81 72
  lastMessageContribute->next = topic->latestMessage->next;
......
129 120
  }
130 121

  
131 122
  subscriber->base.topic = topic;
132
  urtMutexLock(topic->lock);
123
  urtMutexLock(&topic->lock);
133 124

  
134 125
  if (messages) {
135
    urtContributeMessages(messages);
126
    urtContributeMessages(messages, topic);
136 127
  }
137 128

  
138 129
  subscriber->base.lastMessage = topic->latestMessage;
139 130
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
140 131

  
141
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
132
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
142 133

  
143 134
#if (URT_CFG_PUBSUB_PROFILING == true)
144 135
    topic->numHrtSubscribers--;
145 136
#endif /* URT_CFG_PUBSUB_PROFILING */
146 137

  
147
  urtMutexUnlock(topic->lock);
138
  urtMutexUnlock(&topic->lock);
148 139
  return URT_STATUS_OK;
149 140
}
150 141

  
......
168 159
    return URT_STATUS_FETCH_NOTOPIC;
169 160
  }
170 161

  
171
  urtMutexLock(subscriber->base.topic->lock);
162
  urtMutexLock(&subscriber->base.topic->lock);
172 163

  
173 164
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
174 165
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
175 166
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
176
      urtMutexUnlock(subscriber->base.topic->lock);
167
      urtMutexUnlock(&subscriber->base.topic->lock);
177 168
      return URT_STATUS_FETCH_NOMESSAGE;
178 169
    }
179 170
    oldestMessage = oldestMessage->next;
......
182 173
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
183 174
  }
184 175

  
185
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
176
  subscriber->base.lastMessage = oldestMessage;
177
  subscriber->base.lastMessageTime = oldestMessage->originTime;
178
  memcpy(oldestMessage->payload, payload, bytes);
186 179

  
187 180
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
188 181
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
......
199 192
#endif /* URT_CFG_PUBSUB_PROFILING */
200 193

  
201 194
    if (latency) {
202
      latency = calculatedLatency;
195
      *latency = calculatedLatency;
203 196
    }
204 197
  }
205 198

  
......
208 201
  subscriber->base->numMessagesReceived++;
209 202
#endif /* URT_CFG_PUBSUB_PROFILING */
210 203

  
211
  urtMutexUnlock(subscriber->base.topic->lock);
204
  urtMutexUnlock(&subscriber->base.topic->lock);
212 205
  return URT_STATUS_OK;
213 206
}
214 207

  
......
230 223
  if (!subscriber->base.topic)
231 224
      return URT_STATUS_FETCH_NOTOPIC;
232 225

  
233
  urtMutexLock(subscriber->base.topic->lock);
226
  urtMutexLock(&subscriber->base.topic->lock);
234 227
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
235 228

  
236 229
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
237 230
  {
238
    urtMutexUnlock(subscriber->base.topic->lock);
231
    urtMutexUnlock(&subscriber->base.topic->lock);
239 232
    return URT_STATUS_FETCH_NOMESSAGE;
240 233
  }
241 234

  
242
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
235
  subscriber->base.lastMessage = lastMessage;
236
  subscriber->base.lastMessageTime = lastMessage->originTime;
237
  memcpy(lastMessage->payload, payload, bytes);
243 238

  
244 239
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
245 240
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
......
256 251
#endif /* URT_CFG_PUBSUB_PROFILING */
257 252

  
258 253
    if (latency) {
259
      latency = calculatedLatency;
254
      *latency = calculatedLatency;
260 255
    }
261 256
  }
262 257

  
263
  urtMutexUnlock(subscriber->base.topic->lock);
258
  urtMutexUnlock(&subscriber->base.topic->lock);
264 259
  return URT_STATUS_OK;
265 260
}
266 261

  
......
277 272
  if (subscriber->base.topic)
278 273
  {
279 274
# if(URT_CFG_PUBSUB_PROFILING == true)
280
      urtMutexLock(topic->lock);
275
      urtMutexLock(&topic->lock);
281 276
      subscriber->base.topic->numSubscribers--;
282 277
# endif /* URT_CFG_PUBSUB_PROFILING */
283
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
278
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
284 279
# if(URT_CFG_PUBSUB_PROFILING == true)
285
      urtMutexUnlock(topic->lock);
280
      urtMutexUnlock(&topic->lock);
286 281
      subscriber->base.topic = NULL;
287 282
      subscriber->base.lastMessage = NULL;
288 283
      subscriber->base.lastMessageTime = 0;
......
307 302
  urtEventListenerInit(subscriber->base.evtListener);
308 303
  subscriber->base.lastMessage = NULL;
309 304
  subscriber->base.lastMessageTime = 0;
310
  #if (URT_CFG_PUBSUB_PROFILING)
311
    subscriber->base.sumLatencies = 0;
312
    subscriber->base.numMessagesReceived = 0;
313
    subscriber->usefulnesscb = NULL;
314
    subscriber->cbparams = NULL;
315
    subscriber->minLatency = URT_DELAY_INFINITE;
316
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
317
  #endif /* URT_CFG_PUBSUB_PROFILING */
305
#if (URT_CFG_PUBSUB_PROFILING)
306
  subscriber->base.sumLatencies = 0;
307
  subscriber->base.numMessagesReceived = 0;
308
  subscriber->usefulnesscb = NULL;
309
  subscriber->cbparams = NULL;
310
  subscriber->minLatency = URT_DELAY_INFINITE;
311
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
312
#endif /* URT_CFG_PUBSUB_PROFILING */
318 313
  return;
319 314
}
320 315

  
......
335 330
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
336 331
 */
337 332
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
338
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
333
                                       urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
339 334
{
340 335
  urtDebugAssert(subscriber);
341 336
  urtDebugAssert(topic);
342 337

  
343
  if (subscriber->base.topic)
344
  {
338
  if (subscriber->base.topic) {
345 339
    return URT_STATUS_SUBSCRIBE_TOPICSET;
346 340
  }
347 341

  
348 342
  subscriber->base.topic = topic;
349 343
  subscriber->usefulnesscb = usefulnesscb;
350 344
  subscriber->cbparams = cbparams;
351
# if (URT_CFG_PUBSUB_PROFILING == true)
345
#if (URT_CFG_PUBSUB_PROFILING == true)
352 346
  subscriber->base.sumLatencies = 0;
353 347
  subscriber->base.numMessagesReceived = 0;
354 348
  subscriber->minLatency = URT_DELAY_INFINITE;
355 349
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
356
# endif  /* URT_CFG_PUBSUB_PROFILING */
350
#endif  /* URT_CFG_PUBSUB_PROFILING */
357 351

  
358
  urtMutexLock(topic->lock);
359
  if (messages)
360
  {
361
    urtContributeMessages(messages);
352
  urtMutexLock(&topic->lock);
353
  if (messages) {
354
    urtContributeMessages(messages, topic);
362 355
  }
363 356

  
364 357
  subscriber->base.lastMessage = topic->latestMessage;
365 358
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
366 359

  
367
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
360
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
368 361

  
369 362
# if (URT_CFG_PUBSUB_PROFILING == true)
370 363
    topic->numHrtSubscribers--;
371 364
# endif /* URT_CFG_PUBSUB_PROFILING */
372 365

  
373
  urtMutexUnlock(topic->lock);
366
  urtMutexUnlock(&topic->lock);
374 367
  return URT_STATUS_OK;
375 368
}
376 369

  
......
391 384
{
392 385
  urtDebugAssert(subscriber);
393 386

  
394
  if (!subscriber->base.topic)
395
      return URT_STATUS_FETCH_NOTOPIC;
387
  if (!subscriber->base.topic){
388
    return URT_STATUS_FETCH_NOTOPIC;
389
  }
396 390

  
397
  urtMutexLock(subscriber->base.topic->lock);
391
  urtMutexLock(&subscriber->base.topic->lock);
398 392

  
399 393
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
400
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
401
  {
402
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
403
    {
404
      urtMutexUnlock(subscriber->base.topic->lock);
394
  if(oldestMessage->originTime == subscriber->base.lastMessageTime){
395
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){
396
      urtMutexUnlock(&subscriber->base.topic->lock);
405 397
      return URT_STATUS_FETCH_NOMESSAGE;
406 398
    }
407 399
    oldestMessage = oldestMessage->next;
408 400
  }
409
  else
410
  {
401
  else{
411 402
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
412 403
  }
413 404

  
414
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
405
  subscriber->base.lastMessage = oldestMessage;
406
  subscriber->base.lastMessageTime = oldestMessage->originTime;
407
  memcpy(oldestMessage->payload, payload, bytes);
415 408

  
416 409
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
417 410
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
......
428 421
#endif /* URT_CFG_PUBSUB_PROFILING */
429 422

  
430 423
    if (latency) {
431
      latency = calculatedLatency;
424
      *latency = calculatedLatency;
432 425
    }
433 426
  }
434 427

  
......
437 430
  subscriber->base->numMessagesReceived++;
438 431
#endif /* URT_CFG_PUBSUB_PROFILING */
439 432

  
440
  urtMutexUnlock(subscriber->base.topic->lock);
433
  urtMutexUnlock(&subscriber->base.topic->lock);
441 434
  return URT_STATUS_OK;
442 435
}
443 436

  
......
462 455
    return URT_STATUS_FETCH_NOTOPIC;
463 456
  }
464 457

  
465
  urtMutexLock(subscriber->base.topic->lock);
458
  urtMutexLock(&subscriber->base.topic->lock);
466 459
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
467 460

  
468 461
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
469 462
  {
470
    urtMutexUnlock(subscriber->base.topic->lock);
463
    urtMutexUnlock(&subscriber->base.topic->lock);
471 464
    return URT_STATUS_FETCH_NOMESSAGE;
472 465
  }
473 466

  
474
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
467
  subscriber->base.lastMessage = lastMessage;
468
  subscriber->base.lastMessageTime = lastMessage->originTime;
469
  memcpy(lastMessage->payload, payload, bytes);
475 470

  
476 471
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
477 472
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
......
487 482
  }
488 483
#endif /* URT_CFG_PUBSUB_PROFILING */
489 484

  
490
    if (latency) {
491
      latency = calculatedLatency;
485
    if (latency != NULL) {
486
      *latency = calculatedLatency;
492 487
    }
493 488
  }
494 489

  
495
  urtMutexUnlock(subscriber->base.topic->lock);
490
  urtMutexUnlock(&subscriber->base.topic->lock);
496 491
  return URT_STATUS_OK;
497 492
}
498 493

  
......
508 503
{
509 504
  urtDebugAssert(subscriber);
510 505

  
511
  return subscriber->usefulnesscb(latency);
506
  return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
512 507
}
513 508

  
514 509
/**
......
526 521
  if (subscriber->base.topic)
527 522
  {
528 523
# if (URT_CFG_PUBSUB_PROFILING == true)
529
    urtMutexLock(topic->lock);
524
    urtMutexLock(&topic->lock);
530 525
    subscriber->base.topic->numSubscribers--;
531 526
# endif /* URT_CFG_PUBSUB_PROFILING */
532
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
527
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
533 528
# if (URT_CFG_PUBSUB_PROFILING == true)
534
      urtMutexUnlock(topic->lock);
529
      urtMutexUnlock(&topic->lock);
535 530
# endif /* URT_CFG_PUBSUB_PROFILING */
536 531
    subscriber->base.topic = NULL;
537 532
    subscriber->base.lastMessage = NULL;
......
621 616
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
622 617
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
623 618

  
624
  urtMutexLock(topic->lock);
619
  urtMutexLock(&topic->lock);
625 620
  if (messages)
626 621
  {
627
    urtContributeMessages(messages);
622
    urtContributeMessages(messages, topic);
628 623
  }
629 624

  
630 625
  subscriber->base.lastMessage = topic->latestMessage;
631 626
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
632 627

  
633
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
628
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
634 629

  
635 630
# if (URT_CFG_PUBSUB_PROFILING == true)
636 631
  topic->numHrtSubscribers--;
637 632
# endif /* URT_CFG_PUBSUB_PROFILING */
638 633

  
639
  urtMutexUnlock(topic->lock);
634
  urtMutexUnlock(&topic->lock);
640 635
  return URT_STATUS_OK;
641 636
}
642 637

  
......
660 655
  if (!subscriber->base.topic)
661 656
      return URT_STATUS_FETCH_NOTOPIC;
662 657

  
663
  urtMutexLock(subscriber->base.topic->lock);
658
  urtMutexLock(&subscriber->base.topic->lock);
664 659

  
665 660
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
666 661
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
667 662
  {
668 663
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
669 664
    {
670
      urtMutexUnlock(subscriber->base.topic->lock);
665
      urtMutexUnlock(&subscriber->base.topic->lock);
671 666
      return URT_STATUS_FETCH_NOMESSAGE;
672 667
    }
673 668
    oldestMessage = oldestMessage->next;
......
677 672
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
678 673
  }
679 674

  
680
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
675
  subscriber->base.lastMessage = oldestMessage;
676
  subscriber->base.lastMessageTime = oldestMessage->originTime;
677
  memcpy(oldestMessage->payload, payload, bytes);
681 678

  
682 679
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
683 680
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
......
701 698
    subscriber->maxLatency = calculatedLatency;
702 699
  }
703 700
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
704
    urtMutexUnlock(subscriber->base.topic);
701
    urtMutexUnlock(&subscriber->base.topic->lock);
705 702
    return URT_STATUS_JITTERVIOLATION;
706 703
  }
707 704
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
708 705

  
709 706
    if (latency) {
710
      latency = calculatedLatency;
707
      *latency = calculatedLatency;
711 708
    }
712 709
  }
713 710

  
......
716 713
  subscriber->base->numMessagesReceived++;
717 714
#endif /* URT_CFG_PUBSUB_PROFILING */
718 715

  
719
  urtMutexUnlock(subscriber->base.topic->lock);
716
  urtMutexUnlock(&subscriber->base.topic->lock);
720 717
  return URT_STATUS_OK;
721 718
}
722 719

  
......
740 737
  if (!subscriber->base.topic)
741 738
      return URT_STATUS_FETCH_NOTOPIC;
742 739

  
743
  urtMutexLock(subscriber->base.topic->lock);
740
  urtMutexLock(&subscriber->base.topic->lock);
744 741
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
745 742

  
746 743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
747 744
  {
748
    urtMutexUnlock(subscriber->base.topic->lock);
745
    urtMutexUnlock(&subscriber->base.topic->lock);
749 746
    return URT_STATUS_FETCH_NOMESSAGE;
750 747
  }
751 748

  
752
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
749
    subscriber->base.lastMessage = lastMessage;
750
    subscriber->base.lastMessageTime = lastMessage->originTime;
751
    memcpy(lastMessage->payload, payload, bytes);
753 752

  
754 753
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
755
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
754
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
756 755

  
757 756
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
758 757
  subscriber->base.sumLatencies += calculatedLatency;
......
773 772
    subscriber->maxLatency = calculatedLatency;
774 773
  }
775 774
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
776
    urtMutexUnlock(subscriber->base.topic);
775
    urtMutexUnlock(&subscriber->base.topic->lock);
777 776
    return URT_STATUS_JITTERVIOLATION;
778 777
  }
779 778
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
780 779

  
781 780
    if (latency) {
782
      latency = calculatedLatency;
781
      *latency = calculatedLatency;
783 782
    }
784 783
  }
785 784

  
786
  urtMutexUnlock(subscriber->base.topic->lock);
785
  urtMutexUnlock(&subscriber->base.topic->lock);
787 786
  return URT_STATUS_OK;
788 787
}
789 788

  
......
830 829
  }
831 830

  
832 831
#if (URT_CFG_PUBSUB_PROFILING == true)
833
  urtMutexLock(topic->lock);
832
  urtMutexLock(&topic->lock);
834 833
#endif /* URT_CFG_PUBSUB_PROFILING */
835 834

  
836
  urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
835
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
837 836

  
838 837
# if (URT_CFG_PUBSUB_PROFILING == true)
839 838
  subscriber->base.topic->numSubscribers--;
840 839
# endif /* URT_CFG_PUBSUB_PROFILING */
841 840

  
842 841
# if (URT_CFG_PUBSUB_PROFILING == true)
843
  urtMutexUnlock(topic->lock);
842
  urtMutexUnlock(&topic->lock);
844 843
# endif /* URT_CFG_PUBSUB_PROFILING */
845 844
  subscriber->base.topic = NULL;
846 845
  subscriber->base.lastMessage = NULL;
......
939 938
  subscriber->expectedRate = rate;
940 939
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
941 940

  
942
  urtMutexLock(topic->lock);
941
  urtMutexLock(&topic->lock);
943 942
  if (messages)
944 943
  {
945
    urtContributeMessages(messages);
944
    urtContributeMessages(messages, topic);
946 945
  }
947 946

  
948 947
  subscriber->base.lastMessage = topic->latestMessage;
949 948
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
950 949

  
951
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
950
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
952 951

  
953 952
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
954 953
  //TODO: Implement
......
959 958
  topic->numSubscribers--;
960 959
# endif /* URT_CFG_PUBSUB_PROFILING */
961 960

  
962
  urtMutexUnlock(topic->lock);
961
  urtMutexUnlock(&topic->lock);
963 962
  return URT_STATUS_OK;
964 963
}
965 964

  
......
985 984
    return URT_STATUS_FETCH_NOTOPIC;
986 985
  }
987 986

  
988
  urtMutexLock(subscriber->base.topic->lock);
989
  urt_message_t* messageTemp = subscriber->base.lastMessage;
990
  if (messageTemp->next->originTime > messageTemp.originTime)
987
  urtMutexLock(&subscriber->base.topic->lock);
988
  urt_message_t* lastMessage = subscriber->base.lastMessage;
989
  if (lastMessage->next->originTime > lastMessage->originTime)
991 990
  {
992
    urtMutexUnlock(subscriber->base.topic->lock);
991
    urtMutexUnlock(&subscriber->base.topic->lock);
993 992
    return URT_STATUS_FETCH_NOMESSAGE;
994 993
  }
995
  messageTemp = messageTemp->next;
994
  lastMessage = lastMessage->next;
996 995

  
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
998
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
996
  uint64_t calculatedLatency;
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
998
    calculatedLatency = urtTimeNow() - lastMessage->originTime;
999
  }
999 1000

  
1000 1001
#if(URT_CFG_PUBSUB_PROFILING == true)
1001 1002
  subscriber->base.sumLatencies += calculatedLatency;
......
1008 1009
  }
1009 1010
#endif /* URT_CFG_PUBSUB_PROFILING */
1010 1011

  
1011
    if (latency) {
1012
      latency = calculatedLatency;
1013
    }
1012
  if (latency) {
1013
    *latency = calculatedLatency;
1014 1014
  }
1015 1015

  
1016 1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
1017 1017
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1018 1018
  {
1019
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1019
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1020 1020
  }
1021 1021

  
1022 1022
#if (URT_CFG_PUBSUB_PROFILING == true)
......
1032 1032
    subscriber->maxLatency = calculatedLatency;
1033 1033
  }
1034 1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035
    urtMutexUnlock(subscriber->base.topic);
1035
    urtMutexUnlock(&subscriber->base.topic->lock);
1036 1036
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1037 1037
    return URT_STATUS_JITTERVIOLATION;
1038 1038
  }
......
1047 1047
  }
1048 1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1049 1049

  
1050
  urtFetchMessage(messageTemp, subscriber, payload, bytes);
1050
    subscriber->base.lastMessage = lastMessage;
1051
    subscriber->base.lastMessageTime = lastMessage->originTime;
1052
    memcpy(lastMessage->payload, payload, bytes);
1051 1053

  
1052 1054
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1053
  if (messageTemp->next->originTime < messageTemp->originTime)
1055
  if (lastMessage->next->originTime < lastMessage->originTime)
1054 1056
  {
1055 1057
    //TODO: first reset?! (when ... set)
1056 1058
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
......
1061 1063
  }
1062 1064
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1063 1065

  
1064
  urtMutexUnlock(subscriber->base.topic->lock);
1066
  urtMutexUnlock(&subscriber->base.topic->lock);
1065 1067
  return URT_STATUS_OK;
1066 1068
}
1067 1069

  
......
1085 1087
    return URT_STATUS_FETCH_NOTOPIC;
1086 1088
  }
1087 1089

  
1088
  urtMutexLock(subscriber->base.topic->lock);
1090
  urtMutexLock(&subscriber->base.topic->lock);
1089 1091
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1090 1092
  bool hrtZero = false;
1091 1093
  while (lastMessage->next->originTime < lastMessage->originTime) {
1092 1094
    lastMessage = lastMessage->next;
1093 1095
    lastMessage->numHrtConsumersLeft--;
1094 1096
    if (lastMessage->numHrtConsumersLeft == 0) {
1095
        hertZero = true;
1097
        hrtZero = true;
1096 1098
    }
1097 1099
#if (URT_CFG_PUBSUB_PROFILING == true)
1098 1100
    lastMessage->numConsumersLeft--;
......
1104 1106
  }
1105 1107

  
1106 1108
  if (hrtZero) {
1107
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1109
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1108 1110
  }
1109 1111

  
1110 1112
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1111
    urtMutexUnlock(subscriber->base.topic);
1113
    urtMutexUnlock(&subscriber->base.topic->lock);
1112 1114
    return URT_STATUS_FETCH_NOMESSAGE;
1113 1115
  }
1114 1116

  
1115
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
1116
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
1117
  uint64_t calculatedLatency;
1118
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
1119
     calculatedLatency = urtTimeNow() - lastMessage->originTime;
1120
  }
1117 1121

  
1118 1122
#if(URT_CFG_PUBSUB_PROFILING == true)
1119 1123
  subscriber->base.sumLatencies += calculatedLatency;
......
1126 1130
  }
1127 1131
#endif /* URT_CFG_PUBSUB_PROFILING */
1128 1132

  
1129
    if (latency) {
1130
      latency = calculatedLatency;
1131
    }
1133
  if (latency) {
1134
    *latency = calculatedLatency;
1132 1135
  }
1133 1136

  
1134 1137
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
......
1139 1142
    subscriber->maxLatency = calculatedLatency;
1140 1143
  }
1141 1144
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1142
    urtMutexUnlock(subscriber->base.topic);
1145
    urtMutexUnlock(&subscriber->base.topic->lock);
1143 1146
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1144 1147
    return URT_STATUS_JITTERVIOLATION;
1145 1148
  }
......
1154 1157
  }
1155 1158
#endif /* URT_CFG_PUBSUB_PROFILING */
1156 1159

  
1157
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
1160
    subscriber->base.lastMessage = lastMessage;
1161
    subscriber->base.lastMessageTime = lastMessage->originTime;
1162
    memcpy(lastMessage->payload, payload, bytes);
1158 1163

  
1159 1164
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1160 1165
  urtTimerReset(subscriber->qosDeadlineTimer);
1161 1166
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1167

  
1168
  urtMutexUnlock(&subscriber->base.topic->lock);
1169
  return URT_STATUS_OK;
1162 1170
}
1163 1171

  
1164 1172
/**
......
1173 1181
{
1174 1182
  urtDebugAssert(subscriber);
1175 1183

  
1176
  if (subscriber->base.topic)
1177
  {
1178
    urtMutexLock(topic->lock);
1179
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
1180
    subscriber->base.topic->numHrtSubscribers--;
1184
  if(!subscriber->base.topic) {
1185
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1186
  }
1187

  
1188
  urtMutexLock(&subscriber->base.topic->lock);
1189
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
1190
  subscriber->base.topic->numHrtSubscribers--;
1181 1191
# if (URT_CFG_PUBSUB_PROFILING == true)
1182
    subscriber->base.topic->numSubscribers--;
1192
  subscriber->base.topic->numSubscribers--;
1183 1193
# endif /* URT_CFG_PUBSUB_PROFILING */
1184 1194
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1185
    //TODO: remove self from topics lsit of HRT subscribers
1186
    //TODO: ...
1195
  //TODO: remove self from topics list of HRT subscribers, ...
1187 1196
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1188 1197

  
1189
    urt_message_t* messageTemp = subscriber->base.lastMessage;
1190
    bool hrtZero = false;
1191
    while (messageTemp->next->originTime < messageTemp->originTime)
1192
    {
1193
        messageTemp = messageTemp->next;
1194
        messageTemp->numHrtConsumersLeft--;
1195
        if (messageTemp->numHrtConsumersLeft == 0)
1196
        {
1197
            hrtZero = true;
1198
        }
1198
  urt_message_t* messageTemp = subscriber->base.lastMessage;
1199
  bool hrtZero = false;
1200
  while (messageTemp->next->originTime < messageTemp->originTime){
1201
    messageTemp = messageTemp->next;
1202
    messageTemp->numHrtConsumersLeft--;
1203
    if (messageTemp->numHrtConsumersLeft == 0) {
1204
      hrtZero = true;
1205
    }
1199 1206
# if(URT_CFG_PUBSUB_PROFILING == true)
1200
        messageTemp->numConsumersLeft--;
1207
    messageTemp->numConsumersLeft--;
1201 1208
# endif /* URT_CFG_PUBSUB_PROFILING */
1202
    }
1203
    if (hrtZero)
1204
    {
1205
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
1206
    }
1207

  
1208
    urtMutexUnlock(topic->lock);
1209
    subscriber->base.topic = NULL;
1210
    subscriber->base.lastMessage = NULL;
1211
    subscriber->base.lastMessageTime = 0;
1212
    return URT_STATUS_OK;
1209
  }
1210
  if (hrtZero){
1211
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1213 1212
  }
1214 1213

  
1215
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1214
  urtMutexUnlock(&subscriber->base.topic->lock);
1215
  subscriber->base.topic = NULL;
1216
  subscriber->base.lastMessage = NULL;
1217
  subscriber->base.lastMessageTime = 0;
1218
  return URT_STATUS_OK;
1216 1219
}
1217 1220

  
1218 1221

  

Also available in: Unified diff