Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 65dc89cb

History | View | Annotate | Download (38.491 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23
24 1fb06240 skenneweg
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43
44 65dc89cb skenneweg
void urtFetchMessage ()
45
{
46
    //TODO: Update message pointer
47
    //TODO: Copy message origin time
48
    //TODO: Copy message payload
49
}
50
51
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
52
{
53
  while (oldestMessage->next->originTime < oldestMessage->originTime)
54
  {
55
    oldestMessage = oldestMessage->next;
56
  }
57
  return oldestMessage;
58
}
59
60
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
61
{
62
  urt_message_t* lastMessage = subscriber->base.lastMessage;
63
  while (lastMessage->next->originTime < lastMessage->originTime)
64
  {
65
    lastMessage = lastMessage->next;
66
#if (URT_CFG_PUBSUB_PROFILING == true)
67
    subscriber->base.lastMessage->numConsumersLeft--;
68
    subscriber->base->numMessagesReceived++;
69
#endif /* URT_CFG_PUBSUB_PROFILING */
70
  }
71
}
72
73 1fb06240 skenneweg
/******************************************************************************/
74
/* EXPORTED FUNCTIONS                                                         */
75
/******************************************************************************/
76
77 7d9678db skenneweg
/**
78
 * @brief   Initialize the nrt Subscriber.
79
 *
80 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
81 7d9678db skenneweg
 */
82 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
83
{
84 a5e142de skenneweg
  urtDebugAssert(subscriber);
85
86 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
87
  urtEventListenerInit(subscriber->base.evtListener);
88
  subscriber->base.lastMessage = NULL;
89
  subscriber->base.lastMessageTime = 0;
90 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
91 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
92
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
93 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
94 5c6cb22f skenneweg
  return;
95
}
96 1fb06240 skenneweg
97 7d9678db skenneweg
/**
98 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
99 7d9678db skenneweg
 *
100 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
101
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
102
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
103
 *                      Messages must not be associated to another topic.
104
 *                      Once a message has been contributed, it cannot be removed later.
105
 *                      May be NULL(no messages to contribute).
106 7d9678db skenneweg
 *
107 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
108
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
109 7d9678db skenneweg
 */
110 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
111
{
112
  urtDebugAssert(subscriber);
113
  urtDebugAssert(topic);
114
115
  if (!subscriber->base.topic)
116
      return URT_STATUS_SUBSCRIBE_TOPICSET;
117
118
  subscriber->base.topic = topic;
119 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
120 a5e142de skenneweg
121
  if (messages)
122
  {
123
    urt_message_t* lastMessageContribute = messages;
124
    while (lastMessageContribute->next)
125
    {
126
        lastMessageContribute = lastMessageContribute->next;
127
    }
128
    lastMessageContribute->next = topic->latestMessage->next;
129
    topic->latestMessage->next = messages;
130
  }
131
132
  subscriber->base.lastMessage = topic->latestMessage;
133
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
134
135 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
136 a5e142de skenneweg
137
#if (URT_CFG_PUBSUB_PROFILING == true)
138
    topic->numHrtSubscribers--;
139
#endif /* URT_CFG_PUBSUB_PROFILING */
140
141 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
142 7d9678db skenneweg
  return URT_STATUS_OK;
143 1fb06240 skenneweg
}
144
145 7d9678db skenneweg
/**
146 5198dfae skenneweg
 * @brief  Fetches the next message.
147 7d9678db skenneweg
 *
148 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
149
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
150
 * @param[in] bytes  Payload size in bytes.
151
 * @param[in] latency  The latency can be returned by reference. May be NULL.
152 7d9678db skenneweg
 *
153 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
154
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
155
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
156 7d9678db skenneweg
 */
157 a5e142de skenneweg
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
158 65dc89cb skenneweg
{   
159 a5e142de skenneweg
  urtDebugAssert(subscriber);
160 65dc89cb skenneweg
161
  if (!subscriber->base.topic)
162
      return URT_STATUS_FETCH_NOTOPIC;
163
164
  urtMutexLock(subscriber->base.topic->lock);
165
166
  urt_message_t* messageTemp = subscriber->base.lastMessage;
167
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
168
  {
169
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
170
    {
171
      urtMutexUnlock(subscriber->base.topic->lock);
172
      return URT_STATUS_FETCH_NOMESSAGE;
173
    }
174
    messageTemp = messageTemp->next;
175
  }
176
  else
177
  {
178
    messageTemp = urtFindOldestMessage(messageTemp->next);
179
  }
180
181
  urtFetchMessage();
182
183
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
184
#if(URT_CFG_PUBSUB_PROFILING == true)
185
  subscriber->base.sumLatencies += calculatedLatency;
186
187
  if (calculatedLatency < subscriber->minLatency)
188
  {
189
    subscriber->minLatency = calculatedLatency;
190
  }
191
  else if (calculatedLatency > subscriber->maxLatency)
192
  {
193
    subscriber->maxLatency = calculatedLatency;
194
  }
195
#endif /* URT_CFG_PUBSUB_PROFILING */
196
  bool temp = false;
197
  if (temp/*optional latency output argument given*/)
198
  {
199
    latency = calculatedLatency;
200
  }
201
202
#if (URT_CFG_PUBSUB_PROFILING == true)
203
  subscriber->base.lastMessage->numConsumersLeft--;
204
  subscriber->base->numMessagesReceived++;
205
#endif /* URT_CFG_PUBSUB_PROFILING */
206
207
  urtMutexUnlock(subscriber->base.topic->lock);
208 7d9678db skenneweg
  return URT_STATUS_OK;
209 1fb06240 skenneweg
}
210
211 7d9678db skenneweg
/**
212 a5e142de skenneweg
 * @brief Fetches the latest message.
213 7d9678db skenneweg
 *
214 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
215
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
216
 * @param[in] bytes  Payload size in bytes.
217
 * @param[in] latency  The latency can be returned by reference. May be NULL.
218 7d9678db skenneweg
 *
219 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
220
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
221
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
222 7d9678db skenneweg
 */
223 1fb06240 skenneweg
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
224 65dc89cb skenneweg
  urtDebugAssert(subscriber);
225
226
  if (!subscriber->base.topic)
227
      return URT_STATUS_FETCH_NOTOPIC;
228
229
  urtMutexLock(subscriber->base.topic->lock);
230
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
231
232
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
233
  {
234
    urtMutexUnlock(subscriber->base.topic->lock);
235
    return URT_STATUS_FETCH_NOMESSAGE;
236
  }
237
238
  urtFetchMessage();
239
240
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
241
#if(URT_CFG_PUBSUB_PROFILING == true)
242
  subscriber->base.sumLatencies += calculatedLatency;
243
244
  if (calculatedLatency < subscriber->minLatency)
245
  {
246
    subscriber->minLatency = calculatedLatency;
247
  }
248
  else if (calculatedLatency > subscriber->maxLatency)
249
  {
250
    subscriber->maxLatency = calculatedLatency;
251
  }
252
#endif /* URT_CFG_PUBSUB_PROFILING */
253
  bool temp = false;
254
  if (temp/*optional latency output argument given*/)
255
  {
256
    latency = calculatedLatency;
257
  }
258
259
  urtMutexUnlock(subscriber->base.topic->lock);
260 7d9678db skenneweg
  return URT_STATUS_OK;
261 1fb06240 skenneweg
}
262
263 7d9678db skenneweg
/**
264 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
265 7d9678db skenneweg
 *
266 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
267 7d9678db skenneweg
 *
268 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
269
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
270 7d9678db skenneweg
 */
271 5b7188aa skenneweg
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
272
{
273 a5e142de skenneweg
  if (subscriber->base.topic)
274
  {
275
# if(URT_CFG_PUBSUB_PROFILING == true)
276 37cd5dc2 Svenja
      urtMutexLock(topic->lock);
277 5b7188aa skenneweg
      subscriber->base.topic->numSubscribers--;
278 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
279
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
280
# if(URT_CFG_PUBSUB_PROFILING == true)
281 37cd5dc2 Svenja
      urtMutexUnlock(topic->lock);
282 a5e142de skenneweg
      subscriber->base.topic = NULL;
283
      subscriber->base.lastMessage = NULL;
284
      subscriber->base.lastMessageTime = 0;
285
#endif /* URT_CFG_PUBSUB_PROFILING */
286
    return URT_STATUS_OK;
287
  }
288 5b7188aa skenneweg
289
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
290 1fb06240 skenneweg
}
291
292 7d9678db skenneweg
293
/**
294 5198dfae skenneweg
 * @brief  Initialize the srt Subscriber.
295 7d9678db skenneweg
 *
296 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
297 7d9678db skenneweg
 */
298 5c6cb22f skenneweg
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
299
{
300 a5e142de skenneweg
  urtDebugAssert(subscriber);
301
302 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
303
  urtEventListenerInit(subscriber->base.evtListener);
304
  subscriber->base.lastMessage = NULL;
305
  subscriber->base.lastMessageTime = 0;
306
  #if (URT_CFG_PUBSUB_PROFILING)
307
    subscriber->base.sumLatencies = 0;
308
    subscriber->base.numMessagesReceived = 0;
309
    subscriber->usefulnesscb = NULL;
310
    subscriber->cbparams = NULL;
311
    subscriber->minLatency = URT_DELAY_INFINITE;
312
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
313
  #endif /* URT_CFG_PUBSUB_PROFILING */
314
  return;
315
}
316 7d9678db skenneweg
317
/**
318 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
319
 *
320
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
321
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
322
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
323
 *                     Messages must not be associated to another topic.
324
 *                     Once a message has been contributed, it cannot be removed later.
325
 *                     May be NULL (no messages to contribute)
326
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
327
 * @param[in] cbparams  Optional parameters for the usefulness callback.
328
 *                      May be NULL if the callback expects no parameters.
329
 *
330
 * @return  Returns URT_STATUS_OK on success.
331
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
332 7d9678db skenneweg
 */
333
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
334 a5e142de skenneweg
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
335
{
336
  urtDebugAssert(subscriber);
337
  urtDebugAssert(topic);
338
339
  if (subscriber->base.topic)
340
  {
341
    return URT_STATUS_SUBSCRIBE_TOPICSET;
342
  }
343 5b7188aa skenneweg
344
  subscriber->base.topic = topic;
345
  subscriber->usefulnesscb = usefulnesscb;
346
  subscriber->cbparams = cbparams;
347 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
348 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
349
  subscriber->base.numMessagesReceived = 0;
350
  subscriber->minLatency = URT_DELAY_INFINITE;
351
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
352 a5e142de skenneweg
# endif  /* URT_CFG_PUBSUB_PROFILING */
353
354 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
355 a5e142de skenneweg
  if (messages)
356
  {
357
    urt_message_t* lastMessageContribute = messages;
358
    while (lastMessageContribute->next)
359
    {
360
        lastMessageContribute = lastMessageContribute->next;
361
    }
362
    lastMessageContribute->next = topic->latestMessage->next;
363
    topic->latestMessage->next = messages;
364
  }
365
366
  subscriber->base.lastMessage = topic->latestMessage;
367
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
368
369 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
370 a5e142de skenneweg
371
# if (URT_CFG_PUBSUB_PROFILING == true)
372
    topic->numHrtSubscribers--;
373
# endif /* URT_CFG_PUBSUB_PROFILING */
374
375 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
376 a5e142de skenneweg
  return URT_STATUS_OK;
377
}
378 7d9678db skenneweg
379
/**
380 5198dfae skenneweg
 * @brief  Fetches the next message.
381 7d9678db skenneweg
 *
382 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
383
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
384
 * @param[in] bytes  Payload size in bytes.
385
 * @param[in] latency  The latency can be returned by reference. May be NULL.
386 7d9678db skenneweg
 *
387 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
388
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
389
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
390 7d9678db skenneweg
 */
391
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
392 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
393
{
394
  urtDebugAssert(subscriber);
395
396
  if (!subscriber->base.topic)
397
      return URT_STATUS_FETCH_NOTOPIC;
398
399
  urtMutexLock(subscriber->base.topic->lock);
400
401
  urt_message_t* messageTemp = subscriber->base.lastMessage;
402
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
403
  {
404
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
405
    {
406
      urtMutexUnlock(subscriber->base.topic->lock);
407
      return URT_STATUS_FETCH_NOMESSAGE;
408
    }
409
    messageTemp = messageTemp->next;
410
  }
411
  else
412
  {
413
    messageTemp = urtFindOldestMessage(messageTemp->next);
414
  }
415
416
  urtFetchMessage();
417
418
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
419
#if(URT_CFG_PUBSUB_PROFILING == true)
420
  subscriber->base.sumLatencies += calculatedLatency;
421
422
  if (calculatedLatency < subscriber->minLatency)
423
  {
424
    subscriber->minLatency = calculatedLatency;
425
  }
426
  else if (calculatedLatency > subscriber->maxLatency)
427
  {
428
    subscriber->maxLatency = calculatedLatency;
429
  }
430
#endif /* URT_CFG_PUBSUB_PROFILING */
431
  bool temp = false;
432
  if (temp/*optional latency output argument given*/)
433
  {
434
    latency = calculatedLatency;
435
  }
436
437
#if (URT_CFG_PUBSUB_PROFILING == true)
438
  subscriber->base.lastMessage->numConsumersLeft--;
439
  subscriber->base->numMessagesReceived++;
440
#endif /* URT_CFG_PUBSUB_PROFILING */
441
442
  urtMutexUnlock(subscriber->base.topic->lock);
443
  return URT_STATUS_OK;
444
}
445 7d9678db skenneweg
446
/**
447 5198dfae skenneweg
 * @brief  Fetches the latest message.
448 7d9678db skenneweg
 *
449 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
450
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
451
 * @param[in] bytes  Payload size in bytes.
452
 * @param[in] latency  The latency can be returned by reference. May be NULL.
453 7d9678db skenneweg
 *
454 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
455
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
456
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
457 7d9678db skenneweg
 */
458
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
459 65dc89cb skenneweg
                                                size_t bytes, urt_delay_t* latency)
460
{
461
  urtDebugAssert(subscriber);
462
463
  if (!subscriber->base.topic)
464
      return URT_STATUS_FETCH_NOTOPIC;
465
466
  urtMutexLock(subscriber->base.topic->lock);
467
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
468
469
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
470
  {
471
    urtMutexUnlock(subscriber->base.topic->lock);
472
    return URT_STATUS_FETCH_NOMESSAGE;
473
  }
474
475
  urtFetchMessage();
476
477
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
478
#if(URT_CFG_PUBSUB_PROFILING == true)
479
  subscriber->base.sumLatencies += calculatedLatency;
480
481
  if (calculatedLatency < subscriber->minLatency)
482
  {
483
    subscriber->minLatency = calculatedLatency;
484
  }
485
  else if (calculatedLatency > subscriber->maxLatency)
486
  {
487
    subscriber->maxLatency = calculatedLatency;
488
  }
489
#endif /* URT_CFG_PUBSUB_PROFILING */
490
  bool temp = false;
491
  if (temp/*optional latency output argument given*/)
492
  {
493
    latency = calculatedLatency;
494
  }
495
496
  urtMutexUnlock(subscriber->base.topic->lock);
497
  return URT_STATUS_OK;
498
}
499 7d9678db skenneweg
500
/**
501 5198dfae skenneweg
 * @brief  Calculates the usefulness of the subscriber.
502 7d9678db skenneweg
 *
503 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
504
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
505 7d9678db skenneweg
 *
506 5198dfae skenneweg
 * @return  Returns the usefulness as a value within [0,1].
507 7d9678db skenneweg
 */
508 a5e142de skenneweg
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
509
{
510
  urtDebugAssert(subscriber);
511
512
  return subscriber->usefulnesscb(latency);
513
}
514 7d9678db skenneweg
515
/**
516 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
517 7d9678db skenneweg
 *
518 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
519 7d9678db skenneweg
 *
520 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
521
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
522 7d9678db skenneweg
 */
523 a5e142de skenneweg
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
524
{
525
  urtDebugAssert(subscriber);
526
527
  if (subscriber->base.topic)
528
  {
529 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
530 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
531 5b7188aa skenneweg
    subscriber->base.topic->numSubscribers--;
532
# endif /* URT_CFG_PUBSUB_PROFILING */
533 a5e142de skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
534
# if (URT_CFG_PUBSUB_PROFILING == true)
535 37cd5dc2 Svenja
      urtMutexUnlock(topic->lock);
536 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
537
    subscriber->base.topic = NULL;
538
    subscriber->base.lastMessage = NULL;
539
    subscriber->base.lastMessageTime = 0;
540
    return URT_STATUS_OK;
541
  }
542
543
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
544
}
545 7d9678db skenneweg
546
547
/**
548 5198dfae skenneweg
 * @brief  Initialize the FRT Subscriber.
549 7d9678db skenneweg
 *
550 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
551 7d9678db skenneweg
 */
552 5c6cb22f skenneweg
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
553
{
554 a5e142de skenneweg
  urtDebugAssert(subscriber);
555
556 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
557
  urtEventListenerInit(subscriber->base.evtListener);
558
  subscriber->base.lastMessage = NULL;
559
  subscriber->base.lastMessageTime = 0;
560
561
  #if (URT_CFG_PUBSUB_PROFILING)
562
    subscriber->base.sumLatencies = 0;
563
    subscriber->base.numMessagesReceived = 0;
564
  #endif /* URT_CFG_PUBSUB_PROFILING */
565
566
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
567
    subscriber->deadlineOffset = 0;
568
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
569
570
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
571
    subscriber->maxJitter = 0;
572
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
573
574
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
575
    subscriber->minLatency = URT_DELAY_INFINITE;
576
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
577
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
578
  return;
579
}
580
581 7d9678db skenneweg
582
/**
583 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
584
 *
585
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
586
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
587
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
588
 *                      Messages must not be associated to another topic.
589
 *                      Once a message has been contributed, it cannot be removed later.
590
 *                      May be NULL(no messages to contribute).
591
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
592
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
593
 *                    A value of 0 indicates that jitter is of no concern.
594
 *
595
 * @return  Returns URT_STATUS_OK on success.
596
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
597 7d9678db skenneweg
 */
598
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
599 a5e142de skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
600
{
601
  urtDebugAssert(subscriber);
602
  urtDebugAssert(topic);
603
604 5b7188aa skenneweg
  if (subscriber->base.topic)
605
  {
606
    return URT_STATUS_SUBSCRIBE_TOPICSET;
607
  }
608
609
  subscriber->base.topic = topic;
610 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
611 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
612
  subscriber->base.numMessagesReceived = 0;
613 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
614
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
615 5b7188aa skenneweg
  subscriber->deadlineOffset = deadline;
616 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
617
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
618 5b7188aa skenneweg
  subscriber->maxJitter =jitter;
619 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
620
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
621 5b7188aa skenneweg
  subscriber->minLatency = URT_DELAY_INFINITE;
622
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
623 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
624
625 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
626 5b7188aa skenneweg
  if (messages)
627
  {
628
    urt_message_t* lastMessageContribute = messages;
629
    while (lastMessageContribute->next)
630 a5e142de skenneweg
    {
631 5b7188aa skenneweg
      lastMessageContribute = lastMessageContribute->next;
632 a5e142de skenneweg
    }
633 5b7188aa skenneweg
    lastMessageContribute->next = topic->latestMessage->next;
634
    topic->latestMessage->next = messages;
635
  }
636 a5e142de skenneweg
637 5b7188aa skenneweg
  subscriber->base.lastMessage = topic->latestMessage;
638
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
639 a5e142de skenneweg
640 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
641 a5e142de skenneweg
642
# if (URT_CFG_PUBSUB_PROFILING == true)
643 5b7188aa skenneweg
  topic->numHrtSubscribers--;
644 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
645
646 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
647 5b7188aa skenneweg
  return URT_STATUS_OK;
648 a5e142de skenneweg
}
649 7d9678db skenneweg
650
/**
651 5198dfae skenneweg
 * @brief  Fetches the next message.
652 7d9678db skenneweg
 *
653 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
654
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
655
 * @param[in] bytes  Payload size in bytes.
656
 * @param[in] latency  The latency can be returned by reference. May be NULL.
657 7d9678db skenneweg
 *
658 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
659
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
660
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
661 7d9678db skenneweg
 */
662
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
663 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
664
{
665
  urtDebugAssert(subscriber);
666
667
  if (!subscriber->base.topic)
668
      return URT_STATUS_FETCH_NOTOPIC;
669
670
  urtMutexLock(subscriber->base.topic->lock);
671
672
  urt_message_t* messageTemp = subscriber->base.lastMessage;
673
  if(messageTemp->originTime == subscriber->base.lastMessageTime)
674
  {
675
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
676
    {
677
      urtMutexUnlock(subscriber->base.topic->lock);
678
      return URT_STATUS_FETCH_NOMESSAGE;
679
    }
680
    messageTemp = messageTemp->next;
681
  }
682
  else
683
  {
684
    messageTemp = urtFindOldestMessage(messageTemp->next);
685
  }
686
687
  urtFetchMessage();
688
689
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
690
#if(URT_CFG_PUBSUB_PROFILING == true)
691
  subscriber->base.sumLatencies += calculatedLatency;
692
#endif /* URT_CFG_PUBSUB_PROFILING */
693
694
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
695
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
696
  {
697
    subscriber->minLatency = calculatedLatency;
698
  }
699
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
700
  {
701
    subscriber->maxLatency = calculatedLatency;
702
  }
703
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
704
705
  bool temp = false;
706
  if (temp/*optional latency output argument given*/)
707
  {
708
    latency = calculatedLatency;
709
  }
710
711
#if (URT_CFG_PUBSUB_PROFILING == true)
712
  subscriber->base.lastMessage->numConsumersLeft--;
713
  subscriber->base->numMessagesReceived++;
714
#endif /* URT_CFG_PUBSUB_PROFILING */
715
716
  urtMutexUnlock(subscriber->base.topic->lock);
717
  return URT_STATUS_OK; //TODO: or urt_status_jitterviolation
718
}
719 7d9678db skenneweg
720
/**
721 5198dfae skenneweg
 * @brief  Fetches the latest message.
722 7d9678db skenneweg
 *
723 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
724
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
725
 * @param[in] bytes  Payload size in bytes.
726
 * @param[in] latency  The latency can be returned by reference. May be NULL.
727 7d9678db skenneweg
 *
728 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
729
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
730
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
731 7d9678db skenneweg
 */
732
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
733 65dc89cb skenneweg
                                                size_t bytes, urt_delay_t* latency)
734
{
735
  urtDebugAssert(subscriber);
736
737
  if (!subscriber->base.topic)
738
      return URT_STATUS_FETCH_NOTOPIC;
739
740
  urtMutexLock(subscriber->base.topic->lock);
741
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
742
743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
744
  {
745
    urtMutexUnlock(subscriber->base.topic->lock);
746
    return URT_STATUS_FETCH_NOMESSAGE;
747
  }
748
749
  urtFetchMessage();
750
751
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
752
#if(URT_CFG_PUBSUB_PROFILING == true)
753
  subscriber->base.sumLatencies += calculatedLatency;
754
#endif /* URT_CFG_PUBSUB_PROFILING */
755
756
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) //TODO: Both must be true? otherwise jitter does not exist
757
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency-subscriber->maxJitter)
758
  {
759
    subscriber->minLatency = calculatedLatency;
760
  }
761
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency+subscriber->maxJitter)
762
  {
763
    subscriber->maxLatency = calculatedLatency;
764
  }
765
#endif /* URT_CFG_PUBSUB_PROFILING && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
766
767
  bool temp = false;
768
  if (temp/*optional latency output argument given*/)
769
  {
770
    latency = calculatedLatency;
771
  }
772
773
  urtMutexUnlock(subscriber->base.topic->lock);
774
  return URT_STATUS_OK;
775
}
776 7d9678db skenneweg
777
/**
778 5198dfae skenneweg
 * @brief  Calculates the validity from the subscriber.
779 7d9678db skenneweg
 *
780 5198dfae skenneweg
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
781
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
782 7d9678db skenneweg
 *
783 5198dfae skenneweg
 * @return  Returns a boolean indicator whether the latency is fine.
784 7d9678db skenneweg
 */
785 a5e142de skenneweg
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
786
{
787 37cd5dc2 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
788
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
789 5b7188aa skenneweg
    return true;
790 37cd5dc2 Svenja
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
791
792
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
793
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
794
      return true;
795
796
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
797
      return true;
798 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
799 5b7188aa skenneweg
800 a5e142de skenneweg
  return false;
801
}
802 7d9678db skenneweg
803
/**
804 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
805 7d9678db skenneweg
 *
806 5c6cb22f skenneweg
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
807 7d9678db skenneweg
 *
808 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
809
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
810 7d9678db skenneweg
 */
811 5b7188aa skenneweg
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
812
{
813
  urtDebugAssert(subscriber);
814
815
  if (subscriber->base.topic)
816
  {
817 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
818 5b7188aa skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
819
    //TODO: decrement topic's HRT counter
820
# if (URT_CFG_PUBSUB_PROFILING == true)
821
    subscriber->base.topic->numSubscribers--;
822
# endif /* URT_CFG_PUBSUB_PROFILING */
823
//Hier weiter
824
825
# if (URT_CFG_PUBSUB_PROFILING == true)
826 37cd5dc2 Svenja
    urtMutexUnlock(topic->lock);
827 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
828
    subscriber->base.topic = NULL;
829
    subscriber->base.lastMessage = NULL;
830
    subscriber->base.lastMessageTime = 0;
831
    return URT_STATUS_OK;
832
  }
833
834
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
835
}
836 7d9678db skenneweg
837
838
/**
839 5198dfae skenneweg
 * @brief  Initialize the HRT Subscriber.
840 7d9678db skenneweg
 *
841 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
842 7d9678db skenneweg
 */
843 5c6cb22f skenneweg
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
844
{
845 a5e142de skenneweg
  urtDebugAssert(subscriber);
846
847 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
848
  urtEventListenerInit(subscriber->base.evtListener);
849
  subscriber->base.lastMessage = NULL;
850
  subscriber->base.lastMessageTime = 0;
851
852 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING)
853 5c6cb22f skenneweg
    subscriber->base.sumLatencies = 0;
854
    subscriber->base.numMessagesReceived = 0;
855 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
856 5c6cb22f skenneweg
857
  subscriber->next = NULL;
858
859 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
860 5c6cb22f skenneweg
    subscriber->deadlineOffset = 0;
861
    urtTimerInit(subscriber->qodDeadlineTimer);
862 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
863 5c6cb22f skenneweg
864 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
865 5c6cb22f skenneweg
    subscriber->expectedRate = 0;
866 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
867 5c6cb22f skenneweg
868 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
869 5c6cb22f skenneweg
    subscriber->maxJitter = 0;
870 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
871 5c6cb22f skenneweg
872 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
873 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
874
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
875 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
876 5c6cb22f skenneweg
  return;
877
}
878
879 7d9678db skenneweg
880
/**
881 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
882
 *
883 5c6cb22f skenneweg
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
884 5198dfae skenneweg
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
885
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
886
 *                      Messages must not be associated to another topic.
887
 *                      Once a message has been contributed, it cannot be removed later.
888
 *                      May be NULL(no messages to contribute).
889
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
890
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
891
 *                  A value of 0 indicates, that rate is of no concern.
892
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
893
 *                    A value of 0 indicates that jitter is of no concern.
894
 *
895
 * @return  Returns URT_STATUS_OK on success.
896
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
897 7d9678db skenneweg
 */
898
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
899 5b7188aa skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
900
{
901
  urtDebugAssert(subscriber);
902
  urtDebugAssert(topic);
903
904
  if (subscriber->base.topic)
905
  {
906
    return URT_STATUS_SUBSCRIBE_TOPICSET;
907
  }
908
909
  subscriber->base.topic = topic;
910
# if (URT_CFG_PUBSUB_PROFILING == true)
911
  subscriber->base.sumLatencies = 0;
912
  subscriber->base.numMessagesReceived = 0;
913
# endif /* URT_CFG_PUBSUB_PROFILING */
914
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
915
  subscriber->deadlineOffset = deadline;
916
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
917
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
918
  subscriber->maxJitter =jitter;
919
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
920
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
921
  subscriber->minLatency = URT_DELAY_INFINITE;
922
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
923
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
924
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
925
  subscriber->expectedRate = rate;
926
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
927
928 37cd5dc2 Svenja
  urtMutexLock(topic->lock);
929 5b7188aa skenneweg
  if (messages)
930
  {
931
    urt_message_t* lastMessageContribute = messages;
932
    while (lastMessageContribute->next)
933
    {
934
      lastMessageContribute = lastMessageContribute->next;
935
    }
936
    lastMessageContribute->next = topic->latestMessage->next;
937
    topic->latestMessage->next = messages;
938
  }
939
940
  subscriber->base.lastMessage = topic->latestMessage;
941
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
942
943 8378a78b Svenja
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
944 5b7188aa skenneweg
945
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
946 8378a78b Svenja
  //TODO: Implement
947 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
948
949
  topic->numHrtSubscribers--;
950
# if (URT_CFG_PUBSUB_PROFILING == true)
951
  topic->numSubscribers--;
952
# endif /* URT_CFG_PUBSUB_PROFILING */
953
954 37cd5dc2 Svenja
  urtMutexUnlock(topic->lock);
955 5b7188aa skenneweg
  return URT_STATUS_OK;
956
}
957 7d9678db skenneweg
958
959
/**
960 5198dfae skenneweg
 * @brief  Fetches the next message.
961 7d9678db skenneweg
 *
962 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
963
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
964
 * @param[in] bytes  Payload size in bytes.
965
 * @param[in] latency  The latency can be returned by reference. May be NULL.
966 7d9678db skenneweg
 *
967 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
968
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
969
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
970 7d9678db skenneweg
 */
971
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
972 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
973
{
974
  urtDebugAssert(subscriber);
975
976
  if (!subscriber->base.topic)
977
      return URT_STATUS_FETCH_NOTOPIC;
978
979
  urtMutexLock(subscriber->base.topic->lock);
980
  urt_message_t* messageTemp = subscriber->base.lastMessage;
981
  if (messageTemp->next->originTime > messageTemp.originTime)
982
  {
983
    urtMutexUnlock(subscriber->base.topic->lock);
984
    return URT_STATUS_FETCH_NOMESSAGE;
985
  }
986
  messageTemp = messageTemp->next;
987
988
  uint64_t calculatedLatency = NULL; //TODO: Calculate message latency
989
#if (URT_CFG_PUBSUB_PROFILING == true)
990
  subscriber->base.sumLatencies += calculatedLatency;
991
#endif /* URT_CFG_PUBSUB_PROFILING */
992
  bool temp = false;
993
  if (temp /* optional latency output argument given */)
994
  {
995
    latency = calculatedLatency
996
  }
997
998
  subscriber->base.lastMessage->numHrtConsumersLeft--;
999
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1000
  {
1001
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1002
  }
1003
1004
#if (URT_CFG_PUBSUB_PROFILING == true)
1005
  subscriber->base.lastMessage->numConsumersLeft--;
1006
  subscriber->base->numMessagesReceived++;
1007
#endif /* URT_CFG_PUBSUB_PROFILING */
1008
1009
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
1010
  if (temp /*latency is within allowed jitter range*/)
1011
  {
1012
    if (calculatedLatency < subscriber->minLatency)
1013
    {
1014
      subscriber->minLatency = calculatedLatency;
1015
    }
1016
    else if (calculatedLatency > subscriber->maxLatency)
1017
    {
1018
      subscriber->maxLatency = calculatedLatency;
1019
    }
1020
  }
1021
  else
1022
  {
1023
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1024
    urtMutexUnlock(subscriber->base.topic->lock);
1025
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1026
    return URT_STATUS_JITTERVIOLATION;
1027
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1028
  }
1029
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
1030
1031
  urtFetchMessage();
1032
1033
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1034
  if (messageTemp->next->originTime < messageTemp->originTime)
1035
  {
1036
    //TODO: update qos deadliner timer wrt. next message
1037
  }
1038
  else
1039
  {
1040
    //TODO: reset qos deadline timer
1041
  }
1042
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1043
1044
  urtMutexUnlock(subscriber->base.topic->lock);
1045
  return URT_STATUS_OK;
1046
}
1047 7d9678db skenneweg
1048
1049
/**
1050 5198dfae skenneweg
 * @brief  Fetches the latest message.
1051 7d9678db skenneweg
 *
1052 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
1053
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
1054
 * @param[in] bytes  Payload size in bytes.
1055
 * @param[in] latency  The latency can be returned by reference. May be NULL.
1056 7d9678db skenneweg
 *
1057 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
1058
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1059
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1060 7d9678db skenneweg
 */
1061
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
1062
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
1063
1064
/**
1065 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
1066 7d9678db skenneweg
 *
1067 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
1068 7d9678db skenneweg
 *
1069 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
1070
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
1071 7d9678db skenneweg
 */
1072 5b7188aa skenneweg
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
1073
{
1074
  urtDebugAssert(subscriber);
1075
1076
  if (subscriber->base.topic)
1077
  {
1078 37cd5dc2 Svenja
    urtMutexLock(topic->lock);
1079 5b7188aa skenneweg
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
1080
    subscriber->base.topic->numHrtSubscribers--;
1081
# if (URT_CFG_PUBSUB_PROFILING == true)
1082
    subscriber->base.topic->numSubscribers--;
1083
# endif /* URT_CFG_PUBSUB_PROFILING */
1084
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1085
    //TODO: remove self from topics lsit of HRT subscribers
1086
    //TODO: ...
1087
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1088
1089
    urt_message_t* messageTemp = subscriber->base.lastMessage;
1090 37cd5dc2 Svenja
    bool hrtZero = false;
1091 5b7188aa skenneweg
    while (messageTemp->next->originTime < messageTemp->originTime)
1092
    {
1093
        messageTemp = messageTemp->next;
1094
        messageTemp->numHrtConsumersLeft--;
1095 37cd5dc2 Svenja
        if (messageTemp->numHrtConsumersLeft == 0)
1096
        {
1097
            hrtZero = true;
1098
        }
1099 5b7188aa skenneweg
# if(URT_CFG_PUBSUB_PROFILING == true)
1100
        messageTemp->numConsumersLeft--;
1101
# endif /* URT_CFG_PUBSUB_PROFILING */
1102
    }
1103 37cd5dc2 Svenja
    if (hrtZero)
1104 5b7188aa skenneweg
    {
1105 37cd5dc2 Svenja
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
1106 5b7188aa skenneweg
    }
1107
1108 37cd5dc2 Svenja
    urtMutexUnlock(topic->lock);
1109 5b7188aa skenneweg
    subscriber->base.topic = NULL;
1110
    subscriber->base.lastMessage = NULL;
1111
    subscriber->base.lastMessageTime = 0;
1112
    return URT_STATUS_OK;
1113
  }
1114
1115
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1116
}
1117
1118