Revision a5e142de

View differences:

inc/urt_subscriber.h
142 142
  urt_status_t urtNrtSubscriberUnsubscribe(urt_nrtsubscriber_t* subscriber);
143 143

  
144 144
  void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber);
145
  urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* message, urt_usefulness_f* usefulnesscb, void* cbparams);
145
  urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages, urt_usefulness_f* usefulnesscb, void* cbparams);
146 146
  urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency);
147 147
  urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency);
148 148
  float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency);
src/urt_subscriber.c
52 52
 */
53 53
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54 54
{
55
  urtDebugAssert(subscriber);
56

  
55 57
  subscriber->base.topic = NULL;
56 58
  urtEventListenerInit(subscriber->base.evtListener);
57 59
  subscriber->base.lastMessage = NULL;
58 60
  subscriber->base.lastMessageTime = 0;
59
  #if (URT_CFG_PUBSUB_PROFILING)
61
#if (URT_CFG_PUBSUB_PROFILING == true)
60 62
    subscriber->minLatency = URT_DELAY_INFINITE;
61 63
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
62
  #endif /* URT_CFG_PUBSUB_PROFILING */
64
#endif /* URT_CFG_PUBSUB_PROFILING */
63 65
  return;
64 66
}
65 67

  
......
76 78
 * @return  Returns URT_STATUS_OK on success.
77 79
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
78 80
 */
79
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages) {
81
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
82
{
83
  urtDebugAssert(subscriber);
84
  urtDebugAssert(topic);
85

  
86
  if (!subscriber->base.topic)
87
      return URT_STATUS_SUBSCRIBE_TOPICSET;
88

  
89
  subscriber->base.topic = topic;
90
  //TODO: Lock topic
91

  
92
  if (messages)
93
  {
94
    urt_message_t* lastMessageContribute = messages;
95
    while (lastMessageContribute->next)
96
    {
97
        lastMessageContribute = lastMessageContribute->next;
98
    }
99
    lastMessageContribute->next = topic->latestMessage->next;
100
    topic->latestMessage->next = messages;
101
  }
102

  
103
  subscriber->base.lastMessage = topic->latestMessage;
104
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
105

  
106
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
107

  
108
#if (URT_CFG_PUBSUB_PROFILING == true)
109
    topic->numHrtSubscribers--;
110
#endif /* URT_CFG_PUBSUB_PROFILING */
111

  
112
  //TODO: Unlock topic
80 113
  return URT_STATUS_OK;
81 114
}
82 115

  
......
92 125
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
93 126
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
94 127
 */
95
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
128
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129
{
130
  urtDebugAssert(subscriber);
131

  
132
  urt_message_t youngestMessage;
133
  if (subscriber->base.topic)
134
  {
135
    //TODO: Lock Topic
136
    urt_osTime_t localCopy; //TODO: replace with local copy
137
    if (subscriber->base.lastMessageTime == localCopy)
138
    {
139
      if(subscriber->base.lastMessage->next->originTime < localCopy)
140
      {
141
        youngestMessage = subscriber->base.lastMessage->next;
142
      }
143
      else
144
      {
145
        //TODO: Unlock Topic
146
        return URT_STATUS_FETCH_NOMESSAGE;
147
      }
148
    }
149
    else
150
    {
151
      youngestMessage = subscriber->base.lastMessage->next;
152
      while (youngestMessage.originTime < localCopy)
153
      {
154
        youngestMessage = youngestMessage.next;
155
      }
156
    }
157
  }
158
  else
159
  {
160
    return URT_STATUS_FETCH_NOTOPIC;
161
  }
162

  
163
  latency = subscriber->base.lastMessageTime - youngestMessage.originTime;
164

  
165
  //TODO: Other cases
166
  //TODO: Unlock topic
96 167
  return URT_STATUS_OK;
97 168
}
98 169

  
99 170
/**
100
 * @brief Fetches the lates message.
171
 * @brief Fetches the latest message.
101 172
 *
102 173
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
103 174
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
......
121 192
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
122 193
 */
123 194
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) {
124
  return URT_STATUS_OK;
195
  if (subscriber->base.topic)
196
  {
197
# if(URT_CFG_PUBSUB_PROFILING == true)
198
      //TODO: LOCK TOPIC
199
      subscriber->base.topic->numHrtSubscribers--;
200
# endif /* URT_CFG_PUBSUB_PROFILING */
201
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
202
# if(URT_CFG_PUBSUB_PROFILING == true)
203
      //TODO: Unlock TOPIC
204
      subscriber->base.topic = NULL;
205
      subscriber->base.lastMessage = NULL;
206
      subscriber->base.lastMessageTime = 0;
207
#endif /* URT_CFG_PUBSUB_PROFILING */
208
    return URT_STATUS_OK;
209
  }
210
  else
211
      return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
125 212
}
126 213

  
127 214

  
......
132 219
 */
133 220
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
134 221
{
222
  urtDebugAssert(subscriber);
223

  
135 224
  subscriber->base.topic = NULL;
136 225
  urtEventListenerInit(subscriber->base.evtListener);
137 226
  subscriber->base.lastMessage = NULL;
......
164 253
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
165 254
 */
166 255
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
167
                                       urt_message_t* message, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams) {return URT_STATUS_OK;}
256
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
257
{
258
  urtDebugAssert(subscriber);
259
  urtDebugAssert(topic);
260

  
261
  if (subscriber->base.topic)
262
  {
263
    return URT_STATUS_SUBSCRIBE_TOPICSET;
264
  }
265
  else
266
  {
267
    subscriber->base.topic = topic;
268
    subscriber->usefulnesscb = usefulnesscb;
269
    subscriber->cbparams = cbparams;
270
# if (URT_CFG_PUBSUB_PROFILING == true)
271
      subscriber->base.sumLatencies = 0;
272
      subscriber->base.numMessagesReceived = 0;
273
      subscriber->minLatency = URT_DELAY_INFINITE;
274
      subscriber->maxLatency = URT_DELAY_IMMEDIATE;
275
# endif  /* URT_CFG_PUBSUB_PROFILING */
276
  }
277

  
278
  //TODO: Lock topic
279
  if (messages)
280
  {
281
    urt_message_t* lastMessageContribute = messages;
282
    while (lastMessageContribute->next)
283
    {
284
        lastMessageContribute = lastMessageContribute->next;
285
    }
286
    lastMessageContribute->next = topic->latestMessage->next;
287
    topic->latestMessage->next = messages;
288
  }
289

  
290
  subscriber->base.lastMessage = topic->latestMessage;
291
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
292

  
293
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
294

  
295
# if (URT_CFG_PUBSUB_PROFILING == true)
296
    topic->numHrtSubscribers--;
297
# endif /* URT_CFG_PUBSUB_PROFILING */
298

  
299
  //TODO: Unlock topic
300
  return URT_STATUS_OK;
301
}
168 302

  
169 303
/**
170 304
 * @brief  Fetches the next message.
......
204 338
 *
205 339
 * @return  Returns the usefulness as a value within [0,1].
206 340
 */
207
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency){return 0;}
341
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
342
{
343
  urtDebugAssert(subscriber);
344

  
345
  return subscriber->usefulnesscb(latency);
346
}
208 347

  
209 348
/**
210 349
 * @brief  Unsubscribes from a subscriber.
......
214 353
 * @return  Returns URT_STATUS_OK on sucess.
215 354
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
216 355
 */
217
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber){return URT_STATUS_OK;}
356
urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber)
357
{
358
  urtDebugAssert(subscriber);
359

  
360
  if (subscriber->base.topic)
361
  {
362
    if (URT_CFG_PUBSUB_PROFILING == true)
363
    {
364
      //TODO: lock topic
365
      subscriber->base.topic->numHrtSubscribers--;
366
    }
367
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
368
# if (URT_CFG_PUBSUB_PROFILING == true)
369
      //TODO: unlock topic
370
# endif /* URT_CFG_PUBSUB_PROFILING */
371
    subscriber->base.topic = NULL;
372
    subscriber->base.lastMessage = NULL;
373
    subscriber->base.lastMessageTime = 0;
374
    return URT_STATUS_OK;
375
  }
376

  
377
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
378
}
218 379

  
219 380

  
220 381
/**
......
224 385
 */
225 386
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
226 387
{
388
  urtDebugAssert(subscriber);
389

  
227 390
  subscriber->base.topic = NULL;
228 391
  urtEventListenerInit(subscriber->base.evtListener);
229 392
  subscriber->base.lastMessage = NULL;
......
267 430
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
268 431
 */
269 432
urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic,
270
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter){return URT_STATUS_OK;}
433
                                       urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter)
434
{
435
  urtDebugAssert(subscriber);
436
  urtDebugAssert(topic);
437

  
438
    if (subscriber->base.topic)
439
    {
440
      return URT_STATUS_SUBSCRIBE_TOPICSET;
441
    }
442
    else
443
    {
444
      subscriber->base.topic = topic;
445
# if (URT_CFG_PUBSUB_PROFILING == true)
446
        subscriber->base.sumLatencies = 0;
447
        subscriber->base.numMessagesReceived = 0;
448
# endif /* URT_CFG_PUBSUB_PROFILING */
449
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
450
        subscriber->deadlineOffset = deadline;
451
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
452
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
453
        subscriber->maxJitter =jitter;
454
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
455
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
456
        subscriber->minLatency = URT_DELAY_INFINITE;
457
        subscriber->maxLatency = URT_DELAY_IMMEDIATE;
458
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
459
    }
460

  
461
    //TODO: Lock topic
462
    if (messages)
463
    {
464
      urt_message_t* lastMessageContribute = messages;
465
      while (lastMessageContribute->next)
466
      {
467
          lastMessageContribute = lastMessageContribute->next;
468
      }
469
      lastMessageContribute->next = topic->latestMessage->next;
470
      topic->latestMessage->next = messages;
471
    }
472

  
473
    subscriber->base.lastMessage = topic->latestMessage;
474
    subscriber->base.lastMessageTime = topic->latestMessage->originTime;
475

  
476
    urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
477

  
478
# if (URT_CFG_PUBSUB_PROFILING == true)
479
      topic->numHrtSubscribers--;
480
# endif /* URT_CFG_PUBSUB_PROFILING */
481

  
482
    //TODO: Unlock topic
483
    return URT_STATUS_OK;
484
}
271 485

  
272 486
/**
273 487
 * @brief  Fetches the next message.
......
307 521
 *
308 522
 * @return  Returns a boolean indicator whether the latency is fine.
309 523
 */
310
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency){return true;}
524
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
525
{
526
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
527
    if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
528
        return true;
529
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
530
  return false;
531
}
311 532

  
312 533
/**
313 534
 * @brief  Unsubscribes from a subscriber.
......
327 548
 */
328 549
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
329 550
{
551
  urtDebugAssert(subscriber);
552

  
330 553
  subscriber->base.topic = NULL;
331 554
  urtEventListenerInit(subscriber->base.evtListener);
332 555
  subscriber->base.lastMessage = NULL;
333 556
  subscriber->base.lastMessageTime = 0;
334 557

  
335
  #if (URT_CFG_PUBSUB_PROFILING)
558
# if (URT_CFG_PUBSUB_PROFILING)
336 559
    subscriber->base.sumLatencies = 0;
337 560
    subscriber->base.numMessagesReceived = 0;
338
  #endif /* URT_CFG_PUBSUB_PROFILING */
561
# endif /* URT_CFG_PUBSUB_PROFILING */
339 562

  
340 563
  subscriber->next = NULL;
341 564

  
342
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
565
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
343 566
    subscriber->deadlineOffset = 0;
344 567
    urtTimerInit(subscriber->qodDeadlineTimer);
345
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
568
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
346 569

  
347
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
570
# if (URT_CFG_PUBSUB_QOS_RATECHECKS)
348 571
    subscriber->expectedRate = 0;
349
  #endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
572
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
350 573

  
351
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
574
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
352 575
    subscriber->maxJitter = 0;
353
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
576
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
354 577

  
355
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
578
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
356 579
    subscriber->minLatency = URT_DELAY_INFINITE;
357 580
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
358
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
581
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
359 582
  return;
360 583
}
361 584

  

Also available in: Unified diff