Revision fb72e91b src/urt_publisher.c
| src/urt_publisher.c | ||
|---|---|---|
| 60 | 60 |
urtDebugAssert(publisher); |
| 61 | 61 |
urtDebugAssert(topic); |
| 62 | 62 |
|
| 63 |
urt_publisher_t.topic = topic;
|
|
| 63 |
publisher->topic = topic;
|
|
| 64 | 64 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
| 65 |
urt_publisher_t.publishAttempts = 0;
|
|
| 66 |
urt_publisher_t.publishFails = 0;
|
|
| 65 |
publisher->publishAttempts = 0;
|
|
| 66 |
publisher->publishFails = 0;
|
|
| 67 | 67 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
| 68 | 68 |
if (messages != NULL) {
|
| 69 |
urtMutexLock(topic->lock); |
|
| 69 |
urtMutexLock(&topic->lock);
|
|
| 70 | 70 |
urt_message_t* lastMessage = messages; |
| 71 | 71 |
while (lastMessage->next != NULL) {
|
| 72 | 72 |
lastMessage = lastMessage->next; |
| 73 | 73 |
} |
| 74 |
//TODO: setnextpointer of last message to contribute to the message after the topic's latest message
|
|
| 74 |
lastMessage->next = topic->latestMessage->next;
|
|
| 75 | 75 |
topic->latestMessage->next = messages; |
| 76 |
urtMutexUnlock(&topic->lock); |
|
| 76 | 77 |
} |
| 77 | 78 |
|
| 78 | 79 |
return; |
| ... | ... | |
| 92 | 93 |
*/ |
| 93 | 94 |
urt_status_t urtPublisherPublish(urt_publisher_t* publisher, void* payload, size_t bytes, urt_osTime_t t, urt_delay_t timeout) |
| 94 | 95 |
{
|
| 95 |
urtMutexLock(publisher->topic->lock); |
|
| 96 |
urt_message_t* message = publisher->topic->next->latestMessage;
|
|
| 97 |
urt_osTime_t messageTime = message->originTime;
|
|
| 96 |
urtMutexLock(&publisher->topic->lock);
|
|
| 97 |
urt_message_t* latestMessage = publisher->topic->next->latestMessage;
|
|
| 98 |
urt_osTime_t messageTime = latestMessage->originTime;
|
|
| 98 | 99 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
| 99 | 100 |
publisher->publishAttempts++; |
| 100 | 101 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
| 101 | 102 |
|
| 102 |
while (message->numHrtConsumersLeft > 0) {
|
|
| 103 |
urt_osCondvarWaitStatus_t timeout = urtCondvarWait(publisher->topic->hrtReleased, publisher->topic->lock, URT_DELAY_INFINITE);
|
|
| 103 |
while (latestMessage->numHrtConsumersLeft > 0) {
|
|
| 104 |
//TODO: urt_osCondvarWaitStatus_t timeout = urtCondvarWait(&publisher->topic->hrtReleased, &publisher->topic->lock, URT_DELAY_INFINITE);
|
|
| 104 | 105 |
if (timeout == URT_CONDVAR_WAITSTATUS_TIMEOUT) {
|
| 105 | 106 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
| 106 | 107 |
publisher->publishFails++; |
| 107 | 108 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
| 108 |
urtMutexUnlock(publisher->topic->lock); |
|
| 109 |
urtMutexUnlock(&publisher->topic->lock);
|
|
| 109 | 110 |
return URT_STATUS_PUBLISH_TIMEOUT; |
| 110 | 111 |
} |
| 111 | 112 |
|
| 112 |
if (messageTime != message->originTime) {
|
|
| 113 |
messageTime = message->originTime;
|
|
| 114 |
message = message->next;
|
|
| 115 |
while (message->originTime < messageTime) {
|
|
| 116 |
message = message->next;
|
|
| 113 |
if (messageTime != latestMessage->originTime) {
|
|
| 114 |
messageTime = latestMessage->originTime;
|
|
| 115 |
latestMessage = latestMessage->next;
|
|
| 116 |
while (latestMessage->originTime < messageTime) {
|
|
| 117 |
latestMessage = latestMessage->next;
|
|
| 117 | 118 |
} |
| 118 | 119 |
} |
| 119 | 120 |
} |
| 120 | 121 |
|
| 121 | 122 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
| 122 |
if (message->numHrtConsumersLeft > 0) { //TODO: never true because of while?
|
|
| 123 |
if (latestMessage->numConsumersLeft > 0) {
|
|
| 123 | 124 |
publisher->topic->numMessagesDiscarded++; |
| 124 | 125 |
} |
| 125 | 126 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
| 126 | 127 |
|
| 127 |
publisher->topic->latestMessage = message; //TODO: Iterate topic's pointer to this message?
|
|
| 128 |
memcpy(message->payload, payload, bytes); //TODO: copy payload to message?
|
|
| 129 |
//TODO: Set origin time of message
|
|
| 130 |
publisher->topic->numHrtSubscribers = message->numHrtConsumersLeft;
|
|
| 128 |
publisher->topic->latestMessage = latestMessage;
|
|
| 129 |
memcpy(latestMessage->payload, payload, bytes);
|
|
| 130 |
latestMessage->originTime = t;
|
|
| 131 |
publisher->topic->numHrtSubscribers = latestMessage->numHrtConsumersLeft;
|
|
| 131 | 132 |
#if (URT_CFG_PUBSUB_PROFILING == true) |
| 132 |
publisher->topic->numSubscribers = message->numHrtConsumersLeft; //TODO: Set number of comsumers?
|
|
| 133 |
publisher->topic->numSubscribers = latestMessage->numConsumersLeft;
|
|
| 133 | 134 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
| 134 | 135 |
|
| 135 | 136 |
#if (URT_CFG_PUBSUB_QOS_RATECHECKS == true) |
| 136 |
publisher->topic->qosRateTimer = NULL; //TODO: Set QoS rate timer wrt. most critical HRT Subscriber |
|
| 137 |
publisher->topic->qosRateTimer = publisher->topic->hrtSubscribers; |
|
| 138 |
//TODO: Set QoS rate timer (@topic) wrt. most critical HRT Subscriber (here different types?) |
|
| 137 | 139 |
#endif /* URT_CFG_PUBSUB_QOS_RATECHECKS */ |
| 138 | 140 |
|
| 139 | 141 |
#if (URT_CFG_PUBSUB_QOS_DEADLINECHECKS == true) |
| 140 | 142 |
urt_hrtsubscriber_t* hrtSubscriber = publisher->topic->hrtSubscribers; |
| 141 | 143 |
while (hrtSubscriber != NULL) {
|
| 142 | 144 |
if (!urtTimerIsArmed(hrtSubscriber->qosDeadlineTimer)) {
|
| 143 |
*hrtSubscriber->qosDeadlineTimer = message->originTime;
|
|
| 145 |
*hrtSubscriber->qosDeadlineTimer = latestMessage->originTime;
|
|
| 144 | 146 |
} |
| 145 | 147 |
hrtSubscriber = hrtSubscriber->next; |
| 146 | 148 |
} |
| ... | ... | |
| 150 | 152 |
publisher->topic->numMessagesPublished++; |
| 151 | 153 |
#endif /* URT_CFG_PUBSUB_PROFILING */ |
| 152 | 154 |
|
| 153 |
//TODO: Fire event
|
|
| 154 |
urtMutexUnlock(publisher->topic->lock); |
|
| 155 |
urtEventSourceBroadcast(&publisher->topic->evtSource, URT_EVENTFLAG_PROCEED);
|
|
| 156 |
urtMutexUnlock(&publisher->topic->lock);
|
|
| 155 | 157 |
return URT_STATUS_OK; |
| 156 | 158 |
} |
Also available in: Unified diff