Streaming Nonblocking Files

Overview

Use Nonblocking IO and an Event Loop to concurrently stream a large file.

Introduction

In previous articles this advent, we explored how Catalyst can work with the PSGI specification and an event loop like AnyEvent to support high concurrency. Lets put it together with what we've learned about streaming and build a Catalyst powered high concurrency, large file stream.

Using IO::AIO with AnyEvent and PSGI to enable nonblocking streams

IO::AIO is a CPAN distribution to enable non blocking file level operations and it is compatible with many event loops including AnyEvent. Using this library you can open and read a large file in a non blocking manner. Lets take a quick look at how we might use this with PSGI to serve up a file:

    use warnings;
    use strict;
    use AnyEvent::AIO;
    use IO::AIO;

    my $read_chunk;
    $read_chunk = sub {
      my ($writer, $fh, $offset) = @_;
      my $buffer = '';
      aio_read $fh, $offset, 65536, $buffer, 0, sub {
        my $status = shift;
        die "read error[$status]: $!" unless $status >= 0;
        if($status) {
          $writer->write($buffer);
          $read_chunk->($writer, $fh, ($offset + 65536));
        } else {
          $writer->close;
          aio_close $fh, sub { };
        }
      }
    };

    my $psgi_app = sub {
      my $env = shift;
      return sub {
        my $responder = shift;
        my $writer = $responder->(
          [200, [ 'Content-Type' => 'text/plain' ]]);

        aio_open 'root/lorem.txt', IO::AIO::O_RDONLY, 0, sub {
          my ($fh) = @_ or die $!;
          $read_chunk->($writer, $fh, 0);
        };
      };
    };

Again we'll use a closure since that's the simple thing to do. The most important bits here are the aio_open and aio_read. aio_open initiates the request to open the file and associates a callback that gets run when the open file handle is ready. We then recursively call aio_read on this open filehandle and with each chuck we read, we call write on the PSGI $writer object we've spoken of in such great detail previously. When there is no more data, we close the filehandle and the $writer object. And that's really there is too it!

Porting this to Catalyst

We could do a dumb port and have everything in one controller, but since we are using Catalyst and have such great tools to compartmentalize code, lets try to do the right thing. First, let's create a model for the code the reads a file and writes it.

    package MyApp::Stream;

    use Moose;
    use AnyEvent::AIO;
    use IO::AIO;

    has 'writer' => (
      is => 'bare',
      required => 1,
      handles => ['write', 'close']);

    has 'path' => (is=>'ro', required=>1);

    sub start { 
      my $self = shift;
      aio_open $self->path, IO::AIO::O_RDONLY, 0, sub {
        my ($fh) = @_ or die "${\$self->path}: $!";
        warn scalar localtime;
        $self->read_chunk($fh, 0);
      };  
    }

    sub read_chunk {
       my ($self, $fh, $offset) = @_;
       my $buffer = '';
       aio_read $fh, $offset, 65536, $buffer, 0, sub {
         my $status = shift;
         die "read error[$status]: $!" unless $status >= 0;
         if($status) {
           eval {
             $self->write($buffer); 1;
           } || warn "Cannot write, probably a closed pipe: $@";
           $self->read_chunk($fh, ($offset + 65536));
         } else {
           $self->close;
           aio_close $fh, sub { };
         }
       }
    }

One nice thing about doing this is how easy it is to write a test case and to mock the writer object.

    use MyApp;
    use MyApp::Stream;

    {
      package MockWriter;
      use Test::Most;

      sub new { bless {lines=>[]}, shift }

      sub write {
        my ($self, $line) = @_;
        push @{$self->{lines}}, $line;
      }

      sub close {
        my ($self) = @_;
        ok 1;
        ok @{$self->{lines}};
        done_testing;
      }
    }

    my $mocker = MockWriter->new;
    my $streamer = MyApp::Stream->new(
      path => MyApp->path_to('root','lorem.txt')->stringify,
      writer => $mocker);

    $streamer->start;

I know there's a bunch of Test mock objects on CPAN but I usually find it just as easy to make my own, particularly with this tricky asynchronous stuff.

Anyway, lets adapt this model into Catalyst

    package MyApp::Model::Stream;

    use Moose;

    extends 'Catalyst::Model::Factory';

    has 'path' => (is=>'ro', required=>1);

    sub prepare_arguments {
      my ($self, $c, $args) = @_;
      return +{ 
        writer => $c->res->write_fh,
        path => $self->path->stringify };
    }

    __PACKAGE__->meta->make_immutable;

This should look pretty familiar if you've read the previous articles on nonblocking and streaming.

This of course requires a bit of configuration

    package MyApp;

    use Catalyst;

    __PACKAGE__->config(
      'Model::Stream' => {
        class => 'MyApp::Stream',
        path => __PACKAGE__->path_to('root','lorem.txt') });

    __PACKAGE__->setup;

I suppose that could all just go into the adaptor class. but Catalyst makes it so easy and fun to do this type of configuaration I usually just go ahead!

Finally the controller is nice and skinny, just the way we like it!

    package MyApp::Controller::Root;

    use base 'Catalyst::Controller';

    sub streamer :Path(/) {
      my ($self, $c) = @_;
      $c->model('Stream')->start;
    }

    1;

So that should give you something to play with, and one more nonblocking example to help you figure out how to use this technique in your own projects.

For More Information

Summary

We've taken a look at one way to stream a large file with Catalyst using a nonblocking approach.

Author

John Napiorkowski jjnapiork@cpan.org