urtware / src / urt_subscriber.c @ 77bd2c61
History | View | Annotate | Download (27.99 KB)
1 | 1fb06240 | skenneweg | /*
|
---|---|---|---|
2 | µRtWare is a lightweight publish/subscribe middleware for real-time
|
||
3 | applications. It was developed as part of the software habitat for the
|
||
4 | Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
|
||
5 | |||
6 | Copyright (C) 2018..2020 Thomas Schöpping et al.
|
||
7 | |||
8 | This program is free software: you can redistribute it and/or modify
|
||
9 | it under the terms of the GNU General Public License as published by
|
||
10 | the Free Software Foundation, either version 3 of the License, or
|
||
11 | (at your option) any later version.
|
||
12 | |||
13 | This program is distributed in the hope that it will be useful,
|
||
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
16 | GNU General Public License for more details.
|
||
17 | |||
18 | You should have received a copy of the GNU General Public License
|
||
19 | along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
20 | */
|
||
21 | |||
22 | 7d9678db | skenneweg | #include <urtware.h> |
23 | |||
24 | 1fb06240 | skenneweg | /******************************************************************************/
|
25 | /* LOCAL DEFINITIONS */
|
||
26 | /******************************************************************************/
|
||
27 | |||
28 | /******************************************************************************/
|
||
29 | /* EXPORTED VARIABLES */
|
||
30 | /******************************************************************************/
|
||
31 | |||
32 | /******************************************************************************/
|
||
33 | /* LOCAL TYPES */
|
||
34 | /******************************************************************************/
|
||
35 | |||
36 | /******************************************************************************/
|
||
37 | /* LOCAL VARIABLES */
|
||
38 | /******************************************************************************/
|
||
39 | |||
40 | /******************************************************************************/
|
||
41 | /* LOCAL FUNCTIONS */
|
||
42 | /******************************************************************************/
|
||
43 | |||
44 | /******************************************************************************/
|
||
45 | /* EXPORTED FUNCTIONS */
|
||
46 | /******************************************************************************/
|
||
47 | |||
48 | 7d9678db | skenneweg | /**
|
49 | * @brief Initialize the nrt Subscriber.
|
||
50 | *
|
||
51 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to initialize. Must not be NULL.
|
52 | 7d9678db | skenneweg | */
|
53 | 5c6cb22f | skenneweg | void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
|
54 | { |
||
55 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
56 | |||
57 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
58 | urtEventListenerInit(subscriber->base.evtListener); |
||
59 | subscriber->base.lastMessage = NULL;
|
||
60 | subscriber->base.lastMessageTime = 0;
|
||
61 | a5e142de | skenneweg | #if (URT_CFG_PUBSUB_PROFILING == true) |
62 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
63 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
64 | a5e142de | skenneweg | #endif /* URT_CFG_PUBSUB_PROFILING */ |
65 | 5c6cb22f | skenneweg | return;
|
66 | } |
||
67 | 1fb06240 | skenneweg | |
68 | 7d9678db | skenneweg | /**
|
69 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
70 | 7d9678db | skenneweg | *
|
71 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
72 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
73 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
74 | * Messages must not be associated to another topic.
|
||
75 | * Once a message has been contributed, it cannot be removed later.
|
||
76 | * May be NULL(no messages to contribute).
|
||
77 | 7d9678db | skenneweg | *
|
78 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
79 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
80 | 7d9678db | skenneweg | */
|
81 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages) |
82 | { |
||
83 | urtDebugAssert(subscriber); |
||
84 | urtDebugAssert(topic); |
||
85 | |||
86 | if (!subscriber->base.topic)
|
||
87 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
88 | |||
89 | subscriber->base.topic = topic; |
||
90 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
91 | a5e142de | skenneweg | |
92 | if (messages)
|
||
93 | { |
||
94 | urt_message_t* lastMessageContribute = messages; |
||
95 | while (lastMessageContribute->next)
|
||
96 | { |
||
97 | lastMessageContribute = lastMessageContribute->next; |
||
98 | } |
||
99 | lastMessageContribute->next = topic->latestMessage->next; |
||
100 | topic->latestMessage->next = messages; |
||
101 | } |
||
102 | |||
103 | subscriber->base.lastMessage = topic->latestMessage; |
||
104 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
105 | |||
106 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
107 | a5e142de | skenneweg | |
108 | #if (URT_CFG_PUBSUB_PROFILING == true) |
||
109 | topic->numHrtSubscribers--; |
||
110 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
111 | |||
112 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
113 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
114 | 1fb06240 | skenneweg | } |
115 | |||
116 | 7d9678db | skenneweg | /**
|
117 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
118 | 7d9678db | skenneweg | *
|
119 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
120 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
121 | * @param[in] bytes Payload size in bytes.
|
||
122 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
123 | 7d9678db | skenneweg | *
|
124 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
125 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
126 | * Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
127 | 7d9678db | skenneweg | */
|
128 | a5e142de | skenneweg | urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
|
129 | { |
||
130 | urtDebugAssert(subscriber); |
||
131 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
132 | 1fb06240 | skenneweg | } |
133 | |||
134 | 7d9678db | skenneweg | /**
|
135 | a5e142de | skenneweg | * @brief Fetches the latest message.
|
136 | 7d9678db | skenneweg | *
|
137 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber that shall fetch the message. Must not be NULL.
|
138 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
139 | * @param[in] bytes Payload size in bytes.
|
||
140 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
141 | 7d9678db | skenneweg | *
|
142 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
143 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
144 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
145 | 7d9678db | skenneweg | */
|
146 | 1fb06240 | skenneweg | urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
|
147 | 7d9678db | skenneweg | return URT_STATUS_OK;
|
148 | 1fb06240 | skenneweg | } |
149 | |||
150 | 7d9678db | skenneweg | /**
|
151 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
152 | 7d9678db | skenneweg | *
|
153 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
154 | 7d9678db | skenneweg | *
|
155 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
156 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
157 | 7d9678db | skenneweg | */
|
158 | 5b7188aa | skenneweg | urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber) |
159 | { |
||
160 | a5e142de | skenneweg | if (subscriber->base.topic)
|
161 | { |
||
162 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
163 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
164 | 5b7188aa | skenneweg | subscriber->base.topic->numSubscribers--; |
165 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
166 | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
||
167 | # if(URT_CFG_PUBSUB_PROFILING == true) |
||
168 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
169 | a5e142de | skenneweg | subscriber->base.topic = NULL;
|
170 | subscriber->base.lastMessage = NULL;
|
||
171 | subscriber->base.lastMessageTime = 0;
|
||
172 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
173 | return URT_STATUS_OK;
|
||
174 | } |
||
175 | 5b7188aa | skenneweg | |
176 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
177 | 1fb06240 | skenneweg | } |
178 | |||
179 | 7d9678db | skenneweg | |
180 | /**
|
||
181 | 5198dfae | skenneweg | * @brief Initialize the srt Subscriber.
|
182 | 7d9678db | skenneweg | *
|
183 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
184 | 7d9678db | skenneweg | */
|
185 | 5c6cb22f | skenneweg | void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
|
186 | { |
||
187 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
188 | |||
189 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
190 | urtEventListenerInit(subscriber->base.evtListener); |
||
191 | subscriber->base.lastMessage = NULL;
|
||
192 | subscriber->base.lastMessageTime = 0;
|
||
193 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
194 | subscriber->base.sumLatencies = 0;
|
||
195 | subscriber->base.numMessagesReceived = 0;
|
||
196 | subscriber->usefulnesscb = NULL;
|
||
197 | subscriber->cbparams = NULL;
|
||
198 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
199 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
200 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
201 | return;
|
||
202 | } |
||
203 | 7d9678db | skenneweg | |
204 | /**
|
||
205 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
206 | *
|
||
207 | * @param[in] subscriber The SRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
208 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
209 | * @param[in] message NULL terminated list of messages to contribute to the topic.
|
||
210 | * Messages must not be associated to another topic.
|
||
211 | * Once a message has been contributed, it cannot be removed later.
|
||
212 | * May be NULL (no messages to contribute)
|
||
213 | * @param[in] usefulnesscb Pointer to a function to calculate usefulness of a message. Must not be NULL.
|
||
214 | * @param[in] cbparams Optional parameters for the usefulness callback.
|
||
215 | * May be NULL if the callback expects no parameters.
|
||
216 | *
|
||
217 | * @return Returns URT_STATUS_OK on success.
|
||
218 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
219 | 7d9678db | skenneweg | */
|
220 | urt_status_t urtSrtSubscriberSubscribe(urt_srtsubscriber_t* subscriber, urt_topic_t* topic, |
||
221 | a5e142de | skenneweg | urt_message_t* messages, urt_srtusefulnessfunc_t* usefulnesscb, void* cbparams)
|
222 | { |
||
223 | urtDebugAssert(subscriber); |
||
224 | urtDebugAssert(topic); |
||
225 | |||
226 | if (subscriber->base.topic)
|
||
227 | { |
||
228 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
229 | } |
||
230 | 5b7188aa | skenneweg | |
231 | subscriber->base.topic = topic; |
||
232 | subscriber->usefulnesscb = usefulnesscb; |
||
233 | subscriber->cbparams = cbparams; |
||
234 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
235 | 5b7188aa | skenneweg | subscriber->base.sumLatencies = 0;
|
236 | subscriber->base.numMessagesReceived = 0;
|
||
237 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
238 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
239 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
240 | |||
241 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
242 | a5e142de | skenneweg | if (messages)
|
243 | { |
||
244 | urt_message_t* lastMessageContribute = messages; |
||
245 | while (lastMessageContribute->next)
|
||
246 | { |
||
247 | lastMessageContribute = lastMessageContribute->next; |
||
248 | } |
||
249 | lastMessageContribute->next = topic->latestMessage->next; |
||
250 | topic->latestMessage->next = messages; |
||
251 | } |
||
252 | |||
253 | subscriber->base.lastMessage = topic->latestMessage; |
||
254 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
255 | |||
256 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
257 | a5e142de | skenneweg | |
258 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
259 | topic->numHrtSubscribers--; |
||
260 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
261 | |||
262 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
263 | a5e142de | skenneweg | return URT_STATUS_OK;
|
264 | } |
||
265 | 7d9678db | skenneweg | |
266 | /**
|
||
267 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
268 | 7d9678db | skenneweg | *
|
269 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
270 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
271 | * @param[in] bytes Payload size in bytes.
|
||
272 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
273 | 7d9678db | skenneweg | *
|
274 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
275 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
276 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
277 | 7d9678db | skenneweg | */
|
278 | urt_status_t urtSrtSubscriberFetchNextMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
||
279 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
280 | |||
281 | /**
|
||
282 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
283 | 7d9678db | skenneweg | *
|
284 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
285 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
286 | * @param[in] bytes Payload size in bytes.
|
||
287 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
288 | 7d9678db | skenneweg | *
|
289 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
290 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
291 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
292 | 7d9678db | skenneweg | */
|
293 | urt_status_t urtSrtSubscriberFetchLatestMessage(urt_srtsubscriber_t* subscriber, void* payload,
|
||
294 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
295 | |||
296 | /**
|
||
297 | 5198dfae | skenneweg | * @brief Calculates the usefulness of the subscriber.
|
298 | 7d9678db | skenneweg | *
|
299 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
300 | * @param[in] latency Latency (of a message) as argument to calculate usefulness.
|
||
301 | 7d9678db | skenneweg | *
|
302 | 5198dfae | skenneweg | * @return Returns the usefulness as a value within [0,1].
|
303 | 7d9678db | skenneweg | */
|
304 | a5e142de | skenneweg | float urtSrtSubscriberCalculateUsefulness(urt_srtsubscriber_t* subscriber, urt_delay_t latency)
|
305 | { |
||
306 | urtDebugAssert(subscriber); |
||
307 | |||
308 | return subscriber->usefulnesscb(latency);
|
||
309 | } |
||
310 | 7d9678db | skenneweg | |
311 | /**
|
||
312 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
313 | 7d9678db | skenneweg | *
|
314 | 5198dfae | skenneweg | * @param[in] subscriber The NRT subscriber to be unsubscribed. Must not be NULL.
|
315 | 7d9678db | skenneweg | *
|
316 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
317 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
318 | 7d9678db | skenneweg | */
|
319 | a5e142de | skenneweg | urt_status_t urtSrtSubscriberUnsubscribe(urt_srtsubscriber_t* subscriber) |
320 | { |
||
321 | urtDebugAssert(subscriber); |
||
322 | |||
323 | if (subscriber->base.topic)
|
||
324 | { |
||
325 | 5b7188aa | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
326 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
327 | 5b7188aa | skenneweg | subscriber->base.topic->numSubscribers--; |
328 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
329 | a5e142de | skenneweg | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
330 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
331 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
332 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
333 | subscriber->base.topic = NULL;
|
||
334 | subscriber->base.lastMessage = NULL;
|
||
335 | subscriber->base.lastMessageTime = 0;
|
||
336 | return URT_STATUS_OK;
|
||
337 | } |
||
338 | |||
339 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
340 | } |
||
341 | 7d9678db | skenneweg | |
342 | |||
343 | /**
|
||
344 | 5198dfae | skenneweg | * @brief Initialize the FRT Subscriber.
|
345 | 7d9678db | skenneweg | *
|
346 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber to initialize. Must not be NULL.
|
347 | 7d9678db | skenneweg | */
|
348 | 5c6cb22f | skenneweg | void urtFrtSubscriberInit(urt_frtsubscriber_t* subscriber)
|
349 | { |
||
350 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
351 | |||
352 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
353 | urtEventListenerInit(subscriber->base.evtListener); |
||
354 | subscriber->base.lastMessage = NULL;
|
||
355 | subscriber->base.lastMessageTime = 0;
|
||
356 | |||
357 | #if (URT_CFG_PUBSUB_PROFILING)
|
||
358 | subscriber->base.sumLatencies = 0;
|
||
359 | subscriber->base.numMessagesReceived = 0;
|
||
360 | #endif /* URT_CFG_PUBSUB_PROFILING */ |
||
361 | |||
362 | #if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
||
363 | subscriber->deadlineOffset = 0;
|
||
364 | #endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
365 | |||
366 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
||
367 | subscriber->maxJitter = 0;
|
||
368 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
369 | |||
370 | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
||
371 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
372 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
373 | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
374 | return;
|
||
375 | } |
||
376 | |||
377 | 7d9678db | skenneweg | |
378 | /**
|
||
379 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
380 | *
|
||
381 | * @param[in] subscriber The NRT subscriber which shall subscribe to a topic. Must not be NULL.
|
||
382 | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
||
383 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
384 | * Messages must not be associated to another topic.
|
||
385 | * Once a message has been contributed, it cannot be removed later.
|
||
386 | * May be NULL(no messages to contribute).
|
||
387 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
388 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
389 | * A value of 0 indicates that jitter is of no concern.
|
||
390 | *
|
||
391 | * @return Returns URT_STATUS_OK on success.
|
||
392 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
393 | 7d9678db | skenneweg | */
|
394 | urt_status_t urtFrtSubscriberSubscribe(urt_frtsubscriber_t* subscriber, urt_topic_t* topic, |
||
395 | a5e142de | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t jitter) |
396 | { |
||
397 | urtDebugAssert(subscriber); |
||
398 | urtDebugAssert(topic); |
||
399 | |||
400 | 5b7188aa | skenneweg | if (subscriber->base.topic)
|
401 | { |
||
402 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
403 | } |
||
404 | |||
405 | subscriber->base.topic = topic; |
||
406 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING == true) |
407 | 5b7188aa | skenneweg | subscriber->base.sumLatencies = 0;
|
408 | subscriber->base.numMessagesReceived = 0;
|
||
409 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
410 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
411 | 5b7188aa | skenneweg | subscriber->deadlineOffset = deadline; |
412 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
413 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
414 | 5b7188aa | skenneweg | subscriber->maxJitter =jitter; |
415 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
416 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
417 | 5b7188aa | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
418 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
419 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
420 | |||
421 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
422 | 5b7188aa | skenneweg | if (messages)
|
423 | { |
||
424 | urt_message_t* lastMessageContribute = messages; |
||
425 | while (lastMessageContribute->next)
|
||
426 | a5e142de | skenneweg | { |
427 | 5b7188aa | skenneweg | lastMessageContribute = lastMessageContribute->next; |
428 | a5e142de | skenneweg | } |
429 | 5b7188aa | skenneweg | lastMessageContribute->next = topic->latestMessage->next; |
430 | topic->latestMessage->next = messages; |
||
431 | } |
||
432 | a5e142de | skenneweg | |
433 | 5b7188aa | skenneweg | subscriber->base.lastMessage = topic->latestMessage; |
434 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
435 | a5e142de | skenneweg | |
436 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
437 | a5e142de | skenneweg | |
438 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
439 | 5b7188aa | skenneweg | topic->numHrtSubscribers--; |
440 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
441 | |||
442 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
443 | 5b7188aa | skenneweg | return URT_STATUS_OK;
|
444 | a5e142de | skenneweg | } |
445 | 7d9678db | skenneweg | |
446 | /**
|
||
447 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
448 | 7d9678db | skenneweg | *
|
449 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
450 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
451 | * @param[in] bytes Payload size in bytes.
|
||
452 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
453 | 7d9678db | skenneweg | *
|
454 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
455 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
456 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
457 | 7d9678db | skenneweg | */
|
458 | urt_status_t urtFrtSubscriberFetchNextMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
||
459 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
460 | |||
461 | /**
|
||
462 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
463 | 7d9678db | skenneweg | *
|
464 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
465 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
466 | * @param[in] bytes Payload size in bytes.
|
||
467 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
468 | 7d9678db | skenneweg | *
|
469 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
470 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
471 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
472 | 7d9678db | skenneweg | */
|
473 | urt_status_t urtFrtSubscriberFetchLatestMessage(urt_frtsubscriber_t* subscriber, void* payload,
|
||
474 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
475 | |||
476 | /**
|
||
477 | 5198dfae | skenneweg | * @brief Calculates the validity from the subscriber.
|
478 | 7d9678db | skenneweg | *
|
479 | 5198dfae | skenneweg | * @param[in] subscriber The FRT subscriber to calculate a validity for. Must not be NULL.
|
480 | * @param[in] latency Latency (of a message) as argument to calculate validity.
|
||
481 | 7d9678db | skenneweg | *
|
482 | 5198dfae | skenneweg | * @return Returns a boolean indicator whether the latency is fine.
|
483 | 7d9678db | skenneweg | */
|
484 | a5e142de | skenneweg | bool urtFrtSubscriberCalculateValidity(urt_frtsubscriber_t* subscriber, urt_delay_t latency)
|
485 | { |
||
486 | 37cd5dc2 | Svenja | #if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
487 | if (latency > subscriber->minLatency && latency < subscriber->maxLatency)
|
||
488 | 5b7188aa | skenneweg | return true; |
489 | 37cd5dc2 | Svenja | #endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
490 | |||
491 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
492 | if (latency < subscriber->minLatency && subscriber->maxLatency-subscriber->maxJitter < latency)
|
||
493 | return true; |
||
494 | |||
495 | if (latency > subscriber->maxLatency && subscriber->minLatency+subscriber->maxJitter > latency)
|
||
496 | return true; |
||
497 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS && URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
498 | 5b7188aa | skenneweg | |
499 | a5e142de | skenneweg | return false; |
500 | } |
||
501 | 7d9678db | skenneweg | |
502 | /**
|
||
503 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
504 | 7d9678db | skenneweg | *
|
505 | 5c6cb22f | skenneweg | * @param[in] subscriber The FRT subscriber to be unsubscribed. Must not be NULL.
|
506 | 7d9678db | skenneweg | *
|
507 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
508 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
509 | 7d9678db | skenneweg | */
|
510 | 5b7188aa | skenneweg | urt_status_t urtFrtSubscriberUnsubscribe(urt_frtsubscriber_t* subscriber) |
511 | { |
||
512 | urtDebugAssert(subscriber); |
||
513 | |||
514 | if (subscriber->base.topic)
|
||
515 | { |
||
516 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
517 | 5b7188aa | skenneweg | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
518 | //TODO: decrement topic's HRT counter
|
||
519 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
520 | subscriber->base.topic->numSubscribers--; |
||
521 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
522 | //Hier weiter
|
||
523 | |||
524 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
525 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
526 | 5b7188aa | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
527 | subscriber->base.topic = NULL;
|
||
528 | subscriber->base.lastMessage = NULL;
|
||
529 | subscriber->base.lastMessageTime = 0;
|
||
530 | return URT_STATUS_OK;
|
||
531 | } |
||
532 | |||
533 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
534 | } |
||
535 | 7d9678db | skenneweg | |
536 | |||
537 | /**
|
||
538 | 5198dfae | skenneweg | * @brief Initialize the HRT Subscriber.
|
539 | 7d9678db | skenneweg | *
|
540 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to initialize. Must not be NULL.
|
541 | 7d9678db | skenneweg | */
|
542 | 5c6cb22f | skenneweg | void urtHrtSubscriberInit(urt_hrtsubscriber_t* subscriber)
|
543 | { |
||
544 | a5e142de | skenneweg | urtDebugAssert(subscriber); |
545 | |||
546 | 5c6cb22f | skenneweg | subscriber->base.topic = NULL;
|
547 | urtEventListenerInit(subscriber->base.evtListener); |
||
548 | subscriber->base.lastMessage = NULL;
|
||
549 | subscriber->base.lastMessageTime = 0;
|
||
550 | |||
551 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_PROFILING)
|
552 | 5c6cb22f | skenneweg | subscriber->base.sumLatencies = 0;
|
553 | subscriber->base.numMessagesReceived = 0;
|
||
554 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_PROFILING */ |
555 | 5c6cb22f | skenneweg | |
556 | subscriber->next = NULL;
|
||
557 | |||
558 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS)
|
559 | 5c6cb22f | skenneweg | subscriber->deadlineOffset = 0;
|
560 | urtTimerInit(subscriber->qodDeadlineTimer); |
||
561 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
562 | 5c6cb22f | skenneweg | |
563 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_RATECHECKS)
|
564 | 5c6cb22f | skenneweg | subscriber->expectedRate = 0;
|
565 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
566 | 5c6cb22f | skenneweg | |
567 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS)
|
568 | 5c6cb22f | skenneweg | subscriber->maxJitter = 0;
|
569 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
570 | 5c6cb22f | skenneweg | |
571 | a5e142de | skenneweg | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING)
|
572 | 5c6cb22f | skenneweg | subscriber->minLatency = URT_DELAY_INFINITE; |
573 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
574 | a5e142de | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
575 | 5c6cb22f | skenneweg | return;
|
576 | } |
||
577 | |||
578 | 7d9678db | skenneweg | |
579 | /**
|
||
580 | 5198dfae | skenneweg | * @brief Subscribes the subscriber to a topic.
|
581 | *
|
||
582 | 5c6cb22f | skenneweg | * @param[in] subscriber The HRT subscriber which shall subscribe to a topic. Must not be NULL.
|
583 | 5198dfae | skenneweg | * @param[in] topic The topic to subscribe to. Must not be NULL.
|
584 | * @param[in] messages NULL terminated list of messages to contribute to the topic.
|
||
585 | * Messages must not be associated to another topic.
|
||
586 | * Once a message has been contributed, it cannot be removed later.
|
||
587 | * May be NULL(no messages to contribute).
|
||
588 | * @param[in] deadline Maximum latency to consume messages. A value of 0 indicates that latency is of no concern.
|
||
589 | * @param[in] rate Expected minimum rate of new messages (= maximum time between consecutive messages).
|
||
590 | * A value of 0 indicates, that rate is of no concern.
|
||
591 | * @param[in] jitter Maximum allowed jitter (difference between maximum and minimum latency) when consuming messages.
|
||
592 | * A value of 0 indicates that jitter is of no concern.
|
||
593 | *
|
||
594 | * @return Returns URT_STATUS_OK on success.
|
||
595 | * Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
|
||
596 | 7d9678db | skenneweg | */
|
597 | urt_status_t urtHrtSubscriberSubscribe(urt_hrtsubscriber_t* subscriber, urt_topic_t* topic, |
||
598 | 5b7188aa | skenneweg | urt_message_t* messages, urt_delay_t deadline, urt_delay_t rate, urt_delay_t jitter) |
599 | { |
||
600 | urtDebugAssert(subscriber); |
||
601 | urtDebugAssert(topic); |
||
602 | |||
603 | if (subscriber->base.topic)
|
||
604 | { |
||
605 | return URT_STATUS_SUBSCRIBE_TOPICSET;
|
||
606 | } |
||
607 | |||
608 | subscriber->base.topic = topic; |
||
609 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
610 | subscriber->base.sumLatencies = 0;
|
||
611 | subscriber->base.numMessagesReceived = 0;
|
||
612 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
613 | # if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
||
614 | subscriber->deadlineOffset = deadline; |
||
615 | # endif /* URT_CFG_PUBSUB_QOS_DEADLINECHECKS */ |
||
616 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true) |
||
617 | subscriber->maxJitter =jitter; |
||
618 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS */ |
||
619 | # if (URT_CFG_PUBSUB_QOS_JITTERCHECKS == true || URT_CFG_PUBSUB_PROFILING == true) |
||
620 | subscriber->minLatency = URT_DELAY_INFINITE; |
||
621 | subscriber->maxLatency = URT_DELAY_IMMEDIATE; |
||
622 | # endif /* URT_CFG_PUBSUB_QOS_JITTERCHECKS || URT_CFG_PUBSUB_PROFILING */ |
||
623 | # if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
624 | subscriber->expectedRate = rate; |
||
625 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
626 | |||
627 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
628 | 5b7188aa | skenneweg | if (messages)
|
629 | { |
||
630 | urt_message_t* lastMessageContribute = messages; |
||
631 | while (lastMessageContribute->next)
|
||
632 | { |
||
633 | lastMessageContribute = lastMessageContribute->next; |
||
634 | } |
||
635 | lastMessageContribute->next = topic->latestMessage->next; |
||
636 | topic->latestMessage->next = messages; |
||
637 | } |
||
638 | |||
639 | subscriber->base.lastMessage = topic->latestMessage; |
||
640 | subscriber->base.lastMessageTime = topic->latestMessage->originTime; |
||
641 | |||
642 | 8378a78b | Svenja | urtEventRegister(topic->evtSource, subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags); |
643 | 5b7188aa | skenneweg | |
644 | # if(URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
645 | 8378a78b | Svenja | //TODO: Implement
|
646 | 5b7188aa | skenneweg | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
647 | |||
648 | topic->numHrtSubscribers--; |
||
649 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
650 | topic->numSubscribers--; |
||
651 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
652 | |||
653 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
654 | 5b7188aa | skenneweg | return URT_STATUS_OK;
|
655 | } |
||
656 | 7d9678db | skenneweg | |
657 | |||
658 | /**
|
||
659 | 5198dfae | skenneweg | * @brief Fetches the next message.
|
660 | 7d9678db | skenneweg | *
|
661 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
662 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
663 | * @param[in] bytes Payload size in bytes.
|
||
664 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
665 | 7d9678db | skenneweg | *
|
666 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
667 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
668 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
669 | 7d9678db | skenneweg | */
|
670 | urt_status_t urtHrtSubscriberFetchNextMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
||
671 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
672 | |||
673 | |||
674 | /**
|
||
675 | 5198dfae | skenneweg | * @brief Fetches the latest message.
|
676 | 7d9678db | skenneweg | *
|
677 | 5198dfae | skenneweg | * @param[in] subscriber The SRT subscriber that shall fetch the message. Must not be NULL.
|
678 | * @param[in] payload Pointer where to copy the payload to. May be NULL for messages without payload.
|
||
679 | * @param[in] bytes Payload size in bytes.
|
||
680 | * @param[in] latency The latency can be returned by reference. May be NULL.
|
||
681 | 7d9678db | skenneweg | *
|
682 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on success.
|
683 | * Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
|
||
684 | * Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
|
||
685 | 7d9678db | skenneweg | */
|
686 | urt_status_t urtHrtSubscriberFetchLatestMessage(urt_hrtsubscriber_t* subscriber, void* payload,
|
||
687 | size_t bytes, urt_delay_t* latency){return URT_STATUS_OK;}
|
||
688 | |||
689 | /**
|
||
690 | 5198dfae | skenneweg | * @brief Unsubscribes from a subscriber.
|
691 | 7d9678db | skenneweg | *
|
692 | 5198dfae | skenneweg | * @param[in] subscriber The HRT subscriber to be unsubscribed. Must not be NULL.
|
693 | 7d9678db | skenneweg | *
|
694 | 5198dfae | skenneweg | * @return Returns URT_STATUS_OK on sucess.
|
695 | * Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
|
||
696 | 7d9678db | skenneweg | */
|
697 | 5b7188aa | skenneweg | urt_status_t urtHrtSubscriberUnsubscribe(urt_hrtsubscriber_t* subscriber) |
698 | { |
||
699 | urtDebugAssert(subscriber); |
||
700 | |||
701 | if (subscriber->base.topic)
|
||
702 | { |
||
703 | 37cd5dc2 | Svenja | urtMutexLock(topic->lock); |
704 | 5b7188aa | skenneweg | urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener); |
705 | subscriber->base.topic->numHrtSubscribers--; |
||
706 | # if (URT_CFG_PUBSUB_PROFILING == true) |
||
707 | subscriber->base.topic->numSubscribers--; |
||
708 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
709 | # if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
||
710 | //TODO: remove self from topics lsit of HRT subscribers
|
||
711 | //TODO: ...
|
||
712 | # endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
||
713 | |||
714 | urt_message_t* messageTemp = subscriber->base.lastMessage; |
||
715 | 37cd5dc2 | Svenja | bool hrtZero = false; |
716 | 5b7188aa | skenneweg | while (messageTemp->next->originTime < messageTemp->originTime)
|
717 | { |
||
718 | messageTemp = messageTemp->next; |
||
719 | messageTemp->numHrtConsumersLeft--; |
||
720 | 37cd5dc2 | Svenja | if (messageTemp->numHrtConsumersLeft == 0) |
721 | { |
||
722 | hrtZero = true;
|
||
723 | } |
||
724 | 5b7188aa | skenneweg | # if(URT_CFG_PUBSUB_PROFILING == true) |
725 | messageTemp->numConsumersLeft--; |
||
726 | # endif /* URT_CFG_PUBSUB_PROFILING */ |
||
727 | } |
||
728 | 37cd5dc2 | Svenja | if (hrtZero)
|
729 | 5b7188aa | skenneweg | { |
730 | 37cd5dc2 | Svenja | urtCondvarSignal(subscriber->base.topic->hrtReleased); |
731 | 5b7188aa | skenneweg | } |
732 | |||
733 | 37cd5dc2 | Svenja | urtMutexUnlock(topic->lock); |
734 | 5b7188aa | skenneweg | subscriber->base.topic = NULL;
|
735 | subscriber->base.lastMessage = NULL;
|
||
736 | subscriber->base.lastMessageTime = 0;
|
||
737 | return URT_STATUS_OK;
|
||
738 | } |
||
739 | |||
740 | return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
|
||
741 | } |
||
742 | |||
743 |