Revision 37cd5dc2

View differences:

inc/urt_core.h
57 57
  urt_osEventMask_t urtCoreGetEventMask(void);
58 58
  urt_osMutex_t* urtCoreGetMutex(void);
59 59
  urt_osEventSource_t* urtCoreGetEvtSource(void);
60
  urt_node_t* urtCoreGetNodes(void);
61
  void urtCoreSetNodes(urt_node_t* node);
62 60
  void urtCoreStartNodes(void);
63 61
  urt_status_t urtCoreSynchronizeNodes(urt_node_t* node);
64 62
  urt_status_t urtCoreStopNodes(urt_status_t reason);
63
  urt_node_t* urtCoreGetNodes(void);
64
  void urtCoreAddNode(urt_node_t* node);
65 65

  
66 66
#if (URT_CFG_PUBSUB_ENABLED)
67
  void urtCoreAddTopic(urt_topic_t* topic);
67 68
  urt_topic_t* urtCoreGetTopic(urt_topicid_t id);
68 69
#endif /* URT_CFG_PUBSUB_ENABLED */
69 70

  
70 71
#if (URT_CFG_RPC_ENABLED)
72
  void urtCoreAddService(urt_service_t* service);
71 73
  urt_service_t urtCoreGetService(urt_serviceid_t id);
72 74
#endif /* URT_CFG_RPC_ENABLED */
73 75

  
inc/urt_topic.h
76 76
extern "C" {
77 77
#endif /* defined(__cplusplus) */
78 78

  
79
  urt_status_t urtTopicInit(urt_topic_t* topic, urt_topicid_t id);
79
  urt_status_t urtTopicInit(urt_topic_t* topic, urt_topicid_t id, urt_message_t* mandatoryMessage);
80 80
#if defined(__cplusplus)
81 81
}
82 82
#endif /* defined(__cplusplus) */
src/urt_core.c
73 73
  core._status = URT_STATUS_OK;
74 74
  urtEventSourceInit(&core._evtSource);
75 75
  urtMutexInit(&core._lock);
76
  #if (URT_CFG_PUBSUB_ENABLED)
76
# if (URT_CFG_PUBSUB_ENABLED)
77 77
    core._topics = NULL;
78
  #endif /* URT_CFG_PUBSUB_ENABLED */
79
  #if (URT_CFG_RPC_ENABLED)
78
# endif /* URT_CFG_PUBSUB_ENABLED */
79
# if (URT_CFG_RPC_ENABLED)
80 80
    core.urt_service_t = NULL;
81
  #endif /* URT_CFG_RPC_ENABLED */
81
# endif /* URT_CFG_RPC_ENABLED */
82 82
  return;
83 83
}
84 84

  
......
117 117
    return &core._evtSource;
118 118
}
119 119

  
120

  
121
/**
122
 * @brief   Get Core nodes.
123
 *
124
 * @return  Nodes registered to the core.
125
 */
126
urt_node_t* urtCoreGetNodes(void)
127
{
128
    return core._nodes;
129
}
130

  
131
void urtCoreSetNodes(urt_node_t* node)
132
{
133
    core._nodes = node;
134
    return;
135
}
136

  
137 120
/**
138 121
 * @brief   Start threads of all nodes of the Core.
139 122
 */
......
237 220
}
238 221

  
239 222
/**
223
 * @brief   Get Core nodes.
224
 *
225
 * @return  Nodes registered to the core.
226
 */
227
urt_node_t* urtCoreGetNodes(void)
228
{
229
  return core._nodes;
230
}
231

  
232

  
233
/**
234
 * @brief   Prepend node to core's list of nodes.
235
 *
236
 * @param[in] node The node to prepend. Must not be NULL.
237
 */
238
void urtCoreAddNode(urt_node_t* node)
239
{
240
  urtDebugAssert(node);
241

  
242
  node->next = core._nodes;
243
  core._nodes = node;
244
  return;
245
}
246

  
247

  
248
/**
240 249
 * @brief   Get the topic of the Core.
241 250
 *
242 251
 * @param[in] id  Identifier of the topic to retrieve.
243 252
 *
244 253
 * @return  Returns a pointer to the requested service. Returns NULL if no service matches the given ID.
245 254
 */
246
#if (URT_CFG_PUBSUB_ENABLED)
247
  urt_topic_t* urtCoreGetTopic(urt_topicid_t id)
255
# if (URT_CFG_PUBSUB_ENABLED)
256

  
257
/**
258
 * @brief   Append topic to core's list of topics.
259
 *
260
 * @param[in] node The topic to append.
261
 */
262
void urtCoreAddTopic(urt_topic_t* topic)
263
{
264
  urt_topic_t* lastTopic = core._topics;
265
  while (lastTopic->next != NULL)
248 266
  {
249
      urtMutexLock(&core._lock);
250
      urt_topic_t* topic = core._topics;
251
      while (topic != NULL && topic->id < id)
252
          topic = topic->next;
253
      urtMutexUnlock(&core._lock);
254
      if (topic != NULL && topic->id == id)
255
          return topic;
256
      else
257
        return NULL;
267
    lastTopic = lastTopic->next;
258 268
  }
259
#endif /* URT_CFG_PUBSUB_ENABLED */
269
  lastTopic->next = topic;
270
  return;
271
}
272

  
273
/**
274
 * @brief   Get core's list of topics.
275
 *
276
 * @return  The first topic of the core.
277
 */
278
urt_topic_t* urtCoreGetTopic(urt_topicid_t id)
279
{
280
  urtMutexLock(&core._lock);
281
  urt_topic_t* topic = core._topics;
282
  while (topic != NULL && topic->id < id)
283
      topic = topic->next;
284
  urtMutexUnlock(&core._lock);
285
  if (topic != NULL && topic->id == id)
286
      return topic;
287
  else
288
    return NULL;
289
}
290
# endif /* URT_CFG_PUBSUB_ENABLED */
260 291

  
261 292

  
293
# if (URT_CFG_RPC_ENABLED)
262 294
/**
263 295
 * @brief   Get the service of the Core.
264 296
 *
265 297
 * @param[in] id  Identifier of the service to retrieve.
266 298
 *
267 299
 * @return Returns a pointer to the requested service. Returns NULL if no service matches the given ID.
268
 */  
269
#if (URT_CFG_RPC_ENABLED)
270
  urt_service_t urtCoreGetService(urt_serviceid_t id) {return urt_service_t;}
271
#endif /* URT_CFG_RPC_ENABLED */
300
 */
301
urt_service_t urtCoreGetService(urt_serviceid_t id) {return urt_service_t;}
302
# endif /* URT_CFG_RPC_ENABLED */
272 303

  
273 304

  
src/urt_node.c
69 69
    urtCoreSynchronizeNodes(((urt_node_t*)arg));
70 70
  }
71 71

  
72

  
73 72
  while (urtThreadShouldTerminate())
74 73
  {
75 74
    urt_osEventMask_t temp = urtEventWait(mask, URT_EVENT_WAIT_ONE, URT_DELAY_INFINITE);
......
91 90
    ((urt_node_t*)arg)->shutdowncallback(((urt_node_t*)arg), urtCoreGetStatus(), ((urt_node_t*)arg)->shutdownparams);
92 91
  }
93 92
  urtEventUnregister(urtCoreGetEvtSource(), &((urt_node_t*)arg)->listener);
94
  urt_osThread_t* threadToTerminate = ((urt_node_t*)arg)->thread;
95
  //urt_osThread_t* threadToTerminate = urtThreadGetSelf();
96
  while (threadToTerminate->children != NULL || threadToTerminate->sibling != NULL)
93

  
94

  
95
  //Terminate all children and siblings
96
  urt_osThread_t* threadToTerminate = urtThreadGetSelf()->children;
97
  while (threadToTerminate != urtThreadGetSelf() && threadToTerminate != NULL)
97 98
  {
99
    urtThreadTerminate(threadToTerminate, URT_THREAD_TERMINATE_REQUEST);
98 100
    if (threadToTerminate->children != NULL)
99
      urtThreadTerminate(threadToTerminate->children, URT_THREAD_TERMINATE_REQUEST);
100
    if(threadToTerminate->sibling != NULL)
101
      urtThreadTerminate(threadToTerminate->sibling, URT_THREAD_TERMINATE_REQUEST);
101
    {
102
      threadToTerminate = threadToTerminate->children;
103
    }
104
    else
105
    {
106
      if (threadToTerminate->sibling != NULL)
107
      {
108
        threadToTerminate = threadToTerminate->sibling;
109
      }
110
      else
111
      {
112
        threadToTerminate = threadToTerminate->parent;
113
        while (threadToTerminate->sibling == NULL && threadToTerminate != urtThreadGetSelf())
114
        {
115
          threadToTerminate = threadToTerminate->parent;
116
        }
117
        if (threadToTerminate != urtThreadGetSelf())
118
        {
119
          threadToTerminate = threadToTerminate->sibling;
120
        }
121
      }
122
    }
102 123
  }
103 124

  
104
  urt_osThread_t* threadToJoin = ((urt_node_t*)arg)->thread;
105
  while (threadToJoin->children != NULL || threadToJoin->sibling != NULL)
125
  urt_osThread_t* threadToJoin = urtThreadGetSelf()->children;
126
  while (threadToJoin != urtThreadGetSelf() && threadToJoin != NULL)
106 127
  {
128
    urtThreadJoin(threadToJoin);
107 129
    if (threadToJoin->children != NULL)
108
      urtThreadJoin(threadToJoin->children);
109
    if(threadToJoin->sibling != NULL)
110
      urtThreadJoin(threadToJoin->sibling);
130
    {
131
      threadToJoin = threadToJoin->children;
132
    }
133
    else
134
    {
135
      if (threadToJoin->sibling != NULL)
136
      {
137
        threadToJoin = threadToJoin->sibling;
138
      }
139
      else
140
      {
141
        threadToJoin = threadToJoin->parent;
142
        while (threadToJoin->sibling == NULL && threadToJoin != urtThreadGetSelf())
143
        {
144
          threadToJoin = threadToJoin->parent;
145
        }
146
        if (threadToJoin != urtThreadGetSelf())
147
        {
148
          threadToJoin = threadToJoin->sibling;
149
        }
150
      }
151
    }
111 152
  }
112 153

  
113 154
  urtThreadExit();
......
161 202
  urt_osMutex_t* mutexTemp = urtCoreGetMutex();
162 203
  urtMutexLock(mutexTemp);
163 204
    node->next = urtCoreGetNodes();
164
    urtCoreSetNodes(node);
205
    urtCoreAddNode(node);
165 206
  urtMutexUnlock(mutexTemp);
166 207
  return;
167 208
}
src/urt_subscriber.c
87 87
      return URT_STATUS_SUBSCRIBE_TOPICSET;
88 88

  
89 89
  subscriber->base.topic = topic;
90
  //TODO: Lock topic
90
  urtMutexLock(topic->lock);
91 91

  
92 92
  if (messages)
93 93
  {
......
109 109
    topic->numHrtSubscribers--;
110 110
#endif /* URT_CFG_PUBSUB_PROFILING */
111 111

  
112
  //TODO: Unlock topic
112
  urtMutexUnlock(topic->lock);
113 113
  return URT_STATUS_OK;
114 114
}
115 115

  
......
160 160
  if (subscriber->base.topic)
161 161
  {
162 162
# if(URT_CFG_PUBSUB_PROFILING == true)
163
      //TODO: LOCK TOPIC
163
      urtMutexLock(topic->lock);
164 164
      subscriber->base.topic->numSubscribers--;
165 165
# endif /* URT_CFG_PUBSUB_PROFILING */
166 166
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
167 167
# if(URT_CFG_PUBSUB_PROFILING == true)
168
      //TODO: Unlock TOPIC
168
      urtMutexUnlock(topic->lock);
169 169
      subscriber->base.topic = NULL;
170 170
      subscriber->base.lastMessage = NULL;
171 171
      subscriber->base.lastMessageTime = 0;
......
238 238
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
239 239
# endif  /* URT_CFG_PUBSUB_PROFILING */
240 240

  
241
  //TODO: Lock topic
241
  urtMutexLock(topic->lock);
242 242
  if (messages)
243 243
  {
244 244
    urt_message_t* lastMessageContribute = messages;
......
259 259
    topic->numHrtSubscribers--;
260 260
# endif /* URT_CFG_PUBSUB_PROFILING */
261 261

  
262
  //TODO: Unlock topic
262
  urtMutexUnlock(topic->lock);
263 263
  return URT_STATUS_OK;
264 264
}
265 265

  
......
323 323
  if (subscriber->base.topic)
324 324
  {
325 325
# if (URT_CFG_PUBSUB_PROFILING == true)
326
    //TODO: lock topic
326
    urtMutexLock(topic->lock);
327 327
    subscriber->base.topic->numSubscribers--;
328 328
# endif /* URT_CFG_PUBSUB_PROFILING */
329 329
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
330 330
# if (URT_CFG_PUBSUB_PROFILING == true)
331
      //TODO: unlock topic
331
      urtMutexUnlock(topic->lock);
332 332
# endif /* URT_CFG_PUBSUB_PROFILING */
333 333
    subscriber->base.topic = NULL;
334 334
    subscriber->base.lastMessage = NULL;
......
418 418
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
419 419
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
420 420

  
421
  //TODO: Lock topic
421
  urtMutexLock(topic->lock);
422 422
  if (messages)
423 423
  {
424 424
    urt_message_t* lastMessageContribute = messages;
......
439 439
  topic->numHrtSubscribers--;
440 440
# endif /* URT_CFG_PUBSUB_PROFILING */
441 441

  
442
  //TODO: Unlock topic
442
  urtMutexUnlock(topic->lock);
443 443
  return URT_STATUS_OK;
444 444
}
445 445

  
......
483 483
 */
484 484
bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
485 485
{
486
# if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
487
  if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
486
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true)
487
  if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
488 488
    return true;
489
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
490

  
491
# if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
492
  if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
493
      return true;
494

  
495
  if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
496
      return true;
489 497
# endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */
490 498

  
491 499
  return false;
......
505 513

  
506 514
  if (subscriber->base.topic)
507 515
  {
508
    //TODO: lock topic
516
    urtMutexLock(topic->lock);
509 517
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
510 518
    //TODO: decrement topic's HRT counter
511 519
# if (URT_CFG_PUBSUB_PROFILING == true)
......
514 522
//Hier weiter
515 523

  
516 524
# if (URT_CFG_PUBSUB_PROFILING == true)
517
    //TODO: unlock topic
525
    urtMutexUnlock(topic->lock);
518 526
# endif /* URT_CFG_PUBSUB_PROFILING */
519 527
    subscriber->base.topic = NULL;
520 528
    subscriber->base.lastMessage = NULL;
......
616 624
  subscriber->expectedRate = rate;
617 625
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
618 626

  
619
  //TODO: Lock topic
627
  urtMutexLock(topic->lock);
620 628
  if (messages)
621 629
  {
622 630
    urt_message_t* lastMessageContribute = messages;
......
642 650
  topic->numSubscribers--;
643 651
# endif /* URT_CFG_PUBSUB_PROFILING */
644 652

  
645
  //TODO: Unlock topic
653
  urtMutexUnlock(topic->lock);
646 654
  return URT_STATUS_OK;
647 655
}
648 656

  
......
692 700

  
693 701
  if (subscriber->base.topic)
694 702
  {
695
    //TODO: lock topic
703
    urtMutexLock(topic->lock);
696 704
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
697 705
    subscriber->base.topic->numHrtSubscribers--;
698 706
# if (URT_CFG_PUBSUB_PROFILING == true)
......
704 712
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
705 713

  
706 714
    urt_message_t* messageTemp = subscriber->base.lastMessage;
715
    bool hrtZero = false;
707 716
    while (messageTemp->next->originTime < messageTemp->originTime)
708 717
    {
709 718
        messageTemp = messageTemp->next;
710 719
        messageTemp->numHrtConsumersLeft--;
720
        if (messageTemp->numHrtConsumersLeft == 0)
721
        {
722
            hrtZero = true;
723
        }
711 724
# if(URT_CFG_PUBSUB_PROFILING == true)
712 725
        messageTemp->numConsumersLeft--;
713 726
# endif /* URT_CFG_PUBSUB_PROFILING */
714 727
    }
715
    bool temp = false;
716
    if (temp /*TODO: HRT counter of any message became 0?*/)
728
    if (hrtZero)
717 729
    {
718
      //TODO: signal topics condition variable
730
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
719 731
    }
720 732

  
721
    //TODO: unlock topic
733
    urtMutexUnlock(topic->lock);
722 734
    subscriber->base.topic = NULL;
723 735
    subscriber->base.lastMessage = NULL;
724 736
    subscriber->base.lastMessageTime = 0;
src/urt_topic.c
50 50
 *
51 51
 * @param[in] topic  The topic to initialize. Must not be NULL.
52 52
 * @param[in] id  Unique, numeric identifier of the topic.
53
 * @param[in] mandatoryMessage Starting message of the topic. Must not be NULL.
53 54
 *
54 55
 * @return  Returns URT_STATUS_OK on success.
55 56
 *          Returns URT_STATUS_TOPIC_DUPLICATE if another topic with the same identifier already exists.
56 57
 */
57
urt_status_t urtTopicInit(urt_topic_t* topic, urt_topicid_t id)
58
urt_status_t urtTopicInit(urt_topic_t* topic, urt_topicid_t id, urt_message_t* mandatoryMessage)
58 59
{
60
  urtDebugAssert(topic);
61
  urtDebugAssert(mandatoryMessage);
62

  
59 63
  urt_topic_t.next = NULL;
60 64
  urt_topic_t.id = id;
61 65
  urtMutexInit(topic->lock);
62 66
  urtEventSourceInit(topic->evtSource);
63 67
  urt_topic_t.numHrtSubscribers = 0;
64 68
  urtCondvarInit(topic->hrtReleased);
65
  void* payload = NULL; //TODO: Replace
66
  urtMessageInit(topic->mandatoryMessage, payload);
69
  topic->mandatoryMessage = *mandatoryMessage;
67 70
  topic->latestMessage = &topic->mandatoryMessage;
68 71
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
69 72
    urt_topic_t.hrtSubscribers = nullptr;

Also available in: Unified diff