Revision fb72e91b

View differences:

inc/urt_core.h
74 74
#endif /* URT_CFG_RPC_ENABLED */
75 75

  
76 76
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
77
  urt_osTimerCallback_t urtCoreCallbackDefault(void* params)
77
  urt_osTimerCallback_t urtCoreCallbackDefault(void* params);
78 78
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
79 79

  
80 80
#if defined(__cplusplus)
inc/urt_node.h
41 41
/******************************************************************************/
42 42

  
43 43
/*
44
 * forward declarations
44
 * forward declaration
45 45
 */
46 46
typedef struct urt_node urt_node_t;
47 47

  
48 48
/**
49 49
 * @brief
50 50
 *
51
 * @param[in]   node   .
52
 * @param[in]   cause  .
53
 * @param[in]   arg    .
54
 *
55
 * @return    .
56
 *
57 51
 */
58 52
typedef void (*urt_nodeShutdownCallback_t)(urt_node_t* node, urt_status_t cause, void* arg);
59 53

  
......
61 55
/**
62 56
 * @brief
63 57
 *
64
 * @param[in]   node  .
65
 * @param[in]   arg   .
66
 *
67
 * @return    .
68
 *
69 58
 */
70 59
typedef urt_osEventMask_t (*urt_nodeSetupCallback_t)(urt_node_t* node, void* arg);
71 60

  
72 61

  
73 62
/**
74 63
 * @brief
75
 *
76
 * @param[in]   node  .
77
 * @param[in]   events    .
78
 * @param[in]   arg    .
79
 *
80
 * @return    .
81 64
 */
82 65
typedef urt_osEventMask_t (*urt_nodeLoopCallback_t)(urt_node_t* node, urt_osEventMask_t events, void* arg);
83 66

  
inc/urt_primitives.h
23 23
#define URT_PRIMITIVES_H
24 24

  
25 25
#include <urtware.h>
26

  
27 26
#include <stdint.h>
28 27

  
29 28
/*============================================================================*/
src/urt_core.c
298 298
#endif /* URT_CFG_PUBSUB_ENABLED */
299 299

  
300 300

  
301
# if (URT_CFG_RPC_ENABLED)
301
#if (URT_CFG_RPC_ENABLED == true)
302 302
/**
303 303
 * @brief   Prepend service to core's list of services.
304 304
 *
......
325 325
 * @return Returns a pointer to the requested service. Returns NULL if no service matches the given ID.
326 326
 */
327 327
urt_service_t* urtCoreGetService(urt_serviceid_t id) {return urt_service_t;}
328
# endif /* URT_CFG_RPC_ENABLED */
328
#endif /* URT_CFG_RPC_ENABLED */
329 329

  
330 330
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
331 331
urt_osTimerCallback_t urtCoreCallbackDefault(void* params)
332 332
{
333
  urtMutexLock(&core._lock);
334
  while (params) {
335
    urtPrintf("Danger");
336
    urtThreadSleep(1);
337
  }
338
  urtMutexUnlock(&core._lock);
339
  return
333
urtMutexLock(&core._lock);
334
while (params) {
335
  urtPrintf("Danger");
336
  urtThreadSleep(1);
337
}
338
urtMutexUnlock(&core._lock);
339
return;
340 340
}
341 341
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
342 342

  
src/urt_message.c
53 53
 */
54 54
void urtMessageInit(urt_message_t* message, void* payload)
55 55
{
56
  urt_message_t.next = NULL;
57
  urt_message_t.payload = payload;
58
  urt_message_t.originTime = 0;
59
  urt_message_t.numHrtConsumersLeft = 0;
56
  message->next = NULL;
57
  message->payload = payload;
58
  message->originTime = 0;
59
  message->numHrtConsumersLeft = 0;
60 60
  #if (URT_CFG_PUBSUB_PROFILING)
61
    urt_message_t.numConsumersLeft = 0;
61
    message->numConsumersLeft = 0;
62 62
  #endif /* URT_CFG_PUBSUB_PROFILING */
63 63

  
64 64
  return;
src/urt_publisher.c
60 60
  urtDebugAssert(publisher);
61 61
  urtDebugAssert(topic);
62 62

  
63
  urt_publisher_t.topic = topic;
63
  publisher->topic = topic;
64 64
#if (URT_CFG_PUBSUB_PROFILING == true)
65
  urt_publisher_t.publishAttempts = 0;
66
  urt_publisher_t.publishFails = 0;
65
  publisher->publishAttempts = 0;
66
  publisher->publishFails = 0;
67 67
#endif /* URT_CFG_PUBSUB_PROFILING */
68 68
  if (messages != NULL) {
69
    urtMutexLock(topic->lock);
69
    urtMutexLock(&topic->lock);
70 70
    urt_message_t* lastMessage = messages;
71 71
    while (lastMessage->next != NULL) {
72 72
      lastMessage = lastMessage->next;
73 73
    }
74
    //TODO: setnextpointer of last message to contribute to the message after the topic's latest message
74
    lastMessage->next = topic->latestMessage->next;
75 75
    topic->latestMessage->next = messages;
76
    urtMutexUnlock(&topic->lock);
76 77
  }
77 78

  
78 79
  return;
......
92 93
 */
93 94
urt_status_t urtPublisherPublish(urt_publisher_t* publisher, void* payload, size_t bytes, urt_osTime_t t, urt_delay_t timeout)
94 95
{
95
  urtMutexLock(publisher->topic->lock);
96
  urt_message_t* message = publisher->topic->next->latestMessage;
97
  urt_osTime_t messageTime =  message->originTime;
96
  urtMutexLock(&publisher->topic->lock);
97
  urt_message_t* latestMessage = publisher->topic->next->latestMessage;
98
  urt_osTime_t messageTime =  latestMessage->originTime;
98 99
#if (URT_CFG_PUBSUB_PROFILING == true)
99 100
  publisher->publishAttempts++;
100 101
#endif /* URT_CFG_PUBSUB_PROFILING */
101 102

  
102
  while (message->numHrtConsumersLeft > 0) {
103
    urt_osCondvarWaitStatus_t timeout = urtCondvarWait(publisher->topic->hrtReleased, publisher->topic->lock, URT_DELAY_INFINITE);
103
  while (latestMessage->numHrtConsumersLeft > 0) {
104
    //TODO: urt_osCondvarWaitStatus_t timeout = urtCondvarWait(&publisher->topic->hrtReleased, &publisher->topic->lock, URT_DELAY_INFINITE);
104 105
    if (timeout == URT_CONDVAR_WAITSTATUS_TIMEOUT) {
105 106
#if (URT_CFG_PUBSUB_PROFILING == true)
106 107
      publisher->publishFails++;
107 108
#endif /* URT_CFG_PUBSUB_PROFILING */
108
      urtMutexUnlock(publisher->topic->lock);
109
      urtMutexUnlock(&publisher->topic->lock);
109 110
      return URT_STATUS_PUBLISH_TIMEOUT;
110 111
    }
111 112

  
112
    if (messageTime != message->originTime) {
113
      messageTime = message->originTime;
114
      message = message->next;
115
      while (message->originTime < messageTime) {
116
        message = message->next;
113
    if (messageTime != latestMessage->originTime) {
114
      messageTime = latestMessage->originTime;
115
      latestMessage = latestMessage->next;
116
      while (latestMessage->originTime < messageTime) {
117
        latestMessage = latestMessage->next;
117 118
      }
118 119
    }
119 120
  }
120 121

  
121 122
#if (URT_CFG_PUBSUB_PROFILING == true)
122
  if (message->numHrtConsumersLeft > 0) { //TODO: never true because of while?
123
  if (latestMessage->numConsumersLeft > 0) {
123 124
    publisher->topic->numMessagesDiscarded++;
124 125
  }
125 126
#endif /* URT_CFG_PUBSUB_PROFILING */
126 127

  
127
  publisher->topic->latestMessage = message; //TODO: Iterate topic's pointer to this message?
128
  memcpy(message->payload, payload, bytes); //TODO: copy payload to message?
129
  //TODO: Set origin time of message
130
  publisher->topic->numHrtSubscribers = message->numHrtConsumersLeft;
128
  publisher->topic->latestMessage = latestMessage;
129
  memcpy(latestMessage->payload, payload, bytes);
130
  latestMessage->originTime = t;
131
  publisher->topic->numHrtSubscribers = latestMessage->numHrtConsumersLeft;
131 132
#if (URT_CFG_PUBSUB_PROFILING == true)
132
  publisher->topic->numSubscribers = message->numHrtConsumersLeft; //TODO: Set number of comsumers?
133
  publisher->topic->numSubscribers = latestMessage->numConsumersLeft;
133 134
#endif /* URT_CFG_PUBSUB_PROFILING */
134 135

  
135 136
#if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
136
  publisher->topic->qosRateTimer =  NULL; //TODO: Set QoS rate timer wrt. most critical HRT Subscriber
137
  publisher->topic->qosRateTimer =  publisher->topic->hrtSubscribers;
138
  //TODO: Set QoS rate timer (@topic) wrt. most critical HRT Subscriber (here different types?)
137 139
#endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
138 140

  
139 141
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
140 142
  urt_hrtsubscriber_t* hrtSubscriber = publisher->topic->hrtSubscribers;
141 143
  while (hrtSubscriber != NULL) {
142 144
    if (!urtTimerIsArmed(hrtSubscriber->qosDeadlineTimer)) {
143
      *hrtSubscriber->qosDeadlineTimer = message->originTime;
145
      *hrtSubscriber->qosDeadlineTimer = latestMessage->originTime;
144 146
    }
145 147
    hrtSubscriber = hrtSubscriber->next;
146 148
  }
......
150 152
  publisher->topic->numMessagesPublished++;
151 153
#endif /* URT_CFG_PUBSUB_PROFILING */
152 154

  
153
  //TODO: Fire event
154
  urtMutexUnlock(publisher->topic->lock);
155
  urtEventSourceBroadcast(&publisher->topic->evtSource, URT_EVENTFLAG_PROCEED);
156
  urtMutexUnlock(&publisher->topic->lock);
155 157
  return URT_STATUS_OK;
156 158
}
src/urt_subscriber.c
42 42
/* LOCAL FUNCTIONS                                                            */
43 43
/******************************************************************************/
44 44

  
45
void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes)
46
{
47
  subscriber->base.lastMessage = message;
48
  *subscriber->base.lastMessageTime = message->originTime;
49
  memcpy(message->payload, payload, bytes);
50
}
51

  
52 45
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
53 46
{
54
  while (oldestMessage->next->originTime < oldestMessage->originTime)
55
  {
47
  while (oldestMessage->next->originTime < oldestMessage->originTime) {
56 48
    oldestMessage = oldestMessage->next;
57 49
  }
58 50
  return oldestMessage;
......
60 52

  
61 53
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
62 54
{
63
  urt_message_t* lastMessage = subscriber->base.lastMessage;
64
  while (lastMessage->next->originTime < lastMessage->originTime)
65
  {
55
  urt_message_t* lastMessage = latestMessage;
56
  while (lastMessage->next->originTime < lastMessage->originTime) {
66 57
    lastMessage = lastMessage->next;
67 58
#if (URT_CFG_PUBSUB_PROFILING == true)
68 59
    subscriber->base.lastMessage->numConsumersLeft--;
69 60
    subscriber->base->numMessagesReceived++;
70 61
#endif /* URT_CFG_PUBSUB_PROFILING */
71 62
  }
63
  return latestMessage;
72 64
}
73 65

  
74
void urtContributeMessages(urt_message_t* messages)
66
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
75 67
{
76 68
  urt_message_t* lastMessageContribute = messages;
77
  while (lastMessageContribute->next)
78
  {
69
  while (lastMessageContribute->next) {
79 70
    lastMessageContribute = lastMessageContribute->next;
80 71
  }
81 72
  lastMessageContribute->next = topic->latestMessage->next;
......
129 120
  }
130 121

  
131 122
  subscriber->base.topic = topic;
132
  urtMutexLock(topic->lock);
123
  urtMutexLock(&topic->lock);
133 124

  
134 125
  if (messages) {
135
    urtContributeMessages(messages);
126
    urtContributeMessages(messages, topic);
136 127
  }
137 128

  
138 129
  subscriber->base.lastMessage = topic->latestMessage;
139 130
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
140 131

  
141
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
132
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
142 133

  
143 134
#if (URT_CFG_PUBSUB_PROFILING == true)
144 135
    topic->numHrtSubscribers--;
145 136
#endif /* URT_CFG_PUBSUB_PROFILING */
146 137

  
147
  urtMutexUnlock(topic->lock);
138
  urtMutexUnlock(&topic->lock);
148 139
  return URT_STATUS_OK;
149 140
}
150 141

  
......
168 159
    return URT_STATUS_FETCH_NOTOPIC;
169 160
  }
170 161

  
171
  urtMutexLock(subscriber->base.topic->lock);
162
  urtMutexLock(&subscriber->base.topic->lock);
172 163

  
173 164
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
174 165
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
175 166
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
176
      urtMutexUnlock(subscriber->base.topic->lock);
167
      urtMutexUnlock(&subscriber->base.topic->lock);
177 168
      return URT_STATUS_FETCH_NOMESSAGE;
178 169
    }
179 170
    oldestMessage = oldestMessage->next;
......
182 173
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
183 174
  }
184 175

  
185
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
176
  subscriber->base.lastMessage = oldestMessage;
177
  subscriber->base.lastMessageTime = oldestMessage->originTime;
178
  memcpy(oldestMessage->payload, payload, bytes);
186 179

  
187 180
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
188 181
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
......
199 192
#endif /* URT_CFG_PUBSUB_PROFILING */
200 193

  
201 194
    if (latency) {
202
      latency = calculatedLatency;
195
      *latency = calculatedLatency;
203 196
    }
204 197
  }
205 198

  
......
208 201
  subscriber->base->numMessagesReceived++;
209 202
#endif /* URT_CFG_PUBSUB_PROFILING */
210 203

  
211
  urtMutexUnlock(subscriber->base.topic->lock);
204
  urtMutexUnlock(&subscriber->base.topic->lock);
212 205
  return URT_STATUS_OK;
213 206
}
214 207

  
......
230 223
  if (!subscriber->base.topic)
231 224
      return URT_STATUS_FETCH_NOTOPIC;
232 225

  
233
  urtMutexLock(subscriber->base.topic->lock);
226
  urtMutexLock(&subscriber->base.topic->lock);
234 227
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
235 228

  
236 229
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
237 230
  {
238
    urtMutexUnlock(subscriber->base.topic->lock);
231
    urtMutexUnlock(&subscriber->base.topic->lock);
239 232
    return URT_STATUS_FETCH_NOMESSAGE;
240 233
  }
241 234

  
242
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
235
  subscriber->base.lastMessage = lastMessage;
236
  subscriber->base.lastMessageTime = lastMessage->originTime;
237
  memcpy(lastMessage->payload, payload, bytes);
243 238

  
244 239
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
245 240
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
......
256 251
#endif /* URT_CFG_PUBSUB_PROFILING */
257 252

  
258 253
    if (latency) {
259
      latency = calculatedLatency;
254
      *latency = calculatedLatency;
260 255
    }
261 256
  }
262 257

  
263
  urtMutexUnlock(subscriber->base.topic->lock);
258
  urtMutexUnlock(&subscriber->base.topic->lock);
264 259
  return URT_STATUS_OK;
265 260
}
266 261

  
......
277 272
  if (subscriber->base.topic)
278 273
  {
279 274
# if(URT_CFG_PUBSUB_PROFILING == true)
280
      urtMutexLock(topic->lock);
275
      urtMutexLock(&topic->lock);
281 276
      subscriber->base.topic->numSubscribers--;
282 277
# endif /* URT_CFG_PUBSUB_PROFILING */
283
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
278
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
284 279
# if(URT_CFG_PUBSUB_PROFILING == true)
285
      urtMutexUnlock(topic->lock);
280
      urtMutexUnlock(&topic->lock);
286 281
      subscriber->base.topic = NULL;
287 282
      subscriber->base.lastMessage = NULL;
288 283
      subscriber->base.lastMessageTime = 0;
......
307 302
  urtEventListenerInit(subscriber->base.evtListener);
308 303
  subscriber->base.lastMessage = NULL;
309 304
  subscriber->base.lastMessageTime = 0;
310
  #if (URT_CFG_PUBSUB_PROFILING)
311
    subscriber->base.sumLatencies = 0;
312
    subscriber->base.numMessagesReceived = 0;
313
    subscriber->usefulnesscb = NULL;
314
    subscriber->cbparams = NULL;
315
    subscriber->minLatency = URT_DELAY_INFINITE;
316
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
317
  #endif /* URT_CFG_PUBSUB_PROFILING */
305
#if (URT_CFG_PUBSUB_PROFILING)
306
  subscriber->base.sumLatencies = 0;
307
  subscriber->base.numMessagesReceived = 0;
308
  subscriber->usefulnesscb = NULL;
309
  subscriber->cbparams = NULL;
310
  subscriber->minLatency = URT_DELAY_INFINITE;
311
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
312
#endif /* URT_CFG_PUBSUB_PROFILING */
318 313
  return;
319 314
}
320 315

  
......
335 330
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
336 331
 */
337 332
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
338
                                       urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
333
                                       urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
339 334
{
340 335
  urtDebugAssert(subscriber);
341 336
  urtDebugAssert(topic);
342 337

  
343
  if (subscriber->base.topic)
344
  {
338
  if (subscriber->base.topic) {
345 339
    return URT_STATUS_SUBSCRIBE_TOPICSET;
346 340
  }
347 341

  
348 342
  subscriber->base.topic = topic;
349 343
  subscriber->usefulnesscb = usefulnesscb;
350 344
  subscriber->cbparams = cbparams;
351
# if (URT_CFG_PUBSUB_PROFILING == true)
345
#if (URT_CFG_PUBSUB_PROFILING == true)
352 346
  subscriber->base.sumLatencies = 0;
353 347
  subscriber->base.numMessagesReceived = 0;
354 348
  subscriber->minLatency = URT_DELAY_INFINITE;
355 349
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
356
# endif  /* URT_CFG_PUBSUB_PROFILING */
350
#endif  /* URT_CFG_PUBSUB_PROFILING */
357 351

  
358
  urtMutexLock(topic->lock);
359
  if (messages)
360
  {
361
    urtContributeMessages(messages);
352
  urtMutexLock(&topic->lock);
353
  if (messages) {
354
    urtContributeMessages(messages, topic);
362 355
  }
363 356

  
364 357
  subscriber->base.lastMessage = topic->latestMessage;
365 358
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
366 359

  
367
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
360
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
368 361

  
369 362
# if (URT_CFG_PUBSUB_PROFILING == true)
370 363
    topic->numHrtSubscribers--;
371 364
# endif /* URT_CFG_PUBSUB_PROFILING */
372 365

  
373
  urtMutexUnlock(topic->lock);
366
  urtMutexUnlock(&topic->lock);
374 367
  return URT_STATUS_OK;
375 368
}
376 369

  
......
391 384
{
392 385
  urtDebugAssert(subscriber);
393 386

  
394
  if (!subscriber->base.topic)
395
      return URT_STATUS_FETCH_NOTOPIC;
387
  if (!subscriber->base.topic){
388
    return URT_STATUS_FETCH_NOTOPIC;
389
  }
396 390

  
397
  urtMutexLock(subscriber->base.topic->lock);
391
  urtMutexLock(&subscriber->base.topic->lock);
398 392

  
399 393
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
400
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
401
  {
402
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
403
    {
404
      urtMutexUnlock(subscriber->base.topic->lock);
394
  if(oldestMessage->originTime == subscriber->base.lastMessageTime){
395
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime){
396
      urtMutexUnlock(&subscriber->base.topic->lock);
405 397
      return URT_STATUS_FETCH_NOMESSAGE;
406 398
    }
407 399
    oldestMessage = oldestMessage->next;
408 400
  }
409
  else
410
  {
401
  else{
411 402
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
412 403
  }
413 404

  
414
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
405
  subscriber->base.lastMessage = oldestMessage;
406
  subscriber->base.lastMessageTime = oldestMessage->originTime;
407
  memcpy(oldestMessage->payload, payload, bytes);
415 408

  
416 409
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
417 410
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
......
428 421
#endif /* URT_CFG_PUBSUB_PROFILING */
429 422

  
430 423
    if (latency) {
431
      latency = calculatedLatency;
424
      *latency = calculatedLatency;
432 425
    }
433 426
  }
434 427

  
......
437 430
  subscriber->base->numMessagesReceived++;
438 431
#endif /* URT_CFG_PUBSUB_PROFILING */
439 432

  
440
  urtMutexUnlock(subscriber->base.topic->lock);
433
  urtMutexUnlock(&subscriber->base.topic->lock);
441 434
  return URT_STATUS_OK;
442 435
}
443 436

  
......
462 455
    return URT_STATUS_FETCH_NOTOPIC;
463 456
  }
464 457

  
465
  urtMutexLock(subscriber->base.topic->lock);
458
  urtMutexLock(&subscriber->base.topic->lock);
466 459
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
467 460

  
468 461
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
469 462
  {
470
    urtMutexUnlock(subscriber->base.topic->lock);
463
    urtMutexUnlock(&subscriber->base.topic->lock);
471 464
    return URT_STATUS_FETCH_NOMESSAGE;
472 465
  }
473 466

  
474
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
467
  subscriber->base.lastMessage = lastMessage;
468
  subscriber->base.lastMessageTime = lastMessage->originTime;
469
  memcpy(lastMessage->payload, payload, bytes);
475 470

  
476 471
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
477 472
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
......
487 482
  }
488 483
#endif /* URT_CFG_PUBSUB_PROFILING */
489 484

  
490
    if (latency) {
491
      latency = calculatedLatency;
485
    if (latency != NULL) {
486
      *latency = calculatedLatency;
492 487
    }
493 488
  }
494 489

  
495
  urtMutexUnlock(subscriber->base.topic->lock);
490
  urtMutexUnlock(&subscriber->base.topic->lock);
496 491
  return URT_STATUS_OK;
497 492
}
498 493

  
......
508 503
{
509 504
  urtDebugAssert(subscriber);
510 505

  
511
  return subscriber->usefulnesscb(latency);
506
  return (*subscriber->usefulnesscb)(latency, subscriber->cbparams);
512 507
}
513 508

  
514 509
/**
......
526 521
  if (subscriber->base.topic)
527 522
  {
528 523
# if (URT_CFG_PUBSUB_PROFILING == true)
529
    urtMutexLock(topic->lock);
524
    urtMutexLock(&topic->lock);
530 525
    subscriber->base.topic->numSubscribers--;
531 526
# endif /* URT_CFG_PUBSUB_PROFILING */
532
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
527
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
533 528
# if (URT_CFG_PUBSUB_PROFILING == true)
534
      urtMutexUnlock(topic->lock);
529
      urtMutexUnlock(&topic->lock);
535 530
# endif /* URT_CFG_PUBSUB_PROFILING */
536 531
    subscriber->base.topic = NULL;
537 532
    subscriber->base.lastMessage = NULL;
......
621 616
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
622 617
# endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */
623 618

  
624
  urtMutexLock(topic->lock);
619
  urtMutexLock(&topic->lock);
625 620
  if (messages)
626 621
  {
627
    urtContributeMessages(messages);
622
    urtContributeMessages(messages, topic);
628 623
  }
629 624

  
630 625
  subscriber->base.lastMessage = topic->latestMessage;
631 626
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
632 627

  
633
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
628
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
634 629

  
635 630
# if (URT_CFG_PUBSUB_PROFILING == true)
636 631
  topic->numHrtSubscribers--;
637 632
# endif /* URT_CFG_PUBSUB_PROFILING */
638 633

  
639
  urtMutexUnlock(topic->lock);
634
  urtMutexUnlock(&topic->lock);
640 635
  return URT_STATUS_OK;
641 636
}
642 637

  
......
660 655
  if (!subscriber->base.topic)
661 656
      return URT_STATUS_FETCH_NOTOPIC;
662 657

  
663
  urtMutexLock(subscriber->base.topic->lock);
658
  urtMutexLock(&subscriber->base.topic->lock);
664 659

  
665 660
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
666 661
  if(oldestMessage->originTime == subscriber->base.lastMessageTime)
667 662
  {
668 663
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
669 664
    {
670
      urtMutexUnlock(subscriber->base.topic->lock);
665
      urtMutexUnlock(&subscriber->base.topic->lock);
671 666
      return URT_STATUS_FETCH_NOMESSAGE;
672 667
    }
673 668
    oldestMessage = oldestMessage->next;
......
677 672
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
678 673
  }
679 674

  
680
  urtFetchMessage(oldestMessage, subscriber, payload, bytes);
675
  subscriber->base.lastMessage = oldestMessage;
676
  subscriber->base.lastMessageTime = oldestMessage->originTime;
677
  memcpy(oldestMessage->payload, payload, bytes);
681 678

  
682 679
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
683 680
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
......
701 698
    subscriber->maxLatency = calculatedLatency;
702 699
  }
703 700
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
704
    urtMutexUnlock(subscriber->base.topic);
701
    urtMutexUnlock(&subscriber->base.topic->lock);
705 702
    return URT_STATUS_JITTERVIOLATION;
706 703
  }
707 704
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
708 705

  
709 706
    if (latency) {
710
      latency = calculatedLatency;
707
      *latency = calculatedLatency;
711 708
    }
712 709
  }
713 710

  
......
716 713
  subscriber->base->numMessagesReceived++;
717 714
#endif /* URT_CFG_PUBSUB_PROFILING */
718 715

  
719
  urtMutexUnlock(subscriber->base.topic->lock);
716
  urtMutexUnlock(&subscriber->base.topic->lock);
720 717
  return URT_STATUS_OK;
721 718
}
722 719

  
......
740 737
  if (!subscriber->base.topic)
741 738
      return URT_STATUS_FETCH_NOTOPIC;
742 739

  
743
  urtMutexLock(subscriber->base.topic->lock);
740
  urtMutexLock(&subscriber->base.topic->lock);
744 741
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
745 742

  
746 743
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
747 744
  {
748
    urtMutexUnlock(subscriber->base.topic->lock);
745
    urtMutexUnlock(&subscriber->base.topic->lock);
749 746
    return URT_STATUS_FETCH_NOMESSAGE;
750 747
  }
751 748

  
752
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
749
    subscriber->base.lastMessage = lastMessage;
750
    subscriber->base.lastMessageTime = lastMessage->originTime;
751
    memcpy(lastMessage->payload, payload, bytes);
753 752

  
754 753
  if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) {
755
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
754
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
756 755

  
757 756
#if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false)
758 757
  subscriber->base.sumLatencies += calculatedLatency;
......
773 772
    subscriber->maxLatency = calculatedLatency;
774 773
  }
775 774
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
776
    urtMutexUnlock(subscriber->base.topic);
775
    urtMutexUnlock(&subscriber->base.topic->lock);
777 776
    return URT_STATUS_JITTERVIOLATION;
778 777
  }
779 778
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */
780 779

  
781 780
    if (latency) {
782
      latency = calculatedLatency;
781
      *latency = calculatedLatency;
783 782
    }
784 783
  }
785 784

  
786
  urtMutexUnlock(subscriber->base.topic->lock);
785
  urtMutexUnlock(&subscriber->base.topic->lock);
787 786
  return URT_STATUS_OK;
788 787
}
789 788

  
......
830 829
  }
831 830

  
832 831
#if (URT_CFG_PUBSUB_PROFILING == true)
833
  urtMutexLock(topic->lock);
832
  urtMutexLock(&topic->lock);
834 833
#endif /* URT_CFG_PUBSUB_PROFILING */
835 834

  
836
  urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
835
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
837 836

  
838 837
# if (URT_CFG_PUBSUB_PROFILING == true)
839 838
  subscriber->base.topic->numSubscribers--;
840 839
# endif /* URT_CFG_PUBSUB_PROFILING */
841 840

  
842 841
# if (URT_CFG_PUBSUB_PROFILING == true)
843
  urtMutexUnlock(topic->lock);
842
  urtMutexUnlock(&topic->lock);
844 843
# endif /* URT_CFG_PUBSUB_PROFILING */
845 844
  subscriber->base.topic = NULL;
846 845
  subscriber->base.lastMessage = NULL;
......
939 938
  subscriber->expectedRate = rate;
940 939
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
941 940

  
942
  urtMutexLock(topic->lock);
941
  urtMutexLock(&topic->lock);
943 942
  if (messages)
944 943
  {
945
    urtContributeMessages(messages);
944
    urtContributeMessages(messages, topic);
946 945
  }
947 946

  
948 947
  subscriber->base.lastMessage = topic->latestMessage;
949 948
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
950 949

  
951
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
950
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
952 951

  
953 952
# if(URT_CFG_PUBSUB_QOS_RATECHECKS == true)
954 953
  //TODO: Implement
......
959 958
  topic->numSubscribers--;
960 959
# endif /* URT_CFG_PUBSUB_PROFILING */
961 960

  
962
  urtMutexUnlock(topic->lock);
961
  urtMutexUnlock(&topic->lock);
963 962
  return URT_STATUS_OK;
964 963
}
965 964

  
......
985 984
    return URT_STATUS_FETCH_NOTOPIC;
986 985
  }
987 986

  
988
  urtMutexLock(subscriber->base.topic->lock);
989
  urt_message_t* messageTemp = subscriber->base.lastMessage;
990
  if (messageTemp->next->originTime > messageTemp.originTime)
987
  urtMutexLock(&subscriber->base.topic->lock);
988
  urt_message_t* lastMessage = subscriber->base.lastMessage;
989
  if (lastMessage->next->originTime > lastMessage->originTime)
991 990
  {
992
    urtMutexUnlock(subscriber->base.topic->lock);
991
    urtMutexUnlock(&subscriber->base.topic->lock);
993 992
    return URT_STATUS_FETCH_NOMESSAGE;
994 993
  }
995
  messageTemp = messageTemp->next;
994
  lastMessage = lastMessage->next;
996 995

  
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
998
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
996
  uint64_t calculatedLatency;
997
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
998
    calculatedLatency = urtTimeNow() - lastMessage->originTime;
999
  }
999 1000

  
1000 1001
#if(URT_CFG_PUBSUB_PROFILING == true)
1001 1002
  subscriber->base.sumLatencies += calculatedLatency;
......
1008 1009
  }
1009 1010
#endif /* URT_CFG_PUBSUB_PROFILING */
1010 1011

  
1011
    if (latency) {
1012
      latency = calculatedLatency;
1013
    }
1012
  if (latency) {
1013
    *latency = calculatedLatency;
1014 1014
  }
1015 1015

  
1016 1016
  subscriber->base.lastMessage->numHrtConsumersLeft--;
1017 1017
  if (subscriber->base.lastMessage->numHrtConsumersLeft != 0)
1018 1018
  {
1019
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1019
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1020 1020
  }
1021 1021

  
1022 1022
#if (URT_CFG_PUBSUB_PROFILING == true)
......
1032 1032
    subscriber->maxLatency = calculatedLatency;
1033 1033
  }
1034 1034
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1035
    urtMutexUnlock(subscriber->base.topic);
1035
    urtMutexUnlock(&subscriber->base.topic->lock);
1036 1036
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1037 1037
    return URT_STATUS_JITTERVIOLATION;
1038 1038
  }
......
1047 1047
  }
1048 1048
#endif /* URT_CFG_PUBSUB_PROFILING */
1049 1049

  
1050
  urtFetchMessage(messageTemp, subscriber, payload, bytes);
1050
    subscriber->base.lastMessage = lastMessage;
1051
    subscriber->base.lastMessageTime = lastMessage->originTime;
1052
    memcpy(lastMessage->payload, payload, bytes);
1051 1053

  
1052 1054
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1053
  if (messageTemp->next->originTime < messageTemp->originTime)
1055
  if (lastMessage->next->originTime < lastMessage->originTime)
1054 1056
  {
1055 1057
    //TODO: first reset?! (when ... set)
1056 1058
    urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
......
1061 1063
  }
1062 1064
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1063 1065

  
1064
  urtMutexUnlock(subscriber->base.topic->lock);
1066
  urtMutexUnlock(&subscriber->base.topic->lock);
1065 1067
  return URT_STATUS_OK;
1066 1068
}
1067 1069

  
......
1085 1087
    return URT_STATUS_FETCH_NOTOPIC;
1086 1088
  }
1087 1089

  
1088
  urtMutexLock(subscriber->base.topic->lock);
1090
  urtMutexLock(&subscriber->base.topic->lock);
1089 1091
  urt_message_t* lastMessage = subscriber->base.lastMessage;
1090 1092
  bool hrtZero = false;
1091 1093
  while (lastMessage->next->originTime < lastMessage->originTime) {
1092 1094
    lastMessage = lastMessage->next;
1093 1095
    lastMessage->numHrtConsumersLeft--;
1094 1096
    if (lastMessage->numHrtConsumersLeft == 0) {
1095
        hertZero = true;
1097
        hrtZero = true;
1096 1098
    }
1097 1099
#if (URT_CFG_PUBSUB_PROFILING == true)
1098 1100
    lastMessage->numConsumersLeft--;
......
1104 1106
  }
1105 1107

  
1106 1108
  if (hrtZero) {
1107
    urtCondvarSignal(subscriber->base.topic->hrtReleased);
1109
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1108 1110
  }
1109 1111

  
1110 1112
  if (lastMessage->originTime == subscriber->base.lastMessageTime) {
1111
    urtMutexUnlock(subscriber->base.topic);
1113
    urtMutexUnlock(&subscriber->base.topic->lock);
1112 1114
    return URT_STATUS_FETCH_NOMESSAGE;
1113 1115
  }
1114 1116

  
1115
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
1116
    uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime;
1117
  uint64_t calculatedLatency;
1118
  if (URT_CFG_PUBSUB_PROFILING == true || latency || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) {
1119
     calculatedLatency = urtTimeNow() - lastMessage->originTime;
1120
  }
1117 1121

  
1118 1122
#if(URT_CFG_PUBSUB_PROFILING == true)
1119 1123
  subscriber->base.sumLatencies += calculatedLatency;
......
1126 1130
  }
1127 1131
#endif /* URT_CFG_PUBSUB_PROFILING */
1128 1132

  
1129
    if (latency) {
1130
      latency = calculatedLatency;
1131
    }
1133
  if (latency) {
1134
    *latency = calculatedLatency;
1132 1135
  }
1133 1136

  
1134 1137
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true)
......
1139 1142
    subscriber->maxLatency = calculatedLatency;
1140 1143
  }
1141 1144
  else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) {
1142
    urtMutexUnlock(subscriber->base.topic);
1145
    urtMutexUnlock(&subscriber->base.topic->lock);
1143 1146
    urtCoreStopNodes(URT_STATUS_JITTERVIOLATION);
1144 1147
    return URT_STATUS_JITTERVIOLATION;
1145 1148
  }
......
1154 1157
  }
1155 1158
#endif /* URT_CFG_PUBSUB_PROFILING */
1156 1159

  
1157
  urtFetchMessage(lastMessage, subscriber, payload, bytes);
1160
    subscriber->base.lastMessage = lastMessage;
1161
    subscriber->base.lastMessageTime = lastMessage->originTime;
1162
    memcpy(lastMessage->payload, payload, bytes);
1158 1163

  
1159 1164
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true)
1160 1165
  urtTimerReset(subscriber->qosDeadlineTimer);
1161 1166
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */
1167

  
1168
  urtMutexUnlock(&subscriber->base.topic->lock);
1169
  return URT_STATUS_OK;
1162 1170
}
1163 1171

  
1164 1172
/**
......
1173 1181
{
1174 1182
  urtDebugAssert(subscriber);
1175 1183

  
1176
  if (subscriber->base.topic)
1177
  {
1178
    urtMutexLock(topic->lock);
1179
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
1180
    subscriber->base.topic->numHrtSubscribers--;
1184
  if(!subscriber->base.topic) {
1185
    return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1186
  }
1187

  
1188
  urtMutexLock(&subscriber->base.topic->lock);
1189
  urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
1190
  subscriber->base.topic->numHrtSubscribers--;
1181 1191
# if (URT_CFG_PUBSUB_PROFILING == true)
1182
    subscriber->base.topic->numSubscribers--;
1192
  subscriber->base.topic->numSubscribers--;
1183 1193
# endif /* URT_CFG_PUBSUB_PROFILING */
1184 1194
# if (URT_CFG_PUBSUB_QOS_RATECHECKS == true)
1185
    //TODO: remove self from topics lsit of HRT subscribers
1186
    //TODO: ...
1195
  //TODO: remove self from topics list of HRT subscribers, ...
1187 1196
# endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
1188 1197

  
1189
    urt_message_t* messageTemp = subscriber->base.lastMessage;
1190
    bool hrtZero = false;
1191
    while (messageTemp->next->originTime < messageTemp->originTime)
1192
    {
1193
        messageTemp = messageTemp->next;
1194
        messageTemp->numHrtConsumersLeft--;
1195
        if (messageTemp->numHrtConsumersLeft == 0)
1196
        {
1197
            hrtZero = true;
1198
        }
1198
  urt_message_t* messageTemp = subscriber->base.lastMessage;
1199
  bool hrtZero = false;
1200
  while (messageTemp->next->originTime < messageTemp->originTime){
1201
    messageTemp = messageTemp->next;
1202
    messageTemp->numHrtConsumersLeft--;
1203
    if (messageTemp->numHrtConsumersLeft == 0) {
1204
      hrtZero = true;
1205
    }
1199 1206
# if(URT_CFG_PUBSUB_PROFILING == true)
1200
        messageTemp->numConsumersLeft--;
1207
    messageTemp->numConsumersLeft--;
1201 1208
# endif /* URT_CFG_PUBSUB_PROFILING */
1202
    }
1203
    if (hrtZero)
1204
    {
1205
      urtCondvarSignal(subscriber->base.topic->hrtReleased);
1206
    }
1207

  
1208
    urtMutexUnlock(topic->lock);
1209
    subscriber->base.topic = NULL;
1210
    subscriber->base.lastMessage = NULL;
1211
    subscriber->base.lastMessageTime = 0;
1212
    return URT_STATUS_OK;
1209
  }
1210
  if (hrtZero){
1211
    urtCondvarSignal(&subscriber->base.topic->hrtReleased);
1213 1212
  }
1214 1213

  
1215
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
1214
  urtMutexUnlock(&subscriber->base.topic->lock);
1215
  subscriber->base.topic = NULL;
1216
  subscriber->base.lastMessage = NULL;
1217
  subscriber->base.lastMessageTime = 0;
1218
  return URT_STATUS_OK;
1216 1219
}
1217 1220

  
1218 1221

  
src/urt_topic.c
60 60
  urtDebugAssert(topic);
61 61
  urtDebugAssert(mandatoryMessage);
62 62

  
63
  urt_topic_t.next = NULL;
64
  urt_topic_t.id = id;
65
  urtMutexInit(topic->lock);
66
  urtEventSourceInit(topic->evtSource);
67
  urt_topic_t.numHrtSubscribers = 0;
68
  urtCondvarInit(topic->hrtReleased);
63
  topic->next = NULL;
64
  topic->id = id;
65
  urtMutexInit(&topic->lock);
66
  urtEventSourceInit(&topic->evtSource);
67
  topic->numHrtSubscribers = 0;
68
  urtCondvarInit(&topic->hrtReleased);
69 69
  topic->mandatoryMessage = *mandatoryMessage;
70 70
  topic->latestMessage = &topic->mandatoryMessage;
71 71
  #if (URT_CFG_PUBSUB_QOS_RATECHECKS)
72
    urt_topic_t.hrtSubscribers = nullptr;
72
    topic->hrtSubscribers = nullptr;
73 73
    //add later: timer init;
74 74
  #endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */
75 75
  #if (URT_CFG_PUBSUB_PROFILING)
76
    urt_topic_t.numMessagesPublished = 0;
77
    urt_topic_t.numMessagesDiscarded = 0;
78
    urt_topic_t.numSubscribers = 0;
76
    topic->numMessagesPublished = 0;
77
    topic->numMessagesDiscarded = 0;
78
    topic->numSubscribers = 0;
79 79
  #endif /* URT_CFG_PUBSUB_PROFILING */
80 80
  topic->latestMessage->next = topic->latestMessage;
81
  urtMutexLock(topic->lock);
81
  urtMutexLock(&topic->lock);
82 82
  urt_topic_t* topicTemp = topic;
83 83
  while (topicTemp != NULL && topicTemp->id < id)
84 84
      topicTemp = topicTemp->next;
85 85

  
86 86
  if (topicTemp == NULL)
87 87
  {
88
    //TODO: Append self to core's list of topic
89
    urtMutexUnlock(topic->lock);
88
    urtCoreAddTopic(topic);
89
    urtMutexUnlock(&topic->lock);
90 90
    return URT_STATUS_OK;
91 91
  }
92 92
  else if (topicTemp->id > id)
93 93
  {
94 94
    topicTemp->next = topic;
95
    urtMutexUnlock(topic->lock);
95
    urtMutexUnlock(&topic->lock);
96 96
    return URT_STATUS_OK;
97 97
  }
98 98
  else
99 99
  {
100
    urtMutexUnlock(topic->lock);
100
    urtMutexUnlock(&topic->lock);
101 101
    return URT_STATUS_TOPIC_DUPLICATE;
102 102
  }
103 103
}
urtware.h
95 95
/*
96 96
 * TODO: Add further ┬ÁRtWare includes here (e.g. urt_core.h).
97 97
 */
98
//Necessary for memcpy
99
#include <stdio.h>
100
#include <string.h>
101

  
98 102
#include <apps_urtosal.h>
99 103
#include <urt_message.h>
100 104
#include <urt_topic.h>

Also available in: Unified diff