Skip to content

Inconsistency between named device shadow document in cloud and in device

0

Greetings

I have this quite difficult problem where I have a named shadow for a Greengrass Thing and there is some weird behavior on the device side when updating the document in cloud. I'm using the awsiot python SDK version 1.19.0 (constrained to this for now at least), Nucleus component is version 2.15.1 and ShadowManager is version 2.3.13. This problem appears consistently if I change one value in the shadow document json from false to true (doing it the other way works), according to the devices logs the MQTT message does not go through to my self made Python component but in the greengrass.log file it shows that it receives an update from cloud.

greengrass.log after sending the "non working" update from cloud

2026-04-30T11:41:15.985Z [INFO] (pool-3-thread-14) com.aws.greengrass.shadowmanager.sync.strategy.BaseSyncStrategy: sync. Executing sync request. {Type=LocalUpdateSyncRequest, thing name=<thing_name>, shadow name=configuration}
2026-04-30T11:41:16.003Z [INFO] (pool-3-thread-14) com.aws.greengrass.shadowmanager.ShadowManagerDAOImpl: Updating sync info. {thing name=<thing_name>, shadow name=configuration, cloud-version=374, local-version=185}

These are the python methods that I use for subscribing/listening to the shadow updates:

def _subscribe_to_shadow_updates(self):
        """
        Subscribe to shadow delta updates via IoT Core MQTT topics.
        AWS IoT publishes to shadow/update/delta when desired != reported.
        This avoids processing the device's own reported updates.
        """
        if self.ipc_client is None:
            logger.error("Cannot subscribe to shadow updates: IPC client not connected")
            return

        try:
            # Subscribe to the shadow delta topic
            # Only publishes when desired state differs from reported state
            # This prevents double-processing when device sends reported updates
            topic = f"{self.topic_prefix}/update/delta"

            req = SubscribeToIoTCoreRequest()
            req.topic_name = topic
            req.qos = QOS.AT_LEAST_ONCE

            handler = self.ShadowUpdateHandler(self)
            self.sub_operation = self.ipc_client.new_subscribe_to_iot_core(handler)
            self.sub_operation.activate(req)
            future = self.sub_operation.get_response()
            future.result(self.timeout)
            logger.info(
                f"Successfully subscribed to shadow delta updates for "
                f"'{self.thing_name}/{self.shadow_name}'"
            )
        except Exception as e:
            logger.exception(f"Failed to subscribe to shadow updates: {e}")
def _report_shadow_update(
        self, results: dict[str, TypeConfig | AppRemoteConfig | ProcessConfig]
    ):
        """
        Called after updating configuration shadow and notifying observers.
        Will send the 'reported' object back to cloud through the ipc client mqtt.

        Args:
            results (dict): Dictionary mapping observer names to their updated config objects
        """
        if not results:
            logger.info("No results to send")
            return
        if not self.ipc_client:
            raise Exception("IPC client not available to report shadow")
        shadow_report = self._construct_shadow_report(results)

        reported = shadow_report.model_dump(by_alias=True)

        payload = {
            "state": {
                "reported": reported,
            }
        }

        try:
            req = UpdateThingShadowRequest(
                thing_name=self.thing_name,
                shadow_name=self.shadow_name,
                payload=json.dumps(payload).encode("utf-8"),
            )
            logger.info(f"Reporting shadow update: {json.dumps(payload)}")
            self.ipc_client.new_update_thing_shadow().activate(req)
            logger.info("Shadow update sent to cloud")

        except Exception as e:
            logger.exception(f"Exception in shadow update report: {e!s}")
            return
class ShadowUpdateHandler(client.SubscribeToIoTCoreStreamHandler):
        """
        Class that handles shadow update messages from cloud
        """

        def __init__(self, parent):
            self.parent: DeviceShadow = parent
            super().__init__()

        def on_stream_event(self, event: IoTCoreMessage) -> None:
            """
            Called when IPC receives a message from IoT Core regarding
            shadow delta updates (desired != reported).

            Delta message format:
            {
              "state": {
                "fieldName": "newDesiredValue",
                ...
              },
              "metadata": {...},
              "version": 123,
              "timestamp": 1234567890
            }

            Args:
                event (IoTCoreMessage): Event that is to be processed
            """
            try:
                if event.message is None or event.message.payload is None:
                    logger.info("Received empty shadow delta message")
                    return

                delta_update = json.loads(event.message.payload)

                logger.info(f"Shadow delta received: {delta_update}")

                # Delta messages have the changed fields directly in "state"
                if "state" in delta_update:
                    config_data = delta_update["state"]
                else:
                    logger.error("Shadow delta message did not contain 'state' field")
                    raise ValueError("Shadow delta message did not contain expected structure")

                # Validate the incoming data
                try:
                    DeviceConfiguration.model_validate(config_data)
                except ValidationError as e:
                    logger.exception(f"Validation error in shadow delta: {e}")
                    raise

                self._update_shadow_fields(config_data)
                logger.info("Local shadow document updated from cloud delta")

            except Exception as e:
                logger.exception(f"Error processing shadow delta: {e}")

And I have verified from the device's logs that when I update the shadow from true to false it sends the correct reported object back to cloud (or at least to the shadow manager component)

Now while writing this I managed to get this error message to greengrass.log:

2026-04-30T11:53:21.202Z [INFO] (AwsEventLoop 1) com.aws.greengrass.shadowmanager.ipc.UpdateThingShadowRequestHandler: Successfully updated shadow. {service-name=<service_name>, thing name=<thing_name>, shadow name=configuration, local-version=189}
2026-04-30T11:53:21.203Z [INFO] (pool-3-thread-14) com.aws.greengrass.shadowmanager.sync.strategy.BaseSyncStrategy: sync. Executing sync request. {Type=CloudUpdateSyncRequest, thing name=<thing_name>, shadow name=configuration}
2026-04-30T11:53:21.515Z [WARN] (pool-3-thread-14) com.aws.greengrass.shadowmanager.sync.strategy.BaseSyncStrategy: sync. Received conflict when processing request. Retrying as a full sync. {thing name=<thing_name>, shadow name=configuration}
software.amazon.awssdk.services.iotdataplane.model.ConflictException: Version conflict (Service: IotDataPlane, Status Code: 409, Request ID: b82d2b25-d1f6-14b5-f790-b1d269faa4cc)

If someone has any idea what could cause this kind of behavior, I would be grateful for some pointers on how to fix this.

  • Forgot to put this in the actual post but this is my ShadowManager component configuration:

        "strategy": {
          "type": "realTime"
        },
        "synchronize": {
          "coreThing": {
            "classic": false,
            "namedShadows": [
              "configuration"
            ]
          }
        }
    

    I haven't specified the direction there because documentation says that it is betweenDeviceAndCloud by default

3 Answers
1

The root cause is likely a race condition between the IoT Core MQTT subscription and the IPC Shadow Manager.

While the previous answer correctly identifies the 409 Conflict, it misses the specific reason why this is happening in your code. You are currently mixing two different communication paths:

  1. Subscription: You are using SubscribeToIoTCoreRequest. This talks directly to the MQTT broker (cloud/proxy).
  2. Update: You are using UpdateThingShadowRequest. This is an IPC call to the local Greengrass Shadow Manager.

Why the "false to true" update fails: When you update the shadow in the cloud, the Shadow Manager and your Python script (via MQTT sub) receive the message at almost the same time. If your script processes the delta and sends an IPC UpdateThingShadow report while the Shadow Manager is still busy committing the cloud update to its local database, a version mismatch (409) occurs.

How to fix it: Stop using the IoT Core MQTT topic subscription. Instead, use the dedicated Greengrass IPC event stream. This ensures your component stays in sync with the Shadow Manager’s local state before you attempt an update.

Change your subscription logic: Instead of SubscribeToIoTCoreRequest, use SubscribeToNamedShadowUpdateEventsRequest.

from awsiot.greengrasscoreipc.model import SubscribeToNamedShadowUpdateEventsRequest

def _subscribe_to_shadow_updates(self):
    # Use the Shadow Manager IPC event stream instead of raw MQTT
    try:
        req = SubscribeToNamedShadowUpdateEventsRequest()
        req.thing_name = self.thing_name
        req.shadow_name = self.shadow_name
        
        # Your handler will now receive 'NamedShadowUpdateEvent'
        self.sub_operation = self.ipc_client.new_subscribe_to_named_shadow_update_events(handler)
        self.sub_operation.activate(req)
        # ... rest of your activation logic
    except Exception as e:
        logger.exception(f"Failed to subscribe: {e}")

Key Advantages:

  • Atomicity: You only get notified once the Shadow Manager has successfully processed the change.
  • Consistency: It eliminates the race condition where your component tries to "report" a state while the local manager is still updating the "desired" state.
  • Efficiency: You don't need to construct MQTT topic strings manually.
EXPERT
answered 18 days ago
  • Thanks for the answer and sorry for the late reply! My SDK version is 1.19.0 and awsiot.greengrasscoreipc.model does not have SubscribeToNamedShadowUpdateEventsRequest. I could upgrade it but it was constrained to 1.19.0 before I started working on this project and I'm not sure if there was some specific reason for this so my preferred solution would be to not change the version. But if there's not other sensible options, I will upgrade my SDK and try after that.

    EDIT: I also tried looking through the AWS IoT SDK Python Github and could not find that symbol from the model.py file under greengrasscoreipc so does that even exist?

0
Accepted Answer

Fixed this by changing the update/delta listening to use greengrasscoreipc instead of direct IoTCore subscription. Changed also to clientv2 to reduce written code in the codebase for clarity and now it seems to be working consistently.

Updated methods:

def _subscribe_to_shadow_updates(self):
        """
        Subscribe to shadow delta updates via IoT Core MQTT topics.
        AWS IoT publishes to shadow/update/delta when desired != reported.
        This avoids processing the device's own reported updates.
        """
        if self.ipc_client is None:
            logger.error("Cannot subscribe to shadow updates: IPC client not connected")
            return

        try:
            # Subscribe to the shadow delta topic
            # Only publishes when desired state differs from reported state
            # This prevents double-processing when device sends reported updates
            topic = f"{self.topic_prefix}/update/delta"

            self.sub_operation = self.ipc_client.subscribe_to_topic(
                topic=topic,
                on_stream_event=self._on_stream_event,
                on_stream_error=self._on_stream_error,
            )
            logger.info(
                f"Successfully subscribed to shadow delta updates for "
                f"'{self.thing_name}/{self.shadow_name}'"
            )
        except Exception as e:
            logger.exception(f"Failed to subscribe to shadow updates: {e}")

def _report_shadow_update(
        self, results: dict[str, TypeConfig | AppRemoteConfig | ProcessConfig]
    ):
        """
        Called after updating configuration shadow and notifying observers.
        Will send the 'reported' object back to cloud through the ipc client mqtt.

        Args:
            results (dict): Dictionary mapping observer names to their updated config objects
        """
        if not results:
            logger.warning("No results from observers, reporting full shadow state instead")
            # If no observers returned results, report the current shadow state
            shadow_report = self.shadow_doc
        else:
            shadow_report = self._construct_shadow_report(results)

        if not self.ipc_client:
            raise Exception("IPC client not available to report shadow")

        # Convert to dict with aliases for cloud compatibility
        reported = shadow_report.model_dump(by_alias=True)

        payload = {
            "state": {
                "reported": reported,
            }
        }

        try:
            logger.info(f"Reporting shadow update: {json.dumps(payload)}")

            self.ipc_client.update_thing_shadow(
                thing_name=self.thing_name,
                shadow_name=self.shadow_name,
                payload=json.dumps(payload).encode("utf-8"),
            )

            logger.info("Shadow update sent to cloud")

        except Exception as e:
            logger.exception(f"Exception in shadow update report: {e!s}")
            raise  # Re-raise to ensure caller knows update failed

And since updating to clientv2 removes the need for a separate handler class, I removed the ShadowUpdateHandler class and made the on_stream* methods members of the original parent class

answered 14 days ago
0

Based on the error message and behavior you're describing, this appears to be a version conflict issue during shadow synchronization between your Greengrass device and AWS IoT Core.

The key indicator is the ConflictException with status code 409 and the message "Version conflict". This occurs when the shadow document version on the device doesn't match the version in the cloud when an update is attempted. The shadow manager uses version numbers to implement optimistic locking - if the current version of the state document doesn't match the version supplied in an update request, the service rejects the request with an HTTP 409 conflict response.

Here's what's likely happening in your scenario:

  1. When you update the shadow from cloud (changing false to true), the Shadow Manager receives the update and logs it in greengrass.log
  2. Your Python component may be processing this update and sending back a reported state
  3. However, there's a timing issue where the local shadow version and cloud shadow version have diverged
  4. When the Shadow Manager tries to sync the reported state back to the cloud, it encounters a version mismatch, resulting in the 409 conflict

The Shadow Manager is designed to handle these conflicts through merge strategies. When there's a merge conflict:

  • For the deviceToCloud sync direction, the local shadow document value overwrites the cloud
  • For the cloudToDevice sync direction, the cloud value overwrites the local document
  • For bidirectional sync, conflicts in desired state are resolved in favor of the cloud, while conflicts in reported state are resolved in favor of the device

The asymmetric behavior you're seeing (working one direction but not the other) could be related to timing differences in how quickly your component processes and responds to the delta updates, leading to version mismatches in one direction but not the other.

To troubleshoot this issue:

  1. Check your Shadow Manager sync configuration - Verify the sync direction and strategy settings in your Shadow Manager component configuration
  2. Review timing in your code - Ensure your Python component isn't sending multiple rapid updates that could cause version conflicts
  3. Add version tracking - Consider logging the version numbers from both delta messages and your reported updates to identify where the version divergence occurs
  4. Verify shadow document structure - Ensure your reported state updates include all necessary fields and don't inadvertently create conflicts with the desired state

The warning message "Received conflict when processing request. Retrying as a full sync" indicates the Shadow Manager is attempting to recover by performing a full synchronization, which should eventually resolve the inconsistency, though it may take a moment to complete.
Sources
Sync local device shadows with AWS IoT Core - AWS IoT Greengrass
Tutorial: Installing the Device SDK and running the sample application for Device Shadows - AWS IoT Core
Troubleshooting AWS IoT Greengrass - AWS IoT Greengrass

answered 18 days ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.