Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 5b7188aa

History | View | Annotate | Download (27.8 KB)

1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

    
22
#include <urtware.h>
23

    
24
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27

    
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31

    
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35

    
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39

    
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43

    
44
/******************************************************************************/
45
/* EXPORTED FUNCTIONS                                                         */
46
/******************************************************************************/
47

    
48
/**
49
 * @brief   Initialize the nrt Subscriber.
50
 *
51
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
52
 */
53
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54
{
55
  urtDebugAssert(subscriber);
56

    
57
  subscriber->base.topic = NULL;
58
  urtEventListenerInit(subscriber->base.evtListener);
59
  subscriber->base.lastMessage = NULL;
60
  subscriber->base.lastMessageTime = 0;
61
#if (URT_CFG_PUBSUB_PROFILING == true)
62
    subscriber->minLatency = URT_DELAY_INFINITE;
63
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
64
#endif /* URT_CFG_PUBSUB_PROFILING */
65
  return;
66
}
67

    
68
/**
69
 * @brief  Subscribes the subscriber to a topic.
70
 *
71
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
72
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
73
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
74
 *                      Messages must not be associated to another topic.
75
 *                      Once a message has been contributed, it cannot be removed later.
76
 *                      May be NULL(no messages to contribute).
77
 *
78
 * @return  Returns URT_STATUS_OK on success.
79
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
80
 */
81
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
82
{
83
  urtDebugAssert(subscriber);
84
  urtDebugAssert(topic);
85

    
86
  if (!subscriber->base.topic)
87
      return URT_STATUS_SUBSCRIBE_TOPICSET;
88

    
89
  subscriber->base.topic = topic;
90
  //TODO: Lock topic
91

    
92
  if (messages)
93
  {
94
    urt_message_t* lastMessageContribute = messages;
95
    while (lastMessageContribute->next)
96
    {
97
        lastMessageContribute = lastMessageContribute->next;
98
    }
99
    lastMessageContribute->next = topic->latestMessage->next;
100
    topic->latestMessage->next = messages;
101
  }
102

    
103
  subscriber->base.lastMessage = topic->latestMessage;
104
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
105

    
106
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
107

    
108
#if (URT_CFG_PUBSUB_PROFILING == true)
109
    topic->numHrtSubscribers--;
110
#endif /* URT_CFG_PUBSUB_PROFILING */
111

    
112
  //TODO: Unlock topic
113
  return URT_STATUS_OK;
114
}
115

    
116
/**
117
 * @brief  Fetches the next message.
118
 *
119
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
120
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
121
 * @param[in] bytes  Payload size in bytes.
122
 * @param[in] latency  The latency can be returned by reference. May be NULL.
123
 *
124
 * @return  Returns URT_STATUS_OK on success.
125
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
126
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
127
 */
128
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129
{
130
  urtDebugAssert(subscriber);
131
  return URT_STATUS_OK;
132
}
133

    
134
/**
135
 * @brief Fetches the latest message.
136
 *
137
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
138
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
139
 * @param[in] bytes  Payload size in bytes.
140
 * @param[in] latency  The latency can be returned by reference. May be NULL.
141
 *
142
 * @return  Returns URT_STATUS_OK on success.
143
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
144
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
145
 */
146
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
147
  return URT_STATUS_OK;
148
}
149

    
150
/**
151
 * @brief  Unsubscribes from a subscriber.
152
 *
153
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
154
 *
155
 * @return  Returns URT_STATUS_OK on sucess.
156
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
157
 */
158
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
159
{
160
  if (subscriber->base.topic)
161
  {
162
# if(URT_CFG_PUBSUB_PROFILING == true)
163
      //TODO: LOCK TOPIC
164
      subscriber->base.topic->numSubscribers--;
165
# endif /* URT_CFG_PUBSUB_PROFILING */
166
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
167
# if(URT_CFG_PUBSUB_PROFILING == true)
168
      //TODO: Unlock TOPIC
169
      subscriber->base.topic = NULL;
170
      subscriber->base.lastMessage = NULL;
171
      subscriber->base.lastMessageTime = 0;
172
#endif /* URT_CFG_PUBSUB_PROFILING */
173
    return URT_STATUS_OK;
174
  }
175

    
176
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
177
}
178

    
179

    
180
/**
181
 * @brief  Initialize the srt Subscriber.
182
 *
183
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
184
 */
185
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
186
{
187
  urtDebugAssert(subscriber);
188

    
189
  subscriber->base.topic = NULL;
190
  urtEventListenerInit(subscriber->base.evtListener);
191
  subscriber->base.lastMessage = NULL;
192
  subscriber->base.lastMessageTime = 0;
193
  #if (URT_CFG_PUBSUB_PROFILING)
194
    subscriber->base.sumLatencies = 0;
195
    subscriber->base.numMessagesReceived = 0;
196
    subscriber->usefulnesscb = NULL;
197
    subscriber->cbparams = NULL;
198
    subscriber->minLatency = URT_DELAY_INFINITE;
199
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
200
  #endif /* URT_CFG_PUBSUB_PROFILING */
201
  return;
202
}
203

    
204
/**
205
 * @brief  Subscribes the subscriber to a topic.
206
 *
207
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
208
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
209
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
210
 *                     Messages must not be associated to another topic.
211
 *                     Once a message has been contributed, it cannot be removed later.
212
 *                     May be NULL (no messages to contribute)
213
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
214
 * @param[in] cbparams  Optional parameters for the usefulness callback.
215
 *                      May be NULL if the callback expects no parameters.
216
 *
217
 * @return  Returns URT_STATUS_OK on success.
218
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
219
 */
220
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
221
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
222
{
223
  urtDebugAssert(subscriber);
224
  urtDebugAssert(topic);
225

    
226
  if (subscriber->base.topic)
227
  {
228
    return URT_STATUS_SUBSCRIBE_TOPICSET;
229
  }
230

    
231
  subscriber->base.topic = topic;
232
  subscriber->usefulnesscb = usefulnesscb;
233
  subscriber->cbparams = cbparams;
234
# if (URT_CFG_PUBSUB_PROFILING == true)
235
  subscriber->base.sumLatencies = 0;
236
  subscriber->base.numMessagesReceived = 0;
237
  subscriber->minLatency = URT_DELAY_INFINITE;
238
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
239
# endif  /* URT_CFG_PUBSUB_PROFILING */
240

    
241
  //TODO: Lock topic
242
  if (messages)
243
  {
244
    urt_message_t* lastMessageContribute = messages;
245
    while (lastMessageContribute->next)
246
    {
247
        lastMessageContribute = lastMessageContribute->next;
248
    }
249
    lastMessageContribute->next = topic->latestMessage->next;
250
    topic->latestMessage->next = messages;
251
  }
252

    
253
  subscriber->base.lastMessage = topic->latestMessage;
254
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
255

    
256
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
257

    
258
# if (URT_CFG_PUBSUB_PROFILING == true)
259
    topic->numHrtSubscribers--;
260
# endif /* URT_CFG_PUBSUB_PROFILING */
261

    
262
  //TODO: Unlock topic
263
  return URT_STATUS_OK;
264
}
265

    
266
/**
267
 * @brief  Fetches the next message.
268
 *
269
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
270
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
271
 * @param[in] bytes  Payload size in bytes.
272
 * @param[in] latency  The latency can be returned by reference. May be NULL.
273
 *
274
 * @return  Returns URT_STATUS_OK on success.
275
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
276
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
277
 */
278
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
279
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
280

    
281
/**
282
 * @brief  Fetches the latest message.
283
 *
284
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
285
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
286
 * @param[in] bytes  Payload size in bytes.
287
 * @param[in] latency  The latency can be returned by reference. May be NULL.
288
 *
289
 * @return  Returns URT_STATUS_OK on success.
290
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
291
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
292
 */
293
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
294
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
295

    
296
/**
297
 * @brief  Calculates the usefulness of the subscriber.
298
 *
299
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
300
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
301
 *
302
 * @return  Returns the usefulness as a value within [0,1].
303
 */
304
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
305
{
306
  urtDebugAssert(subscriber);
307

    
308
  return subscriber->usefulnesscb(latency);
309
}
310

    
311
/**
312
 * @brief  Unsubscribes from a subscriber.
313
 *
314
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
315
 *
316
 * @return  Returns URT_STATUS_OK on sucess.
317
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
318
 */
319
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
320
{
321
  urtDebugAssert(subscriber);
322

    
323
  if (subscriber->base.topic)
324
  {
325
# if (URT_CFG_PUBSUB_PROFILING == true)
326
    //TODO: lock topic
327
    subscriber->base.topic->numSubscribers--;
328
# endif /* URT_CFG_PUBSUB_PROFILING */
329
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
330
# if (URT_CFG_PUBSUB_PROFILING == true)
331
      //TODO: unlock topic
332
# endif /* URT_CFG_PUBSUB_PROFILING */
333
    subscriber->base.topic = NULL;
334
    subscriber->base.lastMessage = NULL;
335
    subscriber->base.lastMessageTime = 0;
336
    return URT_STATUS_OK;
337
  }
338

    
339
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
340
}
341

    
342

    
343
/**
344
 * @brief  Initialize the FRT Subscriber.
345
 *
346
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
347
 */
348
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
349
{
350
  urtDebugAssert(subscriber);
351

    
352
  subscriber->base.topic = NULL;
353
  urtEventListenerInit(subscriber->base.evtListener);
354
  subscriber->base.lastMessage = NULL;
355
  subscriber->base.lastMessageTime = 0;
356

    
357
  #if (URT_CFG_PUBSUB_PROFILING)
358
    subscriber->base.sumLatencies = 0;
359
    subscriber->base.numMessagesReceived = 0;
360
  #endif /* URT_CFG_PUBSUB_PROFILING */
361

    
362
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
363
    subscriber->deadlineOffset = 0;
364
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
365

    
366
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
367
    subscriber->maxJitter = 0;
368
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
369

    
370
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
371
    subscriber->minLatency = URT_DELAY_INFINITE;
372
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
373
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
374
  return;
375
}
376

    
377

    
378
/**
379
 * @brief  Subscribes the subscriber to a topic.
380
 *
381
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
382
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
383
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
384
 *                      Messages must not be associated to another topic.
385
 *                      Once a message has been contributed, it cannot be removed later.
386
 *                      May be NULL(no messages to contribute).
387
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
388
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
389
 *                    A value of 0 indicates that jitter is of no concern.
390
 *
391
 * @return  Returns URT_STATUS_OK on success.
392
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
393
 */
394
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
395
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
396
{
397
  urtDebugAssert(subscriber);
398
  urtDebugAssert(topic);
399

    
400
  if (subscriber->base.topic)
401
  {
402
    return URT_STATUS_SUBSCRIBE_TOPICSET;
403
  }
404

    
405
  subscriber->base.topic = topic;
406
# if (URT_CFG_PUBSUB_PROFILING == true)
407
  subscriber->base.sumLatencies = 0;
408
  subscriber->base.numMessagesReceived = 0;
409
# endif /* URT_CFG_PUBSUB_PROFILING */
410
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
411
  subscriber->deadlineOffset = deadline;
412
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
413
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
414
  subscriber->maxJitter =jitter;
415
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
416
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
417
  subscriber->minLatency = URT_DELAY_INFINITE;
418
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
419
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
420

    
421
  //TODO: Lock topic
422
  if (messages)
423
  {
424
    urt_message_t* lastMessageContribute = messages;
425
    while (lastMessageContribute->next)
426
    {
427
      lastMessageContribute = lastMessageContribute->next;
428
    }
429
    lastMessageContribute->next = topic->latestMessage->next;
430
    topic->latestMessage->next = messages;
431
  }
432

    
433
  subscriber->base.lastMessage = topic->latestMessage;
434
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
435

    
436
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
437

    
438
# if (URT_CFG_PUBSUB_PROFILING == true)
439
  topic->numHrtSubscribers--;
440
# endif /* URT_CFG_PUBSUB_PROFILING */
441

    
442
  //TODO: Unlock topic
443
  return URT_STATUS_OK;
444
}
445

    
446
/**
447
 * @brief  Fetches the next message.
448
 *
449
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
450
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
451
 * @param[in] bytes  Payload size in bytes.
452
 * @param[in] latency  The latency can be returned by reference. May be NULL.
453
 *
454
 * @return  Returns URT_STATUS_OK on success.
455
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
456
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
457
 */
458
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
459
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
460

    
461
/**
462
 * @brief  Fetches the latest message.
463
 *
464
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
465
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
466
 * @param[in] bytes  Payload size in bytes.
467
 * @param[in] latency  The latency can be returned by reference. May be NULL.
468
 *
469
 * @return  Returns URT_STATUS_OK on success.
470
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
471
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
472
 */
473
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
474
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
475

    
476
/**
477
 * @brief  Calculates the validity from the subscriber.
478
 *
479
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
480
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
481
 *
482
 * @return  Returns a boolean indicator whether the latency is fine.
483
 */
484
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
485
{
486
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
487
  if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
488
    return true;
489
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
490

    
491
  return false;
492
}
493

    
494
/**
495
 * @brief  Unsubscribes from a subscriber.
496
 *
497
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
498
 *
499
 * @return  Returns URT_STATUS_OK on sucess.
500
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
501
 */
502
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
503
{
504
  urtDebugAssert(subscriber);
505

    
506
  if (subscriber->base.topic)
507
  {
508
    //TODO: lock topic
509
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
510
    //TODO: decrement topic's HRT counter
511
# if (URT_CFG_PUBSUB_PROFILING == true)
512
    subscriber->base.topic->numSubscribers--;
513
# endif /* URT_CFG_PUBSUB_PROFILING */
514
//Hier weiter
515

    
516
# if (URT_CFG_PUBSUB_PROFILING == true)
517
    //TODO: unlock topic
518
# endif /* URT_CFG_PUBSUB_PROFILING */
519
    subscriber->base.topic = NULL;
520
    subscriber->base.lastMessage = NULL;
521
    subscriber->base.lastMessageTime = 0;
522
    return URT_STATUS_OK;
523
  }
524

    
525
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
526
}
527

    
528

    
529
/**
530
 * @brief  Initialize the HRT Subscriber.
531
 *
532
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
533
 */
534
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
535
{
536
  urtDebugAssert(subscriber);
537

    
538
  subscriber->base.topic = NULL;
539
  urtEventListenerInit(subscriber->base.evtListener);
540
  subscriber->base.lastMessage = NULL;
541
  subscriber->base.lastMessageTime = 0;
542

    
543
# if (URT_CFG_PUBSUB_PROFILING)
544
    subscriber->base.sumLatencies = 0;
545
    subscriber->base.numMessagesReceived = 0;
546
# endif /* URT_CFG_PUBSUB_PROFILING */
547

    
548
  subscriber->next = NULL;
549

    
550
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
551
    subscriber->deadlineOffset = 0;
552
    urtTimerInit(subscriber->qodDeadlineTimer);
553
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
554

    
555
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
556
    subscriber->expectedRate = 0;
557
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
558

    
559
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
560
    subscriber->maxJitter = 0;
561
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
562

    
563
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
564
    subscriber->minLatency = URT_DELAY_INFINITE;
565
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
566
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
567
  return;
568
}
569

    
570

    
571
/**
572
 * @brief  Subscribes the subscriber to a topic.
573
 *
574
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
575
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
576
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
577
 *                      Messages must not be associated to another topic.
578
 *                      Once a message has been contributed, it cannot be removed later.
579
 *                      May be NULL(no messages to contribute).
580
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
581
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
582
 *                  A value of 0 indicates, that rate is of no concern.
583
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
584
 *                    A value of 0 indicates that jitter is of no concern.
585
 *
586
 * @return  Returns URT_STATUS_OK on success.
587
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
588
 */
589
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
590
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
591
{
592
  urtDebugAssert(subscriber);
593
  urtDebugAssert(topic);
594

    
595
  if (subscriber->base.topic)
596
  {
597
    return URT_STATUS_SUBSCRIBE_TOPICSET;
598
  }
599

    
600
  subscriber->base.topic = topic;
601
# if (URT_CFG_PUBSUB_PROFILING == true)
602
  subscriber->base.sumLatencies = 0;
603
  subscriber->base.numMessagesReceived = 0;
604
# endif /* URT_CFG_PUBSUB_PROFILING */
605
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
606
  subscriber->deadlineOffset = deadline;
607
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
608
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
609
  subscriber->maxJitter =jitter;
610
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
611
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
612
  subscriber->minLatency = URT_DELAY_INFINITE;
613
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
614
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
615
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
616
  subscriber->expectedRate = rate;
617
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
618

    
619
  //TODO: Lock topic
620
  if (messages)
621
  {
622
    urt_message_t* lastMessageContribute = messages;
623
    while (lastMessageContribute->next)
624
    {
625
      lastMessageContribute = lastMessageContribute->next;
626
    }
627
    lastMessageContribute->next = topic->latestMessage->next;
628
    topic->latestMessage->next = messages;
629
  }
630

    
631
  subscriber->base.lastMessage = topic->latestMessage;
632
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
633

    
634
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
635

    
636
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
637
  urt_hrtsubscriber_t* hrtSubscriber = subscriber->base.topic->hrtSubscribers;
638
  while (!hrtSubscriber /* && expected Rate is lower */)
639
  {
640
      hrtSubscriber = hrtSubscriber->next;
641
  }
642

    
643
  if (!hrtSubscriber)
644
  {
645
    //TODO: Append self to topic's list of HRT subscribers
646
  }
647
  else
648
  {
649
    //TOOD: insert self in front of current HRT susbcriber
650
    subscriber->next = hrtSubscriber;
651
  }
652
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
653

    
654
  topic->numHrtSubscribers--;
655
# if (URT_CFG_PUBSUB_PROFILING == true)
656
  topic->numSubscribers--;
657
# endif /* URT_CFG_PUBSUB_PROFILING */
658

    
659
  //TODO: Unlock topic
660
  return URT_STATUS_OK;
661
}
662

    
663

    
664
/**
665
 * @brief  Fetches the next message.
666
 *
667
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
668
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
669
 * @param[in] bytes  Payload size in bytes.
670
 * @param[in] latency  The latency can be returned by reference. May be NULL.
671
 *
672
 * @return  Returns URT_STATUS_OK on success.
673
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
674
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
675
 */
676
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
677
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
678

    
679

    
680
/**
681
 * @brief  Fetches the latest message.
682
 *
683
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
684
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
685
 * @param[in] bytes  Payload size in bytes.
686
 * @param[in] latency  The latency can be returned by reference. May be NULL.
687
 *
688
 * @return  Returns URT_STATUS_OK on success.
689
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
690
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
691
 */
692
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
693
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
694

    
695
/**
696
 * @brief  Unsubscribes from a subscriber.
697
 *
698
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
699
 *
700
 * @return  Returns URT_STATUS_OK on sucess.
701
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
702
 */
703
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
704
{
705
  urtDebugAssert(subscriber);
706

    
707
  if (subscriber->base.topic)
708
  {
709
    //TODO: lock topic
710
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
711
    subscriber->base.topic->numHrtSubscribers--;
712
# if (URT_CFG_PUBSUB_PROFILING == true)
713
    subscriber->base.topic->numSubscribers--;
714
# endif /* URT_CFG_PUBSUB_PROFILING */
715
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
716
    //TODO: remove self from topics lsit of HRT subscribers
717
    //TODO: ...
718
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
719

    
720
    urt_message_t* messageTemp = subscriber->base.lastMessage;
721
    while (messageTemp->next->originTime < messageTemp->originTime)
722
    {
723
        messageTemp = messageTemp->next;
724
        messageTemp->numHrtConsumersLeft--;
725
# if(URT_CFG_PUBSUB_PROFILING == true)
726
        messageTemp->numConsumersLeft--;
727
# endif /* URT_CFG_PUBSUB_PROFILING */
728
    }
729
    bool temp = false;
730
    if (temp /*TODO: HRT counter of any message became 0?*/)
731
    {
732
      //TODO: signal topics condition variable
733
    }
734

    
735
    //TODO: unlock topic
736
    subscriber->base.topic = NULL;
737
    subscriber->base.lastMessage = NULL;
738
    subscriber->base.lastMessageTime = 0;
739
    return URT_STATUS_OK;
740
  }
741

    
742
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
743
}
744

    
745

    
746