urtware / src / urt_subscriber.c @ 5b7188aa
History | View | Annotate | Download (27.824 KB)
1 | 1fb06240 | skenneweg | /*
|
---|---|---|---|
2 | µRtWare is a lightweight publish/subscribe middleware for real-time
|
||
3 | applications. It was developed as part of the software habitat for the
|
||
4 | Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
|
||
5 | |||
6 | Copyright (C) 2018..2020 Thomas Schöpping et al.
|
||
7 | |||
8 | This program is free software: you can redistribute it and/or modify
|
||
9 | it under the terms of the GNU General Public License as published by
|
||
10 | the Free Software Foundation, either version 3 of the License, or
|
||
11 | (at your option) any later version.
|
||
12 | |||
13 | This program is distributed in the hope that it will be useful,
|
||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
16 | GNU General Public License for more details.
|
||
17 | |||
18 | You should have received a copy of the GNU General Public License
|
||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
20 | */
|
||
21 | |||
22 | 7d9678db | skenneweg | #include <urtware.h> |
23 | |||
24 | 1fb06240 | skenneweg | /******************************************************************************/
|
25 | /* LOCAL DEFINITIONS */
|
||
26 | /******************************************************************************/
|
||
27 | |||
28 | /******************************************************************************/
|
||
29 | /* EXPORTED VARIABLES */
|
||
30 | /******************************************************************************/
|
||
31 | |||
32 | /******************************************************************************/
|
||
33 | /* LOCAL TYPES */
|
||
34 | /******************************************************************************/
|
||
35 | |||
36 | /******************************************************************************/
|
||
37 | /* LOCAL VARIABLES */
|
||
38 | /******************************************************************************/
|
||
39 | |||
40 | /******************************************************************************/
|
||
41 | /* LOCAL FUNCTIONS */
|
||
42 | /******************************************************************************/
|
||
43 | |||
44 | /******************************************************************************/
|
||
45 | /* EXPORTED FUNCTIONS */
|
||
46 | /******************************************************************************/
|
||
47 | |||
48 | 7d9678db | skenneweg | /**
|
49 | * @brief Initialize the nrt Subscriber.
|
||
50 | *
|
||
51 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to initialize. Must not be NULL.
|
52 | 7d9678db | skenneweg | */
|
53 | 5c6cb22f | skenneweg | void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
|
54 | { |
||
55 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
56 | |||
57 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
58 | urtEventListenerInit(subscriber->base.evtListener); |
||
59 | subscriber->base.lastMessage = NULL;
|
||
60 | subscriber->base.lastMessageTime = 0;
|
||
61 | a5e142de | skenneweg | #if (URT_CFG_PUBSUB_PROFILING == true) |
62 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
63 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
64 | a5e142de | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
65 | 5c6cb22f | skenneweg | return;
|
66 | } |
||
67 | 1fb06240 | skenneweg | |
68 | 7d9678db | skenneweg | /**
|
69 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
70 | 7d9678db | skenneweg | *
|
71 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
72 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
73 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
74 | * Messages must not be associated to another topic.
|
||
75 | * Once a message has been contributed, it cannot be removed later.
|
||
76 | * May be NULL(no messages to contribute).
|
||
77 | 7d9678db | skenneweg | *
|
78 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
79 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
80 | 7d9678db | skenneweg | */
|
81 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages) |
82 | { |
||
83 | urtDebugAssert(subscriber); |
||
84 | urtDebugAssert(topic); |
||
85 | |||
86 | if (!subscriber->base.topic)
|
||
87 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
88 | |||
89 | subscriber->base.topic = topic; |
||
90 | //TODO: Lock topic
|
||
91 | |||
92 | if (messages)
|
||
93 | { |
||
94 | urt_message_t* lastMessageContribute = messages; |
||
95 | while (lastMessageContribute->next)
|
||
96 | { |
||
97 | lastMessageContribute = lastMessageContribute->next; |
||
98 | } |
||
99 | lastMessageContribute->next = topic->latestMessage->next; |
||
100 | topic->latestMessage->next = messages; |
||
101 | } |
||
102 | |||
103 | subscriber->base.lastMessage = topic->latestMessage; |
||
104 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
105 | |||
106 | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
||
107 | |||
108 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
109 | topic->numHrtSubscribers--; |
||
110 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
111 | |||
112 | //TODO: Unlock topic
|
||
113 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
114 | 1fb06240 | skenneweg | } |
115 | |||
116 | 7d9678db | skenneweg | /**
|
117 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
118 | 7d9678db | skenneweg | *
|
119 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
120 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
121 | * @param[in] bytes Payload size in bytes.
|
||
122 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
123 | 7d9678db | skenneweg | *
|
124 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
125 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
126 | * Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
127 | 7d9678db | skenneweg | */
|
128 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
|
129 | { |
||
130 | urtDebugAssert(subscriber); |
||
131 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
132 | 1fb06240 | skenneweg | } |
133 | |||
134 | 7d9678db | skenneweg | /**
|
135 | a5e142de | skenneweg | * @brief Fetches the latest message.
|
136 | 7d9678db | skenneweg | *
|
137 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
138 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
139 | * @param[in] bytes Payload size in bytes.
|
||
140 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
141 | 7d9678db | skenneweg | *
|
142 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
143 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
144 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
145 | 7d9678db | skenneweg | */
|
146 | 1fb06240 | skenneweg | urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
|
147 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
148 | 1fb06240 | skenneweg | } |
149 | |||
150 | 7d9678db | skenneweg | /**
|
151 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
152 | 7d9678db | skenneweg | *
|
153 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
154 | 7d9678db | skenneweg | *
|
155 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
156 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
157 | 7d9678db | skenneweg | */
|
158 | 5b7188aa | skenneweg | urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) |
159 | { |
||
160 | a5e142de | skenneweg | if (subscriber->base.topic)
|
161 | { |
||
162 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
163 | //TODO: LOCK TOPIC
|
||
164 | 5b7188aa | skenneweg | subscriber->base.topic->numSubscribers--; |
165 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
166 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
167 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
168 | //TODO: Unlock TOPIC
|
||
169 | subscriber->base.topic = NULL;
|
||
170 | subscriber->base.lastMessage = NULL;
|
||
171 | subscriber->base.lastMessageTime = 0;
|
||
172 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
173 | return URT_STATUS_OK;
|
||
174 | } |
||
175 | 5b7188aa | skenneweg | |
176 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
177 | 1fb06240 | skenneweg | } |
178 | |||
179 | 7d9678db | skenneweg | |
180 | /**
|
||
181 | 5198dfae | skenneweg | * @brief Initialize the srt Subscriber.
|
182 | 7d9678db | skenneweg | *
|
183 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
184 | 7d9678db | skenneweg | */
|
185 | 5c6cb22f | skenneweg | void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
|
186 | { |
||
187 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
188 | |||
189 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
190 | urtEventListenerInit(subscriber->base.evtListener); |
||
191 | subscriber->base.lastMessage = NULL;
|
||
192 | subscriber->base.lastMessageTime = 0;
|
||
193 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
194 | subscriber->base.sumLatencies = 0;
|
||
195 | subscriber->base.numMessagesReceived = 0;
|
||
196 | subscriber->usefulnesscb = NULL;
|
||
197 | subscriber->cbparams = NULL;
|
||
198 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
199 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
200 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
201 | return;
|
||
202 | } |
||
203 | 7d9678db | skenneweg | |
204 | /**
|
||
205 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
206 | *
|
||
207 | * @param[in] subscriber The SRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
208 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
209 | * @param[in] message NULL terminated list of messages to contribute to the topic.
|
||
210 | * Messages must not be associated to another topic.
|
||
211 | * Once a message has been contributed, it cannot be removed later.
|
||
212 | * May be NULL (no messages to contribute)
|
||
213 | * @param[in] usefulnesscb Pointer to a function to calculate usefulness of a message. Must not be NULL.
|
||
214 | * @param[in] cbparams Optional parameters for the usefulness callback.
|
||
215 | * May be NULL if the callback expects no parameters.
|
||
216 | *
|
||
217 | * @return Returns URT_STATUS_OK on success.
|
||
218 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
219 | 7d9678db | skenneweg | */
|
220 | urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, |
||
221 | a5e142de | skenneweg | urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
222 | { |
||
223 | urtDebugAssert(subscriber); |
||
224 | urtDebugAssert(topic); |
||
225 | |||
226 | if (subscriber->base.topic)
|
||
227 | { |
||
228 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
229 | } |
||
230 | 5b7188aa | skenneweg | |
231 | subscriber->base.topic = topic; |
||
232 | subscriber->usefulnesscb = usefulnesscb; |
||
233 | subscriber->cbparams = cbparams; |
||
234 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
235 | 5b7188aa | skenneweg | subscriber->base.sumLatencies = 0;
|
236 | subscriber->base.numMessagesReceived = 0;
|
||
237 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
238 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
239 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
240 | |||
241 | //TODO: Lock topic
|
||
242 | if (messages)
|
||
243 | { |
||
244 | urt_message_t* lastMessageContribute = messages; |
||
245 | while (lastMessageContribute->next)
|
||
246 | { |
||
247 | lastMessageContribute = lastMessageContribute->next; |
||
248 | } |
||
249 | lastMessageContribute->next = topic->latestMessage->next; |
||
250 | topic->latestMessage->next = messages; |
||
251 | } |
||
252 | |||
253 | subscriber->base.lastMessage = topic->latestMessage; |
||
254 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
255 | |||
256 | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
||
257 | |||
258 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
259 | topic->numHrtSubscribers--; |
||
260 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
261 | |||
262 | //TODO: Unlock topic
|
||
263 | return URT_STATUS_OK;
|
||
264 | } |
||
265 | 7d9678db | skenneweg | |
266 | /**
|
||
267 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
268 | 7d9678db | skenneweg | *
|
269 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
270 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
271 | * @param[in] bytes Payload size in bytes.
|
||
272 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
273 | 7d9678db | skenneweg | *
|
274 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
275 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
276 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
277 | 7d9678db | skenneweg | */
|
278 | urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
||
279 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
280 | |||
281 | /**
|
||
282 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
283 | 7d9678db | skenneweg | *
|
284 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
285 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
286 | * @param[in] bytes Payload size in bytes.
|
||
287 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
288 | 7d9678db | skenneweg | *
|
289 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
290 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
291 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
292 | 7d9678db | skenneweg | */
|
293 | urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
||
294 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
295 | |||
296 | /**
|
||
297 | 5198dfae | skenneweg | * @brief Calculates the usefulness of the subscriber.
|
298 | 7d9678db | skenneweg | *
|
299 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
300 | * @param[in] latency Latency (of a message) as argument to calculate usefulness.
|
||
301 | 7d9678db | skenneweg | *
|
302 | 5198dfae | skenneweg | * @return Returns the usefulness as a value within [0,1].
|
303 | 7d9678db | skenneweg | */
|
304 | a5e142de | skenneweg | float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
|
305 | { |
||
306 | urtDebugAssert(subscriber); |
||
307 | |||
308 | return subscriber->usefulnesscb(latency);
|
||
309 | } |
||
310 | 7d9678db | skenneweg | |
311 | /**
|
||
312 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
313 | 7d9678db | skenneweg | *
|
314 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
315 | 7d9678db | skenneweg | *
|
316 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
317 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
318 | 7d9678db | skenneweg | */
|
319 | a5e142de | skenneweg | urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber) |
320 | { |
||
321 | urtDebugAssert(subscriber); |
||
322 | |||
323 | if (subscriber->base.topic)
|
||
324 | { |
||
325 | 5b7188aa | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
326 | //TODO: lock topic
|
||
327 | subscriber->base.topic->numSubscribers--; |
||
328 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
329 | a5e142de | skenneweg | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
330 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
331 | //TODO: unlock topic
|
||
332 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
333 | subscriber->base.topic = NULL;
|
||
334 | subscriber->base.lastMessage = NULL;
|
||
335 | subscriber->base.lastMessageTime = 0;
|
||
336 | return URT_STATUS_OK;
|
||
337 | } |
||
338 | |||
339 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
340 | } |
||
341 | 7d9678db | skenneweg | |
342 | |||
343 | /**
|
||
344 | 5198dfae | skenneweg | * @brief Initialize the FRT Subscriber.
|
345 | 7d9678db | skenneweg | *
|
346 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
347 | 7d9678db | skenneweg | */
|
348 | 5c6cb22f | skenneweg | void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
|
349 | { |
||
350 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
351 | |||
352 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
353 | urtEventListenerInit(subscriber->base.evtListener); |
||
354 | subscriber->base.lastMessage = NULL;
|
||
355 | subscriber->base.lastMessageTime = 0;
|
||
356 | |||
357 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
358 | subscriber->base.sumLatencies = 0;
|
||
359 | subscriber->base.numMessagesReceived = 0;
|
||
360 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
361 | |||
362 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
||
363 | subscriber->deadlineOffset = 0;
|
||
364 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
365 | |||
366 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
||
367 | subscriber->maxJitter = 0;
|
||
368 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
369 | |||
370 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
||
371 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
372 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
373 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
374 | return;
|
||
375 | } |
||
376 | |||
377 | 7d9678db | skenneweg | |
378 | /**
|
||
379 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
380 | *
|
||
381 | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
382 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
383 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
384 | * Messages must not be associated to another topic.
|
||
385 | * Once a message has been contributed, it cannot be removed later.
|
||
386 | * May be NULL(no messages to contribute).
|
||
387 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
388 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
389 | * A value of 0 indicates that jitter is of no concern.
|
||
390 | *
|
||
391 | * @return Returns URT_STATUS_OK on success.
|
||
392 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
393 | 7d9678db | skenneweg | */
|
394 | urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic, |
||
395 | a5e142de | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter) |
396 | { |
||
397 | urtDebugAssert(subscriber); |
||
398 | urtDebugAssert(topic); |
||
399 | |||
400 | 5b7188aa | skenneweg | if (subscriber->base.topic)
|
401 | { |
||
402 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
403 | } |
||
404 | |||
405 | subscriber->base.topic = topic; |
||
406 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
407 | 5b7188aa | skenneweg | subscriber->base.sumLatencies = 0;
|
408 | subscriber->base.numMessagesReceived = 0;
|
||
409 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
410 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
411 | 5b7188aa | skenneweg | subscriber->deadlineOffset = deadline; |
412 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
413 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
414 | 5b7188aa | skenneweg | subscriber->maxJitter =jitter; |
415 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
416 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
417 | 5b7188aa | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
418 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
419 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
420 | |||
421 | 5b7188aa | skenneweg | //TODO: Lock topic
|
422 | if (messages)
|
||
423 | { |
||
424 | urt_message_t* lastMessageContribute = messages; |
||
425 | while (lastMessageContribute->next)
|
||
426 | a5e142de | skenneweg | { |
427 | 5b7188aa | skenneweg | lastMessageContribute = lastMessageContribute->next; |
428 | a5e142de | skenneweg | } |
429 | 5b7188aa | skenneweg | lastMessageContribute->next = topic->latestMessage->next; |
430 | topic->latestMessage->next = messages; |
||
431 | } |
||
432 | a5e142de | skenneweg | |
433 | 5b7188aa | skenneweg | subscriber->base.lastMessage = topic->latestMessage; |
434 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
435 | a5e142de | skenneweg | |
436 | 5b7188aa | skenneweg | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
437 | a5e142de | skenneweg | |
438 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
439 | 5b7188aa | skenneweg | topic->numHrtSubscribers--; |
440 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
441 | |||
442 | 5b7188aa | skenneweg | //TODO: Unlock topic
|
443 | return URT_STATUS_OK;
|
||
444 | a5e142de | skenneweg | } |
445 | 7d9678db | skenneweg | |
446 | /**
|
||
447 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
448 | 7d9678db | skenneweg | *
|
449 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
450 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
451 | * @param[in] bytes Payload size in bytes.
|
||
452 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
453 | 7d9678db | skenneweg | *
|
454 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
455 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
456 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
457 | 7d9678db | skenneweg | */
|
458 | urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
||
459 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
460 | |||
461 | /**
|
||
462 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
463 | 7d9678db | skenneweg | *
|
464 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
465 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
466 | * @param[in] bytes Payload size in bytes.
|
||
467 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
468 | 7d9678db | skenneweg | *
|
469 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
470 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
471 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
472 | 7d9678db | skenneweg | */
|
473 | urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
||
474 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
475 | |||
476 | /**
|
||
477 | 5198dfae | skenneweg | * @brief Calculates the validity from the subscriber.
|
478 | 7d9678db | skenneweg | *
|
479 | 5198dfae | skenneweg | * @param[in] subscriber The FRT subscriber to calculate a validity for. Must not be NULL.
|
480 | * @param[in] latency Latency (of a message) as argument to calculate validity.
|
||
481 | 7d9678db | skenneweg | *
|
482 | 5198dfae | skenneweg | * @return Returns a boolean indicator whether the latency is fine.
|
483 | 7d9678db | skenneweg | */
|
484 | a5e142de | skenneweg | bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
|
485 | { |
||
486 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
487 | 5b7188aa | skenneweg | if (latency < subscriber->deadlineOffset && latency < subscriber->maxJitter)
|
488 | return true; |
||
489 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
490 | 5b7188aa | skenneweg | |
491 | a5e142de | skenneweg | return false; |
492 | } |
||
493 | 7d9678db | skenneweg | |
494 | /**
|
||
495 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
496 | 7d9678db | skenneweg | *
|
497 | 5c6cb22f | skenneweg | * @param[in] subscriber The FRT subscriber to be unsubscribed. Must not be NULL.
|
498 | 7d9678db | skenneweg | *
|
499 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
500 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
501 | 7d9678db | skenneweg | */
|
502 | 5b7188aa | skenneweg | urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber) |
503 | { |
||
504 | urtDebugAssert(subscriber); |
||
505 | |||
506 | if (subscriber->base.topic)
|
||
507 | { |
||
508 | //TODO: lock topic
|
||
509 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
510 | //TODO: decrement topic's HRT counter
|
||
511 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
512 | subscriber->base.topic->numSubscribers--; |
||
513 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
514 | //Hier weiter
|
||
515 | |||
516 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
517 | //TODO: unlock topic
|
||
518 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
519 | subscriber->base.topic = NULL;
|
||
520 | subscriber->base.lastMessage = NULL;
|
||
521 | subscriber->base.lastMessageTime = 0;
|
||
522 | return URT_STATUS_OK;
|
||
523 | } |
||
524 | |||
525 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
526 | } |
||
527 | 7d9678db | skenneweg | |
528 | |||
529 | /**
|
||
530 | 5198dfae | skenneweg | * @brief Initialize the HRT Subscriber.
|
531 | 7d9678db | skenneweg | *
|
532 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to initialize. Must not be NULL.
|
533 | 7d9678db | skenneweg | */
|
534 | 5c6cb22f | skenneweg | void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
|
535 | { |
||
536 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
537 | |||
538 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
539 | urtEventListenerInit(subscriber->base.evtListener); |
||
540 | subscriber->base.lastMessage = NULL;
|
||
541 | subscriber->base.lastMessageTime = 0;
|
||
542 | |||
543 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING)
|
544 | 5c6cb22f | skenneweg | subscriber->base.sumLatencies = 0;
|
545 | subscriber->base.numMessagesReceived = 0;
|
||
546 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
547 | 5c6cb22f | skenneweg | |
548 | subscriber->next = NULL;
|
||
549 | |||
550 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
551 | 5c6cb22f | skenneweg | subscriber->deadlineOffset = 0;
|
552 | urtTimerInit(subscriber->qodDeadlineTimer); |
||
553 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
554 | 5c6cb22f | skenneweg | |
555 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_RATECHECKS)
|
556 | 5c6cb22f | skenneweg | subscriber->expectedRate = 0;
|
557 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
558 | 5c6cb22f | skenneweg | |
559 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
560 | 5c6cb22f | skenneweg | subscriber->maxJitter = 0;
|
561 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
562 | 5c6cb22f | skenneweg | |
563 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
564 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
565 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
566 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
567 | 5c6cb22f | skenneweg | return;
|
568 | } |
||
569 | |||
570 | 7d9678db | skenneweg | |
571 | /**
|
||
572 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
573 | *
|
||
574 | 5c6cb22f | skenneweg | * @param[in] subscriber The HRT subscriber which shall subscribe to a topic. Must not be NULL.
|
575 | 5198dfae | skenneweg | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
576 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
577 | * Messages must not be associated to another topic.
|
||
578 | * Once a message has been contributed, it cannot be removed later.
|
||
579 | * May be NULL(no messages to contribute).
|
||
580 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
581 | * @param[in] rate Expected minimum rate of new messages (= maximum time between consecutive messages).
|
||
582 | * A value of 0 indicates, that rate is of no concern.
|
||
583 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
584 | * A value of 0 indicates that jitter is of no concern.
|
||
585 | *
|
||
586 | * @return Returns URT_STATUS_OK on success.
|
||
587 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
588 | 7d9678db | skenneweg | */
|
589 | urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic, |
||
590 | 5b7188aa | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter) |
591 | { |
||
592 | urtDebugAssert(subscriber); |
||
593 | urtDebugAssert(topic); |
||
594 | |||
595 | if (subscriber->base.topic)
|
||
596 | { |
||
597 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
598 | } |
||
599 | |||
600 | subscriber->base.topic = topic; |
||
601 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
602 | subscriber->base.sumLatencies = 0;
|
||
603 | subscriber->base.numMessagesReceived = 0;
|
||
604 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
605 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
606 | subscriber->deadlineOffset = deadline; |
||
607 | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
608 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
609 | subscriber->maxJitter =jitter; |
||
610 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
611 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
612 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
613 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
614 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
615 | # if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
616 | subscriber->expectedRate = rate; |
||
617 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
618 | |||
619 | //TODO: Lock topic
|
||
620 | if (messages)
|
||
621 | { |
||
622 | urt_message_t* lastMessageContribute = messages; |
||
623 | while (lastMessageContribute->next)
|
||
624 | { |
||
625 | lastMessageContribute = lastMessageContribute->next; |
||
626 | } |
||
627 | lastMessageContribute->next = topic->latestMessage->next; |
||
628 | topic->latestMessage->next = messages; |
||
629 | } |
||
630 | |||
631 | subscriber->base.lastMessage = topic->latestMessage; |
||
632 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
633 | |||
634 | urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
|
||
635 | |||
636 | # if(URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
637 | urt_hrtsubscriber_t* hrtSubscriber = subscriber->base.topic->hrtSubscribers; |
||
638 | while (!hrtSubscriber /* && expected Rate is lower */) |
||
639 | { |
||
640 | hrtSubscriber = hrtSubscriber->next; |
||
641 | } |
||
642 | |||
643 | if (!hrtSubscriber)
|
||
644 | { |
||
645 | //TODO: Append self to topic's list of HRT subscribers
|
||
646 | } |
||
647 | else
|
||
648 | { |
||
649 | //TOOD: insert self in front of current HRT susbcriber
|
||
650 | subscriber->next = hrtSubscriber; |
||
651 | } |
||
652 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
653 | |||
654 | topic->numHrtSubscribers--; |
||
655 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
656 | topic->numSubscribers--; |
||
657 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
658 | |||
659 | //TODO: Unlock topic
|
||
660 | return URT_STATUS_OK;
|
||
661 | } |
||
662 | 7d9678db | skenneweg | |
663 | |||
664 | /**
|
||
665 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
666 | 7d9678db | skenneweg | *
|
667 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
668 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
669 | * @param[in] bytes Payload size in bytes.
|
||
670 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
671 | 7d9678db | skenneweg | *
|
672 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
673 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
674 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
675 | 7d9678db | skenneweg | */
|
676 | urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
||
677 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
678 | |||
679 | |||
680 | /**
|
||
681 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
682 | 7d9678db | skenneweg | *
|
683 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
684 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
685 | * @param[in] bytes Payload size in bytes.
|
||
686 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
687 | 7d9678db | skenneweg | *
|
688 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
689 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
690 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
691 | 7d9678db | skenneweg | */
|
692 | urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
||
693 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
694 | |||
695 | /**
|
||
696 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
697 | 7d9678db | skenneweg | *
|
698 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to be unsubscribed. Must not be NULL.
|
699 | 7d9678db | skenneweg | *
|
700 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
701 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
702 | 7d9678db | skenneweg | */
|
703 | 5b7188aa | skenneweg | urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber) |
704 | { |
||
705 | urtDebugAssert(subscriber); |
||
706 | |||
707 | if (subscriber->base.topic)
|
||
708 | { |
||
709 | //TODO: lock topic
|
||
710 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
711 | subscriber->base.topic->numHrtSubscribers--; |
||
712 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
713 | subscriber->base.topic->numSubscribers--; |
||
714 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
715 | # if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
716 | //TODO: remove self from topics lsit of HRT subscribers
|
||
717 | //TODO: ...
|
||
718 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
719 | |||
720 | urt_message_t* messageTemp = subscriber->base.lastMessage; |
||
721 | while (messageTemp->next->originTime < messageTemp->originTime)
|
||
722 | { |
||
723 | messageTemp = messageTemp->next; |
||
724 | messageTemp->numHrtConsumersLeft--; |
||
725 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
726 | messageTemp->numConsumersLeft--; |
||
727 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
728 | } |
||
729 | bool temp = false; |
||
730 | if (temp /*TODO: HRT counter of any message became 0?*/) |
||
731 | { |
||
732 | //TODO: signal topics condition variable
|
||
733 | } |
||
734 | |||
735 | //TODO: unlock topic
|
||
736 | subscriber->base.topic = NULL;
|
||
737 | subscriber->base.lastMessage = NULL;
|
||
738 | subscriber->base.lastMessageTime = 0;
|
||
739 | return URT_STATUS_OK;
|
||
740 | } |
||
741 | |||
742 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
743 | } |
||
744 | |||
745 |