Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 65dc89cb

History | View | Annotate | Download (38.491 KB)

1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

    
22
#include <urtware.h>
23

    
24
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27

    
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31

    
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35

    
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39

    
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43

    
44
void urtFetchMessage ()
45
{
46
    //TODO: Update message pointer
47
    //TODO: Copy message origin time
48
    //TODO: Copy message payload
49
}
50

    
51
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
52
{
53
  while (oldestMessage->next->originTime < oldestMessage->originTime)
54
  {
55
    oldestMessage = oldestMessage->next;
56
  }
57
  return oldestMessage;
58
}
59

    
60
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
61
{
62
  urt_message_t* lastMessage = subscriber->base.lastMessage;
63
  while (lastMessage->next->originTime < lastMessage->originTime)
64
  {
65
    lastMessage = lastMessage->next;
66
#if (URT_CFG_PUBSUB_PROFILING == true)
67
    subscriber->base.lastMessage->numConsumersLeft--;
68
    subscriber->base->numMessagesReceived++;
69
#endif /* URT_CFG_PUBSUB_PROFILING */
70
  }
71
}
72

    
73
/******************************************************************************/
74
/* EXPORTED FUNCTIONS                                                         */
75
/******************************************************************************/
76

    
77
/**
78
 * @brief   Initialize the nrt Subscriber.
79
 *
80
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
81
 */
82
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
83
{
84
  urtDebugAssert(subscriber);
85

    
86
  subscriber->base.topic = NULL;
87
  urtEventListenerInit(subscriber->base.evtListener);
88
  subscriber->base.lastMessage = NULL;
89
  subscriber->base.lastMessageTime = 0;
90
#if (URT_CFG_PUBSUB_PROFILING == true)
91
    subscriber->minLatency = URT_DELAY_INFINITE;
92
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
93
#endif /* URT_CFG_PUBSUB_PROFILING */
94
  return;
95
}
96

    
97
/**
98
 * @brief  Subscribes the subscriber to a topic.
99
 *
100
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
101
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
102
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
103
 *                      Messages must not be associated to another topic.
104
 *                      Once a message has been contributed, it cannot be removed later.
105
 *                      May be NULL(no messages to contribute).
106
 *
107
 * @return  Returns URT_STATUS_OK on success.
108
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
109
 */
110
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
111
{
112
  urtDebugAssert(subscriber);
113
  urtDebugAssert(topic);
114

    
115
  if (!subscriber->base.topic)
116
      return URT_STATUS_SUBSCRIBE_TOPICSET;
117

    
118
  subscriber->base.topic = topic;
119
  urtMutexLock(topic->lock);
120

    
121
  if (messages)
122
  {
123
    urt_message_t* lastMessageContribute = messages;
124
    while (lastMessageContribute->next)
125
    {
126
        lastMessageContribute = lastMessageContribute->next;
127
    }
128
    lastMessageContribute->next = topic->latestMessage->next;
129
    topic->latestMessage->next = messages;
130
  }
131

    
132
  subscriber->base.lastMessage = topic->latestMessage;
133
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
134

    
135
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
136

    
137
#if (URT_CFG_PUBSUB_PROFILING == true)
138
    topic->numHrtSubscribers--;
139
#endif /* URT_CFG_PUBSUB_PROFILING */
140

    
141
  urtMutexUnlock(topic->lock);
142
  return URT_STATUS_OK;
143
}
144

    
145
/**
146
 * @brief  Fetches the next message.
147
 *
148
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
149
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
150
 * @param[in] bytes  Payload size in bytes.
151
 * @param[in] latency  The latency can be returned by reference. May be NULL.
152
 *
153
 * @return  Returns URT_STATUS_OK on success.
154
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
155
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
156
 */
157
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
158
{   
159
  urtDebugAssert(subscriber);
160

    
161
  if (!subscriber->base.topic)
162
      return URT_STATUS_FETCH_NOTOPIC;
163

    
164
  urtMutexLock(subscriber->base.topic->lock);
165

    
166
  urt_message_t* messageTemp = subscriber->base.lastMessage;
167
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
168
  {
169
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
170
    {
171
      urtMutexUnlock(subscriber->base.topic->lock);
172
      return URT_STATUS_FETCH_NOMESSAGE;
173
    }
174
    messageTemp = messageTemp->next;
175
  }
176
  else
177
  {
178
    messageTemp = urtFindOldestMessage(messageTemp->next);
179
  }
180

    
181
  urtFetchMessage();
182

    
183
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
184
#if(URT_CFG_PUBSUB_PROFILING == true)
185
  subscriber->base.sumLatencies += calculatedLatency;
186

    
187
  if (calculatedLatency < subscriber->minLatency)
188
  {
189
    subscriber->minLatency = calculatedLatency;
190
  }
191
  else if (calculatedLatency > subscriber->maxLatency)
192
  {
193
    subscriber->maxLatency = calculatedLatency;
194
  }
195
#endif /* URT_CFG_PUBSUB_PROFILING */
196
  bool temp = false;
197
  if (temp/*optional latency output argument given*/)
198
  {
199
    latency = calculatedLatency;
200
  }
201

    
202
#if (URT_CFG_PUBSUB_PROFILING == true)
203
  subscriber->base.lastMessage->numConsumersLeft--;
204
  subscriber->base->numMessagesReceived++;
205
#endif /* URT_CFG_PUBSUB_PROFILING */
206

    
207
  urtMutexUnlock(subscriber->base.topic->lock);
208
  return URT_STATUS_OK;
209
}
210

    
211
/**
212
 * @brief Fetches the latest message.
213
 *
214
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
215
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
216
 * @param[in] bytes  Payload size in bytes.
217
 * @param[in] latency  The latency can be returned by reference. May be NULL.
218
 *
219
 * @return  Returns URT_STATUS_OK on success.
220
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
221
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
222
 */
223
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
224
  urtDebugAssert(subscriber);
225

    
226
  if (!subscriber->base.topic)
227
      return URT_STATUS_FETCH_NOTOPIC;
228

    
229
  urtMutexLock(subscriber->base.topic->lock);
230
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
231

    
232
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
233
  {
234
    urtMutexUnlock(subscriber->base.topic->lock);
235
    return URT_STATUS_FETCH_NOMESSAGE;
236
  }
237

    
238
  urtFetchMessage();
239

    
240
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
241
#if(URT_CFG_PUBSUB_PROFILING == true)
242
  subscriber->base.sumLatencies += calculatedLatency;
243

    
244
  if (calculatedLatency < subscriber->minLatency)
245
  {
246
    subscriber->minLatency = calculatedLatency;
247
  }
248
  else if (calculatedLatency > subscriber->maxLatency)
249
  {
250
    subscriber->maxLatency = calculatedLatency;
251
  }
252
#endif /* URT_CFG_PUBSUB_PROFILING */
253
  bool temp = false;
254
  if (temp/*optional latency output argument given*/)
255
  {
256
    latency = calculatedLatency;
257
  }
258

    
259
  urtMutexUnlock(subscriber->base.topic->lock);
260
  return URT_STATUS_OK;
261
}
262

    
263
/**
264
 * @brief  Unsubscribes from a subscriber.
265
 *
266
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
267
 *
268
 * @return  Returns URT_STATUS_OK on sucess.
269
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
270
 */
271
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
272
{
273
  if (subscriber->base.topic)
274
  {
275
# if(URT_CFG_PUBSUB_PROFILING == true)
276
      urtMutexLock(topic->lock);
277
      subscriber->base.topic->numSubscribers--;
278
# endif /* URT_CFG_PUBSUB_PROFILING */
279
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
280
# if(URT_CFG_PUBSUB_PROFILING == true)
281
      urtMutexUnlock(topic->lock);
282
      subscriber->base.topic = NULL;
283
      subscriber->base.lastMessage = NULL;
284
      subscriber->base.lastMessageTime = 0;
285
#endif /* URT_CFG_PUBSUB_PROFILING */
286
    return URT_STATUS_OK;
287
  }
288

    
289
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
290
}
291

    
292

    
293
/**
294
 * @brief  Initialize the srt Subscriber.
295
 *
296
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
297
 */
298
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
299
{
300
  urtDebugAssert(subscriber);
301

    
302
  subscriber->base.topic = NULL;
303
  urtEventListenerInit(subscriber->base.evtListener);
304
  subscriber->base.lastMessage = NULL;
305
  subscriber->base.lastMessageTime = 0;
306
  #if (URT_CFG_PUBSUB_PROFILING)
307
    subscriber->base.sumLatencies = 0;
308
    subscriber->base.numMessagesReceived = 0;
309
    subscriber->usefulnesscb = NULL;
310
    subscriber->cbparams = NULL;
311
    subscriber->minLatency = URT_DELAY_INFINITE;
312
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
313
  #endif /* URT_CFG_PUBSUB_PROFILING */
314
  return;
315
}
316

    
317
/**
318
 * @brief  Subscribes the subscriber to a topic.
319
 *
320
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
321
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
322
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
323
 *                     Messages must not be associated to another topic.
324
 *                     Once a message has been contributed, it cannot be removed later.
325
 *                     May be NULL (no messages to contribute)
326
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
327
 * @param[in] cbparams  Optional parameters for the usefulness callback.
328
 *                      May be NULL if the callback expects no parameters.
329
 *
330
 * @return  Returns URT_STATUS_OK on success.
331
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
332
 */
333
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
334
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
335
{
336
  urtDebugAssert(subscriber);
337
  urtDebugAssert(topic);
338

    
339
  if (subscriber->base.topic)
340
  {
341
    return URT_STATUS_SUBSCRIBE_TOPICSET;
342
  }
343

    
344
  subscriber->base.topic = topic;
345
  subscriber->usefulnesscb = usefulnesscb;
346
  subscriber->cbparams = cbparams;
347
# if (URT_CFG_PUBSUB_PROFILING == true)
348
  subscriber->base.sumLatencies = 0;
349
  subscriber->base.numMessagesReceived = 0;
350
  subscriber->minLatency = URT_DELAY_INFINITE;
351
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
352
# endif  /* URT_CFG_PUBSUB_PROFILING */
353

    
354
  urtMutexLock(topic->lock);
355
  if (messages)
356
  {
357
    urt_message_t* lastMessageContribute = messages;
358
    while (lastMessageContribute->next)
359
    {
360
        lastMessageContribute = lastMessageContribute->next;
361
    }
362
    lastMessageContribute->next = topic->latestMessage->next;
363
    topic->latestMessage->next = messages;
364
  }
365

    
366
  subscriber->base.lastMessage = topic->latestMessage;
367
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
368

    
369
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
370

    
371
# if (URT_CFG_PUBSUB_PROFILING == true)
372
    topic->numHrtSubscribers--;
373
# endif /* URT_CFG_PUBSUB_PROFILING */
374

    
375
  urtMutexUnlock(topic->lock);
376
  return URT_STATUS_OK;
377
}
378

    
379
/**
380
 * @brief  Fetches the next message.
381
 *
382
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
383
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
384
 * @param[in] bytes  Payload size in bytes.
385
 * @param[in] latency  The latency can be returned by reference. May be NULL.
386
 *
387
 * @return  Returns URT_STATUS_OK on success.
388
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
389
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
390
 */
391
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
392
                                              size_t bytes, urt_delay_t* latency)
393
{
394
  urtDebugAssert(subscriber);
395

    
396
  if (!subscriber->base.topic)
397
      return URT_STATUS_FETCH_NOTOPIC;
398

    
399
  urtMutexLock(subscriber->base.topic->lock);
400

    
401
  urt_message_t* messageTemp = subscriber->base.lastMessage;
402
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
403
  {
404
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
405
    {
406
      urtMutexUnlock(subscriber->base.topic->lock);
407
      return URT_STATUS_FETCH_NOMESSAGE;
408
    }
409
    messageTemp = messageTemp->next;
410
  }
411
  else
412
  {
413
    messageTemp = urtFindOldestMessage(messageTemp->next);
414
  }
415

    
416
  urtFetchMessage();
417

    
418
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
419
#if(URT_CFG_PUBSUB_PROFILING == true)
420
  subscriber->base.sumLatencies += calculatedLatency;
421

    
422
  if (calculatedLatency < subscriber->minLatency)
423
  {
424
    subscriber->minLatency = calculatedLatency;
425
  }
426
  else if (calculatedLatency > subscriber->maxLatency)
427
  {
428
    subscriber->maxLatency = calculatedLatency;
429
  }
430
#endif /* URT_CFG_PUBSUB_PROFILING */
431
  bool temp = false;
432
  if (temp/*optional latency output argument given*/)
433
  {
434
    latency = calculatedLatency;
435
  }
436

    
437
#if (URT_CFG_PUBSUB_PROFILING == true)
438
  subscriber->base.lastMessage->numConsumersLeft--;
439
  subscriber->base->numMessagesReceived++;
440
#endif /* URT_CFG_PUBSUB_PROFILING */
441

    
442
  urtMutexUnlock(subscriber->base.topic->lock);
443
  return URT_STATUS_OK;
444
}
445

    
446
/**
447
 * @brief  Fetches the latest message.
448
 *
449
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
450
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
451
 * @param[in] bytes  Payload size in bytes.
452
 * @param[in] latency  The latency can be returned by reference. May be NULL.
453
 *
454
 * @return  Returns URT_STATUS_OK on success.
455
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
456
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
457
 */
458
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
459
                                                size_t bytes, urt_delay_t* latency)
460
{
461
  urtDebugAssert(subscriber);
462

    
463
  if (!subscriber->base.topic)
464
      return URT_STATUS_FETCH_NOTOPIC;
465

    
466
  urtMutexLock(subscriber->base.topic->lock);
467
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
468

    
469
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
470
  {
471
    urtMutexUnlock(subscriber->base.topic->lock);
472
    return URT_STATUS_FETCH_NOMESSAGE;
473
  }
474

    
475
  urtFetchMessage();
476

    
477
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
478
#if(URT_CFG_PUBSUB_PROFILING == true)
479
  subscriber->base.sumLatencies += calculatedLatency;
480

    
481
  if (calculatedLatency < subscriber->minLatency)
482
  {
483
    subscriber->minLatency = calculatedLatency;
484
  }
485
  else if (calculatedLatency > subscriber->maxLatency)
486
  {
487
    subscriber->maxLatency = calculatedLatency;
488
  }
489
#endif /* URT_CFG_PUBSUB_PROFILING */
490
  bool temp = false;
491
  if (temp/*optional latency output argument given*/)
492
  {
493
    latency = calculatedLatency;
494
  }
495

    
496
  urtMutexUnlock(subscriber->base.topic->lock);
497
  return URT_STATUS_OK;
498
}
499

    
500
/**
501
 * @brief  Calculates the usefulness of the subscriber.
502
 *
503
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
504
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
505
 *
506
 * @return  Returns the usefulness as a value within [0,1].
507
 */
508
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
509
{
510
  urtDebugAssert(subscriber);
511

    
512
  return subscriber->usefulnesscb(latency);
513
}
514

    
515
/**
516
 * @brief  Unsubscribes from a subscriber.
517
 *
518
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
519
 *
520
 * @return  Returns URT_STATUS_OK on sucess.
521
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
522
 */
523
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
524
{
525
  urtDebugAssert(subscriber);
526

    
527
  if (subscriber->base.topic)
528
  {
529
# if (URT_CFG_PUBSUB_PROFILING == true)
530
    urtMutexLock(topic->lock);
531
    subscriber->base.topic->numSubscribers--;
532
# endif /* URT_CFG_PUBSUB_PROFILING */
533
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
534
# if (URT_CFG_PUBSUB_PROFILING == true)
535
      urtMutexUnlock(topic->lock);
536
# endif /* URT_CFG_PUBSUB_PROFILING */
537
    subscriber->base.topic = NULL;
538
    subscriber->base.lastMessage = NULL;
539
    subscriber->base.lastMessageTime = 0;
540
    return URT_STATUS_OK;
541
  }
542

    
543
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
544
}
545

    
546

    
547
/**
548
 * @brief  Initialize the FRT Subscriber.
549
 *
550
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
551
 */
552
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
553
{
554
  urtDebugAssert(subscriber);
555

    
556
  subscriber->base.topic = NULL;
557
  urtEventListenerInit(subscriber->base.evtListener);
558
  subscriber->base.lastMessage = NULL;
559
  subscriber->base.lastMessageTime = 0;
560

    
561
  #if (URT_CFG_PUBSUB_PROFILING)
562
    subscriber->base.sumLatencies = 0;
563
    subscriber->base.numMessagesReceived = 0;
564
  #endif /* URT_CFG_PUBSUB_PROFILING */
565

    
566
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
567
    subscriber->deadlineOffset = 0;
568
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
569

    
570
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
571
    subscriber->maxJitter = 0;
572
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
573

    
574
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
575
    subscriber->minLatency = URT_DELAY_INFINITE;
576
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
577
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
578
  return;
579
}
580

    
581

    
582
/**
583
 * @brief  Subscribes the subscriber to a topic.
584
 *
585
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
586
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
587
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
588
 *                      Messages must not be associated to another topic.
589
 *                      Once a message has been contributed, it cannot be removed later.
590
 *                      May be NULL(no messages to contribute).
591
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
592
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
593
 *                    A value of 0 indicates that jitter is of no concern.
594
 *
595
 * @return  Returns URT_STATUS_OK on success.
596
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
597
 */
598
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
599
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
600
{
601
  urtDebugAssert(subscriber);
602
  urtDebugAssert(topic);
603

    
604
  if (subscriber->base.topic)
605
  {
606
    return URT_STATUS_SUBSCRIBE_TOPICSET;
607
  }
608

    
609
  subscriber->base.topic = topic;
610
# if (URT_CFG_PUBSUB_PROFILING == true)
611
  subscriber->base.sumLatencies = 0;
612
  subscriber->base.numMessagesReceived = 0;
613
# endif /* URT_CFG_PUBSUB_PROFILING */
614
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
615
  subscriber->deadlineOffset = deadline;
616
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
617
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
618
  subscriber->maxJitter =jitter;
619
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
620
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
621
  subscriber->minLatency = URT_DELAY_INFINITE;
622
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
623
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
624

    
625
  urtMutexLock(topic->lock);
626
  if (messages)
627
  {
628
    urt_message_t* lastMessageContribute = messages;
629
    while (lastMessageContribute->next)
630
    {
631
      lastMessageContribute = lastMessageContribute->next;
632
    }
633
    lastMessageContribute->next = topic->latestMessage->next;
634
    topic->latestMessage->next = messages;
635
  }
636

    
637
  subscriber->base.lastMessage = topic->latestMessage;
638
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
639

    
640
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
641

    
642
# if (URT_CFG_PUBSUB_PROFILING == true)
643
  topic->numHrtSubscribers--;
644
# endif /* URT_CFG_PUBSUB_PROFILING */
645

    
646
  urtMutexUnlock(topic->lock);
647
  return URT_STATUS_OK;
648
}
649

    
650
/**
651
 * @brief  Fetches the next message.
652
 *
653
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
654
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
655
 * @param[in] bytes  Payload size in bytes.
656
 * @param[in] latency  The latency can be returned by reference. May be NULL.
657
 *
658
 * @return  Returns URT_STATUS_OK on success.
659
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
660
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
661
 */
662
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
663
                                              size_t bytes, urt_delay_t* latency)
664
{
665
  urtDebugAssert(subscriber);
666

    
667
  if (!subscriber->base.topic)
668
      return URT_STATUS_FETCH_NOTOPIC;
669

    
670
  urtMutexLock(subscriber->base.topic->lock);
671

    
672
  urt_message_t* messageTemp = subscriber->base.lastMessage;
673
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
674
  {
675
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
676
    {
677
      urtMutexUnlock(subscriber->base.topic->lock);
678
      return URT_STATUS_FETCH_NOMESSAGE;
679
    }
680
    messageTemp = messageTemp->next;
681
  }
682
  else
683
  {
684
    messageTemp = urtFindOldestMessage(messageTemp->next);
685
  }
686

    
687
  urtFetchMessage();
688

    
689
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
690
#if(URT_CFG_PUBSUB_PROFILING == true)
691
  subscriber->base.sumLatencies += calculatedLatency;
692
#endif /* URT_CFG_PUBSUB_PROFILING */
693

    
694
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
695
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
696
  {
697
    subscriber->minLatency = calculatedLatency;
698
  }
699
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
700
  {
701
    subscriber->maxLatency = calculatedLatency;
702
  }
703
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
704

    
705
  bool temp = false;
706
  if (temp/*optional latency output argument given*/)
707
  {
708
    latency = calculatedLatency;
709
  }
710

    
711
#if (URT_CFG_PUBSUB_PROFILING == true)
712
  subscriber->base.lastMessage->numConsumersLeft--;
713
  subscriber->base->numMessagesReceived++;
714
#endif /* URT_CFG_PUBSUB_PROFILING */
715

    
716
  urtMutexUnlock(subscriber->base.topic->lock);
717
  return URT_STATUS_OK; //TODO: or urt_status_jitterviolation
718
}
719

    
720
/**
721
 * @brief  Fetches the latest message.
722
 *
723
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
724
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
725
 * @param[in] bytes  Payload size in bytes.
726
 * @param[in] latency  The latency can be returned by reference. May be NULL.
727
 *
728
 * @return  Returns URT_STATUS_OK on success.
729
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
730
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
731
 */
732
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
733
                                                size_t bytes, urt_delay_t* latency)
734
{
735
  urtDebugAssert(subscriber);
736

    
737
  if (!subscriber->base.topic)
738
      return URT_STATUS_FETCH_NOTOPIC;
739

    
740
  urtMutexLock(subscriber->base.topic->lock);
741
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
742

    
743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
744
  {
745
    urtMutexUnlock(subscriber->base.topic->lock);
746
    return URT_STATUS_FETCH_NOMESSAGE;
747
  }
748

    
749
  urtFetchMessage();
750

    
751
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
752
#if(URT_CFG_PUBSUB_PROFILING == true)
753
  subscriber->base.sumLatencies += calculatedLatency;
754
#endif /* URT_CFG_PUBSUB_PROFILING */
755

    
756
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
757
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
758
  {
759
    subscriber->minLatency = calculatedLatency;
760
  }
761
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
762
  {
763
    subscriber->maxLatency = calculatedLatency;
764
  }
765
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
766

    
767
  bool temp = false;
768
  if (temp/*optional latency output argument given*/)
769
  {
770
    latency = calculatedLatency;
771
  }
772

    
773
  urtMutexUnlock(subscriber->base.topic->lock);
774
  return URT_STATUS_OK;
775
}
776

    
777
/**
778
 * @brief  Calculates the validity from the subscriber.
779
 *
780
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
781
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
782
 *
783
 * @return  Returns a boolean indicator whether the latency is fine.
784
 */
785
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
786
{
787
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
788
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
789
    return true;
790
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
791

    
792
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
793
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
794
      return true;
795

    
796
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
797
      return true;
798
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
799

    
800
  return false;
801
}
802

    
803
/**
804
 * @brief  Unsubscribes from a subscriber.
805
 *
806
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
807
 *
808
 * @return  Returns URT_STATUS_OK on sucess.
809
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
810
 */
811
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
812
{
813
  urtDebugAssert(subscriber);
814

    
815
  if (subscriber->base.topic)
816
  {
817
    urtMutexLock(topic->lock);
818
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
819
    //TODO: decrement topic's HRT counter
820
# if (URT_CFG_PUBSUB_PROFILING == true)
821
    subscriber->base.topic->numSubscribers--;
822
# endif /* URT_CFG_PUBSUB_PROFILING */
823
//Hier weiter
824

    
825
# if (URT_CFG_PUBSUB_PROFILING == true)
826
    urtMutexUnlock(topic->lock);
827
# endif /* URT_CFG_PUBSUB_PROFILING */
828
    subscriber->base.topic = NULL;
829
    subscriber->base.lastMessage = NULL;
830
    subscriber->base.lastMessageTime = 0;
831
    return URT_STATUS_OK;
832
  }
833

    
834
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
835
}
836

    
837

    
838
/**
839
 * @brief  Initialize the HRT Subscriber.
840
 *
841
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
842
 */
843
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
844
{
845
  urtDebugAssert(subscriber);
846

    
847
  subscriber->base.topic = NULL;
848
  urtEventListenerInit(subscriber->base.evtListener);
849
  subscriber->base.lastMessage = NULL;
850
  subscriber->base.lastMessageTime = 0;
851

    
852
# if (URT_CFG_PUBSUB_PROFILING)
853
    subscriber->base.sumLatencies = 0;
854
    subscriber->base.numMessagesReceived = 0;
855
# endif /* URT_CFG_PUBSUB_PROFILING */
856

    
857
  subscriber->next = NULL;
858

    
859
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
860
    subscriber->deadlineOffset = 0;
861
    urtTimerInit(subscriber->qodDeadlineTimer);
862
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
863

    
864
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
865
    subscriber->expectedRate = 0;
866
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
867

    
868
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
869
    subscriber->maxJitter = 0;
870
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
871

    
872
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
873
    subscriber->minLatency = URT_DELAY_INFINITE;
874
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
875
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
876
  return;
877
}
878

    
879

    
880
/**
881
 * @brief  Subscribes the subscriber to a topic.
882
 *
883
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
884
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
885
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
886
 *                      Messages must not be associated to another topic.
887
 *                      Once a message has been contributed, it cannot be removed later.
888
 *                      May be NULL(no messages to contribute).
889
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
890
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
891
 *                  A value of 0 indicates, that rate is of no concern.
892
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
893
 *                    A value of 0 indicates that jitter is of no concern.
894
 *
895
 * @return  Returns URT_STATUS_OK on success.
896
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
897
 */
898
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
899
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
900
{
901
  urtDebugAssert(subscriber);
902
  urtDebugAssert(topic);
903

    
904
  if (subscriber->base.topic)
905
  {
906
    return URT_STATUS_SUBSCRIBE_TOPICSET;
907
  }
908

    
909
  subscriber->base.topic = topic;
910
# if (URT_CFG_PUBSUB_PROFILING == true)
911
  subscriber->base.sumLatencies = 0;
912
  subscriber->base.numMessagesReceived = 0;
913
# endif /* URT_CFG_PUBSUB_PROFILING */
914
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
915
  subscriber->deadlineOffset = deadline;
916
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
917
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
918
  subscriber->maxJitter =jitter;
919
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
920
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
921
  subscriber->minLatency = URT_DELAY_INFINITE;
922
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
923
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
924
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
925
  subscriber->expectedRate = rate;
926
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
927

    
928
  urtMutexLock(topic->lock);
929
  if (messages)
930
  {
931
    urt_message_t* lastMessageContribute = messages;
932
    while (lastMessageContribute->next)
933
    {
934
      lastMessageContribute = lastMessageContribute->next;
935
    }
936
    lastMessageContribute->next = topic->latestMessage->next;
937
    topic->latestMessage->next = messages;
938
  }
939

    
940
  subscriber->base.lastMessage = topic->latestMessage;
941
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
942

    
943
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
944

    
945
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
946
  //TODO: Implement
947
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
948

    
949
  topic->numHrtSubscribers--;
950
# if (URT_CFG_PUBSUB_PROFILING == true)
951
  topic->numSubscribers--;
952
# endif /* URT_CFG_PUBSUB_PROFILING */
953

    
954
  urtMutexUnlock(topic->lock);
955
  return URT_STATUS_OK;
956
}
957

    
958

    
959
/**
960
 * @brief  Fetches the next message.
961
 *
962
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
963
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
964
 * @param[in] bytes  Payload size in bytes.
965
 * @param[in] latency  The latency can be returned by reference. May be NULL.
966
 *
967
 * @return  Returns URT_STATUS_OK on success.
968
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
969
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
970
 */
971
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
972
                                              size_t bytes, urt_delay_t* latency)
973
{
974
  urtDebugAssert(subscriber);
975

    
976
  if (!subscriber->base.topic)
977
      return URT_STATUS_FETCH_NOTOPIC;
978

    
979
  urtMutexLock(subscriber->base.topic->lock);
980
  urt_message_t* messageTemp = subscriber->base.lastMessage;
981
  if (messageTemp->next->originTime > messageTemp.originTime)
982
  {
983
    urtMutexUnlock(subscriber->base.topic->lock);
984
    return URT_STATUS_FETCH_NOMESSAGE;
985
  }
986
  messageTemp = messageTemp->next;
987

    
988
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
989
#if (URT_CFG_PUBSUB_PROFILING == true)
990
  subscriber->base.sumLatencies += calculatedLatency;
991
#endif /* URT_CFG_PUBSUB_PROFILING */
992
  bool temp = false;
993
  if (temp /* optional latency output argument given */)
994
  {
995
    latency = calculatedLatency
996
  }
997

    
998
  subscriber->base.lastMessage->numHrtConsumersLeft--;
999
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1000
  {
1001
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1002
  }
1003

    
1004
#if (URT_CFG_PUBSUB_PROFILING == true)
1005
  subscriber->base.lastMessage->numConsumersLeft--;
1006
  subscriber->base->numMessagesReceived++;
1007
#endif /* URT_CFG_PUBSUB_PROFILING */
1008

    
1009
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
1010
  if (temp /*latency is within allowed jitter range*/)
1011
  {
1012
    if (calculatedLatency < subscriber->minLatency)
1013
    {
1014
      subscriber->minLatency = calculatedLatency;
1015
    }
1016
    else if (calculatedLatency > subscriber->maxLatency)
1017
    {
1018
      subscriber->maxLatency = calculatedLatency;
1019
    }
1020
  }
1021
  else
1022
  {
1023
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1024
    urtMutexUnlock(subscriber->base.topic->lock);
1025
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1026
    return URT_STATUS_JITTERVIOLATION;
1027
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1028
  }
1029
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
1030

    
1031
  urtFetchMessage();
1032

    
1033
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1034
  if (messageTemp->next->originTime < messageTemp->originTime)
1035
  {
1036
    //TODO: update qos deadliner timer wrt. next message
1037
  }
1038
  else
1039
  {
1040
    //TODO: reset qos deadline timer
1041
  }
1042
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1043

    
1044
  urtMutexUnlock(subscriber->base.topic->lock);
1045
  return URT_STATUS_OK;
1046
}
1047

    
1048

    
1049
/**
1050
 * @brief  Fetches the latest message.
1051
 *
1052
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
1053
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
1054
 * @param[in] bytes  Payload size in bytes.
1055
 * @param[in] latency  The latency can be returned by reference. May be NULL.
1056
 *
1057
 * @return  Returns URT_STATUS_OK on success.
1058
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1059
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1060
 */
1061
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
1062
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
1063

    
1064
/**
1065
 * @brief  Unsubscribes from a subscriber.
1066
 *
1067
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
1068
 *
1069
 * @return  Returns URT_STATUS_OK on sucess.
1070
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
1071
 */
1072
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
1073
{
1074
  urtDebugAssert(subscriber);
1075

    
1076
  if (subscriber->base.topic)
1077
  {
1078
    urtMutexLock(topic->lock);
1079
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
1080
    subscriber->base.topic->numHrtSubscribers--;
1081
# if (URT_CFG_PUBSUB_PROFILING == true)
1082
    subscriber->base.topic->numSubscribers--;
1083
# endif /* URT_CFG_PUBSUB_PROFILING */
1084
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1085
    //TODO: remove self from topics lsit of HRT subscribers
1086
    //TODO: ...
1087
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1088

    
1089
    urt_message_t* messageTemp = subscriber->base.lastMessage;
1090
    bool hrtZero = false;
1091
    while (messageTemp->next->originTime < messageTemp->originTime)
1092
    {
1093
        messageTemp = messageTemp->next;
1094
        messageTemp->numHrtConsumersLeft--;
1095
        if (messageTemp->numHrtConsumersLeft == 0)
1096
        {
1097
            hrtZero = true;
1098
        }
1099
# if(URT_CFG_PUBSUB_PROFILING == true)
1100
        messageTemp->numConsumersLeft--;
1101
# endif /* URT_CFG_PUBSUB_PROFILING */
1102
    }
1103
    if (hrtZero)
1104
    {
1105
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
1106
    }
1107

    
1108
    urtMutexUnlock(topic->lock);
1109
    subscriber->base.topic = NULL;
1110
    subscriber->base.lastMessage = NULL;
1111
    subscriber->base.lastMessageTime = 0;
1112
    return URT_STATUS_OK;
1113
  }
1114

    
1115
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1116
}
1117

    
1118

    
1119