Revision fb72e91b src/urt_publisher.c
src/urt_publisher.c | ||
---|---|---|
60 | 60 |
urtDebugAssert(publisher); |
61 | 61 |
urtDebugAssert(topic); |
62 | 62 |
|
63 |
urt_publisher_t.topic = topic;
|
|
63 |
publisher->topic = topic;
|
|
64 | 64 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
65 |
urt_publisher_t.publishAttempts = 0;
|
|
66 |
urt_publisher_t.publishFails = 0;
|
|
65 |
publisher->publishAttempts = 0;
|
|
66 |
publisher->publishFails = 0;
|
|
67 | 67 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
68 | 68 |
if (messages != NULL) { |
69 |
urtMutexLock(topic->lock); |
|
69 |
urtMutexLock(&topic->lock);
|
|
70 | 70 |
urt_message_t* lastMessage = messages; |
71 | 71 |
while (lastMessage->next != NULL) { |
72 | 72 |
lastMessage = lastMessage->next; |
73 | 73 |
} |
74 |
//TODO: setnextpointer of last message to contribute to the message after the topic's latest message
|
|
74 |
lastMessage->next = topic->latestMessage->next;
|
|
75 | 75 |
topic->latestMessage->next = messages; |
76 |
urtMutexUnlock(&topic->lock); |
|
76 | 77 |
} |
77 | 78 |
|
78 | 79 |
return; |
... | ... | |
92 | 93 |
*/ |
93 | 94 |
urt_status_t urtPublisherPublish(urt_publisher_t* publisher, void* payload, size_t bytes, urt_osTime_t t, urt_delay_t timeout) |
94 | 95 |
{ |
95 |
urtMutexLock(publisher->topic->lock); |
|
96 |
urt_message_t* message = publisher->topic->next->latestMessage;
|
|
97 |
urt_osTime_t messageTime = message->originTime;
|
|
96 |
urtMutexLock(&publisher->topic->lock);
|
|
97 |
urt_message_t* latestMessage = publisher->topic->next->latestMessage;
|
|
98 |
urt_osTime_t messageTime = latestMessage->originTime;
|
|
98 | 99 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
99 | 100 |
publisher->publishAttempts++; |
100 | 101 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
101 | 102 |
|
102 |
while (message->numHrtConsumersLeft > 0) {
|
|
103 |
urt_osCondvarWaitStatus_t timeout = urtCondvarWait(publisher->topic->hrtReleased, publisher->topic->lock, URT_DELAY_INFINITE);
|
|
103 |
while (latestMessage->numHrtConsumersLeft > 0) {
|
|
104 |
//TODO: urt_osCondvarWaitStatus_t timeout = urtCondvarWait(&publisher->topic->hrtReleased, &publisher->topic->lock, URT_DELAY_INFINITE);
|
|
104 | 105 |
if (timeout == URT_CONDVAR_WAITSTATUS_TIMEOUT) { |
105 | 106 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
106 | 107 |
publisher->publishFails++; |
107 | 108 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
108 |
urtMutexUnlock(publisher->topic->lock); |
|
109 |
urtMutexUnlock(&publisher->topic->lock);
|
|
109 | 110 |
return URT_STATUS_PUBLISH_TIMEOUT; |
110 | 111 |
} |
111 | 112 |
|
112 |
if (messageTime != message->originTime) {
|
|
113 |
messageTime = message->originTime;
|
|
114 |
message = message->next;
|
|
115 |
while (message->originTime < messageTime) {
|
|
116 |
message = message->next;
|
|
113 |
if (messageTime != latestMessage->originTime) {
|
|
114 |
messageTime = latestMessage->originTime;
|
|
115 |
latestMessage = latestMessage->next;
|
|
116 |
while (latestMessage->originTime < messageTime) {
|
|
117 |
latestMessage = latestMessage->next;
|
|
117 | 118 |
} |
118 | 119 |
} |
119 | 120 |
} |
120 | 121 |
|
121 | 122 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
122 |
if (message->numHrtConsumersLeft > 0) { //TODO: never true because of while?
|
|
123 |
if (latestMessage->numConsumersLeft > 0) {
|
|
123 | 124 |
publisher->topic->numMessagesDiscarded++; |
124 | 125 |
} |
125 | 126 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
126 | 127 |
|
127 |
publisher->topic->latestMessage = message; //TODO: Iterate topic's pointer to this message?
|
|
128 |
memcpy(message->payload, payload, bytes); //TODO: copy payload to message?
|
|
129 |
//TODO: Set origin time of message
|
|
130 |
publisher->topic->numHrtSubscribers = message->numHrtConsumersLeft;
|
|
128 |
publisher->topic->latestMessage = latestMessage;
|
|
129 |
memcpy(latestMessage->payload, payload, bytes);
|
|
130 |
latestMessage->originTime = t;
|
|
131 |
publisher->topic->numHrtSubscribers = latestMessage->numHrtConsumersLeft;
|
|
131 | 132 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
132 |
publisher->topic->numSubscribers = message->numHrtConsumersLeft; //TODO: Set number of comsumers?
|
|
133 |
publisher->topic->numSubscribers = latestMessage->numConsumersLeft;
|
|
133 | 134 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
134 | 135 |
|
135 | 136 |
#if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
136 |
publisher->topic->qosRateTimer = NULL; //TODO: Set QoS rate timer wrt. most critical HRT Subscriber |
|
137 |
publisher->topic->qosRateTimer = publisher->topic->hrtSubscribers; |
|
138 |
//TODO: Set QoS rate timer (@topic) wrt. most critical HRT Subscriber (here different types?) |
|
137 | 139 |
#endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
138 | 140 |
|
139 | 141 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
140 | 142 |
urt_hrtsubscriber_t* hrtSubscriber = publisher->topic->hrtSubscribers; |
141 | 143 |
while (hrtSubscriber != NULL) { |
142 | 144 |
if (!urtTimerIsArmed(hrtSubscriber->qosDeadlineTimer)) { |
143 |
*hrtSubscriber->qosDeadlineTimer = message->originTime;
|
|
145 |
*hrtSubscriber->qosDeadlineTimer = latestMessage->originTime;
|
|
144 | 146 |
} |
145 | 147 |
hrtSubscriber = hrtSubscriber->next; |
146 | 148 |
} |
... | ... | |
150 | 152 |
publisher->topic->numMessagesPublished++; |
151 | 153 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
152 | 154 |
|
153 |
//TODO: Fire event
|
|
154 |
urtMutexUnlock(publisher->topic->lock); |
|
155 |
urtEventSourceBroadcast(&publisher->topic->evtSource, URT_EVENTFLAG_PROCEED);
|
|
156 |
urtMutexUnlock(&publisher->topic->lock);
|
|
155 | 157 |
return URT_STATUS_OK; |
156 | 158 |
} |
Also available in: Unified diff