Passing messages between AWS IoT and SQS queue using Lambdas written in Rust

AWS IoT Core provides a convenient way to connect your IoT devices like ESP32 to the cloud. Typically MQTT protocol is used for that.

Let’s suppose everything is setup on AWS IoT side and you can see messages from IoT Test console. Next we would like to transfer those MQTT messages to some other service that can actually do something useful with them. AWS SQS queues can be used for that.
Let’s create a rule that triggers a Lambda function every time if something is sent to a topic floor/1/room/3/temperature. Actually let’s receive all temperatures from all floors and rooms.

Rule query statement for doing it looks like this:
SELECT *, topic() as topic FROM 'floor/+/room/+/temperature'.
We cannot just do
SELECT * FROM 'floor/+/room/+/temperature'
because then we lose information about topic which can be quite valuable.
Let’s say temperature message itself is like this:

{
    "timestamp": 1593493172
    "temp": 20.6
}

Notice that rule query statement adds the topic field and now JSON looks like this:

{
    "timestamp": 1593493172
    "temp": 20.6
    "topic": "floor/4/room/1/temperature"
}

So this JSON is the event that our mqtt-to-sqs Lambda function should expect.
mqtt-to-sqs.jpg

Requirements #

Setup Rust toolchain for cross compiling.

rustup target add x86_64-unknown-linux-musl
# on macOS
brew install filosottile/musl-cross/musl-cross
# on macOS this is needed for openssl cross compiling
ln -s /usr/local/bin/x86_64-linux-musl-gcc /usr/local/bin/musl-gcc

mqtt-to-sqs #

At the time of writing lambda_runtime has version number 0.2.1 and it is quite out dated and does not play well with rusoto which is AWS SDK for Rust. We are going to use modern async version straight from the master.

.cargo/config #

This configures cross compile target:

[target.x86_64-unknown-linux-musl]
linker = "x86_64-linux-musl-gcc"
[build]
target = "x86_64-unknown-linux-musl"

Cargo.toml #

AWS expects that binary is called bootstrap.

[package]
name = "mqtt_to_sqs"
version = "1.0.0"
authors = ["Andres Vahter"]
edition = "2018"

# lambda exec must be called bootstrap
[[bin]]
name = "bootstrap"
path = "src/main.rs"

[dependencies]
tokio = { version = "0.2", features = ["macros"] }
lambda = { git = "https://github.com/awslabs/aws-lambda-rust-runtime/", branch = "master"}
once_cell = "1.4"
log = "0.4"
env_logger = "0.7.1"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
openssl = {version = "0.10.30", features = ["vendored"]}
rusoto_core = "0.44.0"
rusoto_sqs = "0.44.0"

main.rs #

use env_logger;
use log::{info};
use lambda::{handler_fn, Context};
use serde_json::{Value, json};
use rusoto_core::region::Region;
use rusoto_sqs::{SqsClient, Sqs, SendMessageRequest};
use once_cell::sync::OnceCell;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

fn sqs_client() -> &'static SqsClient {
    static INSTANCE: OnceCell<SqsClient> = OnceCell::new();
    INSTANCE.get_or_init(|| {
        SqsClient::new(Region::EuNorth1)
    })
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    let _ = env_logger::try_init();
    let func = handler_fn(func);
    lambda::run(func).await?;
    Ok(())
}

async fn func(mut message: Value, _: Context) -> Result<Value, Error> {
    info!("message: {:?}", message);

    let topic = message.as_object_mut().unwrap()
                .remove("topic")
                .ok_or("field 'topic' not available")?;
    info!("topic: {:?}, message: {:?}", topic, message.to_string());

    let message = json!({
        "topic": topic,
        "payload": message
    });

    let request = SendMessageRequest {
        delay_seconds: Some(0),
        message_attributes: None,
        message_body: message.to_string(),
        message_deduplication_id: None,
        message_group_id: None,
        message_system_attributes: None,
        queue_url: "https://sqs.eu-north-1.amazonaws.com/1234567890/queue".to_string(),
    };

    sqs_client().send_message(request).await?;

    Ok(json!("OK"))
}

Take a look at this:

fn sqs_client() -> &'static SqsClient {
    static INSTANCE: OnceCell<SqsClient> = OnceCell::new();
    INSTANCE.get_or_init(|| {
        SqsClient::new(Region::EuNorth1)
    })
}

Basically it creates global SqsClient. Otherwise it must be initialized with every Lambda call and it can take quite some time. So yes Lambdas actually are not always completely killed after execution and this trick saves significant amount of time. Without it Lambda was handled around 300 ms, with this trick it was around 10 ms.

This takes topic field out of JSON.

let topic = message.as_object_mut().unwrap()
            .remove("topic")
            .ok_or("field 'topic' not available")?;

..and creates a new JSON where topic and payload are nicely separated.

let message = json!({
    "topic": topic,
    "payload": message
});

So Lambda input event mut message: Value:

{
    "timestamp": 1593493172
    "temp": 20.6
    "topic": "floor/4/room/1/temperature"
}

..becomes:

{
    "topic": "floor/4/room/1/temperature"    
    "payload": {
        "timestamp": 1593493172
        "temp": 20.6
    }
}

sqs-to-mqtt #

Not much to add let’s just see how this code looks like other way around when we are receiving a message from some SQS queue and forward it to IoT service.

Cargo.toml #

[package]
name = "sqs_to_mqtt"
version = "1.0.0"
authors = ["Andres Vahter"]
edition = "2018"

# lambda exec must be called bootstrap
[[bin]]
name = "bootstrap"
path = "src/main.rs"

[dependencies]
tokio = { version = "0.2", features = ["macros"] }
lambda = { git = "https://github.com/awslabs/aws-lambda-rust-runtime/", branch = "master"}
aws_lambda_events = "0.3.0"
bytes = "0.5"
once_cell = "1.4"
log = "0.4"
env_logger = "0.7.1"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
openssl = {version = "0.10.30", features = ["vendored"]}
rusoto_core = "0.44.0"
rusoto_iot_data = "0.44.0"

main.rs #

use std::str::FromStr;
use env_logger;
use log::{info};
use lambda::{handler_fn, Context};
use aws_lambda_events::event::sqs::SqsEvent;
use serde_json::{Value, json};
use rusoto_core::region::Region;
use rusoto_iot_data::{IotDataClient, IotData, PublishRequest};
use bytes::Bytes;
use once_cell::sync::OnceCell;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

fn iot_client() -> &'static IotDataClient {
    static INSTANCE: OnceCell<IotDataClient> = OnceCell::new();
    INSTANCE.get_or_init(|| {
        IotDataClient::new(Region::EuNorth1)
    })
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    let _ = env_logger::try_init();
    let func = handler_fn(func);
    lambda::run(func).await?;
    Ok(())
}

async fn func(event: SqsEvent, _: Context) -> Result<Value, Error> {
    info!("event: {:?}", event);

    for record in event.records {
        let message: Value = serde_json::from_str(&record.body.unwrap())?;
        info!("message: {:}", message);

        let topic = String::from_str(&message["topic"].as_str().unwrap())?;
        let payload = serde_json::to_string(&message["payload"])?;

        let request = PublishRequest {
            payload: Some(Bytes::copy_from_slice(payload.as_bytes())),
            topic: topic,
            qos: Some(0),
        };

        iot_client().publish(request).await?;
    }

    Ok(json!("OK"))
}

Here it is expected that SQS event has the same JSON structure as defined before:

{
    "topic": "some/topic",
    "payload": "json message fields here"
}

Build and deploy #

cargo build --release
zip -j rust.zip ./target/x86_64-unknown-linux-musl/release/bootstrap

rust.zip is the file that must be uploaded to Lambda. Here is nice tutorial how to do it.

That’s it! This is how you can write Rust Lambdas that are using some rusoto modules.

 
10
Kudos
 
10
Kudos

Now read this

Doppler correction tool for SDR

Doppler is the last tool that is missing for receiving data from satellites using software defined radio (SDR) in a UNIX fashion: Write programs that do one thing and do it well. Write programs to work together. Write programs to handle... Continue →