I am attempting to setup a test where I am implementing this guide
https://docs.aws.amazon.com/iot/latest/developerguide/iot-sns-rule.html
I successfully walked through the whole guide and am now trying to publish the same MQTT message from my rust client code
Here is the code that I have
use anyhow::Result;
use aws_config::BehaviorVersion;
use aws_sdk_iot::Client;
use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions};
use rumqttc::{TlsConfiguration, Transport};
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::{fs::File, io::Read};
pub const AWS_CA_FILE: &str = "AmazonRootCA.pem";
pub const AWS_PUBLIC_CERT_FILE: &str = "ThingEast1.cert.pem";
pub const AWS_PRIVATE_KEY_FILE: &str = "ThingEast1.private.key";
#[derive(Debug, Serialize, Deserialize)]
pub struct Wind {
pub velocity: f32,
pub bearing: f32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DataPoint {
pub temperature: f32,
pub humidity: f32,
pub barometer: f32,
pub wind: Wind,
}
fn read_file_to_vec(filename: &str) -> std::io::Result<Vec<u8>> {
let mut file = File::open(filename)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
Ok(buffer)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Initialize AWS SDK client
let config = aws_config::load_defaults(BehaviorVersion::v2023_11_09()).await;
let client = Client::new(&config);
let certs = read_file_to_vec(AWS_PUBLIC_CERT_FILE)?;
let public_ca = read_file_to_vec(AWS_CA_FILE)?;
let private_key = read_file_to_vec(AWS_PRIVATE_KEY_FILE)?;
// Get the AWS IoT endpoint
let resp = client
.describe_endpoint()
.set_endpoint_type(Some("iot:Data-ATS".to_string()))
.send()
.await?;
let endpoint = resp
.endpoint_address
.as_deref()
.ok_or("No endpoint address")?;
let transport = Transport::Tls(TlsConfiguration::Simple {
ca: public_ca,
alpn: None,
client_auth: Some((certs, rumqttc::Key::RSA(private_key))),
});
// Configure MQTT client
let mut mqttoptions = MqttOptions::new("TestClient", endpoint, 8883);
mqttoptions.set_transport(transport);
mqttoptions.set_keep_alive(std::time::Duration::from_secs(60));
let (mqtt_client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
let data = DataPoint {
temperature: 72.0,
humidity: 0.5,
barometer: 29.92,
wind: Wind {
velocity: 5.0,
bearing: 180.0,
},
};
let data_str = serde_json::to_string_pretty(&data)?;
mqtt_client
.subscribe("device/32/data", QoS::AtLeastOnce)
.await?;
mqtt_client
.publish(":aws:iot:us-east-1:device/32/data", QoS::AtLeastOnce, false, data_str)
.await?;
// Iterate to poll the eventloop for connection progress
loop {
let notification = eventloop.poll().await.unwrap();
println!("Received = {:?}", notification);
}
Ok(())
}
Here is the output I get with only subscribing
Running `target/debug/aws_iot_mqtt_test`
Received = Incoming(ConnAck(ConnAck { session_present: false, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(100), max_qos: Some(1), retain_available: Some(1), max_packet_size: Some(149504), assigned_client_identifier: None, topic_alias_max: Some(8), reason_string: None, user_properties: [], wildcard_subscription_available: Some(1), subscription_identifiers_available: Some(0), shared_subscription_available: Some(1), server_keep_alive: Some(60), response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
Received = Outgoing(Subscribe(1))
Received = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtLeastOnce)], properties: None }))
Received = Incoming(Publish(Publish { dup: false, qos: AtMostOnce, retain: false, topic: b"device/32/data", pkid: 0, payload: b"{\n \"message\": \"Hello from AWS IoT console\"\n}", properties: None }))
So it is only an issue with publishing. Here is what I get when attempting to publish
Running `target/debug/aws_iot_mqtt_test`
Received = Incoming(ConnAck(ConnAck { session_present: false, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(100), max_qos: Some(1), retain_available: Some(1), max_packet_size: Some(149504), assigned_client_identifier: None, topic_alias_max: Some(8), reason_string: None, user_properties: [], wildcard_subscription_available: Some(1), subscription_identifiers_available: Some(0), shared_subscription_available: Some(1), server_keep_alive: Some(60), response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
Received = Outgoing(Subscribe(1))
Received = Outgoing(Publish(2))
Received = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtLeastOnce)], properties: None }))
thread 'main' panicked at src/main.rs:91:51:
called `Result::unwrap()` on an `Err` value: MqttState(PubAckFail { reason: NotAuthorized })
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
So my guess is that it is maybe something related to the TestPolicy that is associated the Thing that I created.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": "arn:aws:iot:us-east-1:MYARN:client/TestClient"
},
{
"Effect": "Allow",
"Action": "iot:Publish",
"Resource": "arn:aws:iot:us-east-1: MYARN:topic/device/32/data"
},
{
"Effect": "Allow",
"Action": "iot:Subscribe",
"Resource": "arn:aws:iot:us-east-1: MYARN:topicfilter/device/32/data"
},
{
"Effect": "Allow",
"Action": "iot:Receive",
"Resource": "arn:aws:iot:us-east-1: MYARN:topic/device/32/data"
}
]
}
I'm happy to provide any other information, I hope I have shown that I have attempted to do my homework up to this point. But I am admittedly very new to AWS, so likely missing something basic. Thank you in advance.