Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ fb72e91b

History | View | Annotate | Download (43.174 KB)

1
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21

    
22
#include <urtware.h>
23
#include <stdio.h>
24

    
25
/******************************************************************************/
26
/* LOCAL DEFINITIONS                                                          */
27
/******************************************************************************/
28

    
29
/******************************************************************************/
30
/* EXPORTED VARIABLES                                                         */
31
/******************************************************************************/
32

    
33
/******************************************************************************/
34
/* LOCAL TYPES                                                                */
35
/******************************************************************************/
36

    
37
/******************************************************************************/
38
/* LOCAL VARIABLES                                                            */
39
/******************************************************************************/
40

    
41
/******************************************************************************/
42
/* LOCAL FUNCTIONS                                                            */
43
/******************************************************************************/
44

    
45
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
46
{
47
  while (oldestMessage->next->originTime < oldestMessage->originTime) {
48
    oldestMessage = oldestMessage->next;
49
  }
50
  return oldestMessage;
51
}
52

    
53
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
54
{
55
  urt_message_t* lastMessage = latestMessage;
56
  while (lastMessage->next->originTime < lastMessage->originTime) {
57
    lastMessage = lastMessage->next;
58
#if (URT_CFG_PUBSUB_PROFILING == true)
59
    subscriber->base.lastMessage->numConsumersLeft--;
60
    subscriber->base->numMessagesReceived++;
61
#endif /* URT_CFG_PUBSUB_PROFILING */
62
  }
63
  return latestMessage;
64
}
65

    
66
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
67
{
68
  urt_message_t* lastMessageContribute = messages;
69
  while (lastMessageContribute->next) {
70
    lastMessageContribute = lastMessageContribute->next;
71
  }
72
  lastMessageContribute->next = topic->latestMessage->next;
73
  topic->latestMessage->next = messages;
74
}
75

    
76
/******************************************************************************/
77
/* EXPORTED FUNCTIONS                                                         */
78
/******************************************************************************/
79

    
80
/**
81
 * @brief   Initialize the nrt Subscriber.
82
 *
83
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
84
 */
85
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
86
{
87
  urtDebugAssert(subscriber);
88

    
89
  subscriber->base.topic = NULL;
90
  urtEventListenerInit(subscriber->base.evtListener);
91
  subscriber->base.lastMessage = NULL;
92
  subscriber->base.lastMessageTime = 0;
93
#if (URT_CFG_PUBSUB_PROFILING == true)
94
    subscriber->minLatency = URT_DELAY_INFINITE;
95
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
96
#endif /* URT_CFG_PUBSUB_PROFILING */
97
  return;
98
}
99

    
100
/**
101
 * @brief  Subscribes the subscriber to a topic.
102
 *
103
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
104
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
105
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
106
 *                      Messages must not be associated to another topic.
107
 *                      Once a message has been contributed, it cannot be removed later.
108
 *                      May be NULL(no messages to contribute).
109
 *
110
 * @return  Returns URT_STATUS_OK on success.
111
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
112
 */
113
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
114
{
115
  urtDebugAssert(subscriber);
116
  urtDebugAssert(topic);
117

    
118
  if (!subscriber->base.topic) {
119
    return URT_STATUS_SUBSCRIBE_TOPICSET;
120
  }
121

    
122
  subscriber->base.topic = topic;
123
  urtMutexLock(&topic->lock);
124

    
125
  if (messages) {
126
    urtContributeMessages(messages, topic);
127
  }
128

    
129
  subscriber->base.lastMessage = topic->latestMessage;
130
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
131

    
132
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
133

    
134
#if (URT_CFG_PUBSUB_PROFILING == true)
135
    topic->numHrtSubscribers--;
136
#endif /* URT_CFG_PUBSUB_PROFILING */
137

    
138
  urtMutexUnlock(&topic->lock);
139
  return URT_STATUS_OK;
140
}
141

    
142
/**
143
 * @brief  Fetches the next message.
144
 *
145
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
146
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
147
 * @param[in] bytes  Payload size in bytes.
148
 * @param[in] latency  The latency can be returned by reference. May be NULL.
149
 *
150
 * @return  Returns URT_STATUS_OK on success.
151
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
152
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
153
 */
154
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
155
{   
156
  urtDebugAssert(subscriber);
157

    
158
  if (!subscriber->base.topic) {
159
    return URT_STATUS_FETCH_NOTOPIC;
160
  }
161

    
162
  urtMutexLock(&subscriber->base.topic->lock);
163

    
164
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
165
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
166
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
167
      urtMutexUnlock(&subscriber->base.topic->lock);
168
      return URT_STATUS_FETCH_NOMESSAGE;
169
    }
170
    oldestMessage = oldestMessage->next;
171
  }
172
  else {
173
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
174
  }
175

    
176
  subscriber->base.lastMessage = oldestMessage;
177
  subscriber->base.lastMessageTime = oldestMessage->originTime;
178
  memcpy(oldestMessage->payload, payload, bytes);
179

    
180
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
181
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
182

    
183
#if(URT_CFG_PUBSUB_PROFILING == true)
184
  subscriber->base.sumLatencies += calculatedLatency;
185

    
186
  if (calculatedLatency < subscriber->minLatency) {
187
    subscriber->minLatency = calculatedLatency;
188
  }
189
  else if (calculatedLatency > subscriber->maxLatency) {
190
    subscriber->maxLatency = calculatedLatency;
191
  }
192
#endif /* URT_CFG_PUBSUB_PROFILING */
193

    
194
    if (latency) {
195
      *latency = calculatedLatency;
196
    }
197
  }
198

    
199
#if (URT_CFG_PUBSUB_PROFILING == true)
200
  subscriber->base.lastMessage->numConsumersLeft--;
201
  subscriber->base->numMessagesReceived++;
202
#endif /* URT_CFG_PUBSUB_PROFILING */
203

    
204
  urtMutexUnlock(&subscriber->base.topic->lock);
205
  return URT_STATUS_OK;
206
}
207

    
208
/**
209
 * @brief Fetches the latest message.
210
 *
211
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
212
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
213
 * @param[in] bytes  Payload size in bytes.
214
 * @param[in] latency  The latency can be returned by reference. May be NULL.
215
 *
216
 * @return  Returns URT_STATUS_OK on success.
217
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
218
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
219
 */
220
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) {
221
  urtDebugAssert(subscriber);
222

    
223
  if (!subscriber->base.topic)
224
      return URT_STATUS_FETCH_NOTOPIC;
225

    
226
  urtMutexLock(&subscriber->base.topic->lock);
227
  urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage);
228

    
229
  if (lastMessage->originTime == subscriber->base.lastMessageTime)
230
  {
231
    urtMutexUnlock(&subscriber->base.topic->lock);
232
    return URT_STATUS_FETCH_NOMESSAGE;
233
  }
234

    
235
  subscriber->base.lastMessage = lastMessage;
236
  subscriber->base.lastMessageTime = lastMessage->originTime;
237
  memcpy(lastMessage->payload, payload, bytes);
238

    
239
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
240
    uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime;
241

    
242
#if(URT_CFG_PUBSUB_PROFILING == true)
243
  subscriber->base.sumLatencies += calculatedLatency;
244

    
245
  if (calculatedLatency < subscriber->minLatency) {
246
    subscriber->minLatency = calculatedLatency;
247
  }
248
  else if (calculatedLatency > subscriber->maxLatency) {
249
    subscriber->maxLatency = calculatedLatency;
250
  }
251
#endif /* URT_CFG_PUBSUB_PROFILING */
252

    
253
    if (latency) {
254
      *latency = calculatedLatency;
255
    }
256
  }
257

    
258
  urtMutexUnlock(&subscriber->base.topic->lock);
259
  return URT_STATUS_OK;
260
}
261

    
262
/**
263
 * @brief  Unsubscribes from a subscriber.
264
 *
265
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
266
 *
267
 * @return  Returns URT_STATUS_OK on sucess.
268
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
269
 */
270
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
271
{
272
  if (subscriber->base.topic)
273
  {
274
# if(URT_CFG_PUBSUB_PROFILING == true)
275
      urtMutexLock(&topic->lock);
276
      subscriber->base.topic->numSubscribers--;
277
# endif /* URT_CFG_PUBSUB_PROFILING */
278
    urtEventUnregister(&subscriber->base.topic->evtSource, &subscriber->base.evtListener);
279
# if(URT_CFG_PUBSUB_PROFILING == true)
280
      urtMutexUnlock(&topic->lock);
281
      subscriber->base.topic = NULL;
282
      subscriber->base.lastMessage = NULL;
283
      subscriber->base.lastMessageTime = 0;
284
#endif /* URT_CFG_PUBSUB_PROFILING */
285
    return URT_STATUS_OK;
286
  }
287

    
288
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
289
}
290

    
291

    
292
/**
293
 * @brief  Initialize the srt Subscriber.
294
 *
295
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
296
 */
297
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
298
{
299
  urtDebugAssert(subscriber);
300

    
301
  subscriber->base.topic = NULL;
302
  urtEventListenerInit(subscriber->base.evtListener);
303
  subscriber->base.lastMessage = NULL;
304
  subscriber->base.lastMessageTime = 0;
305
#if (URT_CFG_PUBSUB_PROFILING)
306
  subscriber->base.sumLatencies = 0;
307
  subscriber->base.numMessagesReceived = 0;
308
  subscriber->usefulnesscb = NULL;
309
  subscriber->cbparams = NULL;
310
  subscriber->minLatency = URT_DELAY_INFINITE;
311
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
312
#endif /* URT_CFG_PUBSUB_PROFILING */
313
  return;
314
}
315

    
316
/**
317
 * @brief  Subscribes the subscriber to a topic.
318
 *
319
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
320
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
321
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
322
 *                     Messages must not be associated to another topic.
323
 *                     Once a message has been contributed, it cannot be removed later.
324
 *                     May be NULL (no messages to contribute)
325
 * @param[in] usefulnesscb  Pointer to a function to calculate usefulness of a message. Must not be NULL.
326
 * @param[in] cbparams  Optional parameters for the usefulness callback.
327
 *                      May be NULL if the callback expects no parameters.
328
 *
329
 * @return  Returns URT_STATUS_OK on success.
330
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
331
 */
332
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic,
333
                                       urt_message_t* messages, urt_usefulness_f* usefulnesscb,void* cbparams)
334
{
335
  urtDebugAssert(subscriber);
336
  urtDebugAssert(topic);
337

    
338
  if (subscriber->base.topic) {
339
    return URT_STATUS_SUBSCRIBE_TOPICSET;
340
  }
341

    
342
  subscriber->base.topic = topic;
343
  subscriber->usefulnesscb = usefulnesscb;
344
  subscriber->cbparams = cbparams;
345
#if (URT_CFG_PUBSUB_PROFILING == true)
346
  subscriber->base.sumLatencies = 0;
347
  subscriber->base.numMessagesReceived = 0;
348
  subscriber->minLatency = URT_DELAY_INFINITE;
349
  subscriber->maxLatency = URT_DELAY_IMMEDIATE;
350
#endif  /* URT_CFG_PUBSUB_PROFILING */
351

    
352
  urtMutexLock(&topic->lock);
353
  if (messages) {
354
    urtContributeMessages(messages, topic);
355
  }
356

    
357
  subscriber->base.lastMessage = topic->latestMessage;
358
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
359

    
360
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
361

    
362
# if (URT_CFG_PUBSUB_PROFILING == true)
363
    topic->numHrtSubscribers--;
364
# endif /* URT_CFG_PUBSUB_PROFILING */
365

    
366
  urtMutexUnlock(&topic->lock);
367
  return URT_STATUS_OK;
368
}
369

    
370
/**
371
 * @brief  Fetches the next message.
372
 *
373
 * @param[in] subscriber  The SRT subscriber that shall fetch the message. Must not be NULL.
374
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.