Nonblocking and Streaming - Part 4

Overview

Modern versions of Catalyst allow you to take charge of how your write operations work. This enables better support for working inside common event loops such as AnyEvent and IO::Async to support non blocking writes to stream large files.

Lets explore how PSGI implements delayed and streaming responses and how we can use streaming response with an event loop to enable use of nonblocking IO. Then lets see how Catalyst exposes that API. This part four of a four part series,

Introduction

So now we know how to use a PSGI application under an event loop and how to write non blocking applications. Let's port our five second heartbeat application to Catalyst!

Catalyst Non Blocking Streams

In the past we've discussed in detail how the Catalyst PSGI application is created, and did a bit of tracing regarding what happens when a request to that application is made. So at this point you know that Catalyst is using the delayed form of a PSGI app, and what's more, it finalizes headers early so you have access to a $writer object, suitable for streaming. And we've discussed how one can use $response->write to write to that, (and briefly spoke about chunked transfer encoding.)

So that is all fine for streaming under a blocking server like Starman. But how might we convert the non blocking timer we wrote in the previous example?

As it turns out, its quite trivial to do so. Here's a simple translation of the 5 second timer example. This is probably not the best way to write this using Catalyst, but it would work (Catalyst is after all just Perl, and a controller is just a class, sometimes we seem to forget).

    package MyApp::Controller::Root;

    use base 'Catalyst::Controller';
    use AnyEvent;

    my $watcher;
    my $timer_model = sub {
      my $writer = shift;
      my $count = 1;
      $watcher = AnyEvent->timer(
        after => 0,
        interval => 1,
        cb => sub {
          $writer->write(scalar localtime ."\n");
          if(++$count > 5) {
            $writer->close;
            undef $watcher;
          }
        });
    };

    sub time_server :Path(/) {
      my ($self, $c) = @_;
      $timer_model->($c->response->write_fh);
    }

    1;

Its actually quite similar to the plain old PSGI version. We create a closure over an AnyEvent timer watcher, and then call that with the $writer object. In Catalyst one gets access to the $writer via the res- in Catalyst::Responsewrite_fh> method. Now, you need to know, once you've requested this object, you are expected to close the writer manually. That's because 'finalize_body' checks a flag to see if you've requested the writer and if you have, it skips the rest of the body finalization. You might have noticed that in the code to finalize_body, over in Catalyst::Engine (if you didn't, this might be a good time to have a second look). So, once you've asked for manual control you have to drive stick all the way home!

This works just as it is, but its not a great Catalyst application. The controller is pretty heavy and there's no reusable bits. Lets spend a bit of time to refactor this into something that starts to resemble what a Catalyst application should be.

First lets take that closure and convert it into a class. There's a few approaches but this comes to mind, after pondering it a bit.

    package MyApp::Timer;

    use Moose;
    use AnyEvent;

    has 'writer' => (
      is => 'bare',
      required => 1,
      handles => ['write', 'close']);

    has 'counter' => (
      traits => ['Counter'],
      is => 'ro',
      required => 1,
      handles => {
        decrement_counter => 'dec'});

    has 'watcher' => (
      is => 'ro',
      lazy_build => 1,
      init_arg => undef,
      clearer => 'clear_watcher');

    sub _build_watcher {
      my $self = shift;
      return AnyEvent->timer(
        after => 0,
        interval => 1,
        cb => sub { $self->write_or_finalize } );
    }

    sub start { shift->watcher }

    sub write_or_finalize {
      my $self = shift;
      $self->decrement_counter >= 0 
        ? $self->write_timestamp
          : $self->finalize;
    }

    sub write_timestamp {
      my $self = shift;
      $self->write(scalar localtime ."\n");
    }

    sub finalize {
      my $self = shift;
      $self->close;
      $self->clear_watcher;
    }

    __PACKAGE__->meta->make_immutable;

This version has a few additional tricks in that you can set the counter via initialization. We also try to split up the functionality a bit, and we do our best to take advantage of the bits that Moose gives us. But all in all it basically does the same thing, just this time in a neater package that you can write unit tests for, and hopefully this is a step toward reusable code.

Ok, now that we've written the class, how do we expose it to your Catalyst application. I've always found Catalyst::Model::Adaptor is pretty great at this. Here's one way to adapt this class for Catalyst, via an application specific model:

    package MyApp::Model::Timer;

    use Moose;

    extends 'Catalyst::Model::Factory';

    has 'counter' => (is=>'ro', isa=>'Num', required=>1);

    sub prepare_arguments {
      my ($self, $c, $args) = @_;
      return +{ 
        writer => $c->res->write_fh,
        counter => $self->counter};
    }

    __PACKAGE__->meta->make_immutable;

We're using the factory version of Catalyst::Model::Adaptor because we want a new timer for each request. Since this adaptor does ACCEPT_CONTEXT we can go ahead and grab the $writer right out of the request. Last, we'll configure the counter via general Catalyst configuration. Let's take a look at that next:

    package MyApp;

    use Catalyst;

    __PACKAGE__->config(
      'Model::Timer' => {
        class => 'MyApp::Timer',
        counter => 5});

    __PACKAGE__->setup;

If you are using something like the ConfigLoader plugin, you could have one version of this for development (with a debug version of the timer for example) and another for production. Its just a neat trick that Catalyst lets you do by having this clean separation.

So we need a bit of configuration and that sets up the evented timer! All that is left is the controller. What does that look like now?

    package MyApp::Controller::Root;

    use base 'Catalyst::Controller';

    sub time_server :Path(/) {
      my ($self, $c) = @_;
      $c->model('Timer')->start;
    }

    1;

Ok, that's the skinny controller I wanted to see! Now we have a much better designed application. Most of the real work is in a stand alone class, and the Catalyst bits is mostly glue to tie that model to a request. Now, if we run this application:

    plackup -Ilib -MMyApp -s Twiggy -e 'MyApp->psgi_app'
    Twiggy: Accepting connections at http://0.0.0.0:5000/
    127.0.0.1 - - [28/Nov/2013:22:26:28 -0600] "GET / HTTP/1.0" 200 - "-" "-"

We can hit it with telnet, Apache AB, whatever, and it works just like the plain old PSGI version (except I hope we have a stronger structure going forward)

    $ telnet 127.0.0.1 5000
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    GET / HTTP/1.0

    HTTP/1.0 200 OK

    Thu Nov 28 21:08:32 2013
    Thu Nov 28 21:08:33 2013
    Thu Nov 28 21:08:34 2013
    Thu Nov 28 21:08:35 2013
    Thu Nov 28 21:08:36 2013
    Connection closed by foreign host.

If you do careful testing you'll probably note that the Catalyst version has a bit more overhead overall (the five second test takes maybe 5.150 verses 5.050) due to the fact a lot more code is involved. But you get a lot for that payment, in my opinion at least!

This concludes our four part article on the basics of Catalyst streaming and non blocking. Catalyst has only had these features exposed in public API for less than a year, so its up to you, the user community, to take charge of all the great stuff and do awesome work with it. Hopefully Advent 2014 will be chock full of examples along these lines!

For More Information

Summary

You've learned how to take charge the the PSGI writer that sits underneath every Catalyst response in order to take advantage of some basic non blocking code, running under AnyEvent and Twiggy.

Author

John Napiorkowski email:jjnapiork@cpan.org