Revision 5c6cb22f
inc/urt_message.h | ||
---|---|---|
45 | 45 |
*/ |
46 | 46 |
typedef struct urt_message |
47 | 47 |
{ |
48 |
urt_message_t* next;
|
|
48 |
struct urt_message* next;
|
|
49 | 49 |
void* payload; |
50 | 50 |
urt_osTime_t originTime; |
51 | 51 |
unsigned int numHrtConsumersLeft; |
inc/urt_subscriber.h | ||
---|---|---|
22 | 22 |
#ifndef URTWARE_SUBSCRIBER_H |
23 | 23 |
#define URTWARE_SUBSCRIBER_H |
24 | 24 |
|
25 |
#include <urt_types.h> |
|
26 | 25 |
#include <urtware.h> |
27 | 26 |
|
28 | 27 |
/******************************************************************************/ |
... | ... | |
70 | 69 |
}urt_nrtsubscriber_t; |
71 | 70 |
|
72 | 71 |
/** |
73 |
* @brief nrt subscriber
|
|
72 |
* @brief srt subscriber
|
|
74 | 73 |
*/ |
75 | 74 |
typedef struct urt_srtsubscriber |
76 | 75 |
{ |
77 | 76 |
urt_basesubscriber_t base; |
78 |
urt_srtusefulnessfunc_t* usefulnesscb; |
|
77 |
void* cbparams; |
|
78 |
urt_usefulness_f* usefulnesscb; |
|
79 | 79 |
#if (URT_CFG_PUBSUB_PROFILING) |
80 | 80 |
urt_delay_t minLatency; |
81 | 81 |
urt_delay_t maxLatency; |
... | ... | |
83 | 83 |
}urt_srtsubscriber_t; |
84 | 84 |
|
85 | 85 |
/** |
86 |
* @brief nrt subscriber
|
|
86 |
* @brief frt subscriber
|
|
87 | 87 |
*/ |
88 | 88 |
typedef struct urt_frtsubscriber |
89 | 89 |
{ |
... | ... | |
106 | 106 |
typedef struct urt_hrtsubscriber |
107 | 107 |
{ |
108 | 108 |
urt_basesubscriber_t base; |
109 |
urt_hrtsubscriber_t* next;
|
|
109 |
struct urt_hrtsubscriber* next;
|
|
110 | 110 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS) |
111 | 111 |
urt_delay_t deadlineOffset; |
112 | 112 |
urt_osTimer_t qosDeadlineTimer; |
... | ... | |
142 | 142 |
urt_status_t urtNrtSubscriberUnsubscribe(urt_nrtsubscriber_t* subscriber); |
143 | 143 |
|
144 | 144 |
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber); |
145 |
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* message, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams);
|
|
145 |
urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* message, urt_usefulness_f* usefulnesscb, void* cbparams);
|
|
146 | 146 |
urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency); |
147 | 147 |
urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency); |
148 | 148 |
float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency); |
... | ... | |
160 | 160 |
urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency); |
161 | 161 |
urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency); |
162 | 162 |
urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber); |
163 |
|
|
164 |
float urt_srtusefulnessfunc_t (urt_delay_t dt, void* params); |
|
165 | 163 |
#if defined(__cplusplus) |
166 | 164 |
} |
167 | 165 |
#endif /* defined(__cplusplus) */ |
inc/urt_topic.h | ||
---|---|---|
22 | 22 |
#ifndef URTWARE_TOPIC_H |
23 | 23 |
#define URTWARE_TOPIC_H |
24 | 24 |
|
25 |
#include <urt_types.h> |
|
26 | 25 |
#include <urtware.h> |
27 | 26 |
|
28 | 27 |
/******************************************************************************/ |
... | ... | |
46 | 45 |
*/ |
47 | 46 |
typedef struct urt_topic |
48 | 47 |
{ |
49 |
urt_topic_t* next;
|
|
48 |
struct urt_topic* next;
|
|
50 | 49 |
urt_topicid_t id; |
51 | 50 |
urt_osMutex_t lock; |
52 | 51 |
urt_osEventSource_t evtSource; |
... | ... | |
55 | 54 |
urt_message_t mandatoryMessage; |
56 | 55 |
urt_message_t* latestMessage; |
57 | 56 |
#if (URT_CFG_PUBSUB_QOS_RATECHECKS) |
58 |
urt_hrtsubscriber_t* hrtSubscribers;
|
|
57 |
struct urt_hrtsubscriber* hrtSubscribers;
|
|
59 | 58 |
urt_osTimer_t qosRateTimer; |
60 | 59 |
#endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
61 | 60 |
#if (URT_CFG_PUBSUB_PROFILING) |
src/urt_core.c | ||
---|---|---|
244 | 244 |
* @return Returns a pointer to the requested service. Returns NULL if no service matches the given ID. |
245 | 245 |
*/ |
246 | 246 |
#if (URT_CFG_PUBSUB_ENABLED) |
247 |
urt_topic_t* urtCoreGetTopic(urt_topicid_t id) {return urt_topic_t;} |
|
247 |
urt_topic_t* urtCoreGetTopic(urt_topicid_t id) |
|
248 |
{ |
|
249 |
urtMutexLock(&core._lock); |
|
250 |
urt_topic_t* topic = core._topics; |
|
251 |
while (topic != NULL && topic->id < id) |
|
252 |
topic = topic->next; |
|
253 |
urtMutexUnlock(&core._lock); |
|
254 |
if (topic != NULL && topic->id == id) |
|
255 |
return topic; |
|
256 |
else |
|
257 |
return NULL; |
|
258 |
} |
|
248 | 259 |
#endif /* URT_CFG_PUBSUB_ENABLED */ |
249 | 260 |
|
250 | 261 |
|
src/urt_subscriber.c | ||
---|---|---|
50 | 50 |
* |
51 | 51 |
* @param[in] subscriber The NRT subscriber to initialize. Must not be NULL. |
52 | 52 |
*/ |
53 |
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber) {return;} |
|
53 |
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber) |
|
54 |
{ |
|
55 |
subscriber->base.topic = NULL; |
|
56 |
urtEventListenerInit(subscriber->base.evtListener); |
|
57 |
subscriber->base.lastMessage = NULL; |
|
58 |
subscriber->base.lastMessageTime = 0; |
|
59 |
#if (URT_CFG_PUBSUB_PROFILING) |
|
60 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
61 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
62 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
63 |
return; |
|
64 |
} |
|
54 | 65 |
|
55 | 66 |
/** |
56 | 67 |
* @brief Subscribes the subscriber to a topic. |
... | ... | |
119 | 130 |
* |
120 | 131 |
* @param[in] subscriber The SRT subscriber to initialize. Must not be NULL. |
121 | 132 |
*/ |
122 |
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber) {return;} |
|
133 |
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber) |
|
134 |
{ |
|
135 |
subscriber->base.topic = NULL; |
|
136 |
urtEventListenerInit(subscriber->base.evtListener); |
|
137 |
subscriber->base.lastMessage = NULL; |
|
138 |
subscriber->base.lastMessageTime = 0; |
|
139 |
#if (URT_CFG_PUBSUB_PROFILING) |
|
140 |
subscriber->base.sumLatencies = 0; |
|
141 |
subscriber->base.numMessagesReceived = 0; |
|
142 |
subscriber->usefulnesscb = NULL; |
|
143 |
subscriber->cbparams = NULL; |
|
144 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
145 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
146 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
147 |
return; |
|
148 |
} |
|
123 | 149 |
|
124 | 150 |
/** |
125 | 151 |
* @brief Subscribes the subscriber to a topic. |
... | ... | |
196 | 222 |
* |
197 | 223 |
* @param[in] subscriber The SRT subscriber to initialize. Must not be NULL. |
198 | 224 |
*/ |
199 |
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;} |
|
225 |
void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber) |
|
226 |
{ |
|
227 |
subscriber->base.topic = NULL; |
|
228 |
urtEventListenerInit(subscriber->base.evtListener); |
|
229 |
subscriber->base.lastMessage = NULL; |
|
230 |
subscriber->base.lastMessageTime = 0; |
|
231 |
|
|
232 |
#if (URT_CFG_PUBSUB_PROFILING) |
|
233 |
subscriber->base.sumLatencies = 0; |
|
234 |
subscriber->base.numMessagesReceived = 0; |
|
235 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
236 |
|
|
237 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS) |
|
238 |
subscriber->deadlineOffset = 0; |
|
239 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
|
240 |
|
|
241 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS) |
|
242 |
subscriber->maxJitter = 0; |
|
243 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
244 |
|
|
245 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING) |
|
246 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
247 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
248 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
|
249 |
return; |
|
250 |
} |
|
251 |
|
|
200 | 252 |
|
201 | 253 |
/** |
202 | 254 |
* @brief Subscribes the subscriber to a topic. |
... | ... | |
260 | 312 |
/** |
261 | 313 |
* @brief Unsubscribes from a subscriber. |
262 | 314 |
* |
263 |
* @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
|
315 |
* @param[in] subscriber The FRT subscriber to be unsubscribed. Must not be NULL.
|
|
264 | 316 |
* |
265 | 317 |
* @return Returns URT_STATUS_OK on sucess. |
266 | 318 |
* Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic. |
... | ... | |
273 | 325 |
* |
274 | 326 |
* @param[in] subscriber The HRT subscriber to initialize. Must not be NULL. |
275 | 327 |
*/ |
276 |
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber){return;} |
|
328 |
void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber) |
|
329 |
{ |
|
330 |
subscriber->base.topic = NULL; |
|
331 |
urtEventListenerInit(subscriber->base.evtListener); |
|
332 |
subscriber->base.lastMessage = NULL; |
|
333 |
subscriber->base.lastMessageTime = 0; |
|
334 |
|
|
335 |
#if (URT_CFG_PUBSUB_PROFILING) |
|
336 |
subscriber->base.sumLatencies = 0; |
|
337 |
subscriber->base.numMessagesReceived = 0; |
|
338 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
|
339 |
|
|
340 |
subscriber->next = NULL; |
|
341 |
|
|
342 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS) |
|
343 |
subscriber->deadlineOffset = 0; |
|
344 |
urtTimerInit(subscriber->qodDeadlineTimer); |
|
345 |
#endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
|
346 |
|
|
347 |
#if (URT_CFG_PUBSUB_QOS_RATECHECKS) |
|
348 |
subscriber->expectedRate = 0; |
|
349 |
#endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
|
350 |
|
|
351 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS) |
|
352 |
subscriber->maxJitter = 0; |
|
353 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
|
354 |
|
|
355 |
#if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING) |
|
356 |
subscriber->minLatency = URT_DELAY_INFINITE; |
|
357 |
subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
|
358 |
#endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
|
359 |
return; |
|
360 |
} |
|
361 |
|
|
277 | 362 |
|
278 | 363 |
/** |
279 | 364 |
* @brief Subscribes the subscriber to a topic. |
280 | 365 |
* |
281 |
* @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
|
366 |
* @param[in] subscriber The HRT subscriber which shall subscribe to a topic. Must not be NULL.
|
|
282 | 367 |
* @param[in] topic The topic to subscribe to. Must not be NULL. |
283 | 368 |
* @param[in] messages NULL terminated list of messages to contribute to the topic. |
284 | 369 |
* Messages must not be associated to another topic. |
src/urt_topic.c | ||
---|---|---|
58 | 58 |
{ |
59 | 59 |
urt_topic_t.next = NULL; |
60 | 60 |
urt_topic_t.id = id; |
61 |
//add later: urtmutexinit, urteventsourceinit |
|
61 |
urtMutexInit(topic->lock); |
|
62 |
urtEventSourceInit(topic->evtSource); |
|
62 | 63 |
urt_topic_t.numHrtSubscribers = 0; |
63 |
//add later: condvar, message init, latest message |
|
64 |
urtCondvarInit(topic->hrtReleased); |
|
65 |
void* payload = NULL; //TODO: Replace |
|
66 |
urtMessageInit(topic->mandatoryMessage, payload); |
|
67 |
topic->latestMessage = &topic->mandatoryMessage; |
|
64 | 68 |
#if (URT_CFG_PUBSUB_QOS_RATECHECKS) |
65 | 69 |
urt_topic_t.hrtSubscribers = nullptr; |
66 | 70 |
//add later: timer init; |
... | ... | |
70 | 74 |
urt_topic_t.numMessagesDiscarded = 0; |
71 | 75 |
urt_topic_t.numSubscribers = 0; |
72 | 76 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
73 |
//add later: close circular message buffer,... |
|
74 |
return URT_STATUS_OK; |
|
77 |
topic->latestMessage->next = topic->latestMessage; |
|
78 |
urtMutexLock(topic->lock); |
|
79 |
urt_topic_t* topicTemp = topic; |
|
80 |
while (topicTemp != NULL && topicTemp->id < id) |
|
81 |
topicTemp = topicTemp->next; |
|
82 |
|
|
83 |
if (topicTemp == NULL) |
|
84 |
{ |
|
85 |
//TODO: Append self to core's list of topic |
|
86 |
urtMutexUnlock(topic->lock); |
|
87 |
return URT_STATUS_OK; |
|
88 |
} |
|
89 |
else if (topicTemp->id > id) |
|
90 |
{ |
|
91 |
topicTemp->next = topic; |
|
92 |
urtMutexUnlock(topic->lock); |
|
93 |
return URT_STATUS_OK; |
|
94 |
} |
|
95 |
else |
|
96 |
{ |
|
97 |
urtMutexUnlock(topic->lock); |
|
98 |
return URT_STATUS_TOPIC_DUPLICATE; |
|
99 |
} |
|
75 | 100 |
} |
urtware.h | ||
---|---|---|
96 | 96 |
* TODO: Add further µRtWare includes here (e.g. urt_core.h). |
97 | 97 |
*/ |
98 | 98 |
#include <apps_urtosal.h> |
99 |
#include <urt_message.h> |
|
100 |
#include <urt_topic.h> |
|
101 |
#include <urt_subscriber.h> |
|
99 | 102 |
#include <urt_node.h> |
100 | 103 |
#include <urt_core.h> |
101 | 104 |
|
Also available in: Unified diff