urtware / src / urt_subscriber.c @ 982056f7
History | View | Annotate | Download (42.379 KB)
1 | 1fb06240 | skenneweg | /*
|
---|---|---|---|
2 | µRtWare is a lightweight publish/subscribe middleware for real-time
|
||
3 | applications. It was developed as part of the software habitat for the
|
||
4 | Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
|
||
5 | |||
6 | Copyright (C) 2018..2020 Thomas Schöpping et al.
|
||
7 | |||
8 | This program is free software: you can redistribute it and/or modify
|
||
9 | it under the terms of the GNU General Public License as published by
|
||
10 | the Free Software Foundation, either version 3 of the License, or
|
||
11 | (at your option) any later version.
|
||
12 | |||
13 | This program is distributed in the hope that it will be useful,
|
||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
16 | GNU General Public License for more details.
|
||
17 | |||
18 | You should have received a copy of the GNU General Public License
|
||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
20 | */
|
||
21 | |||
22 | 7d9678db | skenneweg | #include <urtware.h> |
23 | 982056f7 | Svenja | #include <stdio.h> |
24 | 7d9678db | skenneweg | |
25 | 1fb06240 | skenneweg | /******************************************************************************/
|
26 | /* LOCAL DEFINITIONS */
|
||
27 | /******************************************************************************/
|
||
28 | |||
29 | /******************************************************************************/
|
||
30 | /* EXPORTED VARIABLES */
|
||
31 | /******************************************************************************/
|
||
32 | |||
33 | /******************************************************************************/
|
||
34 | /* LOCAL TYPES */
|
||
35 | /******************************************************************************/
|
||
36 | |||
37 | /******************************************************************************/
|
||
38 | /* LOCAL VARIABLES */
|
||
39 | /******************************************************************************/
|
||
40 | |||
41 | /******************************************************************************/
|
||
42 | /* LOCAL FUNCTIONS */
|
||
43 | /******************************************************************************/
|
||
44 | |||
45 | 982056f7 | Svenja | void urtFetchMessage (urt_message_t* message, urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes) |
46 | 65dc89cb | skenneweg | { |
47 | 982056f7 | Svenja | subscriber->base.lastMessage = message; |
48 | *subscriber->base.lastMessageTime = message->originTime; |
||
49 | memcpy(message->payload, payload, bytes); |
||
50 | 65dc89cb | skenneweg | } |
51 | |||
52 | urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage) |
||
53 | { |
||
54 | while (oldestMessage->next->originTime < oldestMessage->originTime)
|
||
55 | { |
||
56 | oldestMessage = oldestMessage->next; |
||
57 | } |
||
58 | return oldestMessage;
|
||
59 | } |
||
60 | |||
61 | urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage) |
||
62 | { |
||
63 | urt_message_t* lastMessage = subscriber->base.lastMessage; |
||
64 | while (lastMessage->next->originTime < lastMessage->originTime)
|
||
65 | { |
||
66 | lastMessage = lastMessage->next; |
||
67 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
68 | subscriber->base.lastMessage->numConsumersLeft--; |
||
69 | subscriber->base->numMessagesReceived++; |
||
70 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
71 | } |
||
72 | } |
||
73 | |||
74 | 982056f7 | Svenja | void urtContributeMessages(urt_message_t* messages)
|
75 | { |
||
76 | urt_message_t* lastMessageContribute = messages; |
||
77 | while (lastMessageContribute->next)
|
||
78 | { |
||
79 | lastMessageContribute = lastMessageContribute->next; |
||
80 | } |
||
81 | lastMessageContribute->next = topic->latestMessage->next; |
||
82 | topic->latestMessage->next = messages; |
||
83 | } |
||
84 | |||
85 | 1fb06240 | skenneweg | /******************************************************************************/
|
86 | /* EXPORTED FUNCTIONS */
|
||
87 | /******************************************************************************/
|
||
88 | |||
89 | 7d9678db | skenneweg | /**
|
90 | * @brief Initialize the nrt Subscriber.
|
||
91 | *
|
||
92 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to initialize. Must not be NULL.
|
93 | 7d9678db | skenneweg | */
|
94 | 5c6cb22f | skenneweg | void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
|
95 | { |
||
96 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
97 | |||
98 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
99 | urtEventListenerInit(subscriber->base.evtListener); |
||
100 | subscriber->base.lastMessage = NULL;
|
||
101 | subscriber->base.lastMessageTime = 0;
|
||
102 | a5e142de | skenneweg | #if (URT_CFG_PUBSUB_PROFILING == true) |
103 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
104 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
105 | a5e142de | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
106 | 5c6cb22f | skenneweg | return;
|
107 | } |
||
108 | 1fb06240 | skenneweg | |
109 | 7d9678db | skenneweg | /**
|
110 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
111 | 7d9678db | skenneweg | *
|
112 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
113 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
114 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
115 | * Messages must not be associated to another topic.
|
||
116 | * Once a message has been contributed, it cannot be removed later.
|
||
117 | * May be NULL(no messages to contribute).
|
||
118 | 7d9678db | skenneweg | *
|
119 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
120 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
121 | 7d9678db | skenneweg | */
|
122 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages) |
123 | { |
||
124 | urtDebugAssert(subscriber); |
||
125 | urtDebugAssert(topic); |
||
126 | |||
127 | 982056f7 | Svenja | if (!subscriber->base.topic) {
|
128 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
129 | } |
||
130 | a5e142de | skenneweg | |
131 | subscriber->base.topic = topic; |
||
132 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
133 | a5e142de | skenneweg | |
134 | 982056f7 | Svenja | if (messages) {
|
135 | urtContributeMessages(messages); |
||
136 | a5e142de | skenneweg | } |
137 | |||
138 | subscriber->base.lastMessage = topic->latestMessage; |
||
139 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
140 | |||
141 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
142 | a5e142de | skenneweg | |
143 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
144 | topic->numHrtSubscribers--; |
||
145 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
146 | |||
147 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
148 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
149 | 1fb06240 | skenneweg | } |
150 | |||
151 | 7d9678db | skenneweg | /**
|
152 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
153 | 7d9678db | skenneweg | *
|
154 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
155 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
156 | * @param[in] bytes Payload size in bytes.
|
||
157 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
158 | 7d9678db | skenneweg | *
|
159 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
160 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
161 | * Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
162 | 7d9678db | skenneweg | */
|
163 | 982056f7 | Svenja | urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) |
164 | 65dc89cb | skenneweg | { |
165 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
166 | 65dc89cb | skenneweg | |
167 | 982056f7 | Svenja | if (!subscriber->base.topic) {
|
168 | return URT_STATUS_FETCH_NOTOPIC;
|
||
169 | } |
||
170 | 65dc89cb | skenneweg | |
171 | urtMutexLock(subscriber->base.topic->lock); |
||
172 | |||
173 | 982056f7 | Svenja | urt_message_t* oldestMessage = subscriber->base.lastMessage; |
174 | if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
|
||
175 | if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
|
||
176 | 65dc89cb | skenneweg | urtMutexUnlock(subscriber->base.topic->lock); |
177 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
178 | } |
||
179 | 982056f7 | Svenja | oldestMessage = oldestMessage->next; |
180 | 65dc89cb | skenneweg | } |
181 | 982056f7 | Svenja | else {
|
182 | oldestMessage = urtFindOldestMessage(oldestMessage->next); |
||
183 | 65dc89cb | skenneweg | } |
184 | |||
185 | 982056f7 | Svenja | urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
186 | |||
187 | if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
||
188 | uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
||
189 | 65dc89cb | skenneweg | |
190 | #if(URT_CFG_PUBSUB_PROFILING == true) |
||
191 | subscriber->base.sumLatencies += calculatedLatency; |
||
192 | |||
193 | 982056f7 | Svenja | if (calculatedLatency < subscriber->minLatency) {
|
194 | 65dc89cb | skenneweg | subscriber->minLatency = calculatedLatency; |
195 | } |
||
196 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency) { |
197 | 65dc89cb | skenneweg | subscriber->maxLatency = calculatedLatency; |
198 | } |
||
199 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
200 | 982056f7 | Svenja | |
201 | if (latency) {
|
||
202 | latency = calculatedLatency; |
||
203 | } |
||
204 | 65dc89cb | skenneweg | } |
205 | |||
206 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
207 | subscriber->base.lastMessage->numConsumersLeft--; |
||
208 | subscriber->base->numMessagesReceived++; |
||
209 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
210 | |||
211 | urtMutexUnlock(subscriber->base.topic->lock); |
||
212 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
213 | 1fb06240 | skenneweg | } |
214 | |||
215 | 7d9678db | skenneweg | /**
|
216 | a5e142de | skenneweg | * @brief Fetches the latest message.
|
217 | 7d9678db | skenneweg | *
|
218 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
219 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
220 | * @param[in] bytes Payload size in bytes.
|
||
221 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
222 | 7d9678db | skenneweg | *
|
223 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
224 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
225 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
226 | 7d9678db | skenneweg | */
|
227 | 982056f7 | Svenja | urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency) { |
228 | 65dc89cb | skenneweg | urtDebugAssert(subscriber); |
229 | |||
230 | if (!subscriber->base.topic)
|
||
231 | return URT_STATUS_FETCH_NOTOPIC;
|
||
232 | |||
233 | urtMutexLock(subscriber->base.topic->lock); |
||
234 | urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
||
235 | |||
236 | if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
||
237 | { |
||
238 | urtMutexUnlock(subscriber->base.topic->lock); |
||
239 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
240 | } |
||
241 | |||
242 | 982056f7 | Svenja | urtFetchMessage(lastMessage, subscriber, payload, bytes); |
243 | |||
244 | if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
||
245 | uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime; |
||
246 | 65dc89cb | skenneweg | |
247 | #if(URT_CFG_PUBSUB_PROFILING == true) |
||
248 | subscriber->base.sumLatencies += calculatedLatency; |
||
249 | |||
250 | 982056f7 | Svenja | if (calculatedLatency < subscriber->minLatency) {
|
251 | 65dc89cb | skenneweg | subscriber->minLatency = calculatedLatency; |
252 | } |
||
253 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency) { |
254 | 65dc89cb | skenneweg | subscriber->maxLatency = calculatedLatency; |
255 | } |
||
256 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
257 | 982056f7 | Svenja | |
258 | if (latency) {
|
||
259 | latency = calculatedLatency; |
||
260 | } |
||
261 | 65dc89cb | skenneweg | } |
262 | |||
263 | urtMutexUnlock(subscriber->base.topic->lock); |
||
264 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
265 | 1fb06240 | skenneweg | } |
266 | |||
267 | 7d9678db | skenneweg | /**
|
268 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
269 | 7d9678db | skenneweg | *
|
270 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
271 | 7d9678db | skenneweg | *
|
272 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
273 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
274 | 7d9678db | skenneweg | */
|
275 | 5b7188aa | skenneweg | urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) |
276 | { |
||
277 | a5e142de | skenneweg | if (subscriber->base.topic)
|
278 | { |
||
279 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
280 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
281 | 5b7188aa | skenneweg | subscriber->base.topic->numSubscribers--; |
282 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
283 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
284 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
285 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
286 | a5e142de | skenneweg | subscriber->base.topic = NULL;
|
287 | subscriber->base.lastMessage = NULL;
|
||
288 | subscriber->base.lastMessageTime = 0;
|
||
289 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
290 | return URT_STATUS_OK;
|
||
291 | } |
||
292 | 5b7188aa | skenneweg | |
293 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
294 | 1fb06240 | skenneweg | } |
295 | |||
296 | 7d9678db | skenneweg | |
297 | /**
|
||
298 | 5198dfae | skenneweg | * @brief Initialize the srt Subscriber.
|
299 | 7d9678db | skenneweg | *
|
300 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
301 | 7d9678db | skenneweg | */
|
302 | 5c6cb22f | skenneweg | void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
|
303 | { |
||
304 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
305 | |||
306 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
307 | urtEventListenerInit(subscriber->base.evtListener); |
||
308 | subscriber->base.lastMessage = NULL;
|
||
309 | subscriber->base.lastMessageTime = 0;
|
||
310 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
311 | subscriber->base.sumLatencies = 0;
|
||
312 | subscriber->base.numMessagesReceived = 0;
|
||
313 | subscriber->usefulnesscb = NULL;
|
||
314 | subscriber->cbparams = NULL;
|
||
315 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
316 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
317 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
318 | return;
|
||
319 | } |
||
320 | 7d9678db | skenneweg | |
321 | /**
|
||
322 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
323 | *
|
||
324 | * @param[in] subscriber The SRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
325 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
326 | * @param[in] message NULL terminated list of messages to contribute to the topic.
|
||
327 | * Messages must not be associated to another topic.
|
||
328 | * Once a message has been contributed, it cannot be removed later.
|
||
329 | * May be NULL (no messages to contribute)
|
||
330 | * @param[in] usefulnesscb Pointer to a function to calculate usefulness of a message. Must not be NULL.
|
||
331 | * @param[in] cbparams Optional parameters for the usefulness callback.
|
||
332 | * May be NULL if the callback expects no parameters.
|
||
333 | *
|
||
334 | * @return Returns URT_STATUS_OK on success.
|
||
335 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
336 | 7d9678db | skenneweg | */
|
337 | urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, |
||
338 | a5e142de | skenneweg | urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
339 | { |
||
340 | urtDebugAssert(subscriber); |
||
341 | urtDebugAssert(topic); |
||
342 | |||
343 | if (subscriber->base.topic)
|
||
344 | { |
||
345 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
346 | } |
||
347 | 5b7188aa | skenneweg | |
348 | subscriber->base.topic = topic; |
||
349 | subscriber->usefulnesscb = usefulnesscb; |
||
350 | subscriber->cbparams = cbparams; |
||
351 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
352 | 5b7188aa | skenneweg | subscriber->base.sumLatencies = 0;
|
353 | subscriber->base.numMessagesReceived = 0;
|
||
354 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
355 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
356 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
357 | |||
358 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
359 | a5e142de | skenneweg | if (messages)
|
360 | { |
||
361 | 982056f7 | Svenja | urtContributeMessages(messages); |
362 | a5e142de | skenneweg | } |
363 | |||
364 | subscriber->base.lastMessage = topic->latestMessage; |
||
365 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
366 | |||
367 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
368 | a5e142de | skenneweg | |
369 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
370 | topic->numHrtSubscribers--; |
||
371 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
372 | |||
373 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
374 | a5e142de | skenneweg | return URT_STATUS_OK;
|
375 | } |
||
376 | 7d9678db | skenneweg | |
377 | /**
|
||
378 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
379 | 7d9678db | skenneweg | *
|
380 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
381 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
382 | * @param[in] bytes Payload size in bytes.
|
||
383 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
384 | 7d9678db | skenneweg | *
|
385 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
386 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
387 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
388 | 7d9678db | skenneweg | */
|
389 | 982056f7 | Svenja | urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* const payload, |
390 | 65dc89cb | skenneweg | size_t bytes, urt_delay_t* latency) |
391 | { |
||
392 | urtDebugAssert(subscriber); |
||
393 | |||
394 | if (!subscriber->base.topic)
|
||
395 | return URT_STATUS_FETCH_NOTOPIC;
|
||
396 | |||
397 | urtMutexLock(subscriber->base.topic->lock); |
||
398 | |||
399 | 982056f7 | Svenja | urt_message_t* oldestMessage = subscriber->base.lastMessage; |
400 | if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
||
401 | 65dc89cb | skenneweg | { |
402 | if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
||
403 | { |
||
404 | urtMutexUnlock(subscriber->base.topic->lock); |
||
405 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
406 | } |
||
407 | 982056f7 | Svenja | oldestMessage = oldestMessage->next; |
408 | 65dc89cb | skenneweg | } |
409 | else
|
||
410 | { |
||
411 | 982056f7 | Svenja | oldestMessage = urtFindOldestMessage(oldestMessage->next); |
412 | 65dc89cb | skenneweg | } |
413 | |||
414 | 982056f7 | Svenja | urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
415 | |||
416 | if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
||
417 | uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
||
418 | 65dc89cb | skenneweg | |
419 | #if(URT_CFG_PUBSUB_PROFILING == true) |
||
420 | subscriber->base.sumLatencies += calculatedLatency; |
||
421 | |||
422 | 982056f7 | Svenja | if (calculatedLatency < subscriber->minLatency) {
|
423 | 65dc89cb | skenneweg | subscriber->minLatency = calculatedLatency; |
424 | } |
||
425 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency) { |
426 | 65dc89cb | skenneweg | subscriber->maxLatency = calculatedLatency; |
427 | } |
||
428 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
429 | 982056f7 | Svenja | |
430 | if (latency) {
|
||
431 | latency = calculatedLatency; |
||
432 | } |
||
433 | 65dc89cb | skenneweg | } |
434 | |||
435 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
436 | subscriber->base.lastMessage->numConsumersLeft--; |
||
437 | subscriber->base->numMessagesReceived++; |
||
438 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
439 | |||
440 | urtMutexUnlock(subscriber->base.topic->lock); |
||
441 | return URT_STATUS_OK;
|
||
442 | } |
||
443 | 7d9678db | skenneweg | |
444 | /**
|
||
445 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
446 | 7d9678db | skenneweg | *
|
447 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
448 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
449 | * @param[in] bytes Payload size in bytes.
|
||
450 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
451 | 7d9678db | skenneweg | *
|
452 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
453 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
454 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
455 | 7d9678db | skenneweg | */
|
456 | 982056f7 | Svenja | urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* const payload, |
457 | 65dc89cb | skenneweg | size_t bytes, urt_delay_t* latency) |
458 | { |
||
459 | urtDebugAssert(subscriber); |
||
460 | |||
461 | 982056f7 | Svenja | if (!subscriber->base.topic) {
|
462 | return URT_STATUS_FETCH_NOTOPIC;
|
||
463 | } |
||
464 | 65dc89cb | skenneweg | |
465 | urtMutexLock(subscriber->base.topic->lock); |
||
466 | urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
||
467 | |||
468 | if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
||
469 | { |
||
470 | urtMutexUnlock(subscriber->base.topic->lock); |
||
471 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
472 | } |
||
473 | |||
474 | 982056f7 | Svenja | urtFetchMessage(lastMessage, subscriber, payload, bytes); |
475 | |||
476 | if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
||
477 | uint64_t calculatedLatency = urtTimeNow() - lastMessage->originTime; |
||
478 | 65dc89cb | skenneweg | |
479 | #if(URT_CFG_PUBSUB_PROFILING == true) |
||
480 | subscriber->base.sumLatencies += calculatedLatency; |
||
481 | |||
482 | 982056f7 | Svenja | if (calculatedLatency < subscriber->minLatency) {
|
483 | 65dc89cb | skenneweg | subscriber->minLatency = calculatedLatency; |
484 | } |
||
485 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency) { |
486 | 65dc89cb | skenneweg | subscriber->maxLatency = calculatedLatency; |
487 | } |
||
488 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
489 | 982056f7 | Svenja | |
490 | if (latency) {
|
||
491 | latency = calculatedLatency; |
||
492 | } |
||
493 | 65dc89cb | skenneweg | } |
494 | |||
495 | urtMutexUnlock(subscriber->base.topic->lock); |
||
496 | return URT_STATUS_OK;
|
||
497 | } |
||
498 | 7d9678db | skenneweg | |
499 | /**
|
||
500 | 5198dfae | skenneweg | * @brief Calculates the usefulness of the subscriber.
|
501 | 7d9678db | skenneweg | *
|
502 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
503 | * @param[in] latency Latency (of a message) as argument to calculate usefulness.
|
||
504 | 7d9678db | skenneweg | *
|
505 | 5198dfae | skenneweg | * @return Returns the usefulness as a value within [0,1].
|
506 | 7d9678db | skenneweg | */
|
507 | a5e142de | skenneweg | float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
|
508 | { |
||
509 | urtDebugAssert(subscriber); |
||
510 | |||
511 | return subscriber->usefulnesscb(latency);
|
||
512 | } |
||
513 | 7d9678db | skenneweg | |
514 | /**
|
||
515 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
516 | 7d9678db | skenneweg | *
|
517 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
518 | 7d9678db | skenneweg | *
|
519 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
520 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
521 | 7d9678db | skenneweg | */
|
522 | a5e142de | skenneweg | urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber) |
523 | { |
||
524 | urtDebugAssert(subscriber); |
||
525 | |||
526 | if (subscriber->base.topic)
|
||
527 | { |
||
528 | 5b7188aa | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
529 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
530 | 5b7188aa | skenneweg | subscriber->base.topic->numSubscribers--; |
531 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
532 | a5e142de | skenneweg | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
533 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
534 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
535 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
536 | subscriber->base.topic = NULL;
|
||
537 | subscriber->base.lastMessage = NULL;
|
||
538 | subscriber->base.lastMessageTime = 0;
|
||
539 | return URT_STATUS_OK;
|
||
540 | } |
||
541 | |||
542 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
543 | } |
||
544 | 7d9678db | skenneweg | |
545 | |||
546 | /**
|
||
547 | 5198dfae | skenneweg | * @brief Initialize the FRT Subscriber.
|
548 | 7d9678db | skenneweg | *
|
549 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
550 | 7d9678db | skenneweg | */
|
551 | 5c6cb22f | skenneweg | void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
|
552 | { |
||
553 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
554 | |||
555 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
556 | urtEventListenerInit(subscriber->base.evtListener); |
||
557 | subscriber->base.lastMessage = NULL;
|
||
558 | subscriber->base.lastMessageTime = 0;
|
||
559 | |||
560 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
561 | subscriber->base.sumLatencies = 0;
|
||
562 | subscriber->base.numMessagesReceived = 0;
|
||
563 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
564 | |||
565 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
||
566 | subscriber->deadlineOffset = 0;
|
||
567 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
568 | |||
569 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
||
570 | subscriber->maxJitter = 0;
|
||
571 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
572 | |||
573 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
||
574 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
575 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
576 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
577 | return;
|
||
578 | } |
||
579 | |||
580 | 7d9678db | skenneweg | |
581 | /**
|
||
582 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
583 | *
|
||
584 | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
585 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
586 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
587 | * Messages must not be associated to another topic.
|
||
588 | * Once a message has been contributed, it cannot be removed later.
|
||
589 | * May be NULL(no messages to contribute).
|
||
590 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
591 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
592 | * A value of 0 indicates that jitter is of no concern.
|
||
593 | *
|
||
594 | * @return Returns URT_STATUS_OK on success.
|
||
595 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
596 | 7d9678db | skenneweg | */
|
597 | urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic, |
||
598 | a5e142de | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter) |
599 | { |
||
600 | urtDebugAssert(subscriber); |
||
601 | urtDebugAssert(topic); |
||
602 | |||
603 | 5b7188aa | skenneweg | if (subscriber->base.topic)
|
604 | { |
||
605 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
606 | } |
||
607 | |||
608 | subscriber->base.topic = topic; |
||
609 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
610 | 5b7188aa | skenneweg | subscriber->base.sumLatencies = 0;
|
611 | subscriber->base.numMessagesReceived = 0;
|
||
612 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
613 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
614 | 5b7188aa | skenneweg | subscriber->deadlineOffset = deadline; |
615 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
616 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
617 | 5b7188aa | skenneweg | subscriber->maxJitter =jitter; |
618 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
619 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
620 | 5b7188aa | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
621 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
622 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
623 | |||
624 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
625 | 5b7188aa | skenneweg | if (messages)
|
626 | { |
||
627 | 982056f7 | Svenja | urtContributeMessages(messages); |
628 | 5b7188aa | skenneweg | } |
629 | a5e142de | skenneweg | |
630 | 5b7188aa | skenneweg | subscriber->base.lastMessage = topic->latestMessage; |
631 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
632 | a5e142de | skenneweg | |
633 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
634 | a5e142de | skenneweg | |
635 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
636 | 5b7188aa | skenneweg | topic->numHrtSubscribers--; |
637 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
638 | |||
639 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
640 | 5b7188aa | skenneweg | return URT_STATUS_OK;
|
641 | a5e142de | skenneweg | } |
642 | 7d9678db | skenneweg | |
643 | /**
|
||
644 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
645 | 7d9678db | skenneweg | *
|
646 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
647 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
648 | * @param[in] bytes Payload size in bytes.
|
||
649 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
650 | 7d9678db | skenneweg | *
|
651 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
652 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
653 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
654 | 7d9678db | skenneweg | */
|
655 | 982056f7 | Svenja | urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* const payload, |
656 | 65dc89cb | skenneweg | size_t bytes, urt_delay_t* latency) |
657 | { |
||
658 | urtDebugAssert(subscriber); |
||
659 | |||
660 | if (!subscriber->base.topic)
|
||
661 | return URT_STATUS_FETCH_NOTOPIC;
|
||
662 | |||
663 | urtMutexLock(subscriber->base.topic->lock); |
||
664 | |||
665 | 982056f7 | Svenja | urt_message_t* oldestMessage = subscriber->base.lastMessage; |
666 | if(oldestMessage->originTime == subscriber->base.lastMessageTime)
|
||
667 | 65dc89cb | skenneweg | { |
668 | if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime)
|
||
669 | { |
||
670 | urtMutexUnlock(subscriber->base.topic->lock); |
||
671 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
672 | } |
||
673 | 982056f7 | Svenja | oldestMessage = oldestMessage->next; |
674 | 65dc89cb | skenneweg | } |
675 | else
|
||
676 | { |
||
677 | 982056f7 | Svenja | oldestMessage = urtFindOldestMessage(oldestMessage->next); |
678 | 65dc89cb | skenneweg | } |
679 | |||
680 | 982056f7 | Svenja | urtFetchMessage(oldestMessage, subscriber, payload, bytes); |
681 | 65dc89cb | skenneweg | |
682 | 982056f7 | Svenja | if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) { |
683 | uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
||
684 | |||
685 | #if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
||
686 | 65dc89cb | skenneweg | subscriber->base.sumLatencies += calculatedLatency; |
687 | 982056f7 | Svenja | |
688 | if (calculatedLatency < subscriber->minLatency) {
|
||
689 | subscriber->minLatency = calculatedLatency; |
||
690 | } |
||
691 | else if (calculatedLatency > subscriber->maxLatency) { |
||
692 | subscriber->maxLatency = calculatedLatency; |
||
693 | } |
||
694 | 65dc89cb | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
695 | |||
696 | 982056f7 | Svenja | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
697 | if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
||
698 | 65dc89cb | skenneweg | subscriber->minLatency = calculatedLatency; |
699 | } |
||
700 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
701 | 65dc89cb | skenneweg | subscriber->maxLatency = calculatedLatency; |
702 | } |
||
703 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
704 | urtMutexUnlock(subscriber->base.topic); |
||
705 | return URT_STATUS_JITTERVIOLATION;
|
||
706 | } |
||
707 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
708 | 65dc89cb | skenneweg | |
709 | 982056f7 | Svenja | if (latency) {
|
710 | latency = calculatedLatency; |
||
711 | } |
||
712 | 65dc89cb | skenneweg | } |
713 | |||
714 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
715 | subscriber->base.lastMessage->numConsumersLeft--; |
||
716 | subscriber->base->numMessagesReceived++; |
||
717 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
718 | |||
719 | urtMutexUnlock(subscriber->base.topic->lock); |
||
720 | 982056f7 | Svenja | return URT_STATUS_OK;
|
721 | 65dc89cb | skenneweg | } |
722 | 7d9678db | skenneweg | |
723 | /**
|
||
724 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
725 | 7d9678db | skenneweg | *
|
726 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
727 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
728 | * @param[in] bytes Payload size in bytes.
|
||
729 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
730 | 7d9678db | skenneweg | *
|
731 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
732 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
733 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
734 | 7d9678db | skenneweg | */
|
735 | 982056f7 | Svenja | urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* const payload, |
736 | 65dc89cb | skenneweg | size_t bytes, urt_delay_t* latency) |
737 | { |
||
738 | urtDebugAssert(subscriber); |
||
739 | |||
740 | if (!subscriber->base.topic)
|
||
741 | return URT_STATUS_FETCH_NOTOPIC;
|
||
742 | |||
743 | urtMutexLock(subscriber->base.topic->lock); |
||
744 | urt_message_t* lastMessage = urtFindLatestMessage(subscriber->base.lastMessage); |
||
745 | |||
746 | if (lastMessage->originTime == subscriber->base.lastMessageTime)
|
||
747 | { |
||
748 | urtMutexUnlock(subscriber->base.topic->lock); |
||
749 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
750 | } |
||
751 | |||
752 | 982056f7 | Svenja | urtFetchMessage(lastMessage, subscriber, payload, bytes); |
753 | 65dc89cb | skenneweg | |
754 | 982056f7 | Svenja | if (URT_CFG_PUBSUB_PROFILING == true || URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || latency) { |
755 | uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime; |
||
756 | |||
757 | #if(URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
||
758 | 65dc89cb | skenneweg | subscriber->base.sumLatencies += calculatedLatency; |
759 | 982056f7 | Svenja | |
760 | if (calculatedLatency < subscriber->minLatency) {
|
||
761 | subscriber->minLatency = calculatedLatency; |
||
762 | } |
||
763 | else if (calculatedLatency > subscriber->maxLatency) { |
||
764 | subscriber->maxLatency = calculatedLatency; |
||
765 | } |
||
766 | 65dc89cb | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
767 | |||
768 | 982056f7 | Svenja | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
769 | if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
||
770 | 65dc89cb | skenneweg | subscriber->minLatency = calculatedLatency; |
771 | } |
||
772 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
773 | 65dc89cb | skenneweg | subscriber->maxLatency = calculatedLatency; |
774 | } |
||
775 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
776 | urtMutexUnlock(subscriber->base.topic); |
||
777 | return URT_STATUS_JITTERVIOLATION;
|
||
778 | } |
||
779 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
780 | 65dc89cb | skenneweg | |
781 | 982056f7 | Svenja | if (latency) {
|
782 | latency = calculatedLatency; |
||
783 | } |
||
784 | 65dc89cb | skenneweg | } |
785 | |||
786 | urtMutexUnlock(subscriber->base.topic->lock); |
||
787 | return URT_STATUS_OK;
|
||
788 | } |
||
789 | 7d9678db | skenneweg | |
790 | /**
|
||
791 | 5198dfae | skenneweg | * @brief Calculates the validity from the subscriber.
|
792 | 7d9678db | skenneweg | *
|
793 | 5198dfae | skenneweg | * @param[in] subscriber The FRT subscriber to calculate a validity for. Must not be NULL.
|
794 | * @param[in] latency Latency (of a message) as argument to calculate validity.
|
||
795 | 7d9678db | skenneweg | *
|
796 | 5198dfae | skenneweg | * @return Returns a boolean indicator whether the latency is fine.
|
797 | 7d9678db | skenneweg | */
|
798 | a5e142de | skenneweg | bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
|
799 | { |
||
800 | 37cd5dc2 | Svenja | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
801 | if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
|
||
802 | 5b7188aa | skenneweg | return true; |
803 | 37cd5dc2 | Svenja | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
804 | |||
805 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
806 | if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
|
||
807 | return true; |
||
808 | |||
809 | if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
|
||
810 | return true; |
||
811 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
812 | 5b7188aa | skenneweg | |
813 | a5e142de | skenneweg | return false; |
814 | } |
||
815 | 7d9678db | skenneweg | |
816 | /**
|
||
817 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
818 | 7d9678db | skenneweg | *
|
819 | 5c6cb22f | skenneweg | * @param[in] subscriber The FRT subscriber to be unsubscribed. Must not be NULL.
|
820 | 7d9678db | skenneweg | *
|
821 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
822 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
823 | 7d9678db | skenneweg | */
|
824 | 5b7188aa | skenneweg | urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber) |
825 | { |
||
826 | urtDebugAssert(subscriber); |
||
827 | |||
828 | 982056f7 | Svenja | if (!subscriber->base.topic) {
|
829 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
830 | } |
||
831 | |||
832 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
833 | urtMutexLock(topic->lock); |
||
834 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
835 | |||
836 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
837 | |||
838 | 5b7188aa | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
839 | 982056f7 | Svenja | subscriber->base.topic->numSubscribers--; |
840 | 5b7188aa | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
841 | |||
842 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
843 | 982056f7 | Svenja | urtMutexUnlock(topic->lock); |
844 | 5b7188aa | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
845 | 982056f7 | Svenja | subscriber->base.topic = NULL;
|
846 | subscriber->base.lastMessage = NULL;
|
||
847 | subscriber->base.lastMessageTime = 0;
|
||
848 | return URT_STATUS_OK;
|
||
849 | 5b7188aa | skenneweg | } |
850 | 7d9678db | skenneweg | |
851 | |||
852 | /**
|
||
853 | 5198dfae | skenneweg | * @brief Initialize the HRT Subscriber.
|
854 | 7d9678db | skenneweg | *
|
855 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to initialize. Must not be NULL.
|
856 | 7d9678db | skenneweg | */
|
857 | 5c6cb22f | skenneweg | void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
|
858 | { |
||
859 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
860 | |||
861 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
862 | urtEventListenerInit(subscriber->base.evtListener); |
||
863 | subscriber->base.lastMessage = NULL;
|
||
864 | subscriber->base.lastMessageTime = 0;
|
||
865 | |||
866 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING)
|
867 | 5c6cb22f | skenneweg | subscriber->base.sumLatencies = 0;
|
868 | subscriber->base.numMessagesReceived = 0;
|
||
869 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
870 | 5c6cb22f | skenneweg | |
871 | subscriber->next = NULL;
|
||
872 | |||
873 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
874 | 5c6cb22f | skenneweg | subscriber->deadlineOffset = 0;
|
875 | urtTimerInit(subscriber->qodDeadlineTimer); |
||
876 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
877 | 5c6cb22f | skenneweg | |
878 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_RATECHECKS)
|
879 | 5c6cb22f | skenneweg | subscriber->expectedRate = 0;
|
880 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
881 | 5c6cb22f | skenneweg | |
882 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
883 | 5c6cb22f | skenneweg | subscriber->maxJitter = 0;
|
884 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
885 | 5c6cb22f | skenneweg | |
886 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
887 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
888 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
889 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
890 | 5c6cb22f | skenneweg | return;
|
891 | } |
||
892 | |||
893 | 7d9678db | skenneweg | |
894 | /**
|
||
895 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
896 | *
|
||
897 | 5c6cb22f | skenneweg | * @param[in] subscriber The HRT subscriber which shall subscribe to a topic. Must not be NULL.
|
898 | 5198dfae | skenneweg | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
899 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
900 | * Messages must not be associated to another topic.
|
||
901 | * Once a message has been contributed, it cannot be removed later.
|
||
902 | * May be NULL(no messages to contribute).
|
||
903 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
904 | * @param[in] rate Expected minimum rate of new messages (= maximum time between consecutive messages).
|
||
905 | * A value of 0 indicates, that rate is of no concern.
|
||
906 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
907 | * A value of 0 indicates that jitter is of no concern.
|
||
908 | *
|
||
909 | * @return Returns URT_STATUS_OK on success.
|
||
910 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
911 | 7d9678db | skenneweg | */
|
912 | urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic, |
||
913 | 5b7188aa | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter) |
914 | { |
||
915 | urtDebugAssert(subscriber); |
||
916 | urtDebugAssert(topic); |
||
917 | |||
918 | if (subscriber->base.topic)
|
||
919 | { |
||
920 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
921 | } |
||
922 | |||
923 | subscriber->base.topic = topic; |
||
924 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
925 | subscriber->base.sumLatencies = 0;
|
||
926 | subscriber->base.numMessagesReceived = 0;
|
||
927 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
928 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
929 | subscriber->deadlineOffset = deadline; |
||
930 | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
931 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
932 | subscriber->maxJitter =jitter; |
||
933 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
934 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
935 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
936 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
937 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
938 | # if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
939 | subscriber->expectedRate = rate; |
||
940 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
941 | |||
942 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
943 | 5b7188aa | skenneweg | if (messages)
|
944 | { |
||
945 | 982056f7 | Svenja | urtContributeMessages(messages); |
946 | 5b7188aa | skenneweg | } |
947 | |||
948 | subscriber->base.lastMessage = topic->latestMessage; |
||
949 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
950 | |||
951 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
952 | 5b7188aa | skenneweg | |
953 | # if(URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
954 | 8378a78b | Svenja | //TODO: Implement
|
955 | 5b7188aa | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
956 | |||
957 | topic->numHrtSubscribers--; |
||
958 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
959 | topic->numSubscribers--; |
||
960 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
961 | |||
962 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
963 | 5b7188aa | skenneweg | return URT_STATUS_OK;
|
964 | } |
||
965 | 7d9678db | skenneweg | |
966 | |||
967 | /**
|
||
968 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
969 | 7d9678db | skenneweg | *
|
970 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
971 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
972 | * @param[in] bytes Payload size in bytes.
|
||
973 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
974 | 7d9678db | skenneweg | *
|
975 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
976 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
977 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
978 | 7d9678db | skenneweg | */
|
979 | 982056f7 | Svenja | urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* const payload, |
980 | 65dc89cb | skenneweg | size_t bytes, urt_delay_t* latency) |
981 | { |
||
982 | urtDebugAssert(subscriber); |
||
983 | |||
984 | 982056f7 | Svenja | if (!subscriber->base.topic) {
|
985 | return URT_STATUS_FETCH_NOTOPIC;
|
||
986 | } |
||
987 | 65dc89cb | skenneweg | |
988 | urtMutexLock(subscriber->base.topic->lock); |
||
989 | urt_message_t* messageTemp = subscriber->base.lastMessage; |
||
990 | if (messageTemp->next->originTime > messageTemp.originTime)
|
||
991 | { |
||
992 | urtMutexUnlock(subscriber->base.topic->lock); |
||
993 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
994 | } |
||
995 | messageTemp = messageTemp->next; |
||
996 | |||
997 | 982056f7 | Svenja | if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
998 | uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime; |
||
999 | |||
1000 | #if(URT_CFG_PUBSUB_PROFILING == true) |
||
1001 | 65dc89cb | skenneweg | subscriber->base.sumLatencies += calculatedLatency; |
1002 | 982056f7 | Svenja | |
1003 | if (calculatedLatency < subscriber->minLatency) {
|
||
1004 | subscriber->minLatency = calculatedLatency; |
||
1005 | } |
||
1006 | else if (calculatedLatency > subscriber->maxLatency) { |
||
1007 | subscriber->maxLatency = calculatedLatency; |
||
1008 | } |
||
1009 | 65dc89cb | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
1010 | 982056f7 | Svenja | |
1011 | if (latency) {
|
||
1012 | latency = calculatedLatency; |
||
1013 | } |
||
1014 | 65dc89cb | skenneweg | } |
1015 | |||
1016 | subscriber->base.lastMessage->numHrtConsumersLeft--; |
||
1017 | if (subscriber->base.lastMessage->numHrtConsumersLeft != 0) |
||
1018 | { |
||
1019 | urtCondvarSignal(subscriber->base.topic->hrtReleased); |
||
1020 | } |
||
1021 | |||
1022 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
1023 | subscriber->base.lastMessage->numConsumersLeft--; |
||
1024 | subscriber->base->numMessagesReceived++; |
||
1025 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1026 | |||
1027 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
1028 | 982056f7 | Svenja | if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
1029 | subscriber->minLatency = calculatedLatency; |
||
1030 | } |
||
1031 | else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
||
1032 | subscriber->maxLatency = calculatedLatency; |
||
1033 | } |
||
1034 | else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
||
1035 | urtMutexUnlock(subscriber->base.topic); |
||
1036 | 65dc89cb | skenneweg | urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
1037 | return URT_STATUS_JITTERVIOLATION;
|
||
1038 | 982056f7 | Svenja | } |
1039 | 65dc89cb | skenneweg | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
1040 | 982056f7 | Svenja | |
1041 | #if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
||
1042 | if (calculatedLatency < subscriber->minLatency) {
|
||
1043 | subscriber->minLatency = calculatedLatency; |
||
1044 | 65dc89cb | skenneweg | } |
1045 | 982056f7 | Svenja | else if (calculatedLatency > subscriber->maxLatency) { |
1046 | subscriber->maxLatency = calculatedLatency; |
||
1047 | } |
||
1048 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1049 | 65dc89cb | skenneweg | |
1050 | 982056f7 | Svenja | urtFetchMessage(messageTemp, subscriber, payload, bytes); |
1051 | 65dc89cb | skenneweg | |
1052 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
1053 | if (messageTemp->next->originTime < messageTemp->originTime)
|
||
1054 | { |
||
1055 | 982056f7 | Svenja | //TODO: first reset?! (when ... set)
|
1056 | urtTimerSet(subscriber->qosDeadlineTimer, subscriber->deadlineOffset, urtCoreCallbackDefault, NULL);
|
||
1057 | 65dc89cb | skenneweg | } |
1058 | else
|
||
1059 | { |
||
1060 | 982056f7 | Svenja | urtTimerReset(subscriber->qosDeadlineTimer); |
1061 | 65dc89cb | skenneweg | } |
1062 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
1063 | |||
1064 | urtMutexUnlock(subscriber->base.topic->lock); |
||
1065 | return URT_STATUS_OK;
|
||
1066 | } |
||
1067 | 7d9678db | skenneweg | |
1068 | |||
1069 | /**
|
||
1070 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
1071 | 7d9678db | skenneweg | *
|
1072 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
1073 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
1074 | * @param[in] bytes Payload size in bytes.
|
||
1075 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
1076 | 7d9678db | skenneweg | *
|
1077 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
1078 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
1079 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
1080 | 7d9678db | skenneweg | */
|
1081 | 982056f7 | Svenja | urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* const payload, |
1082 | size_t bytes, urt_delay_t* latency) |
||
1083 | { |
||
1084 | if (!subscriber->base.topic) {
|
||
1085 | return URT_STATUS_FETCH_NOTOPIC;
|
||
1086 | } |
||
1087 | |||
1088 | urtMutexLock(subscriber->base.topic->lock); |
||
1089 | urt_message_t* lastMessage = subscriber->base.lastMessage; |
||
1090 | bool hrtZero = false; |
||
1091 | while (lastMessage->next->originTime < lastMessage->originTime) {
|
||
1092 | lastMessage = lastMessage->next; |
||
1093 | lastMessage->numHrtConsumersLeft--; |
||
1094 | if (lastMessage->numHrtConsumersLeft == 0) { |
||
1095 | hertZero = true;
|
||
1096 | } |
||
1097 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
1098 | lastMessage->numConsumersLeft--; |
||
1099 | subscriber->base.numMessagesReceived++; |
||
1100 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1101 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
1102 | urtTimerReset(subscriber->qosDeadlineTimer); |
||
1103 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
1104 | } |
||
1105 | |||
1106 | if (hrtZero) {
|
||
1107 | urtCondvarSignal(subscriber->base.topic->hrtReleased); |
||
1108 | } |
||
1109 | |||
1110 | if (lastMessage->originTime == subscriber->base.lastMessageTime) {
|
||
1111 | urtMutexUnlock(subscriber->base.topic); |
||
1112 | return URT_STATUS_FETCH_NOMESSAGE;
|
||
1113 | } |
||
1114 | |||
1115 | if (URT_CFG_PUBSUB_PROFILING == true || latency) { |
||
1116 | uint64_t calculatedLatency = urtTimeNow() - messageTemp->originTime; |
||
1117 | |||
1118 | #if(URT_CFG_PUBSUB_PROFILING == true) |
||
1119 | subscriber->base.sumLatencies += calculatedLatency; |
||
1120 | |||
1121 | if (calculatedLatency < subscriber->minLatency) {
|
||
1122 | subscriber->minLatency = calculatedLatency; |
||
1123 | } |
||
1124 | else if (calculatedLatency > subscriber->maxLatency) { |
||
1125 | subscriber->maxLatency = calculatedLatency; |
||
1126 | } |
||
1127 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1128 | |||
1129 | if (latency) {
|
||
1130 | latency = calculatedLatency; |
||
1131 | } |
||
1132 | } |
||
1133 | |||
1134 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
1135 | if (calculatedLatency < subscriber->minLatency && calculatedLatency > subscriber->minLatency - subscriber->maxJitter) {
|
||
1136 | subscriber->minLatency = calculatedLatency; |
||
1137 | } |
||
1138 | else if (calculatedLatency > subscriber->maxLatency && calculatedLatency < subscriber->maxLatency + subscriber->maxJitter) { |
||
1139 | subscriber->maxLatency = calculatedLatency; |
||
1140 | } |
||
1141 | else if (calculatedLatency > subscriber->maxLatency || calculatedLatency < subscriber->minLatency) { |
||
1142 | urtMutexUnlock(subscriber->base.topic); |
||
1143 | urtCoreStopNodes(URT_STATUS_JITTERVIOLATION); |
||
1144 | return URT_STATUS_JITTERVIOLATION;
|
||
1145 | } |
||
1146 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
1147 | |||
1148 | #if (URT_CFG_PUBSUB_PROFILING == true && URT_CFG_PUBSUB_QOS_JITTERCHECKS == false) |
||
1149 | if (calculatedLatency < subscriber->minLatency) {
|
||
1150 | subscriber->minLatency = calculatedLatency; |
||
1151 | } |
||
1152 | else if (calculatedLatency > subscriber->maxLatency) { |
||
1153 | subscriber->maxLatency = calculatedLatency; |
||
1154 | } |
||
1155 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1156 | |||
1157 | urtFetchMessage(lastMessage, subscriber, payload, bytes); |
||
1158 | |||
1159 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
1160 | urtTimerReset(subscriber->qosDeadlineTimer); |
||
1161 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
1162 | } |
||
1163 | 7d9678db | skenneweg | |
1164 | /**
|
||
1165 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
1166 | 7d9678db | skenneweg | *
|
1167 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to be unsubscribed. Must not be NULL.
|
1168 | 7d9678db | skenneweg | *
|
1169 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
1170 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
1171 | 7d9678db | skenneweg | */
|
1172 | 5b7188aa | skenneweg | urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber) |
1173 | { |
||
1174 | urtDebugAssert(subscriber); |
||
1175 | |||
1176 | if (subscriber->base.topic)
|
||
1177 | { |
||
1178 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
1179 | 5b7188aa | skenneweg | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
1180 | subscriber->base.topic->numHrtSubscribers--; |
||
1181 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
1182 | subscriber->base.topic->numSubscribers--; |
||
1183 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1184 | # if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
1185 | //TODO: remove self from topics lsit of HRT subscribers
|
||
1186 | //TODO: ...
|
||
1187 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
1188 | |||
1189 | urt_message_t* messageTemp = subscriber->base.lastMessage; |
||
1190 | 37cd5dc2 | Svenja | bool hrtZero = false; |
1191 | 5b7188aa | skenneweg | while (messageTemp->next->originTime < messageTemp->originTime)
|
1192 | { |
||
1193 | messageTemp = messageTemp->next; |
||
1194 | messageTemp->numHrtConsumersLeft--; |
||
1195 | 37cd5dc2 | Svenja | if (messageTemp->numHrtConsumersLeft == 0) |
1196 | { |
||
1197 | hrtZero = true;
|
||
1198 | } |
||
1199 | 5b7188aa | skenneweg | # if(URT_CFG_PUBSUB_PROFILING == true) |
1200 | messageTemp->numConsumersLeft--; |
||
1201 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
1202 | } |
||
1203 | 37cd5dc2 | Svenja | if (hrtZero)
|
1204 | 5b7188aa | skenneweg | { |
1205 | 37cd5dc2 | Svenja | urtCondvarSignal(subscriber->base.topic->hrtReleased); |
1206 | 5b7188aa | skenneweg | } |
1207 | |||
1208 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
1209 | 5b7188aa | skenneweg | subscriber->base.topic = NULL;
|
1210 | subscriber->base.lastMessage = NULL;
|
||
1211 | subscriber->base.lastMessageTime = 0;
|
||
1212 | return URT_STATUS_OK;
|
||
1213 | } |
||
1214 | |||
1215 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
1216 | } |
||
1217 | |||
1218 |