Written by Mario Zupan✏️
In a previous post on this blog, we covered how to create a Rust web service using Actix and Diesel. This time around, we’ll create a lightweight, fully asynchronous web service using the warp web framework and tokio-postgres.
Warp is based on the well-known and battle-tested hyper HTTP library, which provides a robust and very fast basis. Another cool feature of warp is it’s filter system, a functional approach to composing web applications.
Essentially, filters are just functions that can be composed together. In warp, they’re used for everything from routing, to middleware, to passing values to handlers. See warp’s release post for a deeper dive. In this tutorial, you’ll see some filters in action and we’ll demonstrate how you can write one yourself.
Setup
To follow along, all you need is a reasonably recent Rust installation (1.39+) and a way to run a Postgres database (e.g., Docker).
First, create your test project.
cargo new warp-example
cd warp-example
Next, edit the Cargo.toml
file and add the dependencies you’ll need.
[dependencies]
tokio = { version = "0.2", features = ["macros"] }
warp = "0.2"
mobc-postgres = { version = "0.5", features = ["with-chrono-0_4"] }
mobc = "0.5"
serde = {version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0"
thiserror = "1.0"
chrono = { version = "0.4", features = ["serde"] }
In case you’re wondering what all of this means:
-
tokio
is our async runtime, which we need to execute futures -
warp
is our web framework -
mobc / mobc-postgres
represents an asynchronous connection pool for our database connections -
serde
is for serializing and deserializing objects (e.g., to/from JSON) -
thiserror
is a utility library we’ll use for error handling -
chrono
represents time and date utilities
To avoid just dumping everything into one file, let’s add a bit of structure to main.rs
.
mod data;
mod db;
mod error;
mod handler;
For each of these modules, we’ll also create a file (e.g., data.rs
).
For the first step, create a web server running on port 8000 with a /health
endpoint that returns a 200 OK
.
In main.rs
, add:
#[tokio::main]
async fn main() {
let health_route = warp::path!("health")
.map(|| StatusCode::OK);
let routes = health_route
.with(warp::cors().allow_any_origin());
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}
In the above snippet, we defined our health_route
, which matches on GET /health
and returns 200 OK
. Then, to demonstrate how to add middleware, set up this route with the warp::cors
filter, which allows the service to be called from any origin. Finally, run the server using warp::serve
.
Test whether it works by starting the application using cargo run
and cURL
.
curl http://localhost:8000/health
Connecting the web service to a database
So far, so good! The next step is to set up your Postgres database and add a check for a working database connection in the /health
handler.
To start a Postgres database, you can either use Docker or a local Postgres installation. With Docker, you can simply execute:
docker run -p 7878:5432 -d postgres:9.6.12
This command starts a Postgres DB on port 7878 with user postgres
, database postgres
, and no password.
Now that you have a running database, the next step is to talk to this database from your warp application. To do so, you can use mobc, an async connection pool, to spawn multiple database connections and reuse them between requests.
Setting this up only takes a couple of lines. First, define some convenience types in main.rs
.
use mobc::{Connection, Pool};
use mobc_postgres::{tokio_postgres, PgConnectionManager};
use tokio_postgres::NoTls;
type DBCon = Connection<PgConnectionManager<NoTls>>;
type DBPool = Pool<PgConnectionManager<NoTls>>;
Next, create your connection pool in db.rs
.
use crate::{DBCon, DBPool};
use mobc_postgres::{tokio_postgres, PgConnectionManager};
use tokio_postgres::{Config, Error, NoTls};
use std::fs;
use std::str::FromStr;
use std::time::Duration;
const DB_POOL_MAX_OPEN: u64 = 32;
const DB_POOL_MAX_IDLE: u64 = 8;
const DB_POOL_TIMEOUT_SECONDS: u64 = 15;
pub fn create_pool() -> std::result::Result<DBPool, mobc::Error<Error>> {
let config = Config::from_str("postgres://postgres@127.0.0.1:7878/postgres")?;
let manager = PgConnectionManager::new(config, NoTls);
Ok(Pool::builder()
.max_open(DB_POOL_MAX_OPEN)
.max_idle(DB_POOL_MAX_IDLE)
.get_timeout(Some(Duration::from_secs(DB_POOL_TIMEOUT_SECONDS)))
.build(manager))
}
The create_pool
function simply creates a Postgres connection string and defines some parameters for the connection pool, such as minimum and maximum open connections, as well as a connection timeout.
The next step is to simply build the pool and return it. At this point, no database connection is actually created, just the pool.
Since we’re already here, let’s also create a function for initializing the database on startup.
const INIT_SQL: &str = "./db.sql";
pub async fn get_db_con(db_pool: &DBPool) -> Result<DBCon> {
db_pool.get().await.map_err(DBPoolError)
}
pub async fn init_db(db_pool: &DBPool) -> Result<()> {
let init_file = fs::read_to_string(INIT_SQL)?;
let con = get_db_con(db_pool).await?;
con
.batch_execute(init_file.as_str())
.await
.map_err(DBInitError)?;
Ok(())
}
With the get_db_con
utility, we tried to get a new database connection from the pool. Don’t worry about the error right now — we’ll talk about error handling later on.
To create a database table from the db.sql
file on startup, the init_db
function is called. This reads the file into a string and executes the query.
The init query looks like this:
CREATE TABLE IF NOT EXISTS todo
(
id SERIAL PRIMARY KEY NOT NULL,
name VARCHAR(255),
created_at timestamp with time zone DEFAULT (now() at time zone 'utc'),
checked boolean DEFAULT false
);
Back in our main function, we can now call our database setup functions.
let db_pool = db::create_pool().expect("database pool can be created");
db::init_db(&db_pool)
.await
.expect("database can be initialized");
If any of the database setup code fails, we can throw our hands up and panic because it won’t make sense to continue.
Assuming it doesn’t fail, it’s finally time to tackle the primary goal of this section: to add a database check to the /health
handler.
To do this, we need a way to pass the db_pool
to the handler. This is a perfect opportunity to write our first warp filter
.
In main.rs
, add the following with_db
filter.
use std::convert::Infallible;
use warp::{Filter, Rejection};
fn with_db(db_pool: DBPool) -> impl Filter<Extract = (DBPool,), Error = Infallible> + Clone {
warp::any().map(move || db_pool.clone())
}
This is a simple extract filter. The above means that for any route (any()
), you want to extract a DBPool
and pass it along.
If you’re interested in learning more about filters, the docs are quite helpful.
The filter is then simply added to the handler definition with the .and()
operator:
let health_route = warp::path!("health")
.and(with_db(db_pool.clone()))
.and_then(handler::health_handler);
Move the health handler to the handler.rs
file and add the database check.
use crate::{db, DBPool};
use warp::{http::StatusCode, reject, Reply, Rejection};
pub async fn health_handler(db_pool: DBPool) -> std::result::Result<impl Reply, Rejection> {
let db = db::get_db_con(&db_pool)
.await
.map_err(|e| reject::custom(e))?;
db.execute("SELECT 1", &[])
.await
.map_err(|e| reject::custom(DBQueryError(e)))?;
Ok(StatusCode::OK)
}
Now the handler receives a DBPool
, which you can use to get a connection and initiate a sanity check query against the database.
If an error occurs during the check, use reject::custom
to return a custom error.
Next, as promised, let’s take a look at error handling with warp.
Handling errors
Clean error handling is one of the most important and oft-overlooked things in any web application. The goal to provide helpful errors to API consumers while not leaking internal details.
We’ll use the thiserror library to conveniently create custom errors.
Start in error.rs
and define an Error
enum, which has a variant for all of your errors.
use mobc_postgres::tokio_postgres;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("error getting connection from DB pool: {0}")]
DBPoolError(mobc::Error<tokio_postgres::Error>),
#[error("error executing DB query: {0}")]
DBQueryError(#[from] tokio_postgres::Error),
#[error("error creating table: {0}")]
DBInitError(tokio_postgres::Error),
#[error("error reading file: {0}")]
ReadFileError(#[from] std::io::Error),
}
If we could find a way to transform these and other errors to meaningful API responses, we could simply return one of our custom errors from a handler and the caller would automatically get the correct error message and status code.
To do this, we’ll use warp’s concept of rejections
.
First, add a convenience type to main.rs
for fallible results.
type Result<T> = std::result::Result<T, warp::Rejection>;
Next, make sure your custom errors are recognized as rejections by warp by implementing the Reject
trait.
impl warp::reject::Reject for Error {}
Define a rejection handler, which turns rejections to nice error responses in the following form.
#[derive(Serialize)]
struct ErrorResponse {
message: String,
}
Such a rejection handler might look like this:
pub async fn handle_rejection(err: Rejection) -> std::result::Result<impl Reply, Infallible> {
let code;
let message;
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "Not Found";
} else if let Some(_) = err.find::<warp::filters::body::BodyDeserializeError>() {
code = StatusCode::BAD_REQUEST;
message = "Invalid Body";
} else if let Some(e) = err.find::<Error>() {
match e {
Error::DBQueryError(_) => {
code = StatusCode::BAD_REQUEST;
message = "Could not Execute request";
}
_ => {
eprintln!("unhandled application error: {:?}", err);
code = StatusCode::INTERNAL_SERVER_ERROR;
message = "Internal Server Error";
}
}
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
message = "Method Not Allowed";
} else {
eprintln!("unhandled error: {:?}", err);
code = StatusCode::INTERNAL_SERVER_ERROR;
message = "Internal Server Error";
}
let json = warp::reply::json(&ErrorResponse {
message: message.into(),
});
Ok(warp::reply::with_status(json, code))
}
Basically, we get a Rejection
from a handler. Then, depending on the type of error, we set the message and status code for the response.
As you can see, we can handle both generic errors, such as not found
, and specific problems, such as an error encountered while parsing the JSON body of a request.
The fallback handlers return a generic 500 error to the user and log what went wrong, so you can investigate if necessary without leaking internals.
In the routing definition, simply add this error handler using the recover
filter:
let routes = health_route
.with(warp::cors().allow_any_origin())
.recover(error::handle_rejection);
Perfect! We’ve made a lot of progress already. All that’s left is to actually implement CRUD handlers for your to-do app.
Implementing the CRUD API
Now we have a web server running and connected to a database as well as a way to handle errors gracefully. The only thing missing from our app is, well, the actual application logic.
We’ll implement four handlers:
-
GET /todo/?search={searchString}
to list all todos, filtered by an optional search string -
POST /todo/
to create a todo -
PUT /todo/{id}
to update the todo with the given ID -
DELETE /todo/{id}
to delete the todo with the given ID
The first step is to create todos because without them we won’t be able to test the other endpoints conveniently.
In db.rs
, add a function for inserting todos into the database.
const TABLE: &str = "todo";
pub async fn create_todo(db_pool: &DBPool, body: TodoRequest) -> Result<Todo> {
let con = get_db_con(db_pool).await?;
let query = format!("INSERT INTO {} (name) VALUES ($1) RETURNING *", TABLE);
let row = con
.query_one(query.as_str(), &[&body.name])
.await
.map_err(DBQueryError)?;
Ok(row_to_todo(&row))
}
fn row_to_todo(row: &Row) -> Todo {
let id: i32 = row.get(0);
let name: String = row.get(1);
let created_at: DateTime<Utc> = row.get(2);
let checked: bool = row.get(3);
Todo {
id,
name,
created_at,
checked,
}
}
This establishes a connection from the pool, sends an insert query, and transforms the returned row to a Todo
.
For this to work, you’ll need some data objects, which are defined in data.rs
.
use chrono::prelude::*;
use serde_derive::{Deserialize, Serialize};
#[derive(Deserialize)]
pub struct Todo {
pub id: i32,
pub name: String,
pub created_at: DateTime<Utc>,
pub checked: bool,
}
#[derive(Deserialize)]
pub struct TodoRequest {
pub name: String,
}
#[derive(Deserialize)]
pub struct TodoUpdateRequest {
pub name: String,
pub checked: bool,
}
#[derive(Serialize)]
pub struct TodoResponse {
pub id: i32,
pub name: String,
pub checked: bool,
}
impl TodoResponse {
pub fn of(todo: Todo) -> TodoResponse {
TodoResponse {
id: todo.id,
name: todo.name,
checked: todo.checked,
}
}
}
The Todo
struct is essentially a mirror of your database table. tokio-postgres
can use chrono’s DateTime<Utc>
to map to and from timestamps. The other structs are the JSON requests you expect for creating and updating a todo and the response you send back in your list, update, and create handlers.
You can now create your actual create handler in handler.rs
.
pub async fn create_todo_handler(body: TodoRequest, db_pool: DBPool) -> Result<impl Reply> {
Ok(json(&TodoResponse::of(
db::create_todo(&db_pool, body)
.await
.map_err(|e| reject::custom(e))?,
)))
}
In this case, you get both the request body parsed to a TodoRequest
and the db_pool
passed into the handler. Once in there, simply call the database function, map it to a TodoResponse
, and use warp’s reply::json
helper to serialize it to JSON.
If an error happens, handle it with warp’s reject::custom
, which enables you to create a rejection out of our custom error type.
The only thing missing is the routing definition in main.rs
.
let todo = warp::path("todo");
let todo_routes = todo
.and(warp::post())
.and(warp::body::json())
.and(with_db(db_pool.clone()))
.and_then(handler::create_todo_handler));
let routes = health_route
.or(todo_routes)
.with(warp::cors().allow_any_origin())
.recover(error::handle_rejection);
You’ll use warp::path
at/todo/
for several routes. Then, using warp’s filters, compose your create handler.
Add the post
method, specify that you expect a JSON body, and use your with_db
filter to signal that you need database access. Finally, finish it up by telling the route which handler to use.
All of that is then passed to the routes with an or
operator.
Test it with the following command.
curl -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"name": "Some Todo"}'
{"id":1,"name":"Some Todo","checked":false}
Great! Now that you know how it works, you can do the other three handlers all at once. Again, start by adding the database helpers.
const SELECT_FIELDS: &str = "id, name, created_at, checked";
pub async fn fetch_todos(db_pool: &DBPool, search: Option<String>) -> Result<Vec<Todo>> {
let con = get_db_con(db_pool).await?;
let where_clause = match search {
Some(_) => "WHERE name like $1",
None => "",
};
let query = format!(
"SELECT {} FROM {} {} ORDER BY created_at DESC",
SELECT_FIELDS, TABLE, where_clause
);
let q = match search {
Some(v) => con.query(query.as_str(), &[&v]).await,
None => con.query(query.as_str(), &[]).await,
};
let rows = q.map_err(DBQueryError)?;
Ok(rows.iter().map(|r| row_to_todo(&r)).collect())
}
pub async fn update_todo(db_pool: &DBPool, id: i32, body: TodoUpdateRequest) -> Result<Todo> {
let con = get_db_con(db_pool).await?;
let query = format!(
"UPDATE {} SET name = $1, checked = $2 WHERE id = $3 RETURNING *",
TABLE
);
let row = con
.query_one(query.as_str(), &[&body.name, &body.checked, &id])
.await
.map_err(DBQueryError)?;
Ok(row_to_todo(&row))
}
pub async fn delete_todo(db_pool: &DBPool, id: i32) -> Result<u64> {
let con = get_db_con(db_pool).await?;
let query = format!("DELETE FROM {} WHERE id = $1", TABLE);
con.execute(query.as_str(), &[&id])
.await
.map_err(DBQueryError)
}
These are essentially the same as in the create
case, except for fetch_todos
, where you’d create a different query if there is a search term.
Let’s look at the handlers next.
#[derive(Deserialize)]
pub struct SearchQuery {
search: Option<String>,
}
pub async fn list_todos_handler(query: SearchQuery, db_pool: DBPool) -> Result<impl Reply> {
let todos = db::fetch_todos(&db_pool, query.search)
.await
.map_err(|e| reject::custom(e))?;
Ok(json::<Vec<_>>(
&todos.into_iter().map(|t| TodoResponse::of(t)).collect(),
))
}
pub async fn update_todo_handler(
id: i32,
body: TodoUpdateRequest,
db_pool: DBPool,
) -> Result<impl Reply> {
Ok(json(&TodoResponse::of(
db::update_todo(&db_pool, id, body)
.await
.map_err(|e| reject::custom(e))?,
)))
}
pub async fn delete_todo_handler(id: i32, db_pool: DBPool) -> Result<impl Reply> {
db::delete_todo(&db_pool, id)
.await
.map_err(|e| reject::custom(e))?;
Ok(StatusCode::OK)
}
Again, you’ll see some familiar things. Every handler calls the database layer, handles the error, and creates a return value for the caller if everything goes well.
The one interesting exception is list_todos_handler
, where the aforementioned query parameter is passed in, already parsed to a SearchQuery
.
This is how you deal with query parameters in warp. If you had more parameters with different types, you could simply add them to the SearchQuery
struct and they would be automatically parsed.
Let’s wire everything up and do one final test.
let todo_routes = todo
.and(warp::get())
.and(warp::query())
.and(with_db(db_pool.clone()))
.and_then(handler::list_todos_handler)
.or(todo
.and(warp::post())
.and(warp::body::json())
.and(with_db(db_pool.clone()))
.and_then(handler::create_todo_handler))
.or(todo
.and(warp::put())
.and(warp::path::param())
.and(warp::body::json())
.and(with_db(db_pool.clone()))
.and_then(handler::update_todo_handler))
.or(todo
.and(warp::delete())
.and(warp::path::param())
.and(with_db(db_pool.clone()))
.and_then(handler::delete_todo_handler));
There are some new things here. To get the query parameter to the list handler, you need to use warp::query()
. To get the id
parameter for update and delete, use warp::path::param()
. Combine the different routes with or
operators, and your todo routes are set up.
There are many different ways to create and structure routes in warp. It’s just functions being composed together, so the process is very flexible. For more examples, check out the official docs.
Now let’s test the whole thing.
First, see if the error handling actually works.
curl -v -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"wrong": "Some Todo"}'
HTTP/1.1 400 Bad Request
{"message":"Invalid Body"}
Next, add another Todo
, check it off immediately, and try to update a nonexistent Todo
.
curl -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"name": "Done Todo"}'
{"id":2,"name":"Done Todo","checked":false}
curl -X PUT 'http://localhost:8000/todo/2' -H 'Content-Type: application/json' -d '{"name": "Done Todo", "checked": true}'
{"id":2,"name":"Done Todo","checked":true}
curl -X PUT 'http://localhost:8000/todo/2000' -H 'Content-Type: application/json' -d '{"name": "Done Todo", "checked": true}'
{"message":"Could not Execute request"}
So far, so good! Now list them, filter the list, delete one of them, and list them again.
curl -X GET 'http://localhost:8000/todo/' -H 'Content-Type: application/json'
[{"id":1,"name":"Some Todo","checked":false},{"id":2,"name":"Done Todo","checked":true}]
curl -X GET 'http://localhost:8000/todo/?search=Done%20Todo' -H 'Content-Type: application/json'
[{"id":2,"name":"Done Todo","checked":true}]
curl -v -X DELETE 'http://localhost:8000/todo/2' -H 'Content-Type: application/json'
HTTP/1.1 200 OK
curl -X GET 'http://localhost:8000/todo/' -H 'Content-Type: application/json'
[{"id":1,"name":"Some Todo","checked":false}]
Perfect! Everything works as expected. You can find the full code for this example on GitHub.
Conclusion
What a ride! In this tutorial, we demonstrated how to create a fully asynchronous web application using warp and tokio-postgres. We managed to get a basic database-backed CRUD web service off the ground with error handling in less than 300 lines of Rust code. Not too shabby!
warp seems very promising; it’s lightweight, modern, fast, and I love the functional approach. The framework is still young and has yet to stand the test of time, but so far, it looks great.
Plug: LogRocket, a DVR for web apps
LogRocket is a frontend logging tool that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.
In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.
Try it for free.
The post Create an async CRUD web service in Rust with warp appeared first on LogRocket Blog.
Top comments (5)
The party of the best Progressive Electric Music in this web recording has made me unendingly content with secure. Visit punctuationcheck.org/learn-more-ab... get to realize our-accentuation instrument free/[/url] for extra nuances. I have never found such stunning tunes in a specific spot.
Really, I never examined Reiki in my life now. By then, at that point, all of a sudden I got pulled in to it by energies of your site. The gave up inescapable results of this system are doing getting. Visit online tool for additional subtleties.
During Christmas time, the Ormskirk was impacting a ton. It's beginning and end coterminous' a stunning show. Click sentencestructure.org/sentence-gra... to commit mistake free work. I will suspend for my work to your incompleted direction.
Genuinely, I have not involved the Ashampoo contraption for a long degree of time now I am tending to thriving with it. Visit sentencechecker.org/how-to-write-a... to make work on time. Preferably, this will my optimal program.
There are different choices for the students to begin the sensible life following finishing affected bearing. Read this article for extra nuances. Started bearing limits the astounding ways to deal with the students to begin far as shown by their perspective.