1
0
mirror of https://github.com/FreeRTOS/coreMQTT synced 2025-06-05 11:25:56 +08:00

Clear old records for clean sessions (#1148)

* Clear state records when a CONNACK's session present flag is false

Co-authored-by: Gary Wicker <14828980+gkwicker@users.noreply.github.com>
This commit is contained in:
Muneeb Ahmed 2020-08-24 13:33:45 -07:00 committed by GitHub
parent ff63b3342e
commit 0884ff0f4d
2 changed files with 42 additions and 15 deletions

View File

@ -277,14 +277,17 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
bool * pSessionPresent );
/**
* @brief Resends pending acks for a re-established MQTT session.
* @brief Resends pending acks for a re-established MQTT session, or
* clears existing state records for a clean session.
*
* @param[in] pContext Initialized MQTT context.
* @param[in] sessionPresent Session present flag received from the MQTT broker.
*
* @return #MQTTSendFailed if transport send failed;
* @return #MQTTSendFailed if transport send during resend failed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext );
static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
bool sessionPresent );
/**
* @brief Serializes a PUBLISH message.
@ -1560,7 +1563,8 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
/*-----------------------------------------------------------*/
static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext )
static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
bool sessionPresent )
{
MQTTStatus_t status = MQTTSuccess;
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
@ -1569,16 +1573,29 @@ static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext )
assert( pContext != NULL );
/* Get the next packet Id for which a PUBREL need to be resent. */
packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
/* Resend all the PUBREL acks after session is reestablished. */
while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) )
if( sessionPresent == true )
{
status = sendPublishAcks( pContext, packetId, state );
/* Get the next packet ID for which a PUBREL need to be resent. */
packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
/* Resend all the PUBREL acks after session is reestablished. */
while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) )
{
status = sendPublishAcks( pContext, packetId, state );
packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
}
}
else
{
/* Clear any existing records if a new session is established. */
( void ) memset( pContext->outgoingPublishRecords,
0x00,
sizeof( pContext->outgoingPublishRecords ) );
( void ) memset( pContext->incomingPublishRecords,
0x00,
sizeof( pContext->incomingPublishRecords ) );
}
return status;
@ -1791,10 +1808,10 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
pSessionPresent );
}
/* Resend all the PUBREL when reestablishing a session. */
if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
if( status == MQTTSuccess )
{
status = resendPendingAcks( pContext );
/* Resend PUBRELs when reestablishing a session, or clear records for new sessions. */
status = handleSessionResumption( pContext, *pSessionPresent );
}
if( status == MQTTSuccess )

View File

@ -994,6 +994,7 @@ void test_MQTT_Connect_happy_path()
TransportInterface_t transport;
MQTTFixedBuffer_t networkBuffer;
MQTTPacketInfo_t incomingPacket;
MQTTPubAckInfo_t cleanRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
setupTransportInterface( &transport );
setupNetworkBuffer( &networkBuffer );
@ -1033,6 +1034,12 @@ void test_MQTT_Connect_happy_path()
mqttContext.keepAliveIntervalSec = 0;
connectInfo.cleanSession = true;
sessionPresentExpected = false;
/* Populate some state records to make sure they are cleared since a clean session
* will be established. */
mqttContext.outgoingPublishRecords[ 0 ].packetId = 1;
mqttContext.outgoingPublishRecords[ 0 ].qos = MQTTQoS2;
mqttContext.outgoingPublishRecords[ 0 ].publishState = MQTTPublishSend;
mqttContext.incomingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT - 1 ].packetId = 1;
MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket );
MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess );
@ -1042,6 +1049,9 @@ void test_MQTT_Connect_happy_path()
TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus );
TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec );
TEST_ASSERT_FALSE( sessionPresent );
/* Test old records were cleared. */
TEST_ASSERT_EQUAL_MEMORY( cleanRecords, mqttContext.outgoingPublishRecords, sizeof( cleanRecords ) );
TEST_ASSERT_EQUAL_MEMORY( cleanRecords, mqttContext.incomingPublishRecords, sizeof( cleanRecords ) );
/* Request to establish a session if present and session present is received
* from broker. */