Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ a5e142de

History | View | Annotate | Download (24.836 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23
24 1fb06240 skenneweg
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43
44
/******************************************************************************/
45
/* EXPORTED FUNCTIONS                                                         */
46
/******************************************************************************/
47
48 7d9678db skenneweg
/**
49
 * @brief   Initialize the nrt Subscriber.
50
 *
51 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
52 7d9678db skenneweg
 */
53 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54
{
55 a5e142de skenneweg
  urtDebugAssert(subscriber);
56
57 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
58
  urtEventListenerInit(subscriber->base.evtListener);
59
  subscriber->base.lastMessage = NULL;
60
  subscriber->base.lastMessageTime = 0;
61 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
62 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
63
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
64 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
65 5c6cb22f skenneweg
  return;
66
}
67 1fb06240 skenneweg
68 7d9678db skenneweg
/**
69 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
70 7d9678db skenneweg
 *
71 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
72
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
73
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
74
 *                      Messages must not be associated to another topic.
75
 *                      Once a message has been contributed, it cannot be removed later.
76
 *                      May be NULL(no messages to contribute).
77 7d9678db skenneweg
 *
78 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
79
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
80 7d9678db skenneweg
 */
81 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
82
{
83
  urtDebugAssert(subscriber);
84
  urtDebugAssert(topic);
85
86
  if (!subscriber->base.topic)
87
      return URT_STATUS_SUBSCRIBE_TOPICSET;
88
89
  subscriber->base.topic = topic;
90
  //TODO: Lock topic
91
92
  if (messages)
93
  {
94
    urt_message_t* lastMessageContribute = messages;
95
    while (lastMessageContribute->next)
96
    {
97
        lastMessageContribute = lastMessageContribute->next;
98
    }
99
    lastMessageContribute->next = topic->latestMessage->next;
100
    topic->latestMessage->next = messages;
101
  }
102
103
  subscriber->base.lastMessage = topic->latestMessage;
104
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
105
106
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
107
108
#if (URT_CFG_PUBSUB_PROFILING == true)
109
    topic->numHrtSubscribers--;
110
#endif /* URT_CFG_PUBSUB_PROFILING */
111
112
  //TODO: Unlock topic
113 7d9678db skenneweg
  return URT_STATUS_OK;
114 1fb06240 skenneweg
}
115
116 7d9678db skenneweg
/**
117 5198dfae skenneweg
 * @brief  Fetches the next message.
118 7d9678db skenneweg
 *
119 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
120
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
121
 * @param[in] bytes  Payload size in bytes.
122
 * @param[in] latency  The latency can be returned by reference. May be NULL.
123 7d9678db skenneweg
 *
124 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
125
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
126
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
127 7d9678db skenneweg
 */
128 a5e142de skenneweg
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129
{
130
  urtDebugAssert(subscriber);
131
132
  urt_message_t youngestMessage;
133
  if (subscriber->base.topic)
134
  {
135
    //TODO: Lock Topic
136
    urt_osTime_t localCopy; //TODO: replace with local copy
137
    if (subscriber->base.lastMessageTime == localCopy)
138
    {
139
      if(subscriber->base.lastMessage->next->originTime < localCopy)
140
      {
141
        youngestMessage = subscriber->base.lastMessage->next;
142
      }
143
      else
144
      {
145
        //TODO: Unlock Topic
146
        return URT_STATUS_FETCH_NOMESSAGE;
147
      }
148
    }
149
    else
150
    {
151
      youngestMessage = subscriber->base.lastMessage->next;
152
      while (youngestMessage.originTime < localCopy)
153
      {
154
        youngestMessage = youngestMessage.next;
155
      }
156
    }
157
  }
158
  else
159
  {
160
    return URT_STATUS_FETCH_NOTOPIC;
161
  }
162
163
  latency = subscriber->base.lastMessageTime - youngestMessage.originTime;
164
165
  //TODO: Other cases
166
  //TODO: Unlock topic
167 7d9678db skenneweg
  return URT_STATUS_OK;
168 1fb06240 skenneweg
}
169
170 7d9678db skenneweg
/**
171 a5e142de skenneweg
 * @brief Fetches the latest message.
172 7d9678db skenneweg
 *
173 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
174
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
175
 * @param[in] bytes  Payload size in bytes.
176
 * @param[in] latency  The latency can be returned by reference. May be NULL.
177 7d9678db skenneweg
 *
178 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
179
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
180
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
181 7d9678db skenneweg
 */
182 1fb06240 skenneweg
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
183 7d9678db skenneweg
  return URT_STATUS_OK;
184 1fb06240 skenneweg
}
185
186 7d9678db skenneweg
/**
187 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
188 7d9678db skenneweg
 *
189 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
190 7d9678db skenneweg
 *
191 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
192
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
193 7d9678db skenneweg
 */
194 1fb06240 skenneweg
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) {
195 a5e142de skenneweg
  if (subscriber->base.topic)
196
  {
197
# if(URT_CFG_PUBSUB_PROFILING == true)
198
      //TODO: LOCK TOPIC
199
      subscriber->base.topic->numHrtSubscribers--;
200
# endif /* URT_CFG_PUBSUB_PROFILING */
201
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
202
# if(URT_CFG_PUBSUB_PROFILING == true)
203
      //TODO: Unlock TOPIC
204
      subscriber->base.topic = NULL;
205
      subscriber->base.lastMessage = NULL;
206
      subscriber->base.lastMessageTime = 0;
207
#endif /* URT_CFG_PUBSUB_PROFILING */
208
    return URT_STATUS_OK;
209
  }
210
  else
211
      return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
212 1fb06240 skenneweg
}
213
214 7d9678db skenneweg
215
/**
216 5198dfae skenneweg
 * @brief  Initialize the srt Subscriber.
217 7d9678db skenneweg
 *
218 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
219 7d9678db skenneweg
 */
220 5c6cb22f skenneweg
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
221
{
222 a5e142de skenneweg
  urtDebugAssert(subscriber);
223
224 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
225
  urtEventListenerInit(subscriber->base.evtListener);
226
  subscriber->base.lastMessage = NULL;
227
  subscriber->base.lastMessageTime = 0;
228
  #if (URT_CFG_PUBSUB_PROFILING)
229
    subscriber->base.sumLatencies = 0;
230
    subscriber->base.numMessagesReceived = 0;
231
    subscriber->usefulnesscb = NULL;
232
    subscriber->cbparams = NULL;
233
    subscriber->minLatency = URT_DELAY_INFINITE;
234
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
235
  #endif /* URT_CFG_PUBSUB_PROFILING */
236
  return;
237
}
238 7d9678db skenneweg
239
/**
240 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
241
 *
242
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
243
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
244
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
245
 *                     Messages must not be associated to another topic.
246
 *                     Once a message has been contributed, it cannot be removed later.
247
 *                     May be NULL (no messages to contribute)
248
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
249
 * @param[in] cbparams  Optional parameters for the usefulness callback.
250
 *                      May be NULL if the callback expects no parameters.
251
 *
252
 * @return  Returns URT_STATUS_OK on success.
253
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
254 7d9678db skenneweg
 */
255
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
256 a5e142de skenneweg
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
257
{
258
  urtDebugAssert(subscriber);
259
  urtDebugAssert(topic);
260
261
  if (subscriber->base.topic)
262
  {
263
    return URT_STATUS_SUBSCRIBE_TOPICSET;
264
  }
265
  else
266
  {
267
    subscriber->base.topic = topic;
268
    subscriber->usefulnesscb = usefulnesscb;
269
    subscriber->cbparams = cbparams;
270
# if (URT_CFG_PUBSUB_PROFILING == true)
271
      subscriber->base.sumLatencies = 0;
272
      subscriber->base.numMessagesReceived = 0;
273
      subscriber->minLatency = URT_DELAY_INFINITE;
274
      subscriber->maxLatency = URT_DELAY_IMMEDIATE;
275
# endif  /* URT_CFG_PUBSUB_PROFILING */
276
  }
277
278
  //TODO: Lock topic
279
  if (messages)
280
  {
281
    urt_message_t* lastMessageContribute = messages;
282
    while (lastMessageContribute->next)
283
    {
284
        lastMessageContribute = lastMessageContribute->next;
285
    }
286
    lastMessageContribute->next = topic->latestMessage->next;
287
    topic->latestMessage->next = messages;
288
  }
289
290
  subscriber->base.lastMessage = topic->latestMessage;
291
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
292
293
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
294
295
# if (URT_CFG_PUBSUB_PROFILING == true)
296
    topic->numHrtSubscribers--;
297
# endif /* URT_CFG_PUBSUB_PROFILING */
298
299
  //TODO: Unlock topic
300
  return URT_STATUS_OK;
301
}
302 7d9678db skenneweg
303
/**
304 5198dfae skenneweg
 * @brief  Fetches the next message.
305 7d9678db skenneweg
 *
306 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
307
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
308
 * @param[in] bytes  Payload size in bytes.
309
 * @param[in] latency  The latency can be returned by reference. May be NULL.
310 7d9678db skenneweg
 *
311 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
312
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
313
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
314 7d9678db skenneweg
 */
315
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
316
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
317
318
/**
319 5198dfae skenneweg
 * @brief  Fetches the latest message.
320 7d9678db skenneweg
 *
321 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
322
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
323
 * @param[in] bytes  Payload size in bytes.
324
 * @param[in] latency  The latency can be returned by reference. May be NULL.
325 7d9678db skenneweg
 *
326 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
327
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
328
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
329 7d9678db skenneweg
 */
330
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
331
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
332
333
/**
334 5198dfae skenneweg
 * @brief  Calculates the usefulness of the subscriber.
335 7d9678db skenneweg
 *
336 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
337
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
338 7d9678db skenneweg
 *
339 5198dfae skenneweg
 * @return  Returns the usefulness as a value within [0,1].
340 7d9678db skenneweg
 */
341 a5e142de skenneweg
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
342
{
343
  urtDebugAssert(subscriber);
344
345
  return subscriber->usefulnesscb(latency);
346
}
347 7d9678db skenneweg
348
/**
349 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
350 7d9678db skenneweg
 *
351 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
352 7d9678db skenneweg
 *
353 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
354
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
355 7d9678db skenneweg
 */
356 a5e142de skenneweg
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
357
{
358
  urtDebugAssert(subscriber);
359
360
  if (subscriber->base.topic)
361
  {
362
    if (URT_CFG_PUBSUB_PROFILING == true)
363
    {
364
      //TODO: lock topic
365
      subscriber->base.topic->numHrtSubscribers--;
366
    }
367
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
368
# if (URT_CFG_PUBSUB_PROFILING == true)
369
      //TODO: unlock topic
370
# endif /* URT_CFG_PUBSUB_PROFILING */
371
    subscriber->base.topic = NULL;
372
    subscriber->base.lastMessage = NULL;
373
    subscriber->base.lastMessageTime = 0;
374
    return URT_STATUS_OK;
375
  }
376
377
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
378
}
379 7d9678db skenneweg
380
381
/**
382 5198dfae skenneweg
 * @brief  Initialize the FRT Subscriber.
383 7d9678db skenneweg
 *
384 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
385 7d9678db skenneweg
 */
386 5c6cb22f skenneweg
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
387
{
388 a5e142de skenneweg
  urtDebugAssert(subscriber);
389
390 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
391
  urtEventListenerInit(subscriber->base.evtListener);
392
  subscriber->base.lastMessage = NULL;
393
  subscriber->base.lastMessageTime = 0;
394
395
  #if (URT_CFG_PUBSUB_PROFILING)
396
    subscriber->base.sumLatencies = 0;
397
    subscriber->base.numMessagesReceived = 0;
398
  #endif /* URT_CFG_PUBSUB_PROFILING */
399
400
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
401
    subscriber->deadlineOffset = 0;
402
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
403
404
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
405
    subscriber->maxJitter = 0;
406
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
407
408
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
409
    subscriber->minLatency = URT_DELAY_INFINITE;
410
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
411
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
412
  return;
413
}
414
415 7d9678db skenneweg
416
/**
417 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
418
 *
419
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
420
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
421
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
422
 *                      Messages must not be associated to another topic.
423
 *                      Once a message has been contributed, it cannot be removed later.
424
 *                      May be NULL(no messages to contribute).
425
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
426
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
427
 *                    A value of 0 indicates that jitter is of no concern.
428
 *
429
 * @return  Returns URT_STATUS_OK on success.
430
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
431 7d9678db skenneweg
 */
432
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
433 a5e142de skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
434
{
435
  urtDebugAssert(subscriber);
436
  urtDebugAssert(topic);
437
438
    if (subscriber->base.topic)
439
    {
440
      return URT_STATUS_SUBSCRIBE_TOPICSET;
441
    }
442
    else
443
    {
444
      subscriber->base.topic = topic;
445
# if (URT_CFG_PUBSUB_PROFILING == true)
446
        subscriber->base.sumLatencies = 0;
447
        subscriber->base.numMessagesReceived = 0;
448
# endif /* URT_CFG_PUBSUB_PROFILING */
449
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
450
        subscriber->deadlineOffset = deadline;
451
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
452
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
453
        subscriber->maxJitter =jitter;
454
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
455
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
456
        subscriber->minLatency = URT_DELAY_INFINITE;
457
        subscriber->maxLatency = URT_DELAY_IMMEDIATE;
458
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
459
    }
460
461
    //TODO: Lock topic
462
    if (messages)
463
    {
464
      urt_message_t* lastMessageContribute = messages;
465
      while (lastMessageContribute->next)
466
      {
467
          lastMessageContribute = lastMessageContribute->next;
468
      }
469
      lastMessageContribute->next = topic->latestMessage->next;
470
      topic->latestMessage->next = messages;
471
    }
472
473
    subscriber->base.lastMessage = topic->latestMessage;
474
    subscriber->base.lastMessageTime = topic->latestMessage->originTime;
475
476
    urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
477
478
# if (URT_CFG_PUBSUB_PROFILING == true)
479
      topic->numHrtSubscribers--;
480
# endif /* URT_CFG_PUBSUB_PROFILING */
481
482
    //TODO: Unlock topic
483
    return URT_STATUS_OK;
484
}
485 7d9678db skenneweg
486
/**
487 5198dfae skenneweg
 * @brief  Fetches the next message.
488 7d9678db skenneweg
 *
489 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
490
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
491
 * @param[in] bytes  Payload size in bytes.
492
 * @param[in] latency  The latency can be returned by reference. May be NULL.
493 7d9678db skenneweg
 *
494 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
495
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
496
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
497 7d9678db skenneweg
 */
498
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
499
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
500
501
/**
502 5198dfae skenneweg
 * @brief  Fetches the latest message.
503 7d9678db skenneweg
 *
504 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
505
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
506
 * @param[in] bytes  Payload size in bytes.
507
 * @param[in] latency  The latency can be returned by reference. May be NULL.
508 7d9678db skenneweg
 *
509 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
510
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
511
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
512 7d9678db skenneweg
 */
513
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
514
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
515
516
/**
517 5198dfae skenneweg
 * @brief  Calculates the validity from the subscriber.
518 7d9678db skenneweg
 *
519 5198dfae skenneweg
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
520
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
521 7d9678db skenneweg
 *
522 5198dfae skenneweg
 * @return  Returns a boolean indicator whether the latency is fine.
523 7d9678db skenneweg
 */
524 a5e142de skenneweg
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
525
{
526
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
527
    if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
528
        return true;
529
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
530
  return false;
531
}
532 7d9678db skenneweg
533
/**
534 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
535 7d9678db skenneweg
 *
536 5c6cb22f skenneweg
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
537 7d9678db skenneweg
 *
538 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
539
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
540 7d9678db skenneweg
 */
541
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
542
543
544
/**
545 5198dfae skenneweg
 * @brief  Initialize the HRT Subscriber.
546 7d9678db skenneweg
 *
547 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
548 7d9678db skenneweg
 */
549 5c6cb22f skenneweg
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
550
{
551 a5e142de skenneweg
  urtDebugAssert(subscriber);
552
553 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
554
  urtEventListenerInit(subscriber->base.evtListener);
555
  subscriber->base.lastMessage = NULL;
556
  subscriber->base.lastMessageTime = 0;
557
558 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING)
559 5c6cb22f skenneweg
    subscriber->base.sumLatencies = 0;
560
    subscriber->base.numMessagesReceived = 0;
561 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
562 5c6cb22f skenneweg
563
  subscriber->next = NULL;
564
565 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
566 5c6cb22f skenneweg
    subscriber->deadlineOffset = 0;
567
    urtTimerInit(subscriber->qodDeadlineTimer);
568 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
569 5c6cb22f skenneweg
570 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
571 5c6cb22f skenneweg
    subscriber->expectedRate = 0;
572 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
573 5c6cb22f skenneweg
574 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
575 5c6cb22f skenneweg
    subscriber->maxJitter = 0;
576 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
577 5c6cb22f skenneweg
578 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
579 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
580
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
581 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
582 5c6cb22f skenneweg
  return;
583
}
584
585 7d9678db skenneweg
586
/**
587 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
588
 *
589 5c6cb22f skenneweg
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
590 5198dfae skenneweg
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
591
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
592
 *                      Messages must not be associated to another topic.
593
 *                      Once a message has been contributed, it cannot be removed later.
594
 *                      May be NULL(no messages to contribute).
595
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
596
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
597
 *                  A value of 0 indicates, that rate is of no concern.
598
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
599
 *                    A value of 0 indicates that jitter is of no concern.
600
 *
601
 * @return  Returns URT_STATUS_OK on success.
602
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
603 7d9678db skenneweg
 */
604
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
605
                                       urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;}
606
607
608
/**
609 5198dfae skenneweg
 * @brief  Fetches the next message.
610 7d9678db skenneweg
 *
611 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
612
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
613
 * @param[in] bytes  Payload size in bytes.
614
 * @param[in] latency  The latency can be returned by reference. May be NULL.
615 7d9678db skenneweg
 *
616 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
617
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
618
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
619 7d9678db skenneweg
 */
620
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
621
                                              size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
622
623
624
/**
625 5198dfae skenneweg
 * @brief  Fetches the latest message.
626 7d9678db skenneweg
 *
627 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
628
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
629
 * @param[in] bytes  Payload size in bytes.
630
 * @param[in] latency  The latency can be returned by reference. May be NULL.
631 7d9678db skenneweg
 *
632 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
633
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
634
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
635 7d9678db skenneweg
 */
636
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
637
                                                size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
638
639
/**
640 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
641 7d9678db skenneweg
 *
642 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
643 7d9678db skenneweg
 *
644 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
645
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
646 7d9678db skenneweg
 */
647
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;}