Photo by Isaac Smith on Unsplash
Introduction
In the AWS Lambda ecosystem, there is an awesome set of tools created to cover common scenarios when creating serverless functions. They are gathered together as Lambda Powertools
and they are available in a few languages.
At the moment there is no version of Powertools
for Rust (they will be created eventually). I saw this as a great opportunity to learn by building small helpers for AWS Lambda in Rust.
I am not attempting to replicate the Powertools suite. Instead, I’ll focus on implementing specific functionalities.
Metrics
I decided to start by creating utilities for CloudWatch custom metrics. When running AWS Lambda we have two main options to put our custom metrics to CloudWatch - using SDK, or printing the log shaped accordingly to Embedded Metrics Format (EMF). The latter is much cheaper and faster, but it requires some boilerplate.
EMF lets you publish up to 100 metric points at once and define up to 30 dimensions.
Goal
I create the library, which would let us
- create metrics for defined namespace and dimensions
- add more dimensions up to maximum limit defined by AWS
- handle gracefully going beyond the limit of 100 metric points
- publish metrics automatically once Lambda function finishes
As you see, not all functionalities provided by original Powertools are covered, yet the library should be already usable (and even useful, hopefully)
Implementation
The complete code for this implementation is available in the GitHub repository
I need two main pieces. First are types (structs, enums) that map to the EMF. I can use some serde
magic to seamlessly serialize them to the JSON.
The second piece would be a struct with some methods to hold a mutable state of current metrics and allow actions on it.
Types
EMF format is defined here. After translating it to Rust I got something like this.
// lib.rs
//...
/// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure_metricdefinition
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetricDefinition {
name: String,
unit: MetricUnit,
storage_resolution: u64,
}
/// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure_metricdirective
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetricDirective {
namespace: String,
dimensions: Vec<Vec<DimensionName>>,
metrics: Vec<MetricDefinition>,
}
/// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html#CloudWatch_Embedded_Metric_Format_Specification_structure_metadata
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetadataObject {
timestamp: i64,
cloud_watch_metrics: Vec<MetricDirective>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct CloudWatchMetricsLog {
#[serde(rename = "_aws")]
aws: MetadataObject,
#[serde(flatten)]
dimensions: Dimensions,
#[serde(flatten)]
metrics_values: MetricValues,
}
// ...
- I use
new type pattern
to keep my types expressive
// lib.rs
// ...
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct Dimensions(HashMap<String, String>);
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct MetricValues(HashMap<String, f64>);
#[derive(Debug, Serialize, Deserialize)]
pub struct DimensionName(String);
#[derive(Debug, Serialize, Deserialize)]
pub struct Namespace(String);
// ...
I decided to keep
metrics values
as a float. For some metrics, I would use integers, but I didn't want to add more complexity at this point-
Metrics units are defined with enums
// lib. rs // ... #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub enum MetricUnit { Seconds, Microseconds, Milliseconds, Bytes, Kilobytes, Megabytes, Gigabytes, Terabytes, Count, BytesPerSecond, KilobytesPerSecond, MegabytesPerSecond, GigabytesPerSecond, TerabytesPerSecond, BitsPerSecond, KilobitsPerSecond, MegabitsPerSecond, GigabitsPerSecond, TerabitsPerSecond, CountPerSecond, } // ...
The only logic I added here is the into
function, which handles conversion to string, needed for printing the log. As we will see in the moment, this implementation requires some fixes, but I intentionally leave it as is for now, to showcase clippy
capabilities.
// lib.rs
// ...
impl CloudWatchMetricsLog {
pub fn into(self) -> String {
serde_json::to_string(&self).unwrap()
}
}
// ...
Domain
Let's first define the domain types
// lib.rs
// ...
#[derive(Debug, Serialize, Deserialize)]
pub struct Metric {
name: String,
unit: MetricUnit,
value: f64,
}
// ...
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct Metrics {
namespace: Namespace,
dimensions: Dimensions,
metrics: Vec<Metric>,
}
// ...
Now we need some functionalities.
Add dimension
This might fail, so no matter if we like it or not, we need to return Result
and leave the responsibility of handling it to the caller. I will work on error types separately, so for now the Err
is just a String
// lib.rs
// ...
impl Metrics {
// ...
pub fn try_add_dimension(&mut self, key: &str, value: &str) -> Result<(), String> {
if self.dimensions.0.len() >= MAX_DIMENSIONS {
Err("Too many dimensions".into())
} else {
self.dimensions.0.insert(key.to_string(), value.to_string());
Ok(())
}
}
// ...
Add metric
At first glance, this operation should return Result
too, because, we might reach the limit of 100 metrics. Additionally, we can't post two data points for the same metric in one log.
Both cases are easily solvable - it is enough to flash current metrics and start collecting metrics from scratch.
The trade-off is that we expect metrics not to impact the performance of our lambda function. On the other hand, printing the log with a limited size sounds like a better strategy than returning an Err
and forcing the caller to deal with it.
// lib.rs
// ...
impl Metrics {
// ...
pub fn add_metric(&mut self, name: &str, unit: MetricUnit, value: f64) {
if self.metrics.len() >= MAX_METRICS
|| self.metrics.iter().any(|metric| metric.name == name)
{
self.flush_metrics();
}
self.metrics.push(Metric {
name: name.to_string(),
unit,
value,
});
}
// ...
Flush metrics
Flushing metrics means simply printing them to the console and removing current entries from our object
// lib.rs
// ...
impl Metrics {
// ...
pub fn flush_metrics(&mut self) {
let payload = self.format_metrics().into();
println!("{payload}");
self.metrics = Vec::new();
}
// ...
Format metrics
The main part of the logic, which is at the same time just transforming domain types to the AWS EMF types
// lib.rs
// ...
pub fn format_metrics(&self) -> CloudWatchMetricsLog {
let metrics_definitions = self
.metrics
.iter()
.map(|entry| entry.into())
.collect::<Vec<MetricDefinition>>();
let metrics_entries = vec![MetricDirective {
namespace: self.namespace.0.to_string(),
dimensions: vec![self
.dimensions
.0
.keys()
.map(|key| DimensionName(key.to_string()))
.collect()],
metrics: metrics_definitions,
}];
let cloudwatch_metrics = MetadataObject {
timestamp: Utc::now().timestamp_millis(),
cloud_watch_metrics: metrics_entries,
};
let metrics_values = self
.metrics
.iter()
.map(|metric| (metric.name.to_string(), metric.value))
.collect::<HashMap<_, _>>();
let cloudwatch_metrics_log = CloudWatchMetricsLog {
aws: cloudwatch_metrics,
dimensions: self.dimensions.clone(),
metrics_values: MetricValues(metrics_values),
};
cloudwatch_metrics_log
}
// ...
Flush metrics automatically at the end of the function
One more thing to implement. the user can manually flush metrics, but it is not very convenient. Let's flush metrics once the function ends, which means that our metrics
object goes out of scope
// lib.rs
// ...
impl Drop for Metrics {
fn drop(&mut self) {
println!("Dropping metrics, publishing metrics");
self.flush_metrics();
}
}
Unit tests
Business logic isn't very complex, so I can create a few test cases with basic assertions.
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_create_metrics() {
let mut metrics = Metrics::new("test_namespace", "service", "dummy_service");
metrics.add_metric("test_metric_count", MetricUnit::Count, 1.0);
metrics.add_metric("test_metric_seconds", MetricUnit::Seconds, 22.0);
let log = metrics.format_metrics();
assert_eq!(log.aws.cloud_watch_metrics[0].namespace, "test_namespace");
assert_eq!(log.aws.cloud_watch_metrics[0].metrics[0].name, "test_metric_count");
assert_eq!(log.aws.cloud_watch_metrics[0].metrics[0].unit, MetricUnit::Count);
assert_eq!(
log.aws.cloud_watch_metrics[0].metrics[0].storage_resolution,
60
);
assert_eq!(log.metrics_values.0.get("test_metric_count"), Some(&1.0));
assert_eq!(
log.aws.cloud_watch_metrics[0].metrics[1].name,
"test_metric_seconds"
);
assert_eq!(log.aws.cloud_watch_metrics[0].metrics[1].unit, MetricUnit::Seconds);
assert_eq!(
log.aws.cloud_watch_metrics[0].metrics[1].storage_resolution,
60
);
assert_eq!(log.dimensions.0.len(), 1);
}
#[test]
fn should_handle_duplicated_metric() {
let mut metrics = Metrics::new("test", "service", "dummy_service");
metrics.add_metric("test", MetricUnit::Count, 2.0);
metrics.add_metric("test", MetricUnit::Count, 1.0);
assert_eq!(metrics.metrics.len(), 1);
}
#[test]
fn should_not_fail_over_100_metrics() {
let mut metrics = Metrics::new("test", "service", "dummy_service");
for i in 0..100 {
metrics.add_metric(&format!("metric{i}"), MetricUnit::Count, i as f64);
}
assert_eq!(metrics.metrics.len(), 100);
metrics.add_metric("over_100", MetricUnit::Count, 11.0);
assert_eq!(metrics.metrics.len(), 1);
}
#[test]
fn should_fail_if_over_30_dimensions() {
let mut metrics = Metrics::new("test", "service", "dummy_service");
for i in 0..29 {
metrics
.try_add_dimension(&format!("key{i}"), &format!("value{i}"))
.unwrap();
}
match metrics.try_add_dimension("key31", "value31") {
Ok(_) => assert!(false, "expected error"),
Err(_) => assert!(true),
}
}
}
Example
In the examples
directory, I created the basic lambda function with AWS SAM.
// ...
async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// Extract some useful info from the request
let command = event.payload.command;
let mut metrics = Metrics::new("custom_lambdas", "service", "dummy_service");
metrics.try_add_dimension("application", "customer_service");
metrics.add_metric("test_count", MetricUnit::Count, 10.4);
metrics.add_metric("test_seconds", MetricUnit::Seconds, 15.0);
metrics.add_metric("test_count", MetricUnit::Count, 10.6);
// Prepare the response
let resp = Response {
req_id: event.context.request_id,
msg: format!("Command {}.", command),
};
// Return `Response` (it will be serialized to JSON automatically by the runtime)
Ok(resp)
}
// ...
After sam build && sam deploy
we can test the function from the console
As expected, there are two logs to the metrics. The first was emitted when we added the test_count
metric for the second time, and the last one was emitted when the function finished.
Finally, I can see metrics added to the CloudWatch
Cleaning up
The library works, which is great, but there are probably things to improve. Let's run clippy
- the great Rust linter.
cargo clippy -- -D clippy::pedantic
The pedantic
linter is pretty opinionated, but this is totally ok for me. All right, let's improve the code.
Panics
The new
function won't panic, since there are no dimensions added yet, so adding the first one is safe. This is a good place to use an allow
statement to make it clear, that this behavior is intentionally handled this way.
impl Metrics {
#[allow(clippy::missing_panics_doc)]
pub fn new(namespace: &str, dimension_key: &str, dimension_value: &str) -> Self {
let mut metrics = Self {
dimensions: Dimensions(HashMap::new()),
namespace: Namespace(namespace.to_string()),
metrics: Vec::new(),
};
// UNWRAP: for new metrics there is no risk of reaching max number of dimensions
metrics
.try_add_dimension(dimension_key, dimension_value)
.unwrap();
metrics
}
The second panic
is more interesting. The into
function for CloudWatchMetricsLog
might fail if Serialize
decides to panic, or there are HashMaps
with non-string keys.
Instead of Into
I need to implement TryInto
. Not to say that I shouldn't create a bare into
function, but implement TryInto
trait.
impl TryInto<String> for CloudWatchMetricsLog {
type Error = String;
fn try_into(self) -> Result<String, Self::Error> {
serde_json::to_string(&self).map_err(|err| err.to_string())
}
}
For now, I leave Error
as a String
, because I am not going to return it to the caller. Instead, I will only print an error
/// Flushes the metrics to stdout in a single payload
/// If an error occurs during serialization, it will be printed to stderr
pub fn flush_metrics(&mut self) {
let serialized_metrics: Result<String, _> = self.format_metrics().try_into();
match serialized_metrics {
Ok(payload) => println!("{payload}"),
Err(err) => eprintln!("Error when serializing metrics: {err}"),
}
self.metrics = Vec::new();
}
must_use and other
Clippy (pedantic) checks public functions as candidates for #[must_use]
attribute. The new
function for Metrics
feels like good match.
impl Metrics {
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn new(namespace: &str, dimension_key: &str, dimension_value: &str) -> Self {
// ...
I have also followed other clippy
suggestions, including docs formatting. Speaking of what ....
Documentation
Rust has a great story for creating documentation. Doc comments let use markdown, and eventually, are transformed into the web page.
After adding comments to the crate in general, and to the public structs and functions we have pretty nice docs out-of-the-box.
I run
cargo doc --open --lib
The browser automatically opens a page with my docs:
Publish crate
Ok, now I am ready to publish the first version of the created lib. It is exciting!
I change the name of the lig to lambda-helpers-metrics
, and set the version to the 0.1.0-alpha
.
After creating an account on crates.io
and email verification I am set up to publish crates.
🎉 🎉 🎉
Now I can run cargo add lambda_helpers_metrics
in any lambda function project and add some metrics. It feels nice :)
Summary
I built a simple library for creating metrics using AWS EMF spec. Thanks to that we can avoid expensive calls with putMetrics
and simply log data in the specific shape to the console.
Clippy
helped me to catch a bunch of possible mistakes.
Finally, I published the crate to the crates.io
so it can be added to the project with cargo add
Next steps
There are some things I would like to work on next
- defining
Errors
with the proper types - tracking cold starts
- add CI/CD
Please add a comment if you see more useful features to be added.
Thanks!
Top comments (0)