1
0
mirror of https://github.com/FreeRTOS/coreMQTT synced 2025-06-05 03:15:54 +08:00
coreMQTT/source/core_mqtt_state.c
Archit Gupta b9dfc361f2 Combine config default headers
The loggging defaults were split out since they are not namespaced to
coreMQTT, and they previously leaked to all files including a coreMQTT
header. Splitting them allowed the logging defaults to only be pulled
into coreMQTT source files. Now that no header files use the config
headers, and thus all coreMQTT config only affects coreMQTT source
files, the split is no longer needed.
2024-02-01 14:44:28 -08:00

1213 lines
40 KiB
C

/*
* coreMQTT v2.1.0
* Copyright (C) 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* SPDX-License-Identifier: MIT
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file core_mqtt_state.c
* @brief Implements the functions in core_mqtt_state.h.
*/
#include <assert.h>
#include <string.h>
#include "core_mqtt_state.h"
/* Include config defaults header to get default values of configs. */
#include "core_mqtt_config_defaults.h"
/*-----------------------------------------------------------*/
/**
* @brief A global static variable used to generate the macro
* #MQTT_INVALID_STATE_COUNT of size_t length.
*/
static const size_t ZERO_SIZE_T = 0U;
/**
* @brief This macro depicts the invalid value for the state publishes.
*/
#define MQTT_INVALID_STATE_COUNT ( ~ZERO_SIZE_T )
/**
* @brief Create a 16-bit bitmap with bit set at specified position.
*
* @param[in] position The position at which the bit need to be set.
*/
#define UINT16_BITMAP_BIT_SET_AT( position ) ( ( uint16_t ) 0x01U << ( ( uint16_t ) position ) )
/**
* @brief Set a bit in an 16-bit unsigned integer.
*
* @param[in] x The 16-bit unsigned integer to set a bit.
* @param[in] position The position at which the bit need to be set.
*/
#define UINT16_SET_BIT( x, position ) ( ( x ) = ( uint16_t ) ( ( x ) | ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) )
/**
* @brief Macro for checking if a bit is set in a 16-bit unsigned integer.
*
* @param[in] x The unsigned 16-bit integer to check.
* @param[in] position Which bit to check.
*/
#define UINT16_CHECK_BIT( x, position ) ( ( ( x ) & ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) == ( UINT16_BITMAP_BIT_SET_AT( position ) ) )
/*-----------------------------------------------------------*/
/**
* @brief Test if a transition to new state is possible, when dealing with PUBLISHes.
*
* @param[in] currentState The current state.
* @param[in] newState State to transition to.
* @param[in] opType Reserve, Send, or Receive.
* @param[in] qos 0, 1, or 2.
*
* @note This function does not validate the current state, or the new state
* based on either the operation type or QoS. It assumes the new state is valid
* given the opType and QoS, which will be the case if calculated by
* MQTT_CalculateStatePublish().
*
* @return `true` if transition is possible, else `false`
*/
static bool validateTransitionPublish( MQTTPublishState_t currentState,
MQTTPublishState_t newState,
MQTTStateOperation_t opType,
MQTTQoS_t qos );
/**
* @brief Test if a transition to a new state is possible, when dealing with acks.
*
* @param[in] currentState The current state.
* @param[in] newState State to transition to.
*
* @return `true` if transition is possible, else `false`.
*/
static bool validateTransitionAck( MQTTPublishState_t currentState,
MQTTPublishState_t newState );
/**
* @brief Test if the publish corresponding to an ack is outgoing or incoming.
*
* @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP.
* @param[in] opType Send, or Receive.
*
* @return `true` if corresponds to outgoing publish, else `false`.
*/
static bool isPublishOutgoing( MQTTPubAckType_t packetType,
MQTTStateOperation_t opType );
/**
* @brief Find a packet ID in the state record.
*
* @param[in] records State record array.
* @param[in] recordCount Length of record array.
* @param[in] packetId packet ID to search for.
* @param[out] pQos QoS retrieved from record.
* @param[out] pCurrentState state retrieved from record.
*
* @return index of the packet id in the record if it exists, else the record length.
*/
static size_t findInRecord( const MQTTPubAckInfo_t * records,
size_t recordCount,
uint16_t packetId,
MQTTQoS_t * pQos,
MQTTPublishState_t * pCurrentState );
/**
* @brief Compact records.
*
* Records are arranged in the relative order to maintain message ordering.
* This will lead to fragmentation and this function will help in defragmenting
* the records array.
*
* @param[in] records State record array.
* @param[in] recordCount Length of record array.
*/
static void compactRecords( MQTTPubAckInfo_t * records,
size_t recordCount );
/**
* @brief Store a new entry in the state record.
*
* @param[in] records State record array.
* @param[in] recordCount Length of record array.
* @param[in] packetId Packet ID of new entry.
* @param[in] qos QoS of new entry.
* @param[in] publishState State of new entry.
*
* @return #MQTTSuccess, #MQTTNoMemory, or #MQTTStateCollision.
*/
static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
size_t recordCount,
uint16_t packetId,
MQTTQoS_t qos,
MQTTPublishState_t publishState );
/**
* @brief Update and possibly delete an entry in the state record.
*
* @param[in] records State record array.
* @param[in] recordIndex index of record to update.
* @param[in] newState New state to update.
* @param[in] shouldDelete Whether an existing entry should be deleted.
*/
static void updateRecord( MQTTPubAckInfo_t * records,
size_t recordIndex,
MQTTPublishState_t newState,
bool shouldDelete );
/**
* @brief Get the packet ID and index of an outgoing publish in specified
* states.
*
* @param[in] pMqttContext Initialized MQTT context.
* @param[in] searchStates The states to search for in 2-byte bit map.
* @param[in,out] pCursor Index at which to start searching.
*
* @return Packet ID of the outgoing publish.
*/
static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
uint16_t searchStates,
MQTTStateCursor_t * pCursor );
/**
* @brief Update the state records for an ACK after state transition
* validations.
*
* @param[in] records State records pointer.
* @param[in] maxRecordCount The maximum number of records.
* @param[in] recordIndex Index at which the record is stored.
* @param[in] packetId Packet id of the packet.
* @param[in] currentState Current state of the publish record.
* @param[in] newState New state of the publish.
*
* @return #MQTTIllegalState, or #MQTTSuccess.
*/
static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
size_t maxRecordCount,
size_t recordIndex,
uint16_t packetId,
MQTTPublishState_t currentState,
MQTTPublishState_t newState );
/**
* @brief Update the state record for a PUBLISH packet after validating
* the state transitions.
*
* @param[in] pMqttContext Initialized MQTT context.
* @param[in] recordIndex Index in state records at which publish record exists.
* @param[in] packetId ID of the PUBLISH packet.
* @param[in] opType Send or Receive.
* @param[in] qos 0, 1, or 2.
* @param[in] currentState Current state of the publish record.
* @param[in] newState New state of the publish record.
*
* @return #MQTTIllegalState, #MQTTStateCollision or #MQTTSuccess.
*/
static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext,
size_t recordIndex,
uint16_t packetId,
MQTTStateOperation_t opType,
MQTTQoS_t qos,
MQTTPublishState_t currentState,
MQTTPublishState_t newState );
/*-----------------------------------------------------------*/
static bool validateTransitionPublish( MQTTPublishState_t currentState,
MQTTPublishState_t newState,
MQTTStateOperation_t opType,
MQTTQoS_t qos )
{
bool isValid = false;
switch( currentState )
{
case MQTTStateNull:
/* Transitions from null occur when storing a new entry into the record. */
if( opType == MQTT_RECEIVE )
{
isValid = ( newState == MQTTPubAckSend ) || ( newState == MQTTPubRecSend );
}
break;
case MQTTPublishSend:
/* Outgoing publish. All such publishes start in this state due to
* the reserve operation. */
switch( qos )
{
case MQTTQoS1:
isValid = newState == MQTTPubAckPending;
break;
case MQTTQoS2:
isValid = newState == MQTTPubRecPending;
break;
case MQTTQoS0:
default:
/* QoS 0 is checked before calling this function. */
break;
}
break;
/* Below cases are for validating the resends of publish when a session is
* reestablished. */
case MQTTPubAckPending:
/* When a session is reestablished, outgoing QoS1 publishes in state
* #MQTTPubAckPending can be resent. The state remains the same. */
isValid = newState == MQTTPubAckPending;
break;
case MQTTPubRecPending:
/* When a session is reestablished, outgoing QoS2 publishes in state
* #MQTTPubRecPending can be resent. The state remains the same. */
isValid = newState == MQTTPubRecPending;
break;
case MQTTPubAckSend:
case MQTTPubCompPending:
case MQTTPubCompSend:
case MQTTPubRecSend:
case MQTTPubRelPending:
case MQTTPubRelSend:
case MQTTPublishDone:
default:
/* For a PUBLISH, we should not start from any other state. */
break;
}
return isValid;
}
/*-----------------------------------------------------------*/
static bool validateTransitionAck( MQTTPublishState_t currentState,
MQTTPublishState_t newState )
{
bool isValid = false;
switch( currentState )
{
case MQTTPubAckSend:
/* Incoming publish, QoS 1. */
case MQTTPubAckPending:
/* Outgoing publish, QoS 1. */
isValid = newState == MQTTPublishDone;
break;
case MQTTPubRecSend:
/* Incoming publish, QoS 2. */
isValid = newState == MQTTPubRelPending;
break;
case MQTTPubRelPending:
/* Incoming publish, QoS 2.
* There are 2 valid transitions possible.
* 1. MQTTPubRelPending -> MQTTPubCompSend : A PUBREL ack is received
* when publish record state is MQTTPubRelPending. This is the
* normal state transition without any connection interruptions.
* 2. MQTTPubRelPending -> MQTTPubRelPending : Receiving a duplicate
* QoS2 publish can result in a transition to the same state.
* This can happen in the below state transition.
* 1. Incoming publish received.
* 2. PUBREC ack sent and state is now MQTTPubRelPending.
* 3. TCP connection failure and broker didn't receive the PUBREC.
* 4. Reestablished MQTT session.
* 5. MQTT broker resent the un-acked publish.
* 6. Publish is received when publish record state is in
* MQTTPubRelPending.
* 7. Sending out a PUBREC will result in this transition
* to the same state. */
isValid = ( newState == MQTTPubCompSend ) ||
( newState == MQTTPubRelPending );
break;
case MQTTPubCompSend:
/* Incoming publish, QoS 2.
* There are 2 valid transitions possible.
* 1. MQTTPubCompSend -> MQTTPublishDone : A PUBCOMP ack is sent
* after receiving a PUBREL from broker. This is the
* normal state transition without any connection interruptions.
* 2. MQTTPubCompSend -> MQTTPubCompSend : Receiving a duplicate PUBREL
* can result in a transition to the same state.
* This can happen in the below state transition.
* 1. A TCP connection failure happened before sending a PUBCOMP
* for an incoming PUBREL.
* 2. Reestablished an MQTT session.
* 3. MQTT broker resent the un-acked PUBREL.
* 4. Receiving the PUBREL again will result in this transition
* to the same state. */
isValid = ( newState == MQTTPublishDone ) ||
( newState == MQTTPubCompSend );
break;
case MQTTPubRecPending:
/* Outgoing publish, Qos 2. */
isValid = newState == MQTTPubRelSend;
break;
case MQTTPubRelSend:
/* Outgoing publish, Qos 2. */
isValid = newState == MQTTPubCompPending;
break;
case MQTTPubCompPending:
/* Outgoing publish, Qos 2.
* There are 2 valid transitions possible.
* 1. MQTTPubCompPending -> MQTTPublishDone : A PUBCOMP is received.
* This marks the complete state transition for the publish packet.
* This is the normal state transition without any connection
* interruptions.
* 2. MQTTPubCompPending -> MQTTPubCompPending : Resending a PUBREL for
* packets in state #MQTTPubCompPending can result in this
* transition to the same state.
* This can happen in the below state transition.
* 1. A TCP connection failure happened before receiving a PUBCOMP
* for an outgoing PUBREL.
* 2. An MQTT session is reestablished.
* 3. Resending the un-acked PUBREL results in this transition
* to the same state. */
isValid = ( newState == MQTTPublishDone ) ||
( newState == MQTTPubCompPending );
break;
case MQTTPublishDone:
/* Done state should transition to invalid since it will be removed from the record. */
case MQTTPublishSend:
/* If an ack was sent/received we shouldn't have been in this state. */
case MQTTStateNull:
/* If an ack was sent/received the record should exist. */
default:
/* Invalid. */
break;
}
return isValid;
}
/*-----------------------------------------------------------*/
static bool isPublishOutgoing( MQTTPubAckType_t packetType,
MQTTStateOperation_t opType )
{
bool isOutgoing = false;
switch( packetType )
{
case MQTTPuback:
case MQTTPubrec:
case MQTTPubcomp:
isOutgoing = opType == MQTT_RECEIVE;
break;
case MQTTPubrel:
isOutgoing = opType == MQTT_SEND;
break;
default:
/* No other ack type. */
break;
}
return isOutgoing;
}
/*-----------------------------------------------------------*/
static size_t findInRecord( const MQTTPubAckInfo_t * records,
size_t recordCount,
uint16_t packetId,
MQTTQoS_t * pQos,
MQTTPublishState_t * pCurrentState )
{
size_t index = 0;
assert( packetId != MQTT_PACKET_ID_INVALID );
*pCurrentState = MQTTStateNull;
for( index = 0; index < recordCount; index++ )
{
if( records[ index ].packetId == packetId )
{
*pQos = records[ index ].qos;
*pCurrentState = records[ index ].publishState;
break;
}
}
if( index == recordCount )
{
index = MQTT_INVALID_STATE_COUNT;
}
return index;
}
/*-----------------------------------------------------------*/
static void compactRecords( MQTTPubAckInfo_t * records,
size_t recordCount )
{
size_t index = 0;
size_t emptyIndex = MQTT_INVALID_STATE_COUNT;
assert( records != NULL );
/* Find the empty spots and fill those with non empty values. */
for( ; index < recordCount; index++ )
{
/* Find the first empty spot. */
if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
{
if( emptyIndex == MQTT_INVALID_STATE_COUNT )
{
emptyIndex = index;
}
}
else
{
if( emptyIndex != MQTT_INVALID_STATE_COUNT )
{
/* Copy over the contents at non empty index to empty index. */
records[ emptyIndex ].packetId = records[ index ].packetId;
records[ emptyIndex ].qos = records[ index ].qos;
records[ emptyIndex ].publishState = records[ index ].publishState;
/* Mark the record at current non empty index as invalid. */
records[ index ].packetId = MQTT_PACKET_ID_INVALID;
records[ index ].qos = MQTTQoS0;
records[ index ].publishState = MQTTStateNull;
/* Advance the emptyIndex. */
emptyIndex++;
}
}
}
}
/*-----------------------------------------------------------*/
static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
size_t recordCount,
uint16_t packetId,
MQTTQoS_t qos,
MQTTPublishState_t publishState )
{
MQTTStatus_t status = MQTTNoMemory;
int32_t index = 0;
size_t availableIndex = recordCount;
bool validEntryFound = false;
assert( packetId != MQTT_PACKET_ID_INVALID );
assert( qos != MQTTQoS0 );
/* Check if we have to compact the records. This is known by checking if
* the last spot in the array is filled. */
if( records[ recordCount - 1U ].packetId != MQTT_PACKET_ID_INVALID )
{
compactRecords( records, recordCount );
}
/* Start from end so first available index will be populated.
* Available index is always found after the last element in the records.
* This is to make sure the relative order of the records in order to meet
* the message ordering requirement of MQTT spec 3.1.1. */
for( index = ( ( int32_t ) recordCount - 1 ); index >= 0; index-- )
{
/* Available index is only found after packet at the highest index. */
if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
{
if( validEntryFound == false )
{
availableIndex = ( size_t ) index;
}
}
else
{
/* A non-empty spot found in the records. */
validEntryFound = true;
if( records[ index ].packetId == packetId )
{
/* Collision. */
LogError( ( "Collision when adding PacketID=%u at index=%d.",
( unsigned int ) packetId,
( int ) index ) );
status = MQTTStateCollision;
availableIndex = recordCount;
break;
}
}
}
if( availableIndex < recordCount )
{
records[ availableIndex ].packetId = packetId;
records[ availableIndex ].qos = qos;
records[ availableIndex ].publishState = publishState;
status = MQTTSuccess;
}
return status;
}
/*-----------------------------------------------------------*/
static void updateRecord( MQTTPubAckInfo_t * records,
size_t recordIndex,
MQTTPublishState_t newState,
bool shouldDelete )
{
assert( records != NULL );
if( shouldDelete == true )
{
/* Mark the record as invalid. */
records[ recordIndex ].packetId = MQTT_PACKET_ID_INVALID;
records[ recordIndex ].qos = MQTTQoS0;
records[ recordIndex ].publishState = MQTTStateNull;
}
else
{
records[ recordIndex ].publishState = newState;
}
}
/*-----------------------------------------------------------*/
static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
uint16_t searchStates,
MQTTStateCursor_t * pCursor )
{
uint16_t packetId = MQTT_PACKET_ID_INVALID;
uint16_t outgoingStates = 0U;
const MQTTPubAckInfo_t * records = NULL;
size_t maxCount;
bool stateCheck = false;
assert( pMqttContext != NULL );
assert( searchStates != 0U );
assert( pCursor != NULL );
/* Create a bit map with all the outgoing publish states. */
UINT16_SET_BIT( outgoingStates, MQTTPublishSend );
UINT16_SET_BIT( outgoingStates, MQTTPubAckPending );
UINT16_SET_BIT( outgoingStates, MQTTPubRecPending );
UINT16_SET_BIT( outgoingStates, MQTTPubRelSend );
UINT16_SET_BIT( outgoingStates, MQTTPubCompPending );
/* Only outgoing publish records need to be searched. */
assert( ( outgoingStates & searchStates ) > 0U );
assert( ( ~outgoingStates & searchStates ) == 0U );
records = pMqttContext->outgoingPublishRecords;
maxCount = pMqttContext->outgoingPublishRecordMaxCount;
while( *pCursor < maxCount )
{
/* Check if any of the search states are present. */
stateCheck = UINT16_CHECK_BIT( searchStates, records[ *pCursor ].publishState );
if( stateCheck == true )
{
packetId = records[ *pCursor ].packetId;
( *pCursor )++;
break;
}
( *pCursor )++;
}
return packetId;
}
/*-----------------------------------------------------------*/
MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType,
MQTTStateOperation_t opType,
MQTTQoS_t qos )
{
MQTTPublishState_t calculatedState = MQTTStateNull;
/* There are more QoS2 cases than QoS1, so initialize to that. */
bool qosValid = qos == MQTTQoS2;
switch( packetType )
{
case MQTTPuback:
qosValid = qos == MQTTQoS1;
calculatedState = MQTTPublishDone;
break;
case MQTTPubrec:
/* Incoming publish: send PUBREC, PUBREL pending.
* Outgoing publish: receive PUBREC, send PUBREL. */
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRelPending : MQTTPubRelSend;
break;
case MQTTPubrel:
/* Incoming publish: receive PUBREL, send PUBCOMP.
* Outgoing publish: send PUBREL, PUBCOMP pending. */
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubCompPending : MQTTPubCompSend;
break;
case MQTTPubcomp:
calculatedState = MQTTPublishDone;
break;
default:
/* No other ack type. */
break;
}
/* Sanity check, make sure ack and QoS agree. */
if( qosValid == false )
{
calculatedState = MQTTStateNull;
}
return calculatedState;
}
/*-----------------------------------------------------------*/
static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
size_t maxRecordCount,
size_t recordIndex,
uint16_t packetId,
MQTTPublishState_t currentState,
MQTTPublishState_t newState )
{
MQTTStatus_t status = MQTTIllegalState;
bool shouldDeleteRecord = false;
bool isTransitionValid = false;
assert( records != NULL );
/* Record to be deleted if the state transition is completed or if a PUBREC
* is received for an outgoing QoS2 publish. When a PUBREC is received,
* record is deleted and added back to the end of the records to maintain
* ordering for PUBRELs. */
shouldDeleteRecord = ( newState == MQTTPublishDone ) || ( newState == MQTTPubRelSend );
isTransitionValid = validateTransitionAck( currentState, newState );
if( isTransitionValid == true )
{
status = MQTTSuccess;
/* Update record for acks. When sending or receiving acks for packets that
* are resent during a session reestablishment, the new state and
* current state can be the same. No update of record required in that case. */
if( currentState != newState )
{
updateRecord( records,
recordIndex,
newState,
shouldDeleteRecord );
/* For QoS2 messages, in order to preserve the message ordering, when
* a PUBREC is received for an outgoing publish, the record should be
* moved to the last. This move will help preserve the order in which
* a PUBREL needs to be resent in case of a session reestablishment. */
if( newState == MQTTPubRelSend )
{
status = addRecord( records,
maxRecordCount,
packetId,
MQTTQoS2,
MQTTPubRelSend );
}
}
}
else
{
/* Invalid state transition. */
LogError( ( "Invalid transition from state %s to state %s.",
MQTT_State_strerror( currentState ),
MQTT_State_strerror( newState ) ) );
}
return status;
}
/*-----------------------------------------------------------*/
static MQTTStatus_t updateStatePublish( const MQTTContext_t * pMqttContext,
size_t recordIndex,
uint16_t packetId,
MQTTStateOperation_t opType,
MQTTQoS_t qos,
MQTTPublishState_t currentState,
MQTTPublishState_t newState )
{
MQTTStatus_t status = MQTTSuccess;
bool isTransitionValid = false;
assert( pMqttContext != NULL );
assert( packetId != MQTT_PACKET_ID_INVALID );
assert( qos != MQTTQoS0 );
/* This will always succeed for an incoming publish. This is due to the fact
* that the passed in currentState must be MQTTStateNull, since
* #MQTT_UpdateStatePublish does not perform a lookup for receives. */
isTransitionValid = validateTransitionPublish( currentState, newState, opType, qos );
if( isTransitionValid == true )
{
/* addRecord will check for collisions. */
if( opType == MQTT_RECEIVE )
{
status = addRecord( pMqttContext->incomingPublishRecords,
pMqttContext->incomingPublishRecordMaxCount,
packetId,
qos,
newState );
}
/* Send operation. */
else
{
/* Skip updating record when publish is resend and no state
* update is required. */
if( currentState != newState )
{
updateRecord( pMqttContext->outgoingPublishRecords,
recordIndex,
newState,
false );
}
}
}
else
{
status = MQTTIllegalState;
LogError( ( "Invalid transition from state %s to state %s.",
MQTT_State_strerror( currentState ),
MQTT_State_strerror( newState ) ) );
}
return status;
}
/*-----------------------------------------------------------*/
MQTTStatus_t MQTT_ReserveState( const MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t qos )
{
MQTTStatus_t status = MQTTSuccess;
if( qos == MQTTQoS0 )
{
status = MQTTSuccess;
}
else if( ( packetId == MQTT_PACKET_ID_INVALID ) || ( pMqttContext == NULL ) )
{
status = MQTTBadParameter;
}
else
{
/* Collisions are detected when adding the record. */
status = addRecord( pMqttContext->outgoingPublishRecords,
pMqttContext->outgoingPublishRecordMaxCount,
packetId,
qos,
MQTTPublishSend );
}
return status;
}
/*-----------------------------------------------------------*/
MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType,
MQTTQoS_t qos )
{
MQTTPublishState_t calculatedState = MQTTStateNull;
switch( qos )
{
case MQTTQoS0:
calculatedState = MQTTPublishDone;
break;
case MQTTQoS1:
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubAckPending : MQTTPubAckSend;
break;
case MQTTQoS2:
calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRecPending : MQTTPubRecSend;
break;
default:
/* No other QoS values. */
break;
}
return calculatedState;
}
/*-----------------------------------------------------------*/
MQTTStatus_t MQTT_UpdateStatePublish( const MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTStateOperation_t opType,
MQTTQoS_t qos,
MQTTPublishState_t * pNewState )
{
MQTTPublishState_t newState = MQTTStateNull;
MQTTPublishState_t currentState = MQTTStateNull;
MQTTStatus_t mqttStatus = MQTTSuccess;
size_t recordIndex = MQTT_INVALID_STATE_COUNT;
MQTTQoS_t foundQoS = MQTTQoS0;
if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
{
LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p",
( void * ) pMqttContext,
( void * ) pNewState ) );
mqttStatus = MQTTBadParameter;
}
else if( qos == MQTTQoS0 )
{
/* QoS 0 publish. Do nothing. */
*pNewState = MQTTPublishDone;
}
else if( packetId == MQTT_PACKET_ID_INVALID )
{
/* Publishes > QoS 0 need a valid packet ID. */
mqttStatus = MQTTBadParameter;
}
else if( opType == MQTT_SEND )
{
/* Search record for entry so we can check QoS. */
recordIndex = findInRecord( pMqttContext->outgoingPublishRecords,
pMqttContext->outgoingPublishRecordMaxCount,
packetId,
&foundQoS,
&currentState );
if( ( recordIndex == MQTT_INVALID_STATE_COUNT ) || ( foundQoS != qos ) )
{
/* Entry should match with supplied QoS. */
mqttStatus = MQTTBadParameter;
}
}
else
{
/* QoS 1 or 2 receive. Nothing to be done. */
}
if( ( qos != MQTTQoS0 ) && ( mqttStatus == MQTTSuccess ) )
{
newState = MQTT_CalculateStatePublish( opType, qos );
/* Validate state transition and update state records. */
mqttStatus = updateStatePublish( pMqttContext,
recordIndex,
packetId,
opType,
qos,
currentState,
newState );
/* Update output parameter on success. */
if( mqttStatus == MQTTSuccess )
{
*pNewState = newState;
}
}
return mqttStatus;
}
/*-----------------------------------------------------------*/
MQTTStatus_t MQTT_RemoveStateRecord( const MQTTContext_t * pMqttContext,
uint16_t packetId )
{
MQTTStatus_t status = MQTTSuccess;
MQTTPubAckInfo_t * records;
size_t recordIndex;
/* Current state is updated by the findInRecord function. */
MQTTPublishState_t currentState;
MQTTQoS_t qos = MQTTQoS0;
if( ( pMqttContext == NULL ) || ( ( pMqttContext->outgoingPublishRecords == NULL ) ) )
{
status = MQTTBadParameter;
}
else
{
records = pMqttContext->outgoingPublishRecords;
recordIndex = findInRecord( records,
pMqttContext->outgoingPublishRecordMaxCount,
packetId,
&qos,
&currentState );
if( currentState == MQTTStateNull )
{
status = MQTTBadParameter;
}
else if( ( qos != MQTTQoS1 ) && ( qos != MQTTQoS2 ) )
{
status = MQTTBadParameter;
}
else
{
/* Delete the record. */
updateRecord( records,
recordIndex,
MQTTStateNull,
true );
}
}
return status;
}
/*-----------------------------------------------------------*/
MQTTStatus_t MQTT_UpdateStateAck( const MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTPubAckType_t packetType,
MQTTStateOperation_t opType,
MQTTPublishState_t * pNewState )
{
MQTTPublishState_t newState = MQTTStateNull;
MQTTPublishState_t currentState = MQTTStateNull;
bool isOutgoingPublish = isPublishOutgoing( packetType, opType );
MQTTQoS_t qos = MQTTQoS0;
size_t maxRecordCount = MQTT_INVALID_STATE_COUNT;
size_t recordIndex = MQTT_INVALID_STATE_COUNT;
MQTTPubAckInfo_t * records = NULL;
MQTTStatus_t status = MQTTBadResponse;
if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
{
LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.",
( void * ) pMqttContext,
( void * ) pNewState ) );
status = MQTTBadParameter;
}
else if( packetId == MQTT_PACKET_ID_INVALID )
{
LogError( ( "Packet ID must be nonzero." ) );
status = MQTTBadParameter;
}
else if( packetType > MQTTPubcomp )
{
LogError( ( "Invalid packet type %u.", ( unsigned int ) packetType ) );
status = MQTTBadParameter;
}
else
{
if( isOutgoingPublish == true )
{
records = pMqttContext->outgoingPublishRecords;
maxRecordCount = pMqttContext->outgoingPublishRecordMaxCount;
}
else
{
records = pMqttContext->incomingPublishRecords;
maxRecordCount = pMqttContext->incomingPublishRecordMaxCount;
}
recordIndex = findInRecord( records,
maxRecordCount,
packetId,
&qos,
&currentState );
}
if( recordIndex != MQTT_INVALID_STATE_COUNT )
{
newState = MQTT_CalculateStateAck( packetType, opType, qos );
/* Validate state transition and update state record. */
status = updateStateAck( records,
maxRecordCount,
recordIndex,
packetId,
currentState,
newState );
/* Update the output parameter. */
if( status == MQTTSuccess )
{
*pNewState = newState;
}
}
else
{
LogError( ( "No matching record found for publish: PacketId=%u.",
( unsigned int ) packetId ) );
}
return status;
}
/*-----------------------------------------------------------*/
uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext,
MQTTStateCursor_t * pCursor,
MQTTPublishState_t * pState )
{
uint16_t packetId = MQTT_PACKET_ID_INVALID;
uint16_t searchStates = 0U;
/* Validate arguments. */
if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) )
{
LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p"
" pState=%p.",
( void * ) pMqttContext,
( void * ) pCursor,
( void * ) pState ) );
}
else
{
/* PUBREL for packets in state #MQTTPubCompPending and #MQTTPubRelSend
* would need to be resent when a session is reestablished.*/
UINT16_SET_BIT( searchStates, MQTTPubCompPending );
UINT16_SET_BIT( searchStates, MQTTPubRelSend );
packetId = stateSelect( pMqttContext, searchStates, pCursor );
/* The state needs to be in #MQTTPubRelSend for sending PUBREL. */
if( packetId != MQTT_PACKET_ID_INVALID )
{
*pState = MQTTPubRelSend;
}
}
return packetId;
}
/*-----------------------------------------------------------*/
uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
MQTTStateCursor_t * pCursor )
{
uint16_t packetId = MQTT_PACKET_ID_INVALID;
uint16_t searchStates = 0U;
/* Validate arguments. */
if( ( pMqttContext == NULL ) || ( pCursor == NULL ) )
{
LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p",
( void * ) pMqttContext,
( void * ) pCursor ) );
}
else
{
/* Packets in state #MQTTPublishSend, #MQTTPubAckPending and
* #MQTTPubRecPending would need to be resent when a session is
* reestablished. */
UINT16_SET_BIT( searchStates, MQTTPublishSend );
UINT16_SET_BIT( searchStates, MQTTPubAckPending );
UINT16_SET_BIT( searchStates, MQTTPubRecPending );
packetId = stateSelect( pMqttContext, searchStates, pCursor );
}
return packetId;
}
/*-----------------------------------------------------------*/
const char * MQTT_State_strerror( MQTTPublishState_t state )
{
const char * str = NULL;
switch( state )
{
case MQTTStateNull:
str = "MQTTStateNull";
break;
case MQTTPublishSend:
str = "MQTTPublishSend";
break;
case MQTTPubAckSend:
str = "MQTTPubAckSend";
break;
case MQTTPubRecSend:
str = "MQTTPubRecSend";
break;
case MQTTPubRelSend:
str = "MQTTPubRelSend";
break;
case MQTTPubCompSend:
str = "MQTTPubCompSend";
break;
case MQTTPubAckPending:
str = "MQTTPubAckPending";
break;
case MQTTPubRecPending:
str = "MQTTPubRecPending";
break;
case MQTTPubRelPending:
str = "MQTTPubRelPending";
break;
case MQTTPubCompPending:
str = "MQTTPubCompPending";
break;
case MQTTPublishDone:
str = "MQTTPublishDone";
break;
default:
/* Invalid state received. */
str = "Invalid MQTT State";
break;
}
return str;
}
/*-----------------------------------------------------------*/