diff --git a/lexicon.txt b/lexicon.txt index fe9a78ca..52d12d80 100644 --- a/lexicon.txt +++ b/lexicon.txt @@ -56,6 +56,7 @@ iot iotlink iotlistdouble iotlog +iotlogdebug iotmqtt iotmqttcallbackinfo iotmqttconnectinfo @@ -85,6 +86,7 @@ keepaliveseconds lu lwt malloc +misra mqtt mqttconnection mqttoperation diff --git a/src/iot_mqtt_api.c b/src/iot_mqtt_api.c index cfc01977..605fbc62 100644 --- a/src/iot_mqtt_api.c +++ b/src/iot_mqtt_api.c @@ -438,13 +438,11 @@ static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * IotLogError( "Network information cannot be NULL." ); status = IOT_NETWORK_BAD_PARAMETER; - goto cleanup; } - - /* Create a new network connection if requested. Otherwise, copy the existing - * network connection. */ - if( pNetworkInfo->createNetworkConnection == true ) + else if( pNetworkInfo->createNetworkConnection == true ) { + /* Create a new network connection if requested. Otherwise, copy the existing + * network connection. */ status = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo, pNetworkInfo->u.setup.pNetworkCredentialInfo, pNetworkConnection ); @@ -458,8 +456,6 @@ static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * else { IotLogError( "Failed to create network connection: %d", status ); - - goto cleanup; } } else @@ -470,7 +466,6 @@ static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * *pCreatedNewNetworkConnection = false; } -cleanup: return status; } @@ -492,7 +487,6 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode, IotLogError( "Failed to allocate memory for new connection." ); status = false; - goto cleanup; } else { @@ -505,50 +499,47 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode, /* Start a new MQTT connection with a reference count of 1. */ pMqttConnection->references = 1; - } - /* Create the references mutex for a new connection. It is a recursive mutex. */ - referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true ); + /* Create the references mutex for a new connection. It is a recursive mutex. */ + referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true ); - if( referencesMutexCreated == false ) - { - IotLogError( "Failed to create references mutex for new connection." ); - - status = false; - goto cleanup; - } - - /* Create the subscription mutex for a new connection. */ - subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false ); - - if( subscriptionMutexCreated == false ) - { - IotLogError( "Failed to create subscription mutex for new connection." ); - - status = false; - goto cleanup; - } - - /* Create the new connection's subscription and operation lists. */ - IotListDouble_Create( &( pMqttConnection->subscriptionList ) ); - IotListDouble_Create( &( pMqttConnection->pendingProcessing ) ); - IotListDouble_Create( &( pMqttConnection->pendingResponse ) ); - - /* Check if keep-alive is active for this connection. */ - if( keepAliveSeconds != 0 ) - { - if( _createKeepAliveOperation( pNetworkInfo, - keepAliveSeconds, - pMqttConnection ) == false ) + if( referencesMutexCreated == false ) { + IotLogError( "Failed to create references mutex for new connection." ); + status = false; - goto cleanup; + } + } + + if( status == true ) + { + /* Create the subscription mutex for a new connection. */ + subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false ); + + if( subscriptionMutexCreated == false ) + { + IotLogError( "Failed to create subscription mutex for new connection." ); + + status = false; + } + else + { + /* Create the new connection's subscription and operation lists. */ + IotListDouble_Create( &( pMqttConnection->subscriptionList ) ); + IotListDouble_Create( &( pMqttConnection->pendingProcessing ) ); + IotListDouble_Create( &( pMqttConnection->pendingResponse ) ); + + /* Check if keep-alive is active for this connection. */ + if( keepAliveSeconds != 0 ) + { + status = _createKeepAliveOperation( pNetworkInfo, + keepAliveSeconds, + pMqttConnection ); + } } } /* Clean up mutexes and connection if this function failed. */ -cleanup: - if( status == false ) { if( subscriptionMutexCreated == true ) @@ -654,33 +645,34 @@ static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation if( _checkInit() == false ) { status = IOT_MQTT_NOT_INITIALIZED; - goto cleanup; } - - /* Check that all elements in the subscription list are valid. */ - if( _IotMqtt_ValidateSubscriptionList( operation, - mqttConnection->awsIotMqttMode, - pSubscriptionList, - subscriptionCount ) == false ) + else { - status = IOT_MQTT_BAD_PARAMETER; - goto cleanup; - } - - /* Check that a reference pointer is provided for a waitable operation. */ - if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ) - { - if( pOperationReference == NULL ) + /* Check that all elements in the subscription list are valid. */ + if( _IotMqtt_ValidateSubscriptionList( operation, + mqttConnection->awsIotMqttMode, + pSubscriptionList, + subscriptionCount ) == false ) { - IotLogError( "Reference must be provided for a waitable %s.", - IotMqtt_OperationType( operation ) ); - status = IOT_MQTT_BAD_PARAMETER; - goto cleanup; } } -cleanup: + if( status == IOT_MQTT_SUCCESS ) + { + /* Check that a reference pointer is provided for a waitable operation. */ + if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ) + { + if( pOperationReference == NULL ) + { + IotLogError( "Reference must be provided for a waitable %s.", + IotMqtt_OperationType( operation ) ); + + status = IOT_MQTT_BAD_PARAMETER; + } + } + } + return status; } @@ -704,37 +696,31 @@ static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t op pCallbackInfo, ppSubscriptionOperation ); - if( status != IOT_MQTT_SUCCESS ) - { - goto cleanup; - } - else + if( status == IOT_MQTT_SUCCESS ) { pSubscriptionOperation = ( *ppSubscriptionOperation ); + + + /* Check the subscription operation data and set the operation type. */ + IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); + IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 ); + pSubscriptionOperation->u.operation.type = operation; + + /* Generate a subscription packet from the subscription list. */ + status = serializeSubscription( pSubscriptionList, + subscriptionCount, + &( pSubscriptionOperation->u.operation.pMqttPacket ), + &( pSubscriptionOperation->u.operation.packetSize ), + &( pSubscriptionOperation->u.operation.packetIdentifier ) ); } - /* Check the subscription operation data and set the operation type. */ - IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); - IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 ); - pSubscriptionOperation->u.operation.type = operation; - - /* Generate a subscription packet from the subscription list. */ - status = serializeSubscription( pSubscriptionList, - subscriptionCount, - &( pSubscriptionOperation->u.operation.pMqttPacket ), - &( pSubscriptionOperation->u.operation.packetSize ), - &( pSubscriptionOperation->u.operation.packetIdentifier ) ); - - if( status != IOT_MQTT_SUCCESS ) + if( status == IOT_MQTT_SUCCESS ) { - goto cleanup; + /* Check the serialized MQTT packet. */ + IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL ); + IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 ); } - /* Check the serialized MQTT packet. */ - IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL ); - IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 ); - -cleanup: return status; } @@ -762,62 +748,54 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation, pCallbackInfo, &pSubscriptionOperation ); - if( status != IOT_MQTT_SUCCESS ) + if( status == IOT_MQTT_SUCCESS ) { - goto cleanup; - } - - /* Add the subscription list for a SUBSCRIBE. */ - if( operation == IOT_MQTT_SUBSCRIBE ) - { - status = _IotMqtt_AddSubscriptions( mqttConnection, - pSubscriptionOperation->u.operation.packetIdentifier, - pSubscriptionList, - subscriptionCount ); - - if( status != IOT_MQTT_SUCCESS ) + /* Add the subscription list for a SUBSCRIBE. */ + if( operation == IOT_MQTT_SUBSCRIBE ) { - goto cleanup; + status = _IotMqtt_AddSubscriptions( mqttConnection, + pSubscriptionOperation->u.operation.packetIdentifier, + pSubscriptionList, + subscriptionCount ); } } - /* Set the reference, if provided. */ - _setOperationReference( pOperationReference, pSubscriptionOperation ); - - /* Send the SUBSCRIBE packet. */ - if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) + if( status == IOT_MQTT_SUCCESS ) { - _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pSubscriptionOperation->job, pSubscriptionOperation ); - } - else - { - status = _IotMqtt_ScheduleOperation( pSubscriptionOperation, - _IotMqtt_ProcessSend, - 0 ); + /* Set the reference, if provided. */ + _setOperationReference( pOperationReference, pSubscriptionOperation ); - if( status != IOT_MQTT_SUCCESS ) + /* Send the SUBSCRIBE packet. */ + if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) { - IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.", - mqttConnection, - IotMqtt_OperationType( operation ) ); + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pSubscriptionOperation->job, pSubscriptionOperation ); + } + else + { + status = _IotMqtt_ScheduleOperation( pSubscriptionOperation, + _IotMqtt_ProcessSend, + 0 ); - if( operation == IOT_MQTT_SUBSCRIBE ) + if( status != IOT_MQTT_SUCCESS ) { - _IotMqtt_RemoveSubscriptionByPacket( mqttConnection, - pSubscriptionOperation->u.operation.packetIdentifier, - MQTT_REMOVE_ALL_SUBSCRIPTIONS ); + IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.", + mqttConnection, + IotMqtt_OperationType( operation ) ); + + if( operation == IOT_MQTT_SUBSCRIBE ) + { + _IotMqtt_RemoveSubscriptionByPacket( mqttConnection, + pSubscriptionOperation->u.operation.packetIdentifier, + MQTT_REMOVE_ALL_SUBSCRIPTIONS ); + } + + /* Clear the previously set (and now invalid) reference. */ + _setOperationReference( pOperationReference, IOT_MQTT_OPERATION_INITIALIZER ); } - - /* Clear the previously set (and now invalid) reference. */ - _setOperationReference( pOperationReference, IOT_MQTT_OPERATION_INITIALIZER ); - - goto cleanup; } } /* Clean up if this function failed. */ -cleanup: - if( status != IOT_MQTT_SUCCESS ) { if( pSubscriptionOperation != NULL ) @@ -911,9 +889,9 @@ bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection { ( pMqttConnection->references )++; - /* In some implementations IotLog() maps to C standard printing API - * that need specific primitive types for format specifiers. Also, - * inttypes.h may not be available on some C99 compilers, despite + /* In some implementations IotLogDebug() maps to C standard printing API + * that needs specific primitive types for format specifiers. Also, + * inttypes.h may not be available on some C99 compilers, despite * stdint.h being available. */ /* coverity[misra_c_2012_directive_4_6_violation] */ IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.", @@ -944,9 +922,9 @@ void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection ( pMqttConnection->references )--; IotMqtt_Assert( pMqttConnection->references >= 0 ); - /* In some implementations IotLog() maps to C standard printing API - * that need specific primitive types for format specifiers. Also, - * inttypes.h may not be available on some C99 compilers, despite stdint.h + /* In some implementations IotLogDebug() maps to C standard printing API + * that needs specific primitive types for format specifiers. Also, + * inttypes.h may not be available on some C99 compilers, despite stdint.h * being available. */ /* coverity[misra_c_2012_directive_4_6_violation] */ IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.", @@ -1054,39 +1032,41 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, if( _checkInit() == false ) { status = IOT_MQTT_NOT_INITIALIZED; - goto cleanup; } - /* Validate network interface and connect info. */ - if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) + else if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) { status = IOT_MQTT_BAD_PARAMETER; - goto cleanup; - } - - networkStatus = _createNetworkConnection( pNetworkInfo, - &pNetworkConnection, - &ownNetworkConnection ); - - if( networkStatus != IOT_NETWORK_SUCCESS ) - { - status = IOT_MQTT_NETWORK_ERROR; - goto cleanup; - } - - IotLogInfo( "Establishing new MQTT connection." ); - - /* Initialize a new MQTT connection object. */ - pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode, - pNetworkInfo, - pConnectInfo->keepAliveSeconds ); - - if( pNewMqttConnection == NULL ) - { - status = IOT_MQTT_NO_MEMORY; - goto cleanup; } else + { + networkStatus = _createNetworkConnection( pNetworkInfo, + &pNetworkConnection, + &ownNetworkConnection ); + + if( networkStatus != IOT_NETWORK_SUCCESS ) + { + status = IOT_MQTT_NETWORK_ERROR; + } + } + + if( status == IOT_MQTT_SUCCESS ) + { + IotLogInfo( "Establishing new MQTT connection." ); + + /* Initialize a new MQTT connection object. */ + pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode, + pNetworkInfo, + pConnectInfo->keepAliveSeconds ); + + if( pNewMqttConnection == NULL ) + { + status = IOT_MQTT_NO_MEMORY; + } + } + + /* Set the MQTT receive callback. */ + if( status == IOT_MQTT_SUCCESS ) { /* Set the network connection associated with the MQTT connection. */ pNewMqttConnection->pNetworkConnection = pNetworkConnection; @@ -1098,82 +1078,76 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, #else pNewMqttConnection->pSerializer = NULL; #endif - } - /* Set the MQTT receive callback. */ - networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection, - IotMqtt_ReceiveCallback, - pNewMqttConnection ); + networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection, + IotMqtt_ReceiveCallback, + pNewMqttConnection ); - if( networkStatus != IOT_NETWORK_SUCCESS ) - { - IotLogError( "Failed to set MQTT network receive callback." ); - - status = IOT_MQTT_NETWORK_ERROR; - goto cleanup; - } - - /* Create a CONNECT operation. */ - status = _IotMqtt_CreateOperation( pNewMqttConnection, - IOT_MQTT_FLAG_WAITABLE, - NULL, - &pOperation ); - - if( status != IOT_MQTT_SUCCESS ) - { - goto cleanup; - } - - /* Ensure the members set by operation creation and serialization - * are appropriate for a blocking CONNECT. */ - IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); - IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) - == IOT_MQTT_FLAG_WAITABLE ); - IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); - - /* Set the operation type. */ - pOperation->u.operation.type = IOT_MQTT_CONNECT; - - /* Add previous session subscriptions. */ - if( pConnectInfo->pPreviousSubscriptions != NULL ) - { - /* Previous subscription count should have been validated as nonzero. */ - IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 ); - - status = _IotMqtt_AddSubscriptions( pNewMqttConnection, - 2, - pConnectInfo->pPreviousSubscriptions, - pConnectInfo->previousSubscriptionCount ); - - if( status != IOT_MQTT_SUCCESS ) + if( networkStatus != IOT_NETWORK_SUCCESS ) { - goto cleanup; + IotLogError( "Failed to set MQTT network receive callback." ); + + status = IOT_MQTT_NETWORK_ERROR; + } + else + { + /* Create a CONNECT operation. */ + status = _IotMqtt_CreateOperation( pNewMqttConnection, + IOT_MQTT_FLAG_WAITABLE, + NULL, + &pOperation ); } } - /* Convert the connect info and will info objects to an MQTT CONNECT packet. */ - status = _getMqttConnectSerializer( pNetworkInfo->pMqttSerializer )( pConnectInfo, - &( pOperation->u.operation.pMqttPacket ), - &( pOperation->u.operation.packetSize ) ); - - if( status != IOT_MQTT_SUCCESS ) + if( status == IOT_MQTT_SUCCESS ) { - goto cleanup; + /* Ensure the members set by operation creation and serialization + * are appropriate for a blocking CONNECT. */ + IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); + IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) + == IOT_MQTT_FLAG_WAITABLE ); + IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); + + /* Set the operation type. */ + pOperation->u.operation.type = IOT_MQTT_CONNECT; + + /* Add previous session subscriptions. */ + if( pConnectInfo->pPreviousSubscriptions != NULL ) + { + /* Previous subscription count should have been validated as nonzero. */ + IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 ); + + status = _IotMqtt_AddSubscriptions( pNewMqttConnection, + 2, + pConnectInfo->pPreviousSubscriptions, + pConnectInfo->previousSubscriptionCount ); + } } - /* Check the serialized MQTT packet. */ - IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); - IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); + if( status == IOT_MQTT_SUCCESS ) + { + /* Convert the connect info and will info objects to an MQTT CONNECT packet. */ + status = _getMqttConnectSerializer( pNetworkInfo->pMqttSerializer )( pConnectInfo, + &( pOperation->u.operation.pMqttPacket ), + &( pOperation->u.operation.packetSize ) ); + } - /* Send the CONNECT packet. */ - _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + if( status == IOT_MQTT_SUCCESS ) + { + /* Check the serialized MQTT packet. */ + IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); + IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); - /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */ - status = IotMqtt_Wait( pOperation, timeoutMs ); + /* Send the CONNECT packet. */ + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); - /* The call to wait cleans up the CONNECT operation, so set the pointer - * to NULL. */ - pOperation = NULL; + /* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */ + status = IotMqtt_Wait( pOperation, timeoutMs ); + + /* The call to wait cleans up the CONNECT operation, so set the pointer + * to NULL. */ + pOperation = NULL; + } /* When a connection is successfully established, schedule keep-alive job. */ if( status == IOT_MQTT_SUCCESS ) @@ -1190,13 +1164,10 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, if( taskPoolStatus != IOT_TASKPOOL_SUCCESS ) { status = IOT_MQTT_SCHEDULING_ERROR; - goto cleanup; } } } -cleanup: - if( status != IOT_MQTT_SUCCESS ) { IotLogError( "Failed to establish new MQTT connection, error %s.", @@ -1259,7 +1230,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection, uint32_t flags ) { bool disconnected = false, initCalled = false; - IotMqttError_t status = IOT_MQTT_STATUS_PENDING; + IotMqttError_t status = IOT_MQTT_SUCCESS; _mqttOperation_t * pOperation = NULL; /* Check that IotMqtt_Init was called. */ @@ -1267,50 +1238,58 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection, if( initCalled == false ) { - goto cleanup; + status = IOT_MQTT_STATUS_PENDING; } - - /* Only send a DISCONNECT packet if the connection is active and the "cleanup only" - * flag is not set. */ - if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == IOT_MQTT_FLAG_CLEANUP_ONLY ) + else { - goto cleanup; + /* Only send a DISCONNECT packet if the connection is active and the "cleanup only" + * flag is not set. */ + if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == IOT_MQTT_FLAG_CLEANUP_ONLY ) + { + status = IOT_MQTT_STATUS_PENDING; + } } - /* Read the connection status. */ - IotMutex_Lock( &( mqttConnection->referencesMutex ) ); - disconnected = mqttConnection->disconnected; - IotMutex_Unlock( &( mqttConnection->referencesMutex ) ); - - if( disconnected == true ) - { - goto cleanup; - } - - IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection ); - - /* Create a DISCONNECT operation. This function blocks until the DISCONNECT - * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */ - status = _IotMqtt_CreateOperation( mqttConnection, - IOT_MQTT_FLAG_WAITABLE, - NULL, - &pOperation ); - if( status == IOT_MQTT_SUCCESS ) { - /* Ensure that the members set by operation creation and serialization - * are appropriate for a blocking DISCONNECT. */ - IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); - IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) - == IOT_MQTT_FLAG_WAITABLE ); - IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); + /* Read the connection status. */ + IotMutex_Lock( &( mqttConnection->referencesMutex ) ); + disconnected = mqttConnection->disconnected; + IotMutex_Unlock( &( mqttConnection->referencesMutex ) ); - /* Set the operation type. */ - pOperation->u.operation.type = IOT_MQTT_DISCONNECT; + if( disconnected == true ) + { + status = IOT_MQTT_STATUS_PENDING; + } + } - /* Generate a DISCONNECT packet. */ - status = _getMqttDisconnectSerializer( mqttConnection->pSerializer )( &( pOperation->u.operation.pMqttPacket ), - &( pOperation->u.operation.packetSize ) ); + if( status == IOT_MQTT_SUCCESS ) + { + IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection ); + + /* Create a DISCONNECT operation. This function blocks until the DISCONNECT + * packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */ + status = _IotMqtt_CreateOperation( mqttConnection, + IOT_MQTT_FLAG_WAITABLE, + NULL, + &pOperation ); + + if( status == IOT_MQTT_SUCCESS ) + { + /* Ensure that the members set by operation creation and serialization + * are appropriate for a blocking DISCONNECT. */ + IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); + IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) + == IOT_MQTT_FLAG_WAITABLE ); + IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); + + /* Set the operation type. */ + pOperation->u.operation.type = IOT_MQTT_DISCONNECT; + + /* Generate a DISCONNECT packet. */ + status = _getMqttDisconnectSerializer( mqttConnection->pSerializer )( &( pOperation->u.operation.pMqttPacket ), + &( pOperation->u.operation.packetSize ) ); + } } if( status == IOT_MQTT_SUCCESS ) @@ -1343,10 +1322,6 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection, } } - /* This function has no return value and no cleanup, but uses the cleanup - * label to exit on error. */ -cleanup: - if( initCalled == true ) { /* Close the underlying network connection. This also cleans up keep-alive. @@ -1541,102 +1516,94 @@ IotMqttError_t IotMqtt_PublishAsync( IotMqttConnection_t mqttConnection, if( _checkInit() == false ) { status = IOT_MQTT_NOT_INITIALIZED; - goto cleanup; } - - if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode, - pPublishInfo, - flags, - pCallbackInfo, - pPublishOperation ) == false ) - { - status = IOT_MQTT_BAD_PARAMETER; - goto cleanup; - } - - /* Create a PUBLISH operation. */ - status = _IotMqtt_CreateOperation( mqttConnection, + else if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode, + pPublishInfo, flags, pCallbackInfo, - &pOperation ); - - if( status != IOT_MQTT_SUCCESS ) + pPublishOperation ) == false ) { - goto cleanup; - } - - /* Check the PUBLISH operation data and set the operation type. */ - IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); - pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER; - - /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */ - if( mqttConnection->awsIotMqttMode == true ) - { - pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh ); - } - - /* Generate a PUBLISH packet from pPublishInfo. */ - status = _getMqttPublishSerializer( mqttConnection->pSerializer )( pPublishInfo, - &( pOperation->u.operation.pMqttPacket ), - &( pOperation->u.operation.packetSize ), - &( pOperation->u.operation.packetIdentifier ), - pPacketIdentifierHigh ); - - if( status != IOT_MQTT_SUCCESS ) - { - goto cleanup; - } - - /* Check the serialized MQTT packet. */ - IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); - IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); - - /* Initialize PUBLISH retry if retryLimit is set. */ - if( pPublishInfo->retryLimit > 0 ) - { - /* A QoS 0 PUBLISH may not be retried. */ - if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) - { - pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit; - pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs; - } - } - - /* Set the reference, if provided. */ - if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) - { - _setOperationReference( pPublishOperation, pOperation ); - } - - /* Send the PUBLISH packet. */ - if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) - { - _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + status = IOT_MQTT_BAD_PARAMETER; } else { - status = _IotMqtt_ScheduleOperation( pOperation, - _IotMqtt_ProcessSend, - 0 ); + /* Create a PUBLISH operation. */ + status = _IotMqtt_CreateOperation( mqttConnection, + flags, + pCallbackInfo, + &pOperation ); + } - if( status != IOT_MQTT_SUCCESS ) + if( status == IOT_MQTT_SUCCESS ) + { + /* Check the PUBLISH operation data and set the operation type. */ + IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); + pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER; + + /* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */ + if( mqttConnection->awsIotMqttMode == true ) { - IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.", - mqttConnection ); + pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh ); + } - /* Clear the previously set (and now invalid) reference. */ + /* Generate a PUBLISH packet from pPublishInfo. */ + status = _getMqttPublishSerializer( mqttConnection->pSerializer )( pPublishInfo, + &( pOperation->u.operation.pMqttPacket ), + &( pOperation->u.operation.packetSize ), + &( pOperation->u.operation.packetIdentifier ), + pPacketIdentifierHigh ); + } + + if( status == IOT_MQTT_SUCCESS ) + { + /* Check the serialized MQTT packet. */ + IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); + IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); + + /* Initialize PUBLISH retry if retryLimit is set. */ + if( pPublishInfo->retryLimit > 0 ) + { + /* A QoS 0 PUBLISH may not be retried. */ if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) { - _setOperationReference( pPublishOperation, IOT_MQTT_OPERATION_INITIALIZER ); + pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit; + pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs; } + } - goto cleanup; + /* Set the reference, if provided. */ + if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) + { + _setOperationReference( pPublishOperation, pOperation ); + } + + /* Send the PUBLISH packet. */ + if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) + { + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + } + else + { + status = _IotMqtt_ScheduleOperation( pOperation, + _IotMqtt_ProcessSend, + 0 ); + + if( status != IOT_MQTT_SUCCESS ) + { + IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.", + mqttConnection ); + + /* Clear the previously set (and now invalid) reference. */ + if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) + { + _setOperationReference( pPublishOperation, IOT_MQTT_OPERATION_INITIALIZER ); + } + } } } /* Clean up the PUBLISH operation if this function fails. Otherwise, set the * appropriate return code based on QoS. */ -cleanup: if( status != IOT_MQTT_SUCCESS ) { @@ -1711,18 +1678,17 @@ IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation, if( _checkInit() == false ) { status = IOT_MQTT_NOT_INITIALIZED; - goto cleanup; } - /* Validate the given operation reference. */ - if( _IotMqtt_ValidateOperation( operation ) == false ) + else if( _IotMqtt_ValidateOperation( operation ) == false ) { status = IOT_MQTT_BAD_PARAMETER; - goto cleanup; } - - /* Check the MQTT connection status. */ - pMqttConnection = operation->pMqttConnection; + else + { + /* Check the MQTT connection status. */ + pMqttConnection = operation->pMqttConnection; + } if( status == IOT_MQTT_SUCCESS ) { @@ -1761,7 +1727,6 @@ IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation, } } -cleanup: return status; } diff --git a/src/iot_mqtt_operation.c b/src/iot_mqtt_operation.c index 1d2d9425..dfb7a60f 100644 --- a/src/iot_mqtt_operation.c +++ b/src/iot_mqtt_operation.c @@ -270,9 +270,9 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation ) pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING; } - /* In some implementations IotLog() maps to C standard printing API - * that need specific primitive types for format specifiers. Also - * inttypes.h may not be available on some C99 compilers, despite + /* In some implementations IotLog() maps to C standard printing API + * that need specific primitive types for format specifiers. Also + * inttypes.h may not be available on some C99 compilers, despite * stdint.h being available. */ /* coverity[misra_c_2012_directive_4_6_violation] */ IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.", @@ -454,96 +454,103 @@ IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection, IotLogError( "Callback should not be set for a waitable operation." ); status = IOT_MQTT_BAD_PARAMETER; - goto cleanup; } } - IotLogDebug( "(MQTT connection %p) Creating new operation record.", - pMqttConnection ); - - /* Increment the reference count for the MQTT connection when creating a new - * operation. */ - if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false ) + if( status == IOT_MQTT_SUCCESS ) { - IotLogError( "(MQTT connection %p) New operation record cannot be created" - " for a closed connection", + IotLogDebug( "(MQTT connection %p) Creating new operation record.", pMqttConnection ); - status = IOT_MQTT_NETWORK_ERROR; - goto cleanup; - } - else - { - /* Reference count will need to be decremented on error. */ - decrementOnError = true; - } - - /* Allocate memory for a new operation. */ - pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) ); - - if( pOperation == NULL ) - { - IotLogError( "(MQTT connection %p) Failed to allocate memory for new " - "operation record.", - pMqttConnection ); - - status = IOT_MQTT_NO_MEMORY; - goto cleanup; - } - else - { - /* Clear the operation data. */ - ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) ); - - /* Initialize some members of the new operation. */ - pOperation->pMqttConnection = pMqttConnection; - pOperation->u.operation.jobReference = 1; - pOperation->u.operation.flags = flags; - pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING; - } - - /* Check if the waitable flag is set. If it is, create a semaphore to - * wait on. */ - if( waitable == true ) - { - /* Create a semaphore to wait on for a waitable operation. */ - if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false ) + /* Increment the reference count for the MQTT connection when creating a new + * operation. */ + if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false ) { - IotLogError( "(MQTT connection %p) Failed to create semaphore for " - "waitable operation.", + IotLogError( "(MQTT connection %p) New operation record cannot be created" + " for a closed connection", pMqttConnection ); - status = IOT_MQTT_NO_MEMORY; - goto cleanup; + status = IOT_MQTT_NETWORK_ERROR; } else { - /* A waitable operation is created with an additional reference for the - * Wait function. */ - ( pOperation->u.operation.jobReference )++; + /* Reference count will need to be decremented on error. */ + decrementOnError = true; } } - else + + if( status == IOT_MQTT_SUCCESS ) { - /* If the waitable flag isn't set but a callback is, copy the callback - * information. */ - if( pCallbackInfo != NULL ) + /* Allocate memory for a new operation. */ + pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) ); + + if( pOperation == NULL ) { - pOperation->u.operation.notify.callback = *pCallbackInfo; + IotLogError( "(MQTT connection %p) Failed to allocate memory for new " + "operation record.", + pMqttConnection ); + + status = IOT_MQTT_NO_MEMORY; + } + else + { + /* Clear the operation data. */ + ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) ); + + /* Initialize some members of the new operation. */ + pOperation->pMqttConnection = pMqttConnection; + pOperation->u.operation.jobReference = 1; + pOperation->u.operation.flags = flags; + pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING; } } - /* Add this operation to the MQTT connection's operation list. */ - IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); - IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ), - &( pOperation->link ) ); - IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); + if( status == IOT_MQTT_SUCCESS ) + { + /* Check if the waitable flag is set. If it is, create a semaphore to + * wait on. */ + if( waitable == true ) + { + /* Create a semaphore to wait on for a waitable operation. */ + if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false ) + { + IotLogError( "(MQTT connection %p) Failed to create semaphore for " + "waitable operation.", + pMqttConnection ); - /* Set the output parameter. */ - *pNewOperation = pOperation; + status = IOT_MQTT_NO_MEMORY; + } + else + { + /* A waitable operation is created with an additional reference for the + * Wait function. */ + ( pOperation->u.operation.jobReference )++; + } + } + else + { + /* If the waitable flag isn't set but a callback is, copy the callback + * information. */ + if( pCallbackInfo != NULL ) + { + pOperation->u.operation.notify.callback = *pCallbackInfo; + } + } + + if( status == IOT_MQTT_SUCCESS ) + { + /* Add this operation to the MQTT connection's operation list. */ + IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); + IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ), + &( pOperation->link ) ); + IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); + + /* Set the output parameter. */ + *pNewOperation = pOperation; + } + } /* Clean up operation and decrement reference count if this function failed. */ -cleanup: if( status != IOT_MQTT_SUCCESS ) { @@ -592,9 +599,9 @@ bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation, IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); pOperation->u.operation.jobReference--; - /* In some implementations IotLog() maps to C standard printing API - * that need specific primitive types for format specifiers. Also - * inttypes.h may not be available on some C99 compilers, despite + /* In some implementations IotLog() maps to C standard printing API + * that need specific primitive types for format specifiers. Also + * inttypes.h may not be available on some C99 compilers, despite * stdint.h being available. */ /* coverity[misra_c_2012_directive_4_6_violation] */ IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed" @@ -812,9 +819,9 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) { - /* In some implementations IotLog() maps to a C standard printing API + /* In some implementations IotLog() maps to a C standard printing API * that need specific primitive types for format specifiers. Also, - * inttypes.h may not be available on some C99 compilers, despite + * inttypes.h may not be available on some C99 compilers, despite * stdint.h being available. */ /* coverity[misra_c_2012_directive_4_6_violation] */ IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.", @@ -1122,9 +1129,9 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection, { ( pResult->u.operation.jobReference )++; - /* In some implementations IotLog() maps to C standard printing API - * that need specific primitive types for format specifiers. Also - * inttypes.h may not be available on some C99 compilers, despite + /* In some implementations IotLog() maps to C standard printing API + * that need specific primitive types for format specifiers. Also + * inttypes.h may not be available on some C99 compilers, despite * stdint.h being available. */ /* coverity[misra_c_2012_directive_4_6_violation] */ IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.", diff --git a/src/iot_mqtt_subscription.c b/src/iot_mqtt_subscription.c index e0dcae25..5908f955 100644 --- a/src/iot_mqtt_subscription.c +++ b/src/iot_mqtt_subscription.c @@ -162,24 +162,21 @@ static bool _matchEndWildcards( const char * pTopicFilter, { /* Determine if the topic filter ends with the '#' wildcard. */ status = ( pTopicFilter[ filterIndex + 1 ] == '/' ) && ( pTopicFilter[ filterIndex + 2 ] == '#' ); + } - if( status == true ) + if( status == false ) + { + /* Determine if the last character is reached for both topic name and topic + * filter for the '+' wildcard. */ + endChar = ( nameIndex == topicNameLength - 1 ) && ( filterIndex == topicFilterLength - 2 ); + + if( endChar == true ) { - goto cleanup; + /* Filter "sport/+" also matches the "sport/" but not "sport". */ + status = ( pTopicFilter[ filterIndex + 1 ] == '+' ); } } - /* Determine if the last character is reached for both topic name and topic - * filter for the '+' wildcard. */ - endChar = ( nameIndex == topicNameLength - 1 ) && ( filterIndex == topicFilterLength - 2 ); - - if( endChar == true ) - { - /* Filter "sport/+" also matches the "sport/" but not "sport". */ - status = ( pTopicFilter[ filterIndex + 1 ] == '+' ); - } - -cleanup: *pMatch = status; return status; @@ -251,7 +248,7 @@ static bool _topicFilterMatch( const char * pTopicName, filterIndex, &status ) == true ) { - goto cleanup; + break; } } else @@ -264,7 +261,7 @@ static bool _topicFilterMatch( const char * pTopicName, &nameIndex, &status ) == true ) { - goto cleanup; + break; } } @@ -273,14 +270,12 @@ static bool _topicFilterMatch( const char * pTopicName, filterIndex++; } - /* If the end of both strings has been reached, they match. */ - if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) ) + if( status == false ) { - status = true; + /* If the end of both strings has been reached, they match. */ + status = ( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) ); } -cleanup: - return status; }