Revision 5c6cb22f

View differences:

inc/urt_message.h
45 45
 */
46 46
typedef struct urt_message
47 47
{
48
  urt_message_t* next;
48
  struct urt_message* next;
49 49
  void* payload;
50 50
  urt_osTime_t originTime;
51 51
  unsigned int numHrtConsumersLeft;
inc/urt_subscriber.h
22 22
#ifndef URTWARE_SUBSCRIBER_H
23 23
#define URTWARE_SUBSCRIBER_H
24 24

  
25
#include <urt_types.h>
26 25
#include <urtware.h>
27 26

  
28 27
/******************************************************************************/
......
70 69
}urt_nrtsubscriber_t;
71 70

  
72 71
/**
73
 * @brief  nrt subscriber
72
 * @brief  srt subscriber
74 73
 */
75 74
typedef struct urt_srtsubscriber
76 75
{
77 76
  urt_basesubscriber_t base;
78
  urt_srtusefulnessfunc_t* usefulnesscb;
77
  void* cbparams;
78
  urt_usefulness_f* usefulnesscb;
79 79
  #if (URT_CFG_PUBSUB_PROFILING)
80 80
    urt_delay_t minLatency;
81 81
    urt_delay_t maxLatency;
......
83 83
}urt_srtsubscriber_t;
84 84

  
85 85
/**
86
 * @brief  nrt subscriber
86
 * @brief  frt subscriber
87 87
 */
88 88
typedef struct urt_frtsubscriber
89 89
{
......
106 106
typedef struct urt_hrtsubscriber
107 107
{
108 108
  urt_basesubscriber_t base;
109
  urt_hrtsubscriber_t* next;
109
  struct urt_hrtsubscriber* next;
110 110
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
111 111
    urt_delay_t deadlineOffset;
112 112
    urt_osTimer_t qosDeadlineTimer;
......
142 142
  urt_status_t urtNrtSubscriberUnsubscribe(urt_nrtsubscriber_t* subscriber);
143 143

  
144 144
  void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber);
145
  urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* message, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams);
145
  urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* message, urt_usefulness_f* usefulnesscb, void* cbparams);
146 146
  urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency);
147 147
  urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency);
148 148
  float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency);
......
160 160
  urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency);
161 161
  urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency);
162 162
  urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber);
163

  
164
  float urt_srtusefulnessfunc_t (urt_delay_t dt, void* params);
165 163
#if defined(__cplusplus)
166 164
}
167 165
#endif /* defined(__cplusplus) */
inc/urt_topic.h
22 22
#ifndef URTWARE_TOPIC_H
23 23
#define URTWARE_TOPIC_H
24 24

  
25
#include <urt_types.h>
26 25
#include <urtware.h>
27 26

  
28 27
/******************************************************************************/
......
46 45
 */
47 46
typedef struct urt_topic
48 47
{
49
  urt_topic_t* next;
48
  struct urt_topic* next;
50 49
  urt_topicid_t id;
51 50
  urt_osMutex_t lock;
52 51
  urt_osEventSource_t evtSource;
......
55 54
  urt_message_t mandatoryMessage;
56 55
  urt_message_t* latestMessage;
57 56
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
58
    urt_hrtsubscriber_t* hrtSubscribers;
57
    struct urt_hrtsubscriber* hrtSubscribers;
59 58
    urt_osTimer_t qosRateTimer;
60 59
  #endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
61 60
  #if (URT_CFG_PUBSUB_PROFILING)
src/urt_core.c
244 244
 * @return  Returns a pointer to the requested service. Returns NULL if no service matches the given ID.
245 245
 */
246 246
#if (URT_CFG_PUBSUB_ENABLED)
247
  urt_topic_t* urtCoreGetTopic(urt_topicid_t id) {return urt_topic_t;}
247
  urt_topic_t* urtCoreGetTopic(urt_topicid_t id)
248
  {
249
      urtMutexLock(&core._lock);
250
      urt_topic_t* topic = core._topics;
251
      while (topic != NULL && topic->id < id)
252
          topic = topic->next;
253
      urtMutexUnlock(&core._lock);
254
      if (topic != NULL && topic->id == id)
255
          return topic;
256
      else
257
        return NULL;
258
  }
248 259
#endif /* URT_CFG_PUBSUB_ENABLED */
249 260

  
250 261

  
src/urt_subscriber.c
50 50
 *
51 51
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
52 52
 */
53
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber) {return;}
53
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54
{
55
  subscriber->base.topic = NULL;
56
  urtEventListenerInit(subscriber->base.evtListener);
57
  subscriber->base.lastMessage = NULL;
58
  subscriber->base.lastMessageTime = 0;
59
  #if (URT_CFG_PUBSUB_PROFILING)
60
    subscriber->minLatency = URT_DELAY_INFINITE;
61
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
62
  #endif /* URT_CFG_PUBSUB_PROFILING */
63
  return;
64
}
54 65

  
55 66
/**
56 67
 * @brief  Subscribes the subscriber to a topic.
......
119 130
 *
120 131
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
121 132
 */
122
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber) {return;}
133
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
134
{
135
  subscriber->base.topic = NULL;
136
  urtEventListenerInit(subscriber->base.evtListener);
137
  subscriber->base.lastMessage = NULL;
138
  subscriber->base.lastMessageTime = 0;
139
  #if (URT_CFG_PUBSUB_PROFILING)
140
    subscriber->base.sumLatencies = 0;
141
    subscriber->base.numMessagesReceived = 0;
142
    subscriber->usefulnesscb = NULL;
143
    subscriber->cbparams = NULL;
144
    subscriber->minLatency = URT_DELAY_INFINITE;
145
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
146
  #endif /* URT_CFG_PUBSUB_PROFILING */
147
  return;
148
}
123 149

  
124 150
/**
125 151
 * @brief  Subscribes the subscriber to a topic.
......
196 222
 *
197 223
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
198 224
 */
199
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
225
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
226
{
227
  subscriber->base.topic = NULL;
228
  urtEventListenerInit(subscriber->base.evtListener);
229
  subscriber->base.lastMessage = NULL;
230
  subscriber->base.lastMessageTime = 0;
231

  
232
  #if (URT_CFG_PUBSUB_PROFILING)
233
    subscriber->base.sumLatencies = 0;
234
    subscriber->base.numMessagesReceived = 0;
235
  #endif /* URT_CFG_PUBSUB_PROFILING */
236

  
237
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
238
    subscriber->deadlineOffset = 0;
239
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
240

  
241
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
242
    subscriber->maxJitter = 0;
243
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
244

  
245
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
246
    subscriber->minLatency = URT_DELAY_INFINITE;
247
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
248
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
249
  return;
250
}
251

  
200 252

  
201 253
/**
202 254
 * @brief  Subscribes the subscriber to a topic.
......
260 312
/**
261 313
 * @brief  Unsubscribes from a subscriber.
262 314
 *
263
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
315
 * @param[in] subscriber  The FRT subscriber to be unsubscribed. Must not be NULL.
264 316
 *
265 317
 * @return  Returns URT_STATUS_OK on sucess.
266 318
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
......
273 325
 *
274 326
 * @param[in] subscriber  The HRT subscriber to initialize. Must not be NULL.
275 327
 */
276
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber){return;}
328
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
329
{
330
  subscriber->base.topic = NULL;
331
  urtEventListenerInit(subscriber->base.evtListener);
332
  subscriber->base.lastMessage = NULL;
333
  subscriber->base.lastMessageTime = 0;
334

  
335
  #if (URT_CFG_PUBSUB_PROFILING)
336
    subscriber->base.sumLatencies = 0;
337
    subscriber->base.numMessagesReceived = 0;
338
  #endif /* URT_CFG_PUBSUB_PROFILING */
339

  
340
  subscriber->next = NULL;
341

  
342
  #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
343
    subscriber->deadlineOffset = 0;
344
    urtTimerInit(subscriber->qodDeadlineTimer);
345
  #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
346

  
347
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
348
    subscriber->expectedRate = 0;
349
  #endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
350

  
351
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
352
    subscriber->maxJitter = 0;
353
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
354

  
355
  #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
356
    subscriber->minLatency = URT_DELAY_INFINITE;
357
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
358
  #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
359
  return;
360
}
361

  
277 362

  
278 363
/**
279 364
 * @brief  Subscribes the subscriber to a topic.
280 365
 *
281
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
366
 * @param[in] subscriber  The HRT subscriber which shall subscribe to a topic. Must not be NULL.
282 367
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
283 368
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
284 369
 *                      Messages must not be associated to another topic.
src/urt_topic.c
58 58
{
59 59
  urt_topic_t.next = NULL;
60 60
  urt_topic_t.id = id;
61
  //add later: urtmutexinit, urteventsourceinit
61
  urtMutexInit(topic->lock);
62
  urtEventSourceInit(topic->evtSource);
62 63
  urt_topic_t.numHrtSubscribers = 0;
63
  //add later: condvar, message init, latest message
64
  urtCondvarInit(topic->hrtReleased);
65
  void* payload = NULL; //TODO: Replace
66
  urtMessageInit(topic->mandatoryMessage, payload);
67
  topic->latestMessage = &topic->mandatoryMessage;
64 68
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
65 69
    urt_topic_t.hrtSubscribers = nullptr;
66 70
    //add later: timer init;
......
70 74
    urt_topic_t.numMessagesDiscarded = 0;
71 75
    urt_topic_t.numSubscribers = 0;
72 76
  #endif /* URT_CFG_PUBSUB_PROFILING */
73
  //add later: close circular message buffer,...
74
  return URT_STATUS_OK;
77
  topic->latestMessage->next = topic->latestMessage;
78
  urtMutexLock(topic->lock);
79
  urt_topic_t* topicTemp = topic;
80
  while (topicTemp != NULL && topicTemp->id < id)
81
      topicTemp = topicTemp->next;
82

  
83
  if (topicTemp == NULL)
84
  {
85
    //TODO: Append self to core's list of topic
86
    urtMutexUnlock(topic->lock);
87
    return URT_STATUS_OK;
88
  }
89
  else if (topicTemp->id > id)
90
  {
91
    topicTemp->next = topic;
92
    urtMutexUnlock(topic->lock);
93
    return URT_STATUS_OK;
94
  }
95
  else
96
  {
97
    urtMutexUnlock(topic->lock);
98
    return URT_STATUS_TOPIC_DUPLICATE;
99
  }
75 100
}
urtware.h
96 96
 * TODO: Add further ┬ÁRtWare includes here (e.g. urt_core.h).
97 97
 */
98 98
#include <apps_urtosal.h>
99
#include <urt_message.h>
100
#include <urt_topic.h>
101
#include <urt_subscriber.h>
99 102
#include <urt_node.h>
100 103
#include <urt_core.h>
101 104

  

Also available in: Unified diff