I created a mobile application to stream audio and display an image associated with the content being played. A third-party API was used to obtain a URL that pointed to a remote image corresponding to the content. I used a Flutter stream to listen to the URL and display the associated image. Originally I added logic to the stream to download and display the most recently published image at a set time interval. However, this wasn't optimal. Most audio streams last one hour, but could range in duration from twenty minutes to several hours. A more efficient solution that didn't use so much network bandwidth was necessary.
My solution was to implement a back-end service to download and hash the images corresponding to each channel in the streaming application. These hashes would be exposed as endpoints on a server and instead of downloading an entire image file at a set interval, I modified the stream logic to instead request a SHA256 hash of the image. So long as the hash didn't change, there would be no reason to download the remote image. This dramatically reduced the network and power consumption of my application. I hosted this back-end service inside of an AWS EC2 instance, and because EC2 instances are billed for data egressing from the instance, and not for data ingressing into the instance, this resulted in extremely minimal cost.
Config
struct that only contains a nested
Secrets
struct. This is used to load the remote image URLs without checking these into public
source control.
AppState
struct is used to hold the most recent image hashes. These are
guarded
by a mutex ensuring the service doesn't respond with a partial hash while the variable
is
being written to. This struct also contains a Notify
signal (from the Tokio
crate)
to signal when an update has occurred.
Config
struct that is used to read
the
secret URLs from a TOML file into program memory.
use std::{
sync::{Arc, Mutex},
time::Duration,
fs::read_to_string,
};
use actix_web::{get, web, App, HttpServer, HttpResponse};
use actix_web::middleware::Logger;
use reqwest;
use sha2::{Digest, Sha256};
use serde_derive::Deserialize;
use tokio::sync::Notify;
use chrono::prelude::*;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
const REFRESH_HASH_IN_SECONDS: u64 = 60;
#[derive(Debug, Deserialize)]
struct Config {
secrets: Secrets,
}
#[derive(Debug, Deserialize)]
struct Secrets {
en_image: String,
en_image_p: String,
es_image: String,
es_image_p: String,
fr_image: String,
po_image: String,
it_image: String,
de_image: String,
}
struct AppState {
en_image_hash: Mutex,
en_p_image_hash: Mutex,
es_image_hash: Mutex,
es_p_image_hash: Mutex,
fr_image_hash: Mutex,
po_image_hash: Mutex,
it_image_hash: Mutex,
de_image_hash: Mutex,
notify: Notify,
}
impl Config {
fn load_from_file(filename: &str) -> Result> {
let config_str = read_to_string(filename)
.map_err(|err| format!("Unable to read config file: {}", err))?;
let config: Config = toml::from_str(&config_str)
.map_err(|err| format!("Unable to parse config file: {}", err))?;
Ok(config)
}
}
AppState
struct.
download_and_hash_image
macro I wrote expects three arguments preceded
by a
dollar sign which indicates that which follows will be replaced in the generated code
by the value passed to the macro invocation.
$state_mu
represents a mutex from the AppState
struct,
$image
represents a string that points to a remote image URL, and
$state_notify
corresponds to the tokio::sync::notify
instance
used to notify the AppState
struct that a new hash is available.
AppState
struct.
download_and_hash_images
function reads the remote image URLs from the
Config
struct and passes these as arguments to the macro described above, along with the other
required arguments.
download_and_hash_image
macro for each image
URL as
often
as the constant REFRESH_HASH_IN_SECONDS
specifies, using the
tokio::time::sleep()
function.
macro_rules! download_and_hash_image {
($state_mu:expr, $image:expr, $state_notify:expr) => {
let image_data = match reqwest::get($image).await {
Ok(response) => match response.bytes().await {
Ok(data) => data,
Err(e) => {
let now: DateTime = Utc::now();
eprintln!("{} : Error reading response bytes: {}", now, e);
continue;
}
},
Err(e) => {
let now: DateTime = Utc::now();
eprintln!("{} : Error fetching image: {}", now, e);
continue;
}
};
let hash = format!("{:x}", Sha256::digest(&image_data));
{
let mut image_hash = $state_mu.lock().unwrap();
*image_hash = hash.clone();
$state_notify.notify_one();
}
};
}
async fn download_and_hash_images(state: Arc, config: Config) {
let en_image = config.secrets.en_image.clone();
let en_p_image = config.secrets.en_image_p.clone();
let es_image = config.secrets.es_image.clone();
let es_p_image = config.secrets.es_image_p.clone();
let fr_image = config.secrets.fr_image.clone();
let po_image = config.secrets.po_image.clone();
let it_image = config.secrets.it_image.clone();
let de_image = config.secrets.de_image.clone();
loop {
download_and_hash_image!(state.en_image_hash, &en_image, state.notify);
download_and_hash_image!(state.en_p_image_hash, &en_p_image, state.notify);
download_and_hash_image!(state.es_image_hash, &es_image, state.notify);
download_and_hash_image!(state.es_p_image_hash, &es_p_image, state.notify);
download_and_hash_image!(state.fr_image_hash, &fr_image, state.notify);
download_and_hash_image!(state.po_image_hash, &po_image, state.notify);
download_and_hash_image!(state.it_image_hash, &it_image, state.notify);
download_and_hash_image!(state.de_image_hash, &de_image, state.notify);
tokio::time::sleep(Duration::from_secs(REFRESH_HASH_IN_SECONDS)).await;
}
}
macro_rules! create_hash_endpoint {
($state_field:ident, $route:expr) => {
#[get($route)]
async fn $state_field(state: web::Data>) -> HttpResponse {
let image_hash = state.$state_field.lock().unwrap();
HttpResponse::Ok().body(image_hash.clone())
}
};
}
create_hash_endpoint!(en_image_hash, "/en");
create_hash_endpoint!(en_p_image_hash, "/en_p");
create_hash_endpoint!(es_image_hash, "/es");
create_hash_endpoint!(es_p_image_hash, "/es_p");
create_hash_endpoint!(fr_image_hash, "/fr");
create_hash_endpoint!(po_image_hash, "/po");
create_hash_endpoint!(it_image_hash, "/it");
create_hash_endpoint!(de_image_hash, "/de");
$state_field
will be the endpoint function name in the generated code.
$route
will correspond to the actual endpoint on the server, shown as
the
attribute above the
endpoint function.
main()
function has an attribute identifying it as the entry point for
an
Actix-Web application.
AppState
struct is created inside of an
Arc
.
Arc
is used to safely share the data contained inside
AppState
across multiple threads concurrently. The Actix-web server will handle multiple HTTP GET
requests simultaneously and each request may be processed on its own thread. Without the
Arc
synchronization issues and race conditions would likely occur.
Arc
has an internal counter that is incremented each time the
AppState
struct is cloned.
When no more references to the shared AppState
exist the counter will equal
0
and Rust can move this
resource out of scope. This will only happen when the web server is stopped and the
main()
function exits.
AppState
struct is used when the tokio
runtime spawns a thread devoted
to downloading and hashing the remote images, using the logic described earlier.
HttpServer
captures a second
clone of
the AppState
struct thereby assuming ownership of it.
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let app_state = Arc::new(AppState {
en_image_hash: Mutex::new(String::new()),
en_p_image_hash: Mutex::new(String::new()),
es_image_hash: Mutex::new(String::new()),
es_p_image_hash: Mutex::new(String::new()),
fr_image_hash: Mutex::new(String::new()),
po_image_hash: Mutex::new(String::new()),
it_image_hash: Mutex::new(String::new()),
de_image_hash: Mutex::new(String::new()),
notify: Notify::new(),
});
let app_state_clone = Arc::clone(&app_state);
tokio::spawn(async move {
let config = Config::load_from_file("Config.toml").unwrap_or_else(|e| {
eprintln!("Error loading config: {}", e);
std::process::exit(1);
});
download_and_hash_images(app_state_clone, config).await;
});
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder
.set_private_key_file("certs/key.pem", SslFiletype::PEM)
.unwrap();
builder
.set_certificate_chain_file("certs/cert.pem")
.unwrap();
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(app_state.clone()))
.wrap(Logger::default())
.service(en_image_hash)
.service(en_p_image_hash)
.service(es_image_hash)
.service(es_p_image_hash)
.service(fr_image_hash)
.service(po_image_hash)
.service(it_image_hash)
.service(de_image_hash)
})
.bind_openssl("0.0.0.0:9191", builder)?
.run()
.await
}
I learned a lot from this project and would really like to continue using Rust for any future projects. I also wrote Actix Web middleware to rate-limit these endpoints, or any routes on an Actix Web server. Check it out here.