Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ 5b7188aa

History | View | Annotate | Download (27.824 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23
24 1fb06240 skenneweg
/******************************************************************************/
25
/* LOCAL DEFINITIONS                                                          */
26
/******************************************************************************/
27
28
/******************************************************************************/
29
/* EXPORTED VARIABLES                                                         */
30
/******************************************************************************/
31
32
/******************************************************************************/
33
/* LOCAL TYPES                                                                */
34
/******************************************************************************/
35
36
/******************************************************************************/
37
/* LOCAL VARIABLES                                                            */
38
/******************************************************************************/
39
40
/******************************************************************************/
41
/* LOCAL FUNCTIONS                                                            */
42
/******************************************************************************/
43
44
/******************************************************************************/
45
/* EXPORTED FUNCTIONS                                                         */
46
/******************************************************************************/
47
48 7d9678db skenneweg
/**
49
 * @brief   Initialize the nrt Subscriber.
50
 *
51 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
52 7d9678db skenneweg
 */
53 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
54
{
55 a5e142de skenneweg
  urtDebugAssert(subscriber);
56
57 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
58
  urtEventListenerInit(subscriber->base.evtListener);
59
  subscriber->base.lastMessage = NULL;
60
  subscriber->base.lastMessageTime = 0;
61 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
62 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
63
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
64 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
65 5c6cb22f skenneweg
  return;
66
}
67 1fb06240 skenneweg
68 7d9678db skenneweg
/**
69 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
70 7d9678db skenneweg
 *
71 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
72
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
73
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
74
 *                      Messages must not be associated to another topic.
75
 *                      Once a message has been contributed, it cannot be removed later.
76
 *                      May be NULL(no messages to contribute).
77 7d9678db skenneweg
 *
78 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
79
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
80 7d9678db skenneweg
 */
81 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
82
{
83
  urtDebugAssert(subscriber);
84
  urtDebugAssert(topic);
85
86
  if (!subscriber->base.topic)
87
      return URT_STATUS_SUBSCRIBE_TOPICSET;
88
89
  subscriber->base.topic = topic;
90
  //TODO: Lock topic
91
92
  if (messages)
93
  {
94
    urt_message_t* lastMessageContribute = messages;
95
    while (lastMessageContribute->next)
96
    {
97
        lastMessageContribute = lastMessageContribute->next;
98
    }
99
    lastMessageContribute->next = topic->latestMessage->next;
100
    topic->latestMessage->next = messages;
101
  }
102
103
  subscriber->base.lastMessage = topic->latestMessage;
104
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
105
106
  urtEventRegister(topic->evtSource, subscriber->base.evtListener, URT_EVENTMASK_MAXPRIO, URT_EVENTFLAG_PROCEED); //TODO: Correct mask, flag?
107
108
#if (URT_CFG_PUBSUB_PROFILING == true)
109
    topic->numHrtSubscribers--;
110
#endif /* URT_CFG_PUBSUB_PROFILING */
111
112
  //TODO: Unlock topic
113 7d9678db skenneweg
  return URT_STATUS_OK;
114 1fb06240 skenneweg
}
115
116 7d9678db skenneweg
/**
117 5198dfae skenneweg
 * @brief  Fetches the next message.
118 7d9678db skenneweg
 *
119 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
120
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
121
 * @param[in] bytes  Payload size in bytes.
122
 * @param[in] latency  The latency can be returned by reference. May be NULL.
123 7d9678db skenneweg
 *
124 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
125
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
126
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
127 7d9678db skenneweg
 */
128 a5e142de skenneweg
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency)
129
{
130
  urtDebugAssert(subscriber);
131 7d9678db skenneweg
  return URT_STATUS_OK;
132 1fb06240 skenneweg
}
133
134 7d9678db skenneweg
/**
135 a5e142de skenneweg
 * @brief Fetches the latest message.
136 7d9678db skenneweg
 *
137 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
138
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
139
 * @param[in] bytes  Payload size in bytes.
140
 * @param[in] latency  The latency can be returned by reference. May be NULL.
141 7d9678db skenneweg
 *
142 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
143
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
144
 *          Returns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
145 7d9678db skenneweg
 */
146 1fb06240 skenneweg
urt_status_t urtNrtSubscriberFetchLatestMessage (urt_nrtsubscriber_t* subscriber, void* payload, size_t bytes, urt_delay_t* latency) {
147 7d9678db skenneweg
  return URT_STATUS_OK;
148 1fb06240 skenneweg
}
149
150 7d9678db skenneweg
/**
151 5198dfae skenneweg
 * @brief  Unsubscribes from a subscriber.
152 7d9678db skenneweg
 *
153 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to be unsubscribed. Must not be NULL.
154 7d9678db skenneweg
 *
155 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on sucess.
156
 *          Returns URT_STATUS_UNSUBSCRIBE_NOTOPIC if the subscriber is not associated to a topic.
157 7d9678db skenneweg
 */
158 5b7188aa skenneweg
urt_status_t urtNrtSubscriberUnsubscribe (urt_nrtsubscriber_t* subscriber)
159
{
160 a5e142de skenneweg
  if (subscriber->base.topic)
161
  {
162
# if(URT_CFG_PUBSUB_PROFILING == true)
163
      //TODO: LOCK TOPIC
164 5b7188aa skenneweg
      subscriber->base.topic->numSubscribers--;
165 a5e142de skenneweg
# endif /* URT_CFG_PUBSUB_PROFILING */
166
    urtEventUnregister(subscriber->base.topic->evtSource, subscriber->base.evtListener);
167
# if(URT_CFG_PUBSUB_PROFILING == true)
168
      //TODO: Unlock TOPIC
169
      subscriber->base.topic = NULL;
170
      subscriber->base.lastMessage = NULL;
171
      subscriber->base.lastMessageTime = 0;
172
#endif /* URT_CFG_PUBSUB_PROFILING */
173
    return URT_STATUS_OK;
174
  }
175 5b7188aa skenneweg
176
  return URT_STATUS_UNSUBSCRIBE_NOTOPIC;
177 1fb06240 skenneweg
}
178
179 7d9678db skenneweg
180
/**
181 5198dfae skenneweg
 * @brief  Initialize the srt Subscriber.
182 7d9678db skenneweg
 *
183 5198dfae skenneweg
 * @param[in] subscriber  The SRT subscriber to initialize. Must not be NULL.
184 7d9678db skenneweg
 */
185 5c6cb22f skenneweg
void urtSrtSubscriberInit(urt_srtsubscriber_t* subscriber)
186
{
187 a5e142de skenneweg
  urtDebugAssert(subscriber);
188
189 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
190
  urtEventListenerInit(subscriber->base.evtListener);
191
  subscriber->base.lastMessage = NULL;
192
  subscriber->base.lastMessageTime = 0;
193
  #if (URT_CFG_PUBSUB_PROFILING)
194
    subscriber->base.sumLatencies = 0;
195
    subscriber->base.numMessagesReceived = 0;
196
    subscriber->usefulnesscb = NULL;
197
    subscriber->cbparams = NULL;
198
    subscriber->minLatency = URT_DELAY_INFINITE;
199
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
200
  #endif /* URT_CFG_PUBSUB_PROFILING */
201
  return;
202
}
203 7d9678db skenneweg
204
/**
205 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
206
 *
207
 * @param[in] subscriber  The SRT subscriber which shall subscribe to a topic. Must not be NULL.
208
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
209
 * @param[in] message  NULL terminated list of messages to contribute to the topic.
210
 *                     Messages must not be associated to another topic.
211
 *                     Once a message has been contributed, it cannot be removed later.
212
 *                     May be NULL (no messages to contribute)
213
 * @param[in] us