Statistics
| Branch: | Revision:

urtware / src / urt_subscriber.c @ fb72e91b

History | View | Annotate | Download (43.174 KB)

1 1fb06240 skenneweg
/*
2
µRtWare is a lightweight publish/subscribe middleware for real-time
3
applications. It was developed as part of the software habitat for the
4
Autonomous Mini Robot [1] (AMiRo) but can be used for other purposes as well.
5

6
Copyright (C) 2018..2020  Thomas Schöpping et al.
7

8
This program is free software: you can redistribute it and/or modify
9
it under the terms of the GNU General Public License as published by
10
the Free Software Foundation, either version 3 of the License, or
11
(at your option) any later version.
12

13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
GNU General Public License for more details.
17

18
You should have received a copy of the GNU General Public License
19
along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
*/
21
22 7d9678db skenneweg
#include <urtware.h>
23 982056f7 Svenja
#include <stdio.h>
24 7d9678db skenneweg
25 1fb06240 skenneweg
/******************************************************************************/
26
/* LOCAL DEFINITIONS                                                          */
27
/******************************************************************************/
28
29
/******************************************************************************/
30
/* EXPORTED VARIABLES                                                         */
31
/******************************************************************************/
32
33
/******************************************************************************/
34
/* LOCAL TYPES                                                                */
35
/******************************************************************************/
36
37
/******************************************************************************/
38
/* LOCAL VARIABLES                                                            */
39
/******************************************************************************/
40
41
/******************************************************************************/
42
/* LOCAL FUNCTIONS                                                            */
43
/******************************************************************************/
44
45 65dc89cb skenneweg
urt_message_t* urtFindOldestMessage(urt_message_t* oldestMessage)
46
{
47 fb72e91b skenneweg
  while (oldestMessage->next->originTime < oldestMessage->originTime) {
48 65dc89cb skenneweg
    oldestMessage = oldestMessage->next;
49
  }
50
  return oldestMessage;
51
}
52
53
urt_message_t* urtFindLatestMessage(urt_message_t* latestMessage)
54
{
55 fb72e91b skenneweg
  urt_message_t* lastMessage = latestMessage;
56
  while (lastMessage->next->originTime < lastMessage->originTime) {
57 65dc89cb skenneweg
    lastMessage = lastMessage->next;
58
#if (URT_CFG_PUBSUB_PROFILING == true)
59
    subscriber->base.lastMessage->numConsumersLeft--;
60
    subscriber->base->numMessagesReceived++;
61
#endif /* URT_CFG_PUBSUB_PROFILING */
62
  }
63 fb72e91b skenneweg
  return latestMessage;
64 65dc89cb skenneweg
}
65
66 fb72e91b skenneweg
void urtContributeMessages(urt_message_t* messages, urt_topic_t* topic)
67 982056f7 Svenja
{
68
  urt_message_t* lastMessageContribute = messages;
69 fb72e91b skenneweg
  while (lastMessageContribute->next) {
70 982056f7 Svenja
    lastMessageContribute = lastMessageContribute->next;
71
  }
72
  lastMessageContribute->next = topic->latestMessage->next;
73
  topic->latestMessage->next = messages;
74
}
75
76 1fb06240 skenneweg
/******************************************************************************/
77
/* EXPORTED FUNCTIONS                                                         */
78
/******************************************************************************/
79
80 7d9678db skenneweg
/**
81
 * @brief   Initialize the nrt Subscriber.
82
 *
83 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber to initialize. Must not be NULL.
84 7d9678db skenneweg
 */
85 5c6cb22f skenneweg
void urtNrtSubscriberInit (urt_nrtsubscriber_t* subscriber)
86
{
87 a5e142de skenneweg
  urtDebugAssert(subscriber);
88
89 5c6cb22f skenneweg
  subscriber->base.topic = NULL;
90
  urtEventListenerInit(subscriber->base.evtListener);
91
  subscriber->base.lastMessage = NULL;
92
  subscriber->base.lastMessageTime = 0;
93 a5e142de skenneweg
#if (URT_CFG_PUBSUB_PROFILING == true)
94 5c6cb22f skenneweg
    subscriber->minLatency = URT_DELAY_INFINITE;
95
    subscriber->maxLatency = URT_DELAY_IMMEDIATE;
96 a5e142de skenneweg
#endif /* URT_CFG_PUBSUB_PROFILING */
97 5c6cb22f skenneweg
  return;
98
}
99 1fb06240 skenneweg
100 7d9678db skenneweg
/**
101 5198dfae skenneweg
 * @brief  Subscribes the subscriber to a topic.
102 7d9678db skenneweg
 *
103 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber which shall subscribe to a topic. Must not be NULL.
104
 * @param[in] topic  The topic to subscribe to. Must not be NULL.
105
 * @param[in] messages  NULL terminated list of messages to contribute to the topic.
106
 *                      Messages must not be associated to another topic.
107
 *                      Once a message has been contributed, it cannot be removed later.
108
 *                      May be NULL(no messages to contribute).
109 7d9678db skenneweg
 *
110 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
111
 *          Returns URT_STATUS_SUBSCRIBE_TOPICSET if the subscriber is already associated to a topic.
112 7d9678db skenneweg
 */
113 a5e142de skenneweg
urt_status_t urtNrtSubscriberSubscribe (urt_nrtsubscriber_t* subscriber, urt_topic_t* topic, urt_message_t* messages)
114
{
115
  urtDebugAssert(subscriber);
116
  urtDebugAssert(topic);
117
118 982056f7 Svenja
  if (!subscriber->base.topic) {
119
    return URT_STATUS_SUBSCRIBE_TOPICSET;
120
  }
121 a5e142de skenneweg
122
  subscriber->base.topic = topic;
123 fb72e91b skenneweg
  urtMutexLock(&topic->lock);
124 a5e142de skenneweg
125 982056f7 Svenja
  if (messages) {
126 fb72e91b skenneweg
    urtContributeMessages(messages, topic);
127 a5e142de skenneweg
  }
128
129
  subscriber->base.lastMessage = topic->latestMessage;
130
  subscriber->base.lastMessageTime = topic->latestMessage->originTime;
131
132 fb72e91b skenneweg
  urtEventRegister(&topic->evtSource, &subscriber->base.evtListener, subscriber->base.evtListener.events, subscriber->base.evtListener.flags);
133 a5e142de skenneweg
134
#if (URT_CFG_PUBSUB_PROFILING == true)
135
    topic->numHrtSubscribers--;
136
#endif /* URT_CFG_PUBSUB_PROFILING */
137
138 fb72e91b skenneweg
  urtMutexUnlock(&topic->lock);
139 7d9678db skenneweg
  return URT_STATUS_OK;
140 1fb06240 skenneweg
}
141
142 7d9678db skenneweg
/**
143 5198dfae skenneweg
 * @brief  Fetches the next message.
144 7d9678db skenneweg
 *
145 5198dfae skenneweg
 * @param[in] subscriber  The NRT subscriber that shall fetch the message. Must not be NULL.
146
 * @param[in] payload  Pointer where to copy the payload to. May be NULL for messages without payload.
147
 * @param[in] bytes  Payload size in bytes.
148
 * @param[in] latency  The latency can be returned by reference. May be NULL.
149 7d9678db skenneweg
 *
150 5198dfae skenneweg
 * @return  Returns URT_STATUS_OK on success.
151
 *          Returns URT_STATUS_FETCH_NOTOPIC if the subscriber is not associated to a topic.
152
 *          Retruns URT_STATUS_FETCH_NOMESSAGE if there is no new message to fetch.
153 7d9678db skenneweg
 */
154 982056f7 Svenja
urt_status_t urtNrtSubscriberFetchNextMessage (urt_nrtsubscriber_t* subscriber, void* const payload, size_t bytes, urt_delay_t* latency)
155 65dc89cb skenneweg
{   
156 a5e142de skenneweg
  urtDebugAssert(subscriber);
157 65dc89cb skenneweg
158 982056f7 Svenja
  if (!subscriber->base.topic) {
159
    return URT_STATUS_FETCH_NOTOPIC;
160
  }
161 65dc89cb skenneweg
162 fb72e91b skenneweg
  urtMutexLock(&subscriber->base.topic->lock);
163 65dc89cb skenneweg
164 982056f7 Svenja
  urt_message_t* oldestMessage = subscriber->base.lastMessage;
165
  if(oldestMessage->originTime == subscriber->base.lastMessageTime) {
166
    if (subscriber->base.lastMessage->next->originTime > subscriber->base.lastMessageTime) {
167 fb72e91b skenneweg
      urtMutexUnlock(&subscriber->base.topic->lock);
168 65dc89cb skenneweg
      return URT_STATUS_FETCH_NOMESSAGE;
169
    }
170 982056f7 Svenja
    oldestMessage = oldestMessage->next;
171 65dc89cb skenneweg
  }
172 982056f7 Svenja
  else {
173
    oldestMessage = urtFindOldestMessage(oldestMessage->next);
174 65dc89cb skenneweg
  }
175
176 fb72e91b skenneweg
  subscriber->base.lastMessage = oldestMessage;
177
  subscriber->base.lastMessageTime = oldestMessage->originTime;
178
  memcpy(oldestMessage->payload, payload, bytes);
179 982056f7 Svenja
180
  if (URT_CFG_PUBSUB_PROFILING == true || latency) {
181
    uint64_t calculatedLatency = urtTimeNow() - oldestMessage->originTime;
182 65dc89cb skenneweg
183
#if(URT_CFG_PUBSUB_PROFILING == true)
184
  subscriber->base.sumLatencies += calculatedLatency;
185
186 982056f7 Svenja
  if (calculatedLatency < subscriber->minLatency) {
187 65dc89cb skenneweg
    subscriber->minLatency = calculatedLatency;
188
  }
189 982056f7 Svenja
  else if (calculatedLatency > subscriber->maxLatency) {
190 65dc89cb skenneweg
    subscriber->maxLatency = calculatedLatency;
191
  }
192
#endif /* URT_CFG_PUBSUB_PROFILING */
193 982056f7 Svenja
194
    if (latency) {
195 fb72e91b skenneweg
      *latency = calculatedLatency;
196 982056f7 Svenja
    }
197 65dc89cb skenneweg
  }
198
199
#if (URT_CFG_PUBSUB_PROFILING == true)
200
  subscriber->base.lastMessage->numConsumersLeft--;
201
  subscriber->base->numMessagesReceived++;
202
#endif /* URT_CFG_PUBSUB_PROFILING */
203
204 fb72e91b skenneweg
  urtMutexUnlock(&subscriber->base.topic->lock);
205 7d9678db skenneweg
  return URT_STATUS_OK;
206 1fb06240 skenneweg
}
207
208