urtware / src / urt_subscriber.c @ a5e142de
History | View | Annotate | Download (24.836 KB)
1 | 1fb06240 | skenneweg | /*
|
---|---|---|---|
2 | µRtWare is a lightweight publish/subscribe middleware for real-time
|
||
3 | applications. It was developed as part of the software habitat for the
|
||
4 | Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
|
||
5 | |||
6 | Copyright (C) 2018..2020 Thomas Schöpping et al.
|
||
7 | |||
8 | This program is free software: you can redistribute it and/or modify
|
||
9 | it under the terms of the GNU General Public License as published by
|
||
10 | the Free Software Foundation, either version 3 of the License, or
|
||
11 | (at your option) any later version.
|
||
12 | |||
13 | This program is distributed in the hope that it will be useful,
|
||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
16 | GNU General Public License for more details.
|
||
17 | |||
18 | You should have received a copy of the GNU General Public License
|
||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
20 | */
|
||
21 | |||
22 | 7d9678db | skenneweg | #include <urtware.h> |
23 | |||
24 | 1fb06240 | skenneweg | /******************************************************************************/
|
25 | /* LOCAL DEFINITIONS */
|
||
26 | /******************************************************************************/
|
||
27 | |||
28 | /******************************************************************************/
|
||
29 | /* EXPORTED VARIABLES */
|
||
30 | /******************************************************************************/
|
||
31 | |||
32 | /******************************************************************************/
|
||
33 | /* LOCAL TYPES */
|
||
34 | /******************************************************************************/
|
||
35 | |||
36 | /******************************************************************************/
|
||
37 | /* LOCAL VARIABLES */
|
||
38 | /******************************************************************************/
|
||
39 | |||
40 | /******************************************************************************/
|
||
41 | /* LOCAL FUNCTIONS */
|
||
42 | /******************************************************************************/
|
||
43 | |||
44 | /******************************************************************************/
|
||
45 | /* EXPORTED FUNCTIONS */
|
||
46 | /******************************************************************************/
|
||
47 | |||
48 | 7d9678db | skenneweg | /**
|
49 | * @brief Initialize the nrt Subscriber.
|
||
50 | *
|
||
51 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to initialize. Must not be NULL.
|
52 | 7d9678db | skenneweg | */
|
53 | 5c6cb22f | skenneweg | void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
|
54 | { |
||
55 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
56 | |||
57 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
58 | urtEventListenerInit(subscriber->base.evtListener); |
||
59 | subscriber->base.lastMessage = NULL;
|
||
60 | subscriber->base.lastMessageTime = 0;
|
||
61 | a5e142de | skenneweg | #if (URT_CFG_PUBSUB_PROFILING == true) |
62 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
63 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
64 | a5e142de | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
65 | 5c6cb22f | skenneweg | return;
|
66 | } |
||
67 | 1fb06240 | skenneweg | |
68 | 7d9678db | skenneweg | /**
|
69 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
70 | 7d9678db | skenneweg | *
|
71 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
72 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
73 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
74 | * Messages must not be associated to another topic.
|
||
75 | * Once a message has been contributed, it cannot be removed later.
|
||
76 | * May be NULL(no messages to contribute).
|
||
77 | 7d9678db | skenneweg | *
|
78 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
79 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
80 | 7d9678db | skenneweg | */
|
81 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages) |
82 | { |
||
83 | urtDebugAssert(subscriber); |
||
84 | urtDebugAssert(topic); |
||
85 | |||
86 | if (!subscriber->base.topic)
|
||
87 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
88 | |||
89 | subscriber->base.topic = topic; |
||
90 | //TODO: Lock topic
|
||
91 | |||
92 | if (messages)
|
||
93 | { |
||
94 | urt_message_t* lastMessageContribute = messages; |
||
95 | while (lastMessageContribute->next)
|
||
96 | { |
||
97 | lastMessageContribute = lastMessageContribute->next; |
||
98 | } |
||
99 | lastMessageContribute->next = topic->latestMessage->next; |
||
100 | topic->latestMessage->next = messages; |
||
101 | } |
||
102 | |||
103 | subscriber->base.lastMessage = topic->latestMessage; |
||
104 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
105 | |||
106 | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
||
107 | |||
108 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
109 | topic->numHrtSubscribers--; |
||
110 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
111 | |||
112 | //TODO: Unlock topic
|
||
113 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
114 | 1fb06240 | skenneweg | } |
115 | |||
116 | 7d9678db | skenneweg | /**
|
117 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
118 | 7d9678db | skenneweg | *
|
119 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
120 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
121 | * @param[in] bytes Payload size in bytes.
|
||
122 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
123 | 7d9678db | skenneweg | *
|
124 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
125 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
126 | * Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
127 | 7d9678db | skenneweg | */
|
128 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
|
129 | { |
||
130 | urtDebugAssert(subscriber); |
||
131 | |||
132 | urt_message_t youngestMessage; |
||
133 | if (subscriber->base.topic)
|
||
134 | { |
||
135 | //TODO: Lock Topic
|
||
136 | urt_osTime_t localCopy; //TODO: replace with local copy
|
||
137 | if (subscriber->base.lastMessageTime == localCopy)
|
||
138 | { |
||
139 | if(subscriber->base.lastMessage->next->originTime < localCopy)
|
||
140 | { |
||
141 | youngestMessage = subscriber->base.lastMessage->next; |
||
142 | } |
||
143 | else
|
||
144 | { |
||
145 | //TODO: Unlock Topic
|
||
146 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
147 | } |
||
148 | } |
||
149 | else
|
||
150 | { |
||
151 | youngestMessage = subscriber->base.lastMessage->next; |
||
152 | while (youngestMessage.originTime < localCopy)
|
||
153 | { |
||
154 | youngestMessage = youngestMessage.next; |
||
155 | } |
||
156 | } |
||
157 | } |
||
158 | else
|
||
159 | { |
||
160 | return URT_STATUS_FETCH_NOTOPIC;
|
||
161 | } |
||
162 | |||
163 | latency = subscriber->base.lastMessageTime - youngestMessage.originTime; |
||
164 | |||
165 | //TODO: Other cases
|
||
166 | //TODO: Unlock topic
|
||
167 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
168 | 1fb06240 | skenneweg | } |
169 | |||
170 | 7d9678db | skenneweg | /**
|
171 | a5e142de | skenneweg | * @brief Fetches the latest message.
|
172 | 7d9678db | skenneweg | *
|
173 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
174 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
175 | * @param[in] bytes Payload size in bytes.
|
||
176 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
177 | 7d9678db | skenneweg | *
|
178 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
179 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
180 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
181 | 7d9678db | skenneweg | */
|
182 | 1fb06240 | skenneweg | urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
|
183 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
184 | 1fb06240 | skenneweg | } |
185 | |||
186 | 7d9678db | skenneweg | /**
|
187 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
188 | 7d9678db | skenneweg | *
|
189 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
190 | 7d9678db | skenneweg | *
|
191 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
192 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
193 | 7d9678db | skenneweg | */
|
194 | 1fb06240 | skenneweg | urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) { |
195 | a5e142de | skenneweg | if (subscriber->base.topic)
|
196 | { |
||
197 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
198 | //TODO: LOCK TOPIC
|
||
199 | subscriber->base.topic->numHrtSubscribers--; |
||
200 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
201 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
202 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
203 | //TODO: Unlock TOPIC
|
||
204 | subscriber->base.topic = NULL;
|
||
205 | subscriber->base.lastMessage = NULL;
|
||
206 | subscriber->base.lastMessageTime = 0;
|
||
207 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
208 | return URT_STATUS_OK;
|
||
209 | } |
||
210 | else
|
||
211 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
212 | 1fb06240 | skenneweg | } |
213 | |||
214 | 7d9678db | skenneweg | |
215 | /**
|
||
216 | 5198dfae | skenneweg | * @brief Initialize the srt Subscriber.
|
217 | 7d9678db | skenneweg | *
|
218 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
219 | 7d9678db | skenneweg | */
|
220 | 5c6cb22f | skenneweg | void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
|
221 | { |
||
222 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
223 | |||
224 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
225 | urtEventListenerInit(subscriber->base.evtListener); |
||
226 | subscriber->base.lastMessage = NULL;
|
||
227 | subscriber->base.lastMessageTime = 0;
|
||
228 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
229 | subscriber->base.sumLatencies = 0;
|
||
230 | subscriber->base.numMessagesReceived = 0;
|
||
231 | subscriber->usefulnesscb = NULL;
|
||
232 | subscriber->cbparams = NULL;
|
||
233 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
234 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
235 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
236 | return;
|
||
237 | } |
||
238 | 7d9678db | skenneweg | |
239 | /**
|
||
240 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
241 | *
|
||
242 | * @param[in] subscriber The SRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
243 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
244 | * @param[in] message NULL terminated list of messages to contribute to the topic.
|
||
245 | * Messages must not be associated to another topic.
|
||
246 | * Once a message has been contributed, it cannot be removed later.
|
||
247 | * May be NULL (no messages to contribute)
|
||
248 | * @param[in] usefulnesscb Pointer to a function to calculate usefulness of a message. Must not be NULL.
|
||
249 | * @param[in] cbparams Optional parameters for the usefulness callback.
|
||
250 | * May be NULL if the callback expects no parameters.
|
||
251 | *
|
||
252 | * @return Returns URT_STATUS_OK on success.
|
||
253 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
254 | 7d9678db | skenneweg | */
|
255 | urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, |
||
256 | a5e142de | skenneweg | urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
257 | { |
||
258 | urtDebugAssert(subscriber); |
||
259 | urtDebugAssert(topic); |
||
260 | |||
261 | if (subscriber->base.topic)
|
||
262 | { |
||
263 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
264 | } |
||
265 | else
|
||
266 | { |
||
267 | subscriber->base.topic = topic; |
||
268 | subscriber->usefulnesscb = usefulnesscb; |
||
269 | subscriber->cbparams = cbparams; |
||
270 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
271 | subscriber->base.sumLatencies = 0;
|
||
272 | subscriber->base.numMessagesReceived = 0;
|
||
273 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
274 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
275 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
276 | } |
||
277 | |||
278 | //TODO: Lock topic
|
||
279 | if (messages)
|
||
280 | { |
||
281 | urt_message_t* lastMessageContribute = messages; |
||
282 | while (lastMessageContribute->next)
|
||
283 | { |
||
284 | lastMessageContribute = lastMessageContribute->next; |
||
285 | } |
||
286 | lastMessageContribute->next = topic->latestMessage->next; |
||
287 | topic->latestMessage->next = messages; |
||
288 | } |
||
289 | |||
290 | subscriber->base.lastMessage = topic->latestMessage; |
||
291 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
292 | |||
293 | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
||
294 | |||
295 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
296 | topic->numHrtSubscribers--; |
||
297 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
298 | |||
299 | //TODO: Unlock topic
|
||
300 | return URT_STATUS_OK;
|
||
301 | } |
||
302 | 7d9678db | skenneweg | |
303 | /**
|
||
304 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
305 | 7d9678db | skenneweg | *
|
306 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
307 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
308 | * @param[in] bytes Payload size in bytes.
|
||
309 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
310 | 7d9678db | skenneweg | *
|
311 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
312 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
313 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
314 | 7d9678db | skenneweg | */
|
315 | urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
||
316 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
317 | |||
318 | /**
|
||
319 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
320 | 7d9678db | skenneweg | *
|
321 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
322 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
323 | * @param[in] bytes Payload size in bytes.
|
||
324 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
325 | 7d9678db | skenneweg | *
|
326 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
327 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
328 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
329 | 7d9678db | skenneweg | */
|
330 | urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
||
331 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
332 | |||
333 | /**
|
||
334 | 5198dfae | skenneweg | * @brief Calculates the usefulness of the subscriber.
|
335 | 7d9678db | skenneweg | *
|
336 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
337 | * @param[in] latency Latency (of a message) as argument to calculate usefulness.
|
||
338 | 7d9678db | skenneweg | *
|
339 | 5198dfae | skenneweg | * @return Returns the usefulness as a value within [0,1].
|
340 | 7d9678db | skenneweg | */
|
341 | a5e142de | skenneweg | float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
|
342 | { |
||
343 | urtDebugAssert(subscriber); |
||
344 | |||
345 | return subscriber->usefulnesscb(latency);
|
||
346 | } |
||
347 | 7d9678db | skenneweg | |
348 | /**
|
||
349 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
350 | 7d9678db | skenneweg | *
|
351 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
352 | 7d9678db | skenneweg | *
|
353 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
354 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
355 | 7d9678db | skenneweg | */
|
356 | a5e142de | skenneweg | urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber) |
357 | { |
||
358 | urtDebugAssert(subscriber); |
||
359 | |||
360 | if (subscriber->base.topic)
|
||
361 | { |
||
362 | if (URT_CFG_PUBSUB_PROFILING == true) |
||
363 | { |
||
364 | //TODO: lock topic
|
||
365 | subscriber->base.topic->numHrtSubscribers--; |
||
366 | } |
||
367 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
368 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
369 | //TODO: unlock topic
|
||
370 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
371 | subscriber->base.topic = NULL;
|
||
372 | subscriber->base.lastMessage = NULL;
|
||
373 | subscriber->base.lastMessageTime = 0;
|
||
374 | return URT_STATUS_OK;
|
||
375 | } |
||
376 | |||
377 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
378 | } |
||
379 | 7d9678db | skenneweg | |
380 | |||
381 | /**
|
||
382 | 5198dfae | skenneweg | * @brief Initialize the FRT Subscriber.
|
383 | 7d9678db | skenneweg | *
|
384 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
385 | 7d9678db | skenneweg | */
|
386 | 5c6cb22f | skenneweg | void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
|
387 | { |
||
388 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
389 | |||
390 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
391 | urtEventListenerInit(subscriber->base.evtListener); |
||
392 | subscriber->base.lastMessage = NULL;
|
||
393 | subscriber->base.lastMessageTime = 0;
|
||
394 | |||
395 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
396 | subscriber->base.sumLatencies = 0;
|
||
397 | subscriber->base.numMessagesReceived = 0;
|
||
398 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
399 | |||
400 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
||
401 | subscriber->deadlineOffset = 0;
|
||
402 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
403 | |||
404 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
||
405 | subscriber->maxJitter = 0;
|
||
406 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
407 | |||
408 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
||
409 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
410 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
411 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
412 | return;
|
||
413 | } |
||
414 | |||
415 | 7d9678db | skenneweg | |
416 | /**
|
||
417 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
418 | *
|
||
419 | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
420 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
421 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
422 | * Messages must not be associated to another topic.
|
||
423 | * Once a message has been contributed, it cannot be removed later.
|
||
424 | * May be NULL(no messages to contribute).
|
||
425 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
426 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
427 | * A value of 0 indicates that jitter is of no concern.
|
||
428 | *
|
||
429 | * @return Returns URT_STATUS_OK on success.
|
||
430 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
431 | 7d9678db | skenneweg | */
|
432 | urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic, |
||
433 | a5e142de | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter) |
434 | { |
||
435 | urtDebugAssert(subscriber); |
||
436 | urtDebugAssert(topic); |
||
437 | |||
438 | if (subscriber->base.topic)
|
||
439 | { |
||
440 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
441 | } |
||
442 | else
|
||
443 | { |
||
444 | subscriber->base.topic = topic; |
||
445 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
446 | subscriber->base.sumLatencies = 0;
|
||
447 | subscriber->base.numMessagesReceived = 0;
|
||
448 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
449 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
450 | subscriber->deadlineOffset = deadline; |
||
451 | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
452 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
453 | subscriber->maxJitter =jitter; |
||
454 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
455 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
456 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
457 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
458 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
459 | } |
||
460 | |||
461 | //TODO: Lock topic
|
||
462 | if (messages)
|
||
463 | { |
||
464 | urt_message_t* lastMessageContribute = messages; |
||
465 | while (lastMessageContribute->next)
|
||
466 | { |
||
467 | lastMessageContribute = lastMessageContribute->next; |
||
468 | } |
||
469 | lastMessageContribute->next = topic->latestMessage->next; |
||
470 | topic->latestMessage->next = messages; |
||
471 | } |
||
472 | |||
473 | subscriber->base.lastMessage = topic->latestMessage; |
||
474 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
475 | |||
476 | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
||
477 | |||
478 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
479 | topic->numHrtSubscribers--; |
||
480 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
481 | |||
482 | //TODO: Unlock topic
|
||
483 | return URT_STATUS_OK;
|
||
484 | } |
||
485 | 7d9678db | skenneweg | |
486 | /**
|
||
487 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
488 | 7d9678db | skenneweg | *
|
489 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
490 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
491 | * @param[in] bytes Payload size in bytes.
|
||
492 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
493 | 7d9678db | skenneweg | *
|
494 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
495 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
496 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
497 | 7d9678db | skenneweg | */
|
498 | urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
||
499 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
500 | |||
501 | /**
|
||
502 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
503 | 7d9678db | skenneweg | *
|
504 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
505 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
506 | * @param[in] bytes Payload size in bytes.
|
||
507 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
508 | 7d9678db | skenneweg | *
|
509 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
510 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
511 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
512 | 7d9678db | skenneweg | */
|
513 | urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
||
514 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
515 | |||
516 | /**
|
||
517 | 5198dfae | skenneweg | * @brief Calculates the validity from the subscriber.
|
518 | 7d9678db | skenneweg | *
|
519 | 5198dfae | skenneweg | * @param[in] subscriber The FRT subscriber to calculate a validity for. Must not be NULL.
|
520 | * @param[in] latency Latency (of a message) as argument to calculate validity.
|
||
521 | 7d9678db | skenneweg | *
|
522 | 5198dfae | skenneweg | * @return Returns a boolean indicator whether the latency is fine.
|
523 | 7d9678db | skenneweg | */
|
524 | a5e142de | skenneweg | bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
|
525 | { |
||
526 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
527 | if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
||
528 | return true; |
||
529 | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
530 | return false; |
||
531 | } |
||
532 | 7d9678db | skenneweg | |
533 | /**
|
||
534 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
535 | 7d9678db | skenneweg | *
|
536 | 5c6cb22f | skenneweg | * @param[in] subscriber The FRT subscriber to be unsubscribed. Must not be NULL.
|
537 | 7d9678db | skenneweg | *
|
538 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
539 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
540 | 7d9678db | skenneweg | */
|
541 | urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber){return URT_STATUS_OK;}
|
||
542 | |||
543 | |||
544 | /**
|
||
545 | 5198dfae | skenneweg | * @brief Initialize the HRT Subscriber.
|
546 | 7d9678db | skenneweg | *
|
547 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to initialize. Must not be NULL.
|
548 | 7d9678db | skenneweg | */
|
549 | 5c6cb22f | skenneweg | void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
|
550 | { |
||
551 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
552 | |||
553 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
554 | urtEventListenerInit(subscriber->base.evtListener); |
||
555 | subscriber->base.lastMessage = NULL;
|
||
556 | subscriber->base.lastMessageTime = 0;
|
||
557 | |||
558 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING)
|
559 | 5c6cb22f | skenneweg | subscriber->base.sumLatencies = 0;
|
560 | subscriber->base.numMessagesReceived = 0;
|
||
561 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
562 | 5c6cb22f | skenneweg | |
563 | subscriber->next = NULL;
|
||
564 | |||
565 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
566 | 5c6cb22f | skenneweg | subscriber->deadlineOffset = 0;
|
567 | urtTimerInit(subscriber->qodDeadlineTimer); |
||
568 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
569 | 5c6cb22f | skenneweg | |
570 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_RATECHECKS)
|
571 | 5c6cb22f | skenneweg | subscriber->expectedRate = 0;
|
572 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
573 | 5c6cb22f | skenneweg | |
574 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
575 | 5c6cb22f | skenneweg | subscriber->maxJitter = 0;
|
576 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
577 | 5c6cb22f | skenneweg | |
578 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
579 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
580 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
581 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
582 | 5c6cb22f | skenneweg | return;
|
583 | } |
||
584 | |||
585 | 7d9678db | skenneweg | |
586 | /**
|
||
587 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
588 | *
|
||
589 | 5c6cb22f | skenneweg | * @param[in] subscriber The HRT subscriber which shall subscribe to a topic. Must not be NULL.
|
590 | 5198dfae | skenneweg | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
591 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
592 | * Messages must not be associated to another topic.
|
||
593 | * Once a message has been contributed, it cannot be removed later.
|
||
594 | * May be NULL(no messages to contribute).
|
||
595 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
596 | * @param[in] rate Expected minimum rate of new messages (= maximum time between consecutive messages).
|
||
597 | * A value of 0 indicates, that rate is of no concern.
|
||
598 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
599 | * A value of 0 indicates that jitter is of no concern.
|
||
600 | *
|
||
601 | * @return Returns URT_STATUS_OK on success.
|
||
602 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
603 | 7d9678db | skenneweg | */
|
604 | urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic, |
||
605 | urt_message_t* message, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter){return URT_STATUS_OK;}
|
||
606 | |||
607 | |||
608 | /**
|
||
609 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
610 | 7d9678db | skenneweg | *
|
611 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
612 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
613 | * @param[in] bytes Payload size in bytes.
|
||
614 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
615 | 7d9678db | skenneweg | *
|
616 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
617 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
618 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
619 | 7d9678db | skenneweg | */
|
620 | urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
||
621 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
622 | |||
623 | |||
624 | /**
|
||
625 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
626 | 7d9678db | skenneweg | *
|
627 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
628 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
629 | * @param[in] bytes Payload size in bytes.
|
||
630 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
631 | 7d9678db | skenneweg | *
|
632 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
633 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
634 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
635 | 7d9678db | skenneweg | */
|
636 | urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
||
637 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
638 | |||
639 | /**
|
||
640 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
641 | 7d9678db | skenneweg | *
|
642 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to be unsubscribed. Must not be NULL.
|
643 | 7d9678db | skenneweg | *
|
644 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
645 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
646 | 7d9678db | skenneweg | */
|
647 | urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber){return URT_STATUS_OK;} |