mirror of
https://github.com/FreeRTOS/coreMQTT
synced 2025-10-24 19:38:01 +08:00
Fixes to timeout of sendMessageVector and refactor of sendBuffer for consistency (#224)
* Fixes to timeout of sendMessageVector and refactor of sendBuffer for consistency.
* Update size table.
* Fixing some small MISRA related issues
* Formatting fix
* Minor fixes for CBMC.
* Updated logical flow to break instead.
* Revert "Updated logical flow to break instead."
This reverts commit 0ac1c6a618.
* Updated unit tests for coverage.
* Fix MQTT_Publish Proof
* Fix proofs for connect/sub/unsub API functions
* New timing scheme.
* Update config defaults to reflect new timing change.
* Fix doxygen. Fix formatting. Fix memory table.
* Doxygen fixes.
* Fix CBMC proofs
* Added License identifier back.
* Swapped from warning to error for Visual Studio.
Co-authored-by: Jason Carroll <czjaso@amazon.com>
Co-authored-by: Soren Ptak <skptak@amazon.com>
Co-authored-by: Aniruddha Kanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
This commit is contained in:
@@ -114,7 +114,7 @@ which is defined by @ref MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT.
|
||||
For @ref mqtt_processloop_function and @ref mqtt_receiveloop_function, the timeout value represents the <i>minimum</i> duration that will be spent in the function, provided there are no network errors.
|
||||
Should the timeout be set to 0, then the loop will run for a single iteration. A single iteration of a loop consists of an attempt to receive a single byte from the network, and
|
||||
if the single byte receive was successful, then attempt(s) to receive the rest of the packet (with retry attempts governed by @ref MQTT_RECV_POLLING_TIMEOUT_MS), followed by sending acknowledgement response, if needed
|
||||
(with retry attempts governed by @ref MQTT_SEND_RETRY_TIMEOUT_MS), and then, finally deserialization of the packet received and a call to the application callback.
|
||||
(with retry attempts governed by @ref MQTT_SEND_TIMEOUT_MS), and then, finally deserialization of the packet received and a call to the application callback.
|
||||
If the first read did not succeed, then instead the library checks if a ping request needs to be sent (only for the process loop).
|
||||
|
||||
See the below diagrams for a representation of the above flows:
|
||||
@@ -154,8 +154,8 @@ Some configuration settings are C pre-processor constants, and some are function
|
||||
@section MQTT_RECV_POLLING_TIMEOUT_MS
|
||||
@copydoc MQTT_RECV_POLLING_TIMEOUT_MS
|
||||
|
||||
@section MQTT_SEND_RETRY_TIMEOUT_MS
|
||||
@copydoc MQTT_SEND_RETRY_TIMEOUT_MS
|
||||
@section MQTT_SEND_TIMEOUT_MS
|
||||
@copydoc MQTT_SEND_TIMEOUT_MS
|
||||
|
||||
@section MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
|
||||
@copydoc MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
|
||||
|
||||
@@ -9,7 +9,7 @@ The timeouts and the recommended configurations are listed below.
|
||||
2. [MQTT Keep Alive interval](@ref mqtt_timeouts_keep_alive)
|
||||
3. [MQTT Ping Response timeout](@ref mqtt_timeouts_ping_response)
|
||||
4. [MQTT Receive Polling timeout](@ref mqtt_timeouts_receive_polling)
|
||||
5. [MQTT Send Retry timeout](@ref mqtt_timeouts_send_retry)
|
||||
5. [MQTT Send timeout](@ref mqtt_timeouts_send)
|
||||
6. [Timeouts for MQTT_ProcessLoop and MQTT_ReceiveLoop APIs](@ref mqtt_timeouts_process_receive_loop)
|
||||
7. [Timeout for MQTT_Connect](@ref mqtt_timeouts_connect)
|
||||
|
||||
@@ -67,10 +67,8 @@ without any data received, we recommend using a value larger than the Transport
|
||||
|
||||
The MQTT Receive Polling timeout can be set by defining the configuration @ref MQTT_RECV_POLLING_TIMEOUT_MS.
|
||||
|
||||
@section mqtt_timeouts_send_retry MQTT Send Retry timeout
|
||||
MQTT Send Retry timeout is the maximum duration between non-empty network transmissions while sending an MQTT packet via the
|
||||
@ref mqtt_processloop_function or @ref mqtt_receiveloop_function API functions. This timeout represents the maximum duration
|
||||
that is allowed for no data transmission over the network through the Transport Send function.
|
||||
@section mqtt_timeouts_send MQTT Send timeout
|
||||
MQTT Send timeout is the maximum duration allowed to send an MQTT packet over the transport interface.
|
||||
|
||||
It is important to note that having this timeout too short will result in MQTT being disconnected due to the possibility
|
||||
of partial data being sent. If you have small TCP buffers and a high latency network, the optimum value for the timeout
|
||||
@@ -80,7 +78,7 @@ hitting a timeout of Transport Send before any data could be sent to transport l
|
||||
than the Transport Send timeout. If a dummy implementation of the @ref MQTTGetCurrentTimeFunc_t timer function,
|
||||
that always returns 0, is used, then this timeout must be set to 0.
|
||||
|
||||
The MQTT Send Retry timeout can be set by defining the configuration @ref MQTT_SEND_RETRY_TIMEOUT_MS.
|
||||
The MQTT Send timeout can be set by defining the configuration @ref MQTT_SEND_TIMEOUT_MS.
|
||||
|
||||
@section mqtt_timeouts_process_receive_loop Timeouts for MQTT_ProcessLoop and MQTT_ReceiveLoop APIs
|
||||
This timeout is passed as an argument to @ref mqtt_processloop_function or @ref mqtt_receiveloop_function API functions.
|
||||
|
||||
@@ -83,8 +83,8 @@
|
||||
* repeatedly to send bytes over the network until either:
|
||||
* 1. The requested number of bytes @a bytesToSend have been sent.
|
||||
* OR
|
||||
* 2. No byte cannot be sent over the network for the MQTT_SEND_RETRY_TIMEOUT_MS
|
||||
* duration.
|
||||
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
|
||||
* function.
|
||||
* OR
|
||||
* 3. There is an error in sending data over the network.
|
||||
*
|
||||
@@ -107,8 +107,8 @@ static int32_t sendBuffer( MQTTContext_t * pContext,
|
||||
* repeatedly to send bytes over the network until either:
|
||||
* 1. The requested number of bytes @a remainingLength have been sent.
|
||||
* OR
|
||||
* 2. No byte cannot be sent over the network for the MQTT_SEND_RETRY_TIMEOUT_MS
|
||||
* duration.
|
||||
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
|
||||
* function.
|
||||
* OR
|
||||
* 3. There is an error in sending data over the network.
|
||||
*
|
||||
@@ -122,7 +122,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
|
||||
/**
|
||||
* @brief Sends the vector array passed through the parameters over the network.
|
||||
*
|
||||
* @note The preference is given to 'write' function if it is present in the
|
||||
* @note The preference is given to 'writev' function if it is present in the
|
||||
* transport interface. Otherwise, a send call is made repeatedly to achieve the
|
||||
* result.
|
||||
*
|
||||
@@ -130,6 +130,15 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext,
|
||||
* @param[in] pIoVec The vector array to be sent.
|
||||
* @param[in] ioVecCount The number of elements in the array.
|
||||
*
|
||||
* @note This operation may call the transport send or writev functions
|
||||
* repeatedly to send bytes over the network until either:
|
||||
* 1. The requested number of bytes have been sent.
|
||||
* OR
|
||||
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
|
||||
* function.
|
||||
* OR
|
||||
* 3. There is an error in sending data over the network.
|
||||
*
|
||||
* @return The total number of bytes sent or the error code as received from the
|
||||
* transport interface.
|
||||
*/
|
||||
@@ -735,11 +744,12 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
|
||||
TransportOutVector_t * pIoVec,
|
||||
size_t ioVecCount )
|
||||
{
|
||||
uint32_t timeoutTime;
|
||||
uint32_t bytesToSend = 0U;
|
||||
int32_t bytesSentOrError = 0;
|
||||
int32_t sendResult;
|
||||
uint32_t timeoutMs;
|
||||
TransportOutVector_t * pIoVectIterator;
|
||||
size_t vectorsToBeSent = ioVecCount;
|
||||
size_t bytesToSend = 0U;
|
||||
int32_t bytesSentOrError = 0;
|
||||
|
||||
assert( pContext != NULL );
|
||||
assert( pIoVec != NULL );
|
||||
@@ -747,23 +757,20 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
|
||||
/* Send must always be defined */
|
||||
assert( pContext->transportInterface.send != NULL );
|
||||
|
||||
timeoutTime = pContext->getTime() + MQTT_SEND_RETRY_TIMEOUT_MS;
|
||||
|
||||
/* Count the total number of bytes to be sent as outlined in the vector. */
|
||||
for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
|
||||
{
|
||||
bytesToSend += ( uint32_t ) pIoVectIterator->iov_len;
|
||||
bytesToSend += pIoVectIterator->iov_len;
|
||||
}
|
||||
|
||||
/* Reset the iterator to point to the first entry in the array. */
|
||||
pIoVectIterator = pIoVec;
|
||||
|
||||
while( ( bytesSentOrError < ( int32_t ) bytesToSend ) &&
|
||||
( bytesSentOrError >= 0 ) )
|
||||
{
|
||||
int32_t sendResult;
|
||||
uint32_t bytesSentThisVector = 0U;
|
||||
/* Set the timeout. */
|
||||
timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS;
|
||||
|
||||
while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
|
||||
{
|
||||
if( pContext->transportInterface.writev != NULL )
|
||||
{
|
||||
sendResult = pContext->transportInterface.writev( pContext->transportInterface.pNetworkContext,
|
||||
@@ -772,48 +779,60 @@ static int32_t sendMessageVector( MQTTContext_t * pContext,
|
||||
}
|
||||
else
|
||||
{
|
||||
sendResult = sendBuffer( pContext,
|
||||
pIoVectIterator->iov_base,
|
||||
pIoVectIterator->iov_len );
|
||||
sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
|
||||
pIoVectIterator->iov_base,
|
||||
pIoVectIterator->iov_len );
|
||||
}
|
||||
|
||||
if( sendResult >= 0 )
|
||||
if( sendResult > 0 )
|
||||
{
|
||||
/* It is a bug in the application's transport send implementation if
|
||||
* more bytes than expected are sent. */
|
||||
assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
|
||||
|
||||
bytesSentOrError += sendResult;
|
||||
bytesSentThisVector += ( uint32_t ) sendResult;
|
||||
|
||||
/* Set last transmission time. */
|
||||
pContext->lastPacketTxTime = pContext->getTime();
|
||||
|
||||
LogDebug( ( "sendMessageVector: Bytes Sent=%ld, Bytes Remaining=%lu",
|
||||
( long int ) sendResult,
|
||||
( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
|
||||
}
|
||||
else if( sendResult < 0 )
|
||||
{
|
||||
bytesSentOrError = sendResult;
|
||||
LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
bytesSentOrError = sendResult;
|
||||
|
||||
/* We do not need to break here as the condition is checked in the loop.
|
||||
* The following statements will not execute as bytesSentThisVector is not
|
||||
* updated and is still 0. */
|
||||
/* MISRA Empty body */
|
||||
}
|
||||
|
||||
while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
|
||||
( bytesSentThisVector >= pIoVectIterator->iov_len ) )
|
||||
/* Check for timeout. */
|
||||
if( pContext->getTime() >= timeoutMs )
|
||||
{
|
||||
bytesSentThisVector -= ( uint32_t ) pIoVectIterator->iov_len;
|
||||
pIoVectIterator++;
|
||||
LogError( ( "sendMessageVector: Unable to send packet: Timed out." ) );
|
||||
break;
|
||||
}
|
||||
|
||||
/* Update the send pointer to the correct vector and offset. */
|
||||
while( ( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) &&
|
||||
( sendResult >= ( int32_t ) pIoVectIterator->iov_len ) )
|
||||
{
|
||||
sendResult -= ( int32_t ) pIoVectIterator->iov_len;
|
||||
pIoVectIterator++;
|
||||
/* Update the number of vector which are yet to be sent. */
|
||||
vectorsToBeSent--;
|
||||
}
|
||||
|
||||
/* Some of the bytes from this vector were sent as well, update the length
|
||||
* and the pointer to data in this vector. */
|
||||
if( ( bytesSentThisVector > 0U ) &&
|
||||
if( ( sendResult > 0 ) &&
|
||||
( pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ) ) )
|
||||
{
|
||||
pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ bytesSentThisVector ] );
|
||||
pIoVectIterator->iov_len -= bytesSentThisVector;
|
||||
}
|
||||
|
||||
/* Check for timeout. */
|
||||
if( pContext->getTime() > timeoutTime )
|
||||
{
|
||||
break;
|
||||
pIoVectIterator->iov_base = ( const void * ) &( ( ( const uint8_t * ) pIoVectIterator->iov_base )[ sendResult ] );
|
||||
pIoVectIterator->iov_len -= ( size_t ) sendResult;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -824,74 +843,60 @@ static int32_t sendBuffer( MQTTContext_t * pContext,
|
||||
const uint8_t * pBufferToSend,
|
||||
size_t bytesToSend )
|
||||
{
|
||||
int32_t sendResult;
|
||||
uint32_t timeoutMs;
|
||||
int32_t bytesSentOrError = 0;
|
||||
const uint8_t * pIndex = pBufferToSend;
|
||||
size_t bytesRemaining;
|
||||
int32_t totalBytesSent = 0, bytesSent;
|
||||
uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U;
|
||||
bool sendError = false;
|
||||
|
||||
assert( pContext != NULL );
|
||||
assert( pContext->getTime != NULL );
|
||||
assert( pContext->transportInterface.send != NULL );
|
||||
assert( pIndex != NULL );
|
||||
|
||||
bytesRemaining = bytesToSend;
|
||||
/* Set the timeout. */
|
||||
timeoutMs = pContext->getTime() + MQTT_SEND_TIMEOUT_MS;
|
||||
|
||||
/* Loop until the entire packet is sent. */
|
||||
while( ( bytesRemaining > 0UL ) && ( sendError == false ) )
|
||||
while( ( bytesSentOrError < ( int32_t ) bytesToSend ) && ( bytesSentOrError >= 0 ) )
|
||||
{
|
||||
bytesSent = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
|
||||
pIndex,
|
||||
bytesRemaining );
|
||||
sendResult = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
|
||||
pIndex,
|
||||
bytesToSend - ( size_t ) bytesSentOrError );
|
||||
|
||||
if( bytesSent < 0 )
|
||||
if( sendResult > 0 )
|
||||
{
|
||||
LogError( ( "Transport send failed. Error code=%ld.", ( long int ) bytesSent ) );
|
||||
totalBytesSent = bytesSent;
|
||||
sendError = true;
|
||||
}
|
||||
else if( bytesSent > 0 )
|
||||
{
|
||||
/* Record the most recent time of successful transmission. */
|
||||
lastSendTimeMs = pContext->getTime();
|
||||
|
||||
/* It is a bug in the application's transport send implementation if
|
||||
* more bytes than expected are sent. To avoid a possible overflow
|
||||
* in converting bytesRemaining from unsigned to signed, this assert
|
||||
* must exist after the check for bytesSent being negative. */
|
||||
assert( ( size_t ) bytesSent <= bytesRemaining );
|
||||
* more bytes than expected are sent. */
|
||||
assert( sendResult <= ( ( int32_t ) bytesToSend - bytesSentOrError ) );
|
||||
|
||||
bytesRemaining -= ( size_t ) bytesSent;
|
||||
totalBytesSent += bytesSent;
|
||||
/* Increment the index. */
|
||||
pIndex = &pIndex[ bytesSent ];
|
||||
LogDebug( ( "BytesSent=%ld, BytesRemaining=%lu",
|
||||
( long int ) bytesSent,
|
||||
( unsigned long ) bytesRemaining ) );
|
||||
bytesSentOrError += sendResult;
|
||||
pIndex = &pIndex[ sendResult ];
|
||||
|
||||
/* Set last transmission time. */
|
||||
pContext->lastPacketTxTime = pContext->getTime();
|
||||
|
||||
LogDebug( ( "sendBuffer: Bytes Sent=%ld, Bytes Remaining=%lu",
|
||||
( long int ) sendResult,
|
||||
( unsigned long ) ( bytesToSend - ( size_t ) bytesSentOrError ) ) );
|
||||
}
|
||||
else if( sendResult < 0 )
|
||||
{
|
||||
bytesSentOrError = sendResult;
|
||||
LogError( ( "sendBuffer: Unable to send packet: Network Error." ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No bytes were sent over the network. */
|
||||
timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs );
|
||||
/* MISRA Empty body */
|
||||
}
|
||||
|
||||
/* Check for timeout if we have been waiting to send any data over the network. */
|
||||
if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS )
|
||||
{
|
||||
LogError( ( "Unable to send packet: Timed out in transport send." ) );
|
||||
sendError = true;
|
||||
}
|
||||
/* Check for timeout. */
|
||||
if( pContext->getTime() >= timeoutMs )
|
||||
{
|
||||
LogError( ( "sendBuffer: Unable to send packet: Timed out." ) );
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Update time of last transmission if the entire packet is successfully sent. */
|
||||
if( totalBytesSent > 0 )
|
||||
{
|
||||
pContext->lastPacketTxTime = lastSendTimeMs;
|
||||
LogDebug( ( "Successfully sent packet at time %lu.",
|
||||
( unsigned long ) lastSendTimeMs ) );
|
||||
}
|
||||
|
||||
return totalBytesSent;
|
||||
return bytesSentOrError;
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------*/
|
||||
@@ -1243,7 +1248,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
|
||||
{
|
||||
MQTTStatus_t status = MQTTSuccess;
|
||||
MQTTPublishState_t newState = MQTTStateNull;
|
||||
int32_t bytesSent = 0;
|
||||
int32_t sendResult = 0;
|
||||
uint8_t packetTypeByte = 0U;
|
||||
MQTTPubAckType_t packetType;
|
||||
MQTTFixedBuffer_t localBuffer;
|
||||
@@ -1270,14 +1275,14 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
|
||||
|
||||
/* Here, we are not using the vector approach for efficiency. There is just one buffer
|
||||
* to be sent which can be achieved with a normal send call. */
|
||||
bytesSent = sendBuffer( pContext,
|
||||
localBuffer.pBuffer,
|
||||
MQTT_PUBLISH_ACK_PACKET_SIZE );
|
||||
sendResult = sendBuffer( pContext,
|
||||
localBuffer.pBuffer,
|
||||
MQTT_PUBLISH_ACK_PACKET_SIZE );
|
||||
|
||||
MQTT_POST_SEND_HOOK( pContext );
|
||||
}
|
||||
|
||||
if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
|
||||
if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
|
||||
{
|
||||
pContext->controlPacketSent = true;
|
||||
|
||||
@@ -1301,7 +1306,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
|
||||
{
|
||||
LogError( ( "Failed to send ACK packet: PacketType=%02x, SentBytes=%ld, "
|
||||
"PacketSize=%lu.",
|
||||
( unsigned int ) packetTypeByte, ( long int ) bytesSent,
|
||||
( unsigned int ) packetTypeByte, ( long int ) sendResult,
|
||||
MQTT_PUBLISH_ACK_PACKET_SIZE ) );
|
||||
status = MQTTSendFailed;
|
||||
}
|
||||
@@ -2830,7 +2835,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
|
||||
|
||||
MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
|
||||
{
|
||||
int32_t bytesSent = 0;
|
||||
int32_t sendResult = 0;
|
||||
MQTTStatus_t status = MQTTSuccess;
|
||||
size_t packetSize = 0U;
|
||||
/* MQTT ping packets are of fixed length. */
|
||||
@@ -2878,15 +2883,15 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
|
||||
* Here, we do not use the vectored IO approach for efficiency as the
|
||||
* Ping packet does not have numerous fields which need to be copied
|
||||
* from the user provided buffers. Thus it can be sent directly. */
|
||||
bytesSent = sendBuffer( pContext,
|
||||
localBuffer.pBuffer,
|
||||
2U );
|
||||
sendResult = sendBuffer( pContext,
|
||||
localBuffer.pBuffer,
|
||||
2U );
|
||||
|
||||
/* Give the mutex away. */
|
||||
MQTT_POST_SEND_HOOK( pContext );
|
||||
|
||||
/* It is an error to not send the entire PINGREQ packet. */
|
||||
if( bytesSent < ( int32_t ) packetSize )
|
||||
if( sendResult < ( int32_t ) packetSize )
|
||||
{
|
||||
LogError( ( "Transport send failed for PINGREQ packet." ) );
|
||||
status = MQTTSendFailed;
|
||||
@@ -2896,7 +2901,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
|
||||
pContext->pingReqSendTimeMs = pContext->lastPacketTxTime;
|
||||
pContext->waitingForPingResp = true;
|
||||
LogDebug( ( "Sent %ld bytes of PINGREQ packet.",
|
||||
( long int ) bytesSent ) );
|
||||
( long int ) sendResult ) );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2953,7 +2958,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
|
||||
MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
|
||||
{
|
||||
size_t packetSize = 0U;
|
||||
int32_t bytesSent = 0;
|
||||
int32_t sendResult = 0;
|
||||
MQTTStatus_t status = MQTTSuccess;
|
||||
MQTTFixedBuffer_t localBuffer;
|
||||
uint8_t disconnectPacket[ 2U ];
|
||||
@@ -2990,14 +2995,14 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
|
||||
/* Here we do not use vectors as the disconnect packet has fixed fields
|
||||
* which do not reside in user provided buffers. Thus, it can be sent
|
||||
* using a simple send call. */
|
||||
bytesSent = sendBuffer( pContext,
|
||||
localBuffer.pBuffer,
|
||||
packetSize );
|
||||
sendResult = sendBuffer( pContext,
|
||||
localBuffer.pBuffer,
|
||||
packetSize );
|
||||
|
||||
/* Give the mutex away. */
|
||||
MQTT_POST_SEND_HOOK( pContext );
|
||||
|
||||
if( bytesSent < ( int32_t ) packetSize )
|
||||
if( sendResult < ( int32_t ) packetSize )
|
||||
{
|
||||
LogError( ( "Transport send failed for DISCONNECT packet." ) );
|
||||
status = MQTTSendFailed;
|
||||
@@ -3005,7 +3010,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
|
||||
else
|
||||
{
|
||||
LogDebug( ( "Sent %ld bytes of DISCONNECT packet.",
|
||||
( long int ) bytesSent ) );
|
||||
( long int ) sendResult ) );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -258,7 +258,7 @@ typedef struct MQTTDeserializedInfo
|
||||
* there is no time implementation, it is the responsibility of the application
|
||||
* to provide a dummy function to always return 0, provide 0 timeouts for
|
||||
* all calls to #MQTT_Connect, #MQTT_ProcessLoop, and #MQTT_ReceiveLoop and configure
|
||||
* the #MQTT_RECV_POLLING_TIMEOUT_MS and #MQTT_SEND_RETRY_TIMEOUT_MS configurations
|
||||
* the #MQTT_RECV_POLLING_TIMEOUT_MS and #MQTT_SEND_TIMEOUT_MS configurations
|
||||
* to be 0. This will result in loop functions running for a single iteration, and
|
||||
* #MQTT_Connect relying on #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT to receive the CONNACK packet.
|
||||
*
|
||||
@@ -420,7 +420,7 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
|
||||
*
|
||||
* @note If a dummy #MQTTGetCurrentTimeFunc_t was passed to #MQTT_Init, then a
|
||||
* timeout value of 0 MUST be passed to the API, and the #MQTT_RECV_POLLING_TIMEOUT_MS
|
||||
* and #MQTT_SEND_RETRY_TIMEOUT_MS timeout configurations MUST be set to 0.
|
||||
* and #MQTT_SEND_TIMEOUT_MS timeout configurations MUST be set to 0.
|
||||
*
|
||||
* @param[in] pContext Initialized MQTT context.
|
||||
* @param[in] pConnectInfo MQTT CONNECT packet information.
|
||||
@@ -736,7 +736,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext );
|
||||
*
|
||||
* @note Calling this function blocks the calling context for a time period that
|
||||
* depends on the passed the configuration macros, #MQTT_RECV_POLLING_TIMEOUT_MS
|
||||
* and #MQTT_SEND_RETRY_TIMEOUT_MS, and the underlying transport interface implementation
|
||||
* and #MQTT_SEND_TIMEOUT_MS, and the underlying transport interface implementation
|
||||
* timeouts, unless an error occurs. The blocking period also depends on the execution time of the
|
||||
* #MQTTEventCallback_t callback supplied to the library. It is recommended that the supplied
|
||||
* #MQTTEventCallback_t callback does not contain blocking operations to prevent potential
|
||||
@@ -787,14 +787,14 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext );
|
||||
* keep alive.
|
||||
*
|
||||
* @note If a dummy #MQTTGetCurrentTimeFunc_t was passed to #MQTT_Init, then the
|
||||
* #MQTT_RECV_POLLING_TIMEOUT_MS and #MQTT_SEND_RETRY_TIMEOUT_MS timeout configurations
|
||||
* #MQTT_RECV_POLLING_TIMEOUT_MS and #MQTT_SEND_TIMEOUT_MS timeout configurations
|
||||
* MUST be set to 0.
|
||||
*
|
||||
* @param[in] pContext Initialized and connected MQTT context.
|
||||
*
|
||||
* @note Calling this function blocks the calling context for a time period that
|
||||
* depends on the the configuration macros, #MQTT_RECV_POLLING_TIMEOUT_MS and
|
||||
* #MQTT_SEND_RETRY_TIMEOUT_MS, and the underlying transport interface implementation
|
||||
* #MQTT_SEND_TIMEOUT_MS, and the underlying transport interface implementation
|
||||
* timeouts, unless an error occurs. The blocking period also depends on the execution time of the
|
||||
* #MQTTEventCallback_t callback supplied to the library. It is recommended that the supplied
|
||||
* #MQTTEventCallback_t callback does not contain blocking operations to prevent potential
|
||||
|
||||
@@ -169,28 +169,30 @@
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief The maximum duration between non-empty network transmissions while
|
||||
* sending an MQTT packet via the #MQTT_ProcessLoop or #MQTT_ReceiveLoop
|
||||
* API functions.
|
||||
* @brief The maximum duration allowed to send an MQTT packet over the transport
|
||||
* interface.
|
||||
*
|
||||
* When sending an MQTT packet, the transport send function may be called multiple
|
||||
* times until all of the required number of bytes are sent.
|
||||
* This timeout represents the maximum duration that is allowed for no data
|
||||
* transmission over the network through the transport send function.
|
||||
* When sending an MQTT packet, the transport send or writev functions may be
|
||||
* called multiple times until all of the required number of bytes are sent.
|
||||
* This timeout represents the maximum duration that is allowed to send the MQTT
|
||||
* packet while calling the transport send or writev functions.
|
||||
*
|
||||
* If the timeout expires, the #MQTT_ProcessLoop and #MQTT_ReceiveLoop functions
|
||||
* return #MQTTSendFailed.
|
||||
* If the timeout expires, #MQTTSendFailed will be returned by the public API
|
||||
* functions.
|
||||
*
|
||||
* @note If a dummy implementation of the #MQTTGetCurrentTimeFunc_t timer function,
|
||||
* is supplied to the library, then #MQTT_SEND_RETRY_TIMEOUT_MS MUST be set to 0.
|
||||
* is supplied to the library, then #MQTT_SEND_TIMEOUT_MS MUST be set to 0.
|
||||
*
|
||||
* <b>Possible values:</b> Any positive 32 bit integer. Recommended to use a small
|
||||
* timeout value. <br>
|
||||
* <b>Default value:</b> `10`
|
||||
* <b>Possible values:</b> Any positive 32 bit integer. <br>
|
||||
* <b>Default value:</b> `20000`
|
||||
*
|
||||
*/
|
||||
#ifndef MQTT_SEND_RETRY_TIMEOUT_MS
|
||||
#define MQTT_SEND_RETRY_TIMEOUT_MS ( 10U )
|
||||
#ifndef MQTT_SEND_TIMEOUT_MS
|
||||
#define MQTT_SEND_TIMEOUT_MS ( 20000U )
|
||||
#endif
|
||||
|
||||
#ifdef MQTT_SEND_RETRY_TIMEOUT_MS
|
||||
#error MQTT_SEND_RETRY_TIMEOUT_MS is deprecated. Instead use MQTT_SEND_TIMEOUT_MS.
|
||||
#endif
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
|
||||
@@ -29,6 +29,31 @@
|
||||
#include "core_mqtt.h"
|
||||
#include "mqtt_cbmc_state.h"
|
||||
|
||||
/**
|
||||
* @brief Implement a get time function to return timeout after certain
|
||||
* iterations have been made in the code. This ensures that we do not hit
|
||||
* unwinding error in CBMC. In real life scenarios, the send function will
|
||||
* not just keep accepting 1 byte at a time for a long time since it just
|
||||
* gets added to the TCP buffer.
|
||||
*
|
||||
* @return The global system time.
|
||||
*/
|
||||
static uint32_t ulGetTimeFunction( void )
|
||||
{
|
||||
static uint32_t systemTime = 0;
|
||||
|
||||
if( systemTime >= MAX_NETWORK_SEND_TRIES )
|
||||
{
|
||||
systemTime = systemTime + MQTT_SEND_TIMEOUT_MS + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
systemTime = systemTime + 1;
|
||||
}
|
||||
|
||||
return systemTime;
|
||||
}
|
||||
|
||||
void harness()
|
||||
{
|
||||
MQTTContext_t * pContext;
|
||||
@@ -43,6 +68,8 @@ void harness()
|
||||
__CPROVER_assume( pContext != NULL );
|
||||
__CPROVER_assume( pContext->networkBuffer.pBuffer != NULL );
|
||||
|
||||
pContext->getTime = ulGetTimeFunction;
|
||||
|
||||
pConnectInfo = allocateMqttConnectInfo( NULL );
|
||||
__CPROVER_assume( isValidMqttConnectInfo( pConnectInfo ) );
|
||||
|
||||
|
||||
@@ -29,6 +29,31 @@
|
||||
#include "core_mqtt.h"
|
||||
#include "mqtt_cbmc_state.h"
|
||||
|
||||
/**
|
||||
* @brief Implement a get time function to return timeout after certain
|
||||
* iterations have been made in the code. This ensures that we do not hit
|
||||
* unwinding error in CBMC. In real life scenarios, the send function will
|
||||
* not just keep accepting 1 byte at a time for a long time since it just
|
||||
* gets added to the TCP buffer.
|
||||
*
|
||||
* @return The global system time.
|
||||
*/
|
||||
static uint32_t ulGetTimeFunction( void )
|
||||
{
|
||||
static uint32_t systemTime = 0;
|
||||
|
||||
if( systemTime >= MAX_NETWORK_SEND_TRIES )
|
||||
{
|
||||
systemTime = systemTime + MQTT_SEND_TIMEOUT_MS + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
systemTime = systemTime + 1;
|
||||
}
|
||||
|
||||
return systemTime;
|
||||
}
|
||||
|
||||
void harness()
|
||||
{
|
||||
MQTTContext_t * pContext;
|
||||
@@ -38,6 +63,11 @@ void harness()
|
||||
pContext = allocateMqttContext( NULL );
|
||||
__CPROVER_assume( isValidMqttContext( pContext ) );
|
||||
|
||||
if( pContext != NULL )
|
||||
{
|
||||
pContext->getTime = ulGetTimeFunction;
|
||||
}
|
||||
|
||||
pPublishInfo = allocateMqttPublishInfo( NULL );
|
||||
__CPROVER_assume( isValidMqttPublishInfo( pPublishInfo ) );
|
||||
|
||||
|
||||
@@ -36,12 +36,6 @@ INCLUDES +=
|
||||
|
||||
REMOVE_FUNCTION_BODY +=
|
||||
REMOVE_FUNCTION_BODY +=
|
||||
# Unlike recvExact, sendBuffer is not bounded by the timeout. The loop in
|
||||
# sendBuffer will continue until all the bytes are sent or a network error
|
||||
# occurs. Please see NetworkInterfaceReceiveStub in
|
||||
# libraries\standard\mqtt\cbmc\stubs\network_interface_stubs.c for more
|
||||
# information.
|
||||
UNWINDSET += __CPROVER_file_local_core_mqtt_c_sendBuffer.0:$(MAX_NETWORK_SEND_TRIES)
|
||||
# These loops will run for the maximum number of publishes pending acknowledgement.
|
||||
# This is set in test/cbmc/include/core_mqtt_config.h.
|
||||
UNWINDSET += __CPROVER_file_local_core_mqtt_state_c_addRecord.0:$(MQTT_STATE_ARRAY_MAX_COUNT)
|
||||
|
||||
@@ -29,6 +29,31 @@
|
||||
#include "core_mqtt.h"
|
||||
#include "mqtt_cbmc_state.h"
|
||||
|
||||
/**
|
||||
* @brief Implement a get time function to return timeout after certain
|
||||
* iterations have been made in the code. This ensures that we do not hit
|
||||
* unwinding error in CBMC. In real life scenarios, the send function will
|
||||
* not just keep accepting 1 byte at a time for a long time since it just
|
||||
* gets added to the TCP buffer.
|
||||
*
|
||||
* @return The global system time.
|
||||
*/
|
||||
static uint32_t ulGetTimeFunction( void )
|
||||
{
|
||||
static uint32_t systemTime = 0;
|
||||
|
||||
if( systemTime >= MAX_NETWORK_SEND_TRIES )
|
||||
{
|
||||
systemTime = systemTime + MQTT_SEND_TIMEOUT_MS + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
systemTime = systemTime + 1;
|
||||
}
|
||||
|
||||
return systemTime;
|
||||
}
|
||||
|
||||
void harness()
|
||||
{
|
||||
MQTTContext_t * pContext;
|
||||
@@ -39,6 +64,11 @@ void harness()
|
||||
pContext = allocateMqttContext( NULL );
|
||||
__CPROVER_assume( isValidMqttContext( pContext ) );
|
||||
|
||||
if( pContext != NULL )
|
||||
{
|
||||
pContext->getTime = ulGetTimeFunction;
|
||||
}
|
||||
|
||||
/* Please see the default bound description on SUBSCRIPTION_COUNT_MAX in
|
||||
* mqtt_cbmc_state.c for more information. */
|
||||
__CPROVER_assume( subscriptionCount < SUBSCRIPTION_COUNT_MAX );
|
||||
|
||||
@@ -29,6 +29,31 @@
|
||||
#include "core_mqtt.h"
|
||||
#include "mqtt_cbmc_state.h"
|
||||
|
||||
/**
|
||||
* @brief Implement a get time function to return timeout after certain
|
||||
* iterations have been made in the code. This ensures that we do not hit
|
||||
* unwinding error in CBMC. In real life scenarios, the send function will
|
||||
* not just keep accepting 1 byte at a time for a long time since it just
|
||||
* gets added to the TCP buffer.
|
||||
*
|
||||
* @return The global system time.
|
||||
*/
|
||||
static uint32_t ulGetTimeFunction( void )
|
||||
{
|
||||
static uint32_t systemTime = 0;
|
||||
|
||||
if( systemTime >= MAX_NETWORK_SEND_TRIES )
|
||||
{
|
||||
systemTime = systemTime + MQTT_SEND_TIMEOUT_MS + 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
systemTime = systemTime + 1;
|
||||
}
|
||||
|
||||
return systemTime;
|
||||
}
|
||||
|
||||
void harness()
|
||||
{
|
||||
MQTTContext_t * pContext;
|
||||
@@ -39,6 +64,11 @@ void harness()
|
||||
pContext = allocateMqttContext( NULL );
|
||||
__CPROVER_assume( isValidMqttContext( pContext ) );
|
||||
|
||||
if( pContext != NULL )
|
||||
{
|
||||
pContext->getTime = ulGetTimeFunction;
|
||||
}
|
||||
|
||||
/* Please see the default bound description on SUBSCRIPTION_COUNT_MAX in
|
||||
* mqtt_cbmc_state.c for more information. */
|
||||
__CPROVER_assume( subscriptionCount < SUBSCRIPTION_COUNT_MAX );
|
||||
|
||||
@@ -4701,6 +4701,9 @@ void test_MQTT_Ping_error_path( void )
|
||||
/* Case when there is timeout in sending data through transport send. */
|
||||
transport.recv = transportRecvSuccess;
|
||||
transport.send = transportSendNoBytes; /* Use the mock function that returns zero bytes sent. */
|
||||
/* Initialize context. */
|
||||
mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
|
||||
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
|
||||
MQTT_GetPingreqPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess );
|
||||
MQTT_GetPingreqPacketSize_ReturnThruPtr_pPacketSize( &pingreqSize );
|
||||
MQTT_SerializePingreq_ExpectAnyArgsAndReturn( MQTTSuccess );
|
||||
|
||||
Reference in New Issue
Block a user