Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ fb72e91b

History | View | Annotate | Download (43.174 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23 982056f7 Svenja
#include <stdio.h>
24 7d9678db skenneweg
25 1fb06240 skenneweg
/******************************************************************************/
26
/* LOCAL DEFINITIONS                                                          */
27
/******************************************************************************/
28
29
/******************************************************************************/
30
/* EXPORTED VARIABLES                                                         */
31
/******************************************************************************/
32
33
/******************************************************************************/
34
/* LOCAL TYPES                                                                */
35
/******************************************************************************/
36
37
/******************************************************************************/
38
/* LOCAL VARIABLES                                                            */
39
/******************************************************************************/
40
41
/******************************************************************************/
42
/* LOCAL FUNCTIONS                                                            */
43
/******************************************************************************/
44
45 65dc89cb skenneweg
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
46
{
47 fb72e91b skenneweg
  while (oldestMessage->next->originTime < oldestMessage->originTime) {
48 65dc89cb skenneweg
    oldestMessage = oldestMessage->next;
49
  }
50
  return oldestMessage;
51
}
52
53
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
54
{
55 fb72e91b skenneweg
  urt_message_t* lastMessage = latestMessage;
56
  while (lastMessage->next->originTime < lastMessage->originTime) {
57 65dc89cb skenneweg
    lastMessage = lastMessage->next;
58
#if (URT_CFG_PUBSUB_PROFILING == true)
59
    subscriber->base.lastMessage->numConsumersLeft--;
60
    subscriber->base->numMessagesReceived++;
61
#endif /* URT_CFG_PUBSUB_PROFILING */
62
  }
63 fb72e91b skenneweg
  return latestMessage;
64 65dc89cb skenneweg
}
65
66 fb72e91b skenneweg
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
67 982056f7 Svenja
{
68
  urt_message_t* lastMessageContribute = messages;
69 fb72e91b skenneweg
  while (lastMessageContribute->next) {
70 982056f7 Svenja
    lastMessageContribute = lastMessageContribute->next;
71
  }
72
  lastMessageContribute->next = topic->latestMessage->next;
73
  topic->latestMessage->next = messages;
74
}
75
76 1fb06240 skenneweg
/******************************************************************************/
77
/* EXPORTED FUNCTIONS                                                         */
78
/******************************************************************************/
79
80 7d9678db skenneweg
/**
81
 * @brief   Initialize the nrt Subscriber.
82
 *
83 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
84 7d9678db skenneweg
 */
85 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
86
{
87 a5e142de skenneweg
  urtDebugAssert(subscriber);
88
89 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
90
  urtEventListenerInit(subscriber->base.evtListener);
91
  subscriber->base.lastMessage = NULL;
92
  subscriber->base.lastMessageTime = 0;
93 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
94 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
95
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
96 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
97 5c6cb22f skenneweg
  return;
98
}
99 1fb06240 skenneweg
100 7d9678db skenneweg
/**
101 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
102 7d9678db skenneweg
 *
103 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
104
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
105
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
106
 *                      Messages must not be associated to another topic.
107
 *                      Once a message has been contributed, it cannot be removed later.
108
 *                      May be NULL(no messages to contribute).
109 7d9678db skenneweg
 *
110 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
111
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
112 7d9678db skenneweg
 */
113 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
114
{
115
  urtDebugAssert(subscriber);
116
  urtDebugAssert(topic);
117
118 982056f7 Svenja
  if (!subscriber->base.topic) {
119
    return URT_STATUS_SUBSCRIBE_TOPICSET;
120
  }
121 a5e142de skenneweg
122
  subscriber->base.topic = topic;
123 fb72e91b skenneweg
  urtMutexLock(&topic->lock);
124 a5e142de skenneweg
125 982056f7 Svenja
  if (messages) {
126 fb72e91b skenneweg
    urtContributeMessages(messages, topic);
127 a5e142de skenneweg
  }
128
129
  subscriber->base.lastMessage = topic->latestMessage;
130
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
131
132 fb72e91b skenneweg
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
133 a5e142de skenneweg
134
#if (URT_CFG_PUBSUB_PROFILING == true)
135
    topic->numHrtSubscribers--;
136
#endif /* URT_CFG_PUBSUB_PROFILING */
137
138 fb72e91b skenneweg
  urtMutexUnlock(&topic->lock);
139 7d9678db skenneweg
  return URT_STATUS_OK;
140 1fb06240 skenneweg
}
141
142 7d9678db skenneweg
/**
143 5198dfae skenneweg
 * @brief  Fetches the next message.
144 7d9678db skenneweg
 *
145 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
146
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
147
 * @param[in] bytes  Payload size in bytes.
148
 * @param[in] latency  The latency can be returned by reference. May be NULL.
149 7d9678db skenneweg
 *
150 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
151
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
152
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
153 7d9678db skenneweg
 */
154 982056f7 Svenja
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
155 65dc89cb skenneweg
{   
156 a5e142de skenneweg
  urtDebugAssert(subscriber);
157 65dc89cb skenneweg
158 982056f7 Svenja
  if (!subscriber->base.topic) {
159
    return URT_STATUS_FETCH_NOTOPIC;
160
  }
161 65dc89cb skenneweg
162 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
163 65dc89cb skenneweg
164 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
165
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
166
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
167 fb72e91b skenneweg
      urtMutexUnlock(&subscriber->base.topic->lock);
168 65dc89cb skenneweg
      return URT_STATUS_FETCH_NOMESSAGE;
169
    }
170 982056f7 Svenja
    oldestMessage = oldestMessage->next;
171 65dc89cb skenneweg
  }
172 982056f7 Svenja
  else {
173
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
174 65dc89cb skenneweg
  }
175
176 fb72e91b skenneweg
  subscriber->base.lastMessage = oldestMessage;
177
  subscriber->base.lastMessageTime = oldestMessage->originTime;
178
  memcpy(oldestMessage->payload, payload, bytes);
179 982056f7 Svenja
180
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
181
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
182 65dc89cb skenneweg
183
#if(URT_CFG_PUBSUB_PROFILING == true)
184
  subscriber->base.sumLatencies += calculatedLatency;
185
186 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
187 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
188
  }
189 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
190 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
191
  }
192
#endif /* URT_CFG_PUBSUB_PROFILING */
193 982056f7 Svenja
194
    if (latency) {
195 fb72e91b skenneweg
      *latency = calculatedLatency;
196 982056f7 Svenja
    }
197 65dc89cb skenneweg
  }
198
199
#if (URT_CFG_PUBSUB_PROFILING == true)
200
  subscriber->base.lastMessage->numConsumersLeft--;
201
  subscriber->base->numMessagesReceived++;
202
#endif /* URT_CFG_PUBSUB_PROFILING */
203
204 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
205 7d9678db skenneweg
  return URT_STATUS_OK;
206 1fb06240 skenneweg
}
207
208 7d9678db skenneweg
/**
209 a5e142de skenneweg
 * @brief Fetches the latest message.
210 7d9678db skenneweg
 *
211 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
212
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
213
 * @param[in] bytes  Payload size in bytes.
214
 * @param[in] latency  The latency can be returned by reference. May be NULL.
215 7d9678db skenneweg
 *
216 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
217
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
218
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
219 7d9678db skenneweg
 */
220 982056f7 Svenja
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
221 65dc89cb skenneweg
  urtDebugAssert(subscriber);
222
223
  if (!subscriber->base.topic)
224
      return URT_STATUS_FETCH_NOTOPIC;
225
226 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
227 65dc89cb skenneweg
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
228
229
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
230
  {
231 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
232 65dc89cb skenneweg
    return URT_STATUS_FETCH_NOMESSAGE;
233
  }
234
235 fb72e91b skenneweg
  subscriber->base.lastMessage = lastMessage;
236
  subscriber->base.lastMessageTime = lastMessage->originTime;
237
  memcpy(lastMessage->payload, payload, bytes);
238 982056f7 Svenja
239
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
240
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
241 65dc89cb skenneweg
242
#if(URT_CFG_PUBSUB_PROFILING == true)
243
  subscriber->base.sumLatencies += calculatedLatency;
244
245 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
246 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
247
  }
248 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
249 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
250
  }
251
#endif /* URT_CFG_PUBSUB_PROFILING */
252 982056f7 Svenja
253
    if (latency) {
254 fb72e91b skenneweg
      *latency = calculatedLatency;
255 982056f7 Svenja
    }
256 65dc89cb skenneweg
  }
257
258 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
259 7d9678db skenneweg
  return URT_STATUS_OK;
260 1fb06240 skenneweg
}
261
262 7d9678db skenneweg
/**
263 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
264 7d9678db skenneweg
 *
265 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
266 7d9678db skenneweg
 *
267 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
268
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
269 7d9678db skenneweg
 */
270 5b7188aa skenneweg
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
271
{
272 a5e142de skenneweg
  if (subscriber->base.topic)
273
  {
274
# if(URT_CFG_PUBSUB_PROFILING == true)
275 fb72e91b skenneweg
      urtMutexLock(&topic->lock);
276 5b7188aa skenneweg
      subscriber->base.topic->numSubscribers--;
277 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
278 fb72e91b skenneweg
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
279 a5e142de skenneweg
# if(URT_CFG_PUBSUB_PROFILING == true)
280 fb72e91b skenneweg
      urtMutexUnlock(&topic->lock);
281 a5e142de skenneweg
      subscriber->base.topic = NULL;
282
      subscriber->base.lastMessage = NULL;
283
      subscriber->base.lastMessageTime = 0;
284
#endif /* URT_CFG_PUBSUB_PROFILING */
285
    return URT_STATUS_OK;
286
  }
287 5b7188aa skenneweg
288
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
289 1fb06240 skenneweg
}
290
291 7d9678db skenneweg
292
/**
293 5198dfae skenneweg
 * @brief  Initialize the srt Subscriber.
294 7d9678db skenneweg
 *
295 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
296 7d9678db skenneweg
 */
297 5c6cb22f skenneweg
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
298
{
299 a5e142de skenneweg
  urtDebugAssert(subscriber);
300
301 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
302
  urtEventListenerInit(subscriber->base.evtListener);
303
  subscriber->base.lastMessage = NULL;
304
  subscriber->base.lastMessageTime = 0;
305 fb72e91b skenneweg
#if (URT_CFG_PUBSUB_PROFILING)
306
  subscriber->base.sumLatencies = 0;
307
  subscriber->base.numMessagesReceived = 0;
308
  subscriber->usefulnesscb = NULL;
309
  subscriber->cbparams = NULL;
310
  subscriber->minLatency = URT_DELAY_INFINITE;
311
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
312
#endif /* URT_CFG_PUBSUB_PROFILING */
313 5c6cb22f skenneweg
  return;
314
}
315 7d9678db skenneweg
316
/**
317 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
318
 *
319
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
320
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
321
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
322
 *                     Messages must not be associated to another topic.
323
 *                     Once a message has been contributed, it cannot be removed later.
324
 *                     May be NULL (no messages to contribute)
325
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
326
 * @param[in] cbparams  Optional parameters for the usefulness callback.
327
 *                      May be NULL if the callback expects no parameters.
328
 *
329
 * @return  Returns URT_STATUS_OK on success.
330
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
331 7d9678db skenneweg
 */
332
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
333 fb72e91b skenneweg
                                       urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
334 a5e142de skenneweg
{
335
  urtDebugAssert(subscriber);
336
  urtDebugAssert(topic);
337
338 fb72e91b skenneweg
  if (subscriber->base.topic) {
339 a5e142de skenneweg
    return URT_STATUS_SUBSCRIBE_TOPICSET;
340
  }
341 5b7188aa skenneweg
342
  subscriber->base.topic = topic;
343
  subscriber->usefulnesscb = usefulnesscb;
344
  subscriber->cbparams = cbparams;
345 fb72e91b skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
346 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
347
  subscriber->base.numMessagesReceived = 0;
348
  subscriber->minLatency = URT_DELAY_INFINITE;
349
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
350 fb72e91b skenneweg
#endif  /* URT_CFG_PUBSUB_PROFILING */
351 a5e142de skenneweg
352 fb72e91b skenneweg
  urtMutexLock(&topic->lock);
353
  if (messages) {
354
    urtContributeMessages(messages, topic);
355 a5e142de skenneweg
  }
356
357
  subscriber->base.lastMessage = topic->latestMessage;
358
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
359
360 fb72e91b skenneweg
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
361 a5e142de skenneweg
362
# if (URT_CFG_PUBSUB_PROFILING == true)
363
    topic->numHrtSubscribers--;
364
# endif /* URT_CFG_PUBSUB_PROFILING */
365
366 fb72e91b skenneweg
  urtMutexUnlock(&topic->lock);
367 a5e142de skenneweg
  return URT_STATUS_OK;
368
}
369 7d9678db skenneweg
370
/**
371 5198dfae skenneweg
 * @brief  Fetches the next message.
372 7d9678db skenneweg
 *
373 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
374
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
375
 * @param[in] bytes  Payload size in bytes.
376
 * @param[in] latency  The latency can be returned by reference. May be NULL.
377 7d9678db skenneweg
 *
378 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
379
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
380
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
381 7d9678db skenneweg
 */
382 982056f7 Svenja
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload,
383 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
384
{
385
  urtDebugAssert(subscriber);
386
387 fb72e91b skenneweg
  if (!subscriber->base.topic){
388
    return URT_STATUS_FETCH_NOTOPIC;
389
  }
390 65dc89cb skenneweg
391 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
392 65dc89cb skenneweg
393 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
394 fb72e91b skenneweg
  if(oldestMessage->originTime == subscriber->base.lastMessageTime){
395
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){
396
      urtMutexUnlock(&subscriber->base.topic->lock);
397 65dc89cb skenneweg
      return URT_STATUS_FETCH_NOMESSAGE;
398
    }
399 982056f7 Svenja
    oldestMessage = oldestMessage->next;
400 65dc89cb skenneweg
  }
401 fb72e91b skenneweg
  else{
402 982056f7 Svenja
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
403 65dc89cb skenneweg
  }
404
405 fb72e91b skenneweg
  subscriber->base.lastMessage = oldestMessage;
406
  subscriber->base.lastMessageTime = oldestMessage->originTime;
407
  memcpy(oldestMessage->payload, payload, bytes);
408 982056f7 Svenja
409
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
410
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
411 65dc89cb skenneweg
412
#if(URT_CFG_PUBSUB_PROFILING == true)
413
  subscriber->base.sumLatencies += calculatedLatency;
414
415 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
416 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
417
  }
418 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
419 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
420
  }
421
#endif /* URT_CFG_PUBSUB_PROFILING */
422 982056f7 Svenja
423
    if (latency) {
424 fb72e91b skenneweg
      *latency = calculatedLatency;
425 982056f7 Svenja
    }
426 65dc89cb skenneweg
  }
427
428
#if (URT_CFG_PUBSUB_PROFILING == true)
429
  subscriber->base.lastMessage->numConsumersLeft--;
430
  subscriber->base->numMessagesReceived++;
431
#endif /* URT_CFG_PUBSUB_PROFILING */
432
433 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
434 65dc89cb skenneweg
  return URT_STATUS_OK;
435
}
436 7d9678db skenneweg
437
/**
438 5198dfae skenneweg
 * @brief  Fetches the latest message.
439 7d9678db skenneweg
 *
440 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
441
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
442
 * @param[in] bytes  Payload size in bytes.
443
 * @param[in] latency  The latency can be returned by reference. May be NULL.
444 7d9678db skenneweg
 *
445 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
446
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
447
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
448 7d9678db skenneweg
 */
449 982056f7 Svenja
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload,
450 65dc89cb skenneweg
                                                size_t bytes, urt_delay_t* latency)
451
{
452
  urtDebugAssert(subscriber);
453
454 982056f7 Svenja
  if (!subscriber->base.topic) {
455
    return URT_STATUS_FETCH_NOTOPIC;
456
  }
457 65dc89cb skenneweg
458 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
459 65dc89cb skenneweg
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
460
461
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
462
  {
463 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
464 65dc89cb skenneweg
    return URT_STATUS_FETCH_NOMESSAGE;
465
  }
466
467 fb72e91b skenneweg
  subscriber->base.lastMessage = lastMessage;
468
  subscriber->base.lastMessageTime = lastMessage->originTime;
469
  memcpy(lastMessage->payload, payload, bytes);
470 982056f7 Svenja
471
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
472
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
473 65dc89cb skenneweg
474
#if(URT_CFG_PUBSUB_PROFILING == true)
475
  subscriber->base.sumLatencies += calculatedLatency;
476
477 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
478 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
479
  }
480 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
481 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
482
  }
483
#endif /* URT_CFG_PUBSUB_PROFILING */
484 982056f7 Svenja
485 fb72e91b skenneweg
    if (latency != NULL) {
486
      *latency = calculatedLatency;
487 982056f7 Svenja
    }
488 65dc89cb skenneweg
  }
489
490 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
491 65dc89cb skenneweg
  return URT_STATUS_OK;
492
}
493 7d9678db skenneweg
494
/**
495 5198dfae skenneweg
 * @brief  Calculates the usefulness of the subscriber.
496 7d9678db skenneweg
 *
497 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
498
 * @param[in] latency  Latency (of a message) as argument to calculate usefulness.
499 7d9678db skenneweg
 *
500 5198dfae skenneweg
 * @return  Returns the usefulness as a value within [0,1].
501 7d9678db skenneweg
 */
502 a5e142de skenneweg
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
503
{
504
  urtDebugAssert(subscriber);
505
506 fb72e91b skenneweg
  return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
507 a5e142de skenneweg
}
508 7d9678db skenneweg
509
/**
510 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
511 7d9678db skenneweg
 *
512 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
513 7d9678db skenneweg
 *
514 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
515
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
516 7d9678db skenneweg
 */
517 a5e142de skenneweg
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
518
{
519
  urtDebugAssert(subscriber);
520
521
  if (subscriber->base.topic)
522
  {
523 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
524 fb72e91b skenneweg
    urtMutexLock(&topic->lock);
525 5b7188aa skenneweg
    subscriber->base.topic->numSubscribers--;
526
# endif /* URT_CFG_PUBSUB_PROFILING */
527 fb72e91b skenneweg
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
528 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
529 fb72e91b skenneweg
      urtMutexUnlock(&topic->lock);
530 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
531
    subscriber->base.topic = NULL;
532
    subscriber->base.lastMessage = NULL;
533
    subscriber->base.lastMessageTime = 0;
534
    return URT_STATUS_OK;
535
  }
536
537
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
538
}
539 7d9678db skenneweg
540
541
/**
542 5198dfae skenneweg
 * @brief  Initialize the FRT Subscriber.
543 7d9678db skenneweg
 *
544 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
545 7d9678db skenneweg
 */
546 5c6cb22f skenneweg
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
547
{
548 a5e142de skenneweg
  urtDebugAssert(subscriber);
549
550 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
551
  urtEventListenerInit(subscriber->base.evtListener);
552
  subscriber->base.lastMessage = NULL;
553
  subscriber->base.lastMessageTime = 0;
554
555
  #if (URT_CFG_PUBSUB_PROFILING)
556
    subscriber->base.sumLatencies = 0;
557
    subscriber->base.numMessagesReceived = 0;
558
  #endif /* URT_CFG_PUBSUB_PROFILING */
559
560
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
561
    subscriber->deadlineOffset = 0;
562
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
563
564
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
565
    subscriber->maxJitter = 0;
566
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
567
568
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
569
    subscriber->minLatency = URT_DELAY_INFINITE;
570
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
571
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
572
  return;
573
}
574
575 7d9678db skenneweg
576
/**
577 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
578
 *
579
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
580
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
581
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
582
 *                      Messages must not be associated to another topic.
583
 *                      Once a message has been contributed, it cannot be removed later.
584
 *                      May be NULL(no messages to contribute).
585
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
586
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
587
 *                    A value of 0 indicates that jitter is of no concern.
588
 *
589
 * @return  Returns URT_STATUS_OK on success.
590
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
591 7d9678db skenneweg
 */
592
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
593 a5e142de skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
594
{
595
  urtDebugAssert(subscriber);
596
  urtDebugAssert(topic);
597
598 5b7188aa skenneweg
  if (subscriber->base.topic)
599
  {
600
    return URT_STATUS_SUBSCRIBE_TOPICSET;
601
  }
602
603
  subscriber->base.topic = topic;
604 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
605 5b7188aa skenneweg
  subscriber->base.sumLatencies = 0;
606
  subscriber->base.numMessagesReceived = 0;
607 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
608
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
609 5b7188aa skenneweg
  subscriber->deadlineOffset = deadline;
610 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
611
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
612 5b7188aa skenneweg
  subscriber->maxJitter =jitter;
613 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
614
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
615 5b7188aa skenneweg
  subscriber->minLatency = URT_DELAY_INFINITE;
616
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
617 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
618
619 fb72e91b skenneweg
  urtMutexLock(&topic->lock);
620 5b7188aa skenneweg
  if (messages)
621
  {
622 fb72e91b skenneweg
    urtContributeMessages(messages, topic);
623 5b7188aa skenneweg
  }
624 a5e142de skenneweg
625 5b7188aa skenneweg
  subscriber->base.lastMessage = topic->latestMessage;
626
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
627 a5e142de skenneweg
628 fb72e91b skenneweg
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
629 a5e142de skenneweg
630
# if (URT_CFG_PUBSUB_PROFILING == true)
631 5b7188aa skenneweg
  topic->numHrtSubscribers--;
632 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
633
634 fb72e91b skenneweg
  urtMutexUnlock(&topic->lock);
635 5b7188aa skenneweg
  return URT_STATUS_OK;
636 a5e142de skenneweg
}
637 7d9678db skenneweg
638
/**
639 5198dfae skenneweg
 * @brief  Fetches the next message.
640 7d9678db skenneweg
 *
641 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
642
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
643
 * @param[in] bytes  Payload size in bytes.
644
 * @param[in] latency  The latency can be returned by reference. May be NULL.
645 7d9678db skenneweg
 *
646 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
647
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
648
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
649 7d9678db skenneweg
 */
650 982056f7 Svenja
urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload,
651 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
652
{
653
  urtDebugAssert(subscriber);
654
655
  if (!subscriber->base.topic)
656
      return URT_STATUS_FETCH_NOTOPIC;
657
658 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
659 65dc89cb skenneweg
660 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
661
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
662 65dc89cb skenneweg
  {
663
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
664
    {
665 fb72e91b skenneweg
      urtMutexUnlock(&subscriber->base.topic->lock);
666 65dc89cb skenneweg
      return URT_STATUS_FETCH_NOMESSAGE;
667
    }
668 982056f7 Svenja
    oldestMessage = oldestMessage->next;
669 65dc89cb skenneweg
  }
670
  else
671
  {
672 982056f7 Svenja
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
673 65dc89cb skenneweg
  }
674
675 fb72e91b skenneweg
  subscriber->base.lastMessage = oldestMessage;
676
  subscriber->base.lastMessageTime = oldestMessage->originTime;
677
  memcpy(oldestMessage->payload, payload, bytes);
678 65dc89cb skenneweg
679 982056f7 Svenja
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
680
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
681
682
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
683 65dc89cb skenneweg
  subscriber->base.sumLatencies += calculatedLatency;
684 982056f7 Svenja
685
  if (calculatedLatency < subscriber->minLatency) {
686
    subscriber->minLatency = calculatedLatency;
687
  }
688
  else if (calculatedLatency > subscriber->maxLatency) {
689
    subscriber->maxLatency = calculatedLatency;
690
  }
691 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
692
693 982056f7 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
694
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
695 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
696
  }
697 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
698 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
699
  }
700 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
701 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
702 982056f7 Svenja
    return URT_STATUS_JITTERVIOLATION;
703
  }
704
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
705 65dc89cb skenneweg
706 982056f7 Svenja
    if (latency) {
707 fb72e91b skenneweg
      *latency = calculatedLatency;
708 982056f7 Svenja
    }
709 65dc89cb skenneweg
  }
710
711
#if (URT_CFG_PUBSUB_PROFILING == true)
712
  subscriber->base.lastMessage->numConsumersLeft--;
713
  subscriber->base->numMessagesReceived++;
714
#endif /* URT_CFG_PUBSUB_PROFILING */
715
716 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
717 982056f7 Svenja
  return URT_STATUS_OK;
718 65dc89cb skenneweg
}
719 7d9678db skenneweg
720
/**
721 5198dfae skenneweg
 * @brief  Fetches the latest message.
722 7d9678db skenneweg
 *
723 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
724
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
725
 * @param[in] bytes  Payload size in bytes.
726
 * @param[in] latency  The latency can be returned by reference. May be NULL.
727 7d9678db skenneweg
 *
728 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
729
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
730
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
731 7d9678db skenneweg
 */
732 982056f7 Svenja
urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload,
733 65dc89cb skenneweg
                                                size_t bytes, urt_delay_t* latency)
734
{
735
  urtDebugAssert(subscriber);
736
737
  if (!subscriber->base.topic)
738
      return URT_STATUS_FETCH_NOTOPIC;
739
740 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
741 65dc89cb skenneweg
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
742
743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
744
  {
745 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
746 65dc89cb skenneweg
    return URT_STATUS_FETCH_NOMESSAGE;
747
  }
748
749 fb72e91b skenneweg
    subscriber->base.lastMessage = lastMessage;
750
    subscriber->base.lastMessageTime = lastMessage->originTime;
751
    memcpy(lastMessage->payload, payload, bytes);
752 65dc89cb skenneweg
753 982056f7 Svenja
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
754 fb72e91b skenneweg
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
755 982056f7 Svenja
756
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
757 65dc89cb skenneweg
  subscriber->base.sumLatencies += calculatedLatency;
758 982056f7 Svenja
759
  if (calculatedLatency < subscriber->minLatency) {
760
    subscriber->minLatency = calculatedLatency;
761
  }
762
  else if (calculatedLatency > subscriber->maxLatency) {
763
    subscriber->maxLatency = calculatedLatency;
764
  }
765 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
766
767 982056f7 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
768
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
769 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
770
  }
771 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
772 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
773
  }
774 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
775 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
776 982056f7 Svenja
    return URT_STATUS_JITTERVIOLATION;
777
  }
778
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
779 65dc89cb skenneweg
780 982056f7 Svenja
    if (latency) {
781 fb72e91b skenneweg
      *latency = calculatedLatency;
782 982056f7 Svenja
    }
783 65dc89cb skenneweg
  }
784
785 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
786 65dc89cb skenneweg
  return URT_STATUS_OK;
787
}
788 7d9678db skenneweg
789
/**
790 5198dfae skenneweg
 * @brief  Calculates the validity from the subscriber.
791 7d9678db skenneweg
 *
792 5198dfae skenneweg
 * @param[in] subscriber  The FRT subscriber to calculate a validity for. Must not be NULL.
793
 * @param[in] latency  Latency (of a message) as argument to calculate validity.
794 7d9678db skenneweg
 *
795 5198dfae skenneweg
 * @return  Returns a boolean indicator whether the latency is fine.
796 7d9678db skenneweg
 */
797 a5e142de skenneweg
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
798
{
799 37cd5dc2 Svenja
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
800
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
801 5b7188aa skenneweg
    return true;
802 37cd5dc2 Svenja
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
803
804
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
805
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
806
      return true;
807
808
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
809
      return true;
810 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
811 5b7188aa skenneweg
812 a5e142de skenneweg
  return false;
813
}
814 7d9678db skenneweg
815
/**
816 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
817 7d9678db skenneweg
 *
818 5c6cb22f skenneweg
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
819 7d9678db skenneweg
 *
820 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
821
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
822 7d9678db skenneweg
 */
823 5b7188aa skenneweg
urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber)
824
{
825
  urtDebugAssert(subscriber);
826
827 982056f7 Svenja
  if (!subscriber->base.topic) {
828
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
829
  }
830
831
#if (URT_CFG_PUBSUB_PROFILING == true)
832 fb72e91b skenneweg
  urtMutexLock(&topic->lock);
833 982056f7 Svenja
#endif /* URT_CFG_PUBSUB_PROFILING */
834
835 fb72e91b skenneweg
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
836 982056f7 Svenja
837 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
838 982056f7 Svenja
  subscriber->base.topic->numSubscribers--;
839 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
840
841
# if (URT_CFG_PUBSUB_PROFILING == true)
842 fb72e91b skenneweg
  urtMutexUnlock(&topic->lock);
843 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
844 982056f7 Svenja
  subscriber->base.topic = NULL;
845
  subscriber->base.lastMessage = NULL;
846
  subscriber->base.lastMessageTime = 0;
847
  return URT_STATUS_OK;
848 5b7188aa skenneweg
}
849 7d9678db skenneweg
850
851
/**
852 5198dfae skenneweg
 * @brief  Initialize the HRT Subscriber.
853 7d9678db skenneweg
 *
854 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
855 7d9678db skenneweg
 */
856 5c6cb22f skenneweg
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
857
{
858 a5e142de skenneweg
  urtDebugAssert(subscriber);
859
860 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
861
  urtEventListenerInit(subscriber->base.evtListener);
862
  subscriber->base.lastMessage = NULL;
863
  subscriber->base.lastMessageTime = 0;
864
865 a5e142de skenneweg
# if (URT_CFG_PUBSUB_PROFILING)
866 5c6cb22f skenneweg
    subscriber->base.sumLatencies = 0;
867
    subscriber->base.numMessagesReceived = 0;
868 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
869 5c6cb22f skenneweg
870
  subscriber->next = NULL;
871
872 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
873 5c6cb22f skenneweg
    subscriber->deadlineOffset = 0;
874
    urtTimerInit(subscriber->qodDeadlineTimer);
875 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
876 5c6cb22f skenneweg
877 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
878 5c6cb22f skenneweg
    subscriber->expectedRate = 0;
879 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
880 5c6cb22f skenneweg
881 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
882 5c6cb22f skenneweg
    subscriber->maxJitter = 0;
883 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
884 5c6cb22f skenneweg
885 a5e142de skenneweg
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
886 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
887
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
888 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
889 5c6cb22f skenneweg
  return;
890
}
891
892 7d9678db skenneweg
893
/**
894 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
895
 *
896 5c6cb22f skenneweg
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
897 5198dfae skenneweg
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
898
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
899
 *                      Messages must not be associated to another topic.
900
 *                      Once a message has been contributed, it cannot be removed later.
901
 *                      May be NULL(no messages to contribute).
902
 * @param[in] deadline  Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
903
 * @param[in] rate  Expected minimum rate of new messages (= maximum time between consecutive messages).
904
 *                  A value of 0 indicates, that rate is of no concern.
905
 * @param[in] jitter  Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
906
 *                    A value of 0 indicates that jitter is of no concern.
907
 *
908
 * @return  Returns URT_STATUS_OK on success.
909
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
910 7d9678db skenneweg
 */
911
urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic,
912 5b7188aa skenneweg
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter)
913
{
914
  urtDebugAssert(subscriber);
915
  urtDebugAssert(topic);
916
917
  if (subscriber->base.topic)
918
  {
919
    return URT_STATUS_SUBSCRIBE_TOPICSET;
920
  }
921
922
  subscriber->base.topic = topic;
923
# if (URT_CFG_PUBSUB_PROFILING == true)
924
  subscriber->base.sumLatencies = 0;
925
  subscriber->base.numMessagesReceived = 0;
926
# endif /* URT_CFG_PUBSUB_PROFILING */
927
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
928
  subscriber->deadlineOffset = deadline;
929
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
930
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
931
  subscriber->maxJitter =jitter;
932
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
933
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
934
  subscriber->minLatency = URT_DELAY_INFINITE;
935
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
936
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
937
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
938
  subscriber->expectedRate = rate;
939
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
940
941 fb72e91b skenneweg
  urtMutexLock(&topic->lock);
942 5b7188aa skenneweg
  if (messages)
943
  {
944 fb72e91b skenneweg
    urtContributeMessages(messages, topic);
945 5b7188aa skenneweg
  }
946
947
  subscriber->base.lastMessage = topic->latestMessage;
948
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
949
950 fb72e91b skenneweg
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
951 5b7188aa skenneweg
952
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
953 8378a78b Svenja
  //TODO: Implement
954 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
955
956
  topic->numHrtSubscribers--;
957
# if (URT_CFG_PUBSUB_PROFILING == true)
958
  topic->numSubscribers--;
959
# endif /* URT_CFG_PUBSUB_PROFILING */
960
961 fb72e91b skenneweg
  urtMutexUnlock(&topic->lock);
962 5b7188aa skenneweg
  return URT_STATUS_OK;
963
}
964 7d9678db skenneweg
965
966
/**
967 5198dfae skenneweg
 * @brief  Fetches the next message.
968 7d9678db skenneweg
 *
969 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
970
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
971
 * @param[in] bytes  Payload size in bytes.
972
 * @param[in] latency  The latency can be returned by reference. May be NULL.
973 7d9678db skenneweg
 *
974 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
975
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
976
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
977 7d9678db skenneweg
 */
978 982056f7 Svenja
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
979 65dc89cb skenneweg
                                              size_t bytes, urt_delay_t* latency)
980
{
981
  urtDebugAssert(subscriber);
982
983 982056f7 Svenja
  if (!subscriber->base.topic) {
984
    return URT_STATUS_FETCH_NOTOPIC;
985
  }
986 65dc89cb skenneweg
987 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
988
  urt_message_t* lastMessage = subscriber->base.lastMessage;
989
  if (lastMessage->next->originTime > lastMessage->originTime)
990 65dc89cb skenneweg
  {
991 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
992 65dc89cb skenneweg
    return URT_STATUS_FETCH_NOMESSAGE;
993
  }
994 fb72e91b skenneweg
  lastMessage = lastMessage->next;
995 65dc89cb skenneweg
996 fb72e91b skenneweg
  uint64_t calculatedLatency;
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
998
    calculatedLatency = urtTimeNow() - lastMessage->originTime;
999
  }
1000 982056f7 Svenja
1001
#if(URT_CFG_PUBSUB_PROFILING == true)
1002 65dc89cb skenneweg
  subscriber->base.sumLatencies += calculatedLatency;
1003 982056f7 Svenja
1004
  if (calculatedLatency < subscriber->minLatency) {
1005
    subscriber->minLatency = calculatedLatency;
1006
  }
1007
  else if (calculatedLatency > subscriber->maxLatency) {
1008
    subscriber->maxLatency = calculatedLatency;
1009
  }
1010 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
1011 982056f7 Svenja
1012 fb72e91b skenneweg
  if (latency) {
1013
    *latency = calculatedLatency;
1014 65dc89cb skenneweg
  }
1015
1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
1017
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1018
  {
1019 fb72e91b skenneweg
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1020 65dc89cb skenneweg
  }
1021
1022
#if (URT_CFG_PUBSUB_PROFILING == true)
1023
  subscriber->base.lastMessage->numConsumersLeft--;
1024
  subscriber->base->numMessagesReceived++;
1025
#endif /* URT_CFG_PUBSUB_PROFILING */
1026
1027
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1028 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1029
    subscriber->minLatency = calculatedLatency;
1030
  }
1031
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1032
    subscriber->maxLatency = calculatedLatency;
1033
  }
1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
1036 65dc89cb skenneweg
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1037
    return URT_STATUS_JITTERVIOLATION;
1038 982056f7 Svenja
  }
1039 65dc89cb skenneweg
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1040 982056f7 Svenja
1041
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1042
  if (calculatedLatency < subscriber->minLatency) {
1043
    subscriber->minLatency = calculatedLatency;
1044 65dc89cb skenneweg
  }
1045 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
1046
    subscriber->maxLatency = calculatedLatency;
1047
  }
1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1049 65dc89cb skenneweg
1050 fb72e91b skenneweg
    subscriber->base.lastMessage = lastMessage;
1051
    subscriber->base.lastMessageTime = lastMessage->originTime;
1052
    memcpy(lastMessage->payload, payload, bytes);
1053 65dc89cb skenneweg
1054
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1055 fb72e91b skenneweg
  if (lastMessage->next->originTime < lastMessage->originTime)
1056 65dc89cb skenneweg
  {
1057 982056f7 Svenja
    //TODO: first reset?! (when ... set)
1058
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
1059 65dc89cb skenneweg
  }
1060
  else
1061
  {
1062 982056f7 Svenja
    urtTimerReset(subscriber->qosDeadlineTimer);
1063 65dc89cb skenneweg
  }
1064
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1065
1066 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
1067 65dc89cb skenneweg
  return URT_STATUS_OK;
1068
}
1069 7d9678db skenneweg
1070
1071
/**
1072 5198dfae skenneweg
 * @brief  Fetches the latest message.
1073 7d9678db skenneweg
 *
1074 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
1075
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
1076
 * @param[in] bytes  Payload size in bytes.
1077
 * @param[in] latency  The latency can be returned by reference. May be NULL.
1078 7d9678db skenneweg
 *
1079 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
1080
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
1081
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
1082 7d9678db skenneweg
 */
1083 982056f7 Svenja
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload,
1084
                                                size_t bytes, urt_delay_t* latency)
1085
{
1086
  if (!subscriber->base.topic) {
1087
    return URT_STATUS_FETCH_NOTOPIC;
1088
  }
1089
1090 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
1091 982056f7 Svenja
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1092
  bool hrtZero = false;
1093
  while (lastMessage->next->originTime < lastMessage->originTime) {
1094
    lastMessage = lastMessage->next;
1095
    lastMessage->numHrtConsumersLeft--;
1096
    if (lastMessage->numHrtConsumersLeft == 0) {
1097 fb72e91b skenneweg
        hrtZero = true;
1098 982056f7 Svenja
    }
1099
#if (URT_CFG_PUBSUB_PROFILING == true)
1100
    lastMessage->numConsumersLeft--;
1101
    subscriber->base.numMessagesReceived++;
1102
#endif /* URT_CFG_PUBSUB_PROFILING */
1103
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1104
    urtTimerReset(subscriber->qosDeadlineTimer);
1105
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1106
  }
1107
1108
  if (hrtZero) {
1109 fb72e91b skenneweg
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1110 982056f7 Svenja
  }
1111
1112
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1113 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
1114 982056f7 Svenja
    return URT_STATUS_FETCH_NOMESSAGE;
1115
  }
1116
1117 fb72e91b skenneweg
  uint64_t calculatedLatency;
1118
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
1119
     calculatedLatency = urtTimeNow() - lastMessage->originTime;
1120
  }
1121 982056f7 Svenja
1122
#if(URT_CFG_PUBSUB_PROFILING == true)
1123
  subscriber->base.sumLatencies += calculatedLatency;
1124
1125
  if (calculatedLatency < subscriber->minLatency) {
1126
    subscriber->minLatency = calculatedLatency;
1127
  }
1128
  else if (calculatedLatency > subscriber->maxLatency) {
1129
    subscriber->maxLatency = calculatedLatency;
1130
  }
1131
#endif /* URT_CFG_PUBSUB_PROFILING */
1132
1133 fb72e91b skenneweg
  if (latency) {
1134
    *latency = calculatedLatency;
1135 982056f7 Svenja
  }
1136
1137
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
1138
  if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
1139
    subscriber->minLatency = calculatedLatency;
1140
  }
1141
  else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) {
1142
    subscriber->maxLatency = calculatedLatency;
1143
  }
1144
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1145 fb72e91b skenneweg
    urtMutexUnlock(&subscriber->base.topic->lock);
1146 982056f7 Svenja
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1147
    return URT_STATUS_JITTERVIOLATION;
1148
  }
1149
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
1150
1151
#if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
1152
  if (calculatedLatency < subscriber->minLatency) {
1153
    subscriber->minLatency = calculatedLatency;
1154
  }
1155
  else if (calculatedLatency > subscriber->maxLatency) {
1156
    subscriber->maxLatency = calculatedLatency;
1157
  }
1158
#endif /* URT_CFG_PUBSUB_PROFILING */
1159
1160 fb72e91b skenneweg
    subscriber->base.lastMessage = lastMessage;
1161
    subscriber->base.lastMessageTime = lastMessage->originTime;
1162
    memcpy(lastMessage->payload, payload, bytes);
1163 982056f7 Svenja
1164
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1165
  urtTimerReset(subscriber->qosDeadlineTimer);
1166
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1167 fb72e91b skenneweg
1168
  urtMutexUnlock(&subscriber->base.topic->lock);
1169
  return URT_STATUS_OK;
1170 982056f7 Svenja
}
1171 7d9678db skenneweg
1172
/**
1173 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
1174 7d9678db skenneweg
 *
1175 5198dfae skenneweg
 * @param[in] subscriber  The HRT subscriber to be unsubscribed. Must not be NULL.
1176 7d9678db skenneweg
 *
1177 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
1178
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
1179 7d9678db skenneweg
 */
1180 5b7188aa skenneweg
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber)
1181
{
1182
  urtDebugAssert(subscriber);
1183
1184 fb72e91b skenneweg
  if(!subscriber->base.topic) {
1185
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1186
  }
1187
1188
  urtMutexLock(&subscriber->base.topic->lock);
1189
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
1190
  subscriber->base.topic->numHrtSubscribers--;
1191 5b7188aa skenneweg
# if (URT_CFG_PUBSUB_PROFILING == true)
1192 fb72e91b skenneweg
  subscriber->base.topic->numSubscribers--;
1193 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
1194
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1195 fb72e91b skenneweg
  //TODO: remove self from topics list of HRT subscribers, ...
1196 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1197
1198 fb72e91b skenneweg
  urt_message_t* messageTemp = subscriber->base.lastMessage;
1199
  bool hrtZero = false;
1200
  while (messageTemp->next->originTime < messageTemp->originTime){
1201
    messageTemp = messageTemp->next;
1202
    messageTemp->numHrtConsumersLeft--;
1203
    if (messageTemp->numHrtConsumersLeft == 0) {
1204
      hrtZero = true;
1205
    }
1206 5b7188aa skenneweg
# if(URT_CFG_PUBSUB_PROFILING == true)
1207 fb72e91b skenneweg
    messageTemp->numConsumersLeft--;
1208 5b7188aa skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
1209 fb72e91b skenneweg
  }
1210
  if (hrtZero){
1211
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1212 5b7188aa skenneweg
  }
1213
1214 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
1215
  subscriber->base.topic = NULL;
1216
  subscriber->base.lastMessage = NULL;
1217
  subscriber->base.lastMessageTime = 0;
1218
  return URT_STATUS_OK;
1219 5b7188aa skenneweg
}
1220
1221