Building a Log Store in Rust – Part 6

7 Apr

It’s been almost 2 months since I lasted posted here, and the reason is simple: asynchronous networking in Rust is HARD! There have been a few blog posts and HN comments about Tokio and how it is tough to get started; I couldn’t agree more. The concept of a future, something that will happen and you can fetch the result in the future, isn’t that hard to gasp. However, getting the library to do what you want, coupled with Rust’s learning curves, and the result is a nightmare of compiler errors that don’t seem to make sense. Thankfully the folks on the Rust User Forum are amazing!

Closures and Synthetic Types

A lot of what you’ll do when dealing with Tokio (or Hyper) is mapping one Future to another. The two go-to methods for this are map and and_then. They seem similar, but are very different. The map method of a Future, provides the result of a Future (not the Error, for that use map_err) and expects another result. This result is then wrapped in a Future for you, using the same Error type as the original Future. Whereas, and_then provides the Future as the argument to the closure and expects that you return a new Future. I want to thank softprops (aka Doug Tangren) for his help at the NYC Rust Meetup for explaining this to me.

The above will help a lot of people, but eventually you’ll still face something like the following error I received:

error[E0599]: no method named `unwrap` found for type `std::result::Result<(std::option::Option<&futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, futures::stream::IterOk<std::slice::Iter<'_, futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, std::io::Error>), (std::io::Error, futures::stream::IterOk<std::slice::Iter<'_, futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, std::io::Error>)>` in the current scope
--> src/main.rs:105:57
|
105 |     let clients = core.run(client_stream.into_future()).unwrap();
|                                                         ^^^^^^
|
= note: the method `unwrap` exists but the following trait bounds were not satisfied:
`(std::io::Error, futures::stream::IterOk<std::slice::Iter<'_, futures::AndThen<tokio_proto::Connect<tokio_proto::pipeline::Pipeline, rpc_server::MessageProto>, std::result::Result<tokio_proto::pipeline::ClientService<tokio_core::net::TcpStream, rpc_server::MessageProto>, std::io::Error>, [closure@src/main.rs:100:71: 100:90]>>, std::io::Error>) : std::fmt::Debug`

You’ll notice the synthetic type in the middle of that error: [closure@src/main.rs:100:71: 100:90]. These errors are very reminiscent of errors you’d get when using an STL library in C++. They’re long, confusing, and not terribly helpful. One strategy I found for dealing with these types of issues, is to limit the number of closures you have if possible.

When using Tokio and Hyper, you’ll often have closure inside of closure. However, I stumbled upon this blog post about micro-services in Rust. I noticed that instead of having closures everywhere, the code elegantly called out to functions:

fn call(&self, request: Request) -> Self::Future {
        match (request.method(), request.path()) {
            (&Post, "/") => {
                let future = request
                    .body()
                    .concat2()
                    .and_then(parse_form)
                    .and_then(write_to_db)
                    .then(make_post_response);
                Box::new(future)
            }
            _ => Box::new(futures::future::ok(
                Response::new().with_status(StatusCode::NotFound),
            )),
        }
    }

Instead of providing an in-line closure for the and_then methods, a function was simply supplied. The logic for consuming one Future and producing another was done in a function. This made the code MUCH more readable, and I found easier to debug when the compiler complained.

Errors in Futures

While my code “works” in that it can receive logs from FileBeat and save them to the appropriate indices via RPC calls to connected clients, it cannot handle errors very well. During one trial run, I ran into a bug in the decoding code for the RPC messages. The result was a panic, a closing of the connection to the RPC client, and then running into a closed socket error every time I received another message. I could re-work the code such that the result from the HTTP call was an error; however, changing the code such that it tried to reconnect to the client inside of a future would be pretty difficult. I’d be very curious to see how someone much more knowledgeable than I about these things would propose I’d handle these situations.

A Diversion

While I am by no means giving up on LogStore, I am going to take a diversion and focus on the backend file storage instead. I have a few reasons for taking this diversion:

  • As this post mentions, asynchronous networking in Rust is really hard. I really lost momentum and my enjoyment of programming in Rust simply because I hadn’t a clue what to do next or how to solve the issue. Most of the usual ownership/borrow issues I run into in Rust simply require me thinking through the code and re-working how I did something. The networking code however, truly left me randomly changing code with the hope that it would miraculously work; it usually didn’t. Without the help of Vitaly Davidovich and Doug Tangren I would still be stuck.
  • Creating a file format is way easier to debug and test than networking code. I just feel like I’ll get further faster, and the project will keep my interest more, if I switch to something without networking. I also hope that while I take this diversion, the ergonomics of Tokio will improve.
  • Finally, I’d really like to try and implement some of the techniques that are used in the Anna key-value store paper. I think their approach of having multiple threads handle different keys would work really well in Rust. Rust naturally prevents you from making concurrency bugs, so it seems like a natural choice for something like Anna.

So this will most likely be the last post about LogStore. If/when I come back to LogStore, then I’ll continue posting. However, I do plan to continue with this blog, and blogging about Rust; however, it’ll focus more on implementing a backend storage layer for LogStore.

As always, thanks for reading!

Leave a Reply

Your email address will not be published. Required fields are marked *