Websocket Multiuser Chat

Overview

Following up on our Websockets Echo example let's build a basic and functional multiuser chat application.

Introduction

The main claim to fame (AFAIK) for Websockets is in how it makes it much easier to keep a persistent, bi-directional connection open between a client (like a web browser) and a server. The low overhead, low latency websocket is ideal for applications where you want to let lots of people interact in near real time. One such application could be a multiuser chat application, similar to IRC.

Although in theory one could build such a system on top of a blocking server such as Starman, you'd be limited in the number of allowed persistent connections. As you might recall from earlier articles, a blocking server can only be simultaneously connected to a number of clients which is equal to its number of workers (be they forked or threaded). As a result it is common to build persistent, websocket based applications on top of a non blocking server, managed by an event loop (such as AnyEvent and Twiggy). In this example we will use that approach.

Scope

A true multiuser chat would need significantly more security hardening than this example will show. There's no reason you could not build such security on top of existing tools in the Catalyst ecosystem.

The Code

Although I usually try to break my model down more carefully, sometimes for prototyping, or for build a proof of concept, I make uber controllers. Since all Catalyst::Controllers play nice with Moose this makes it easy to build some simple models and attributes on your controller. Once you nail down the API, you can start to refactor code more correctly. But it makes a nice first draft!

Let's look at the first part of the controller:

    package MyApp::Controller::Root;

    use Moose;
    use MooseX::MethodAttributes;
    use AnyEvent::Handle;
    use Protocol::WebSocket::Handshake::Server;
    use JSON;

    extends 'Catalyst::Controller';

    has 'history' => (
      is => 'bare',
      traits => ['Array'],
      isa  => 'ArrayRef[HashRef]',
      default => sub { +[] },
      handles => {
        history => 'elements',
        add_to_history => 'push'});

    has 'clients' => (
      is => 'bare',
      traits => ['Array'],
      default => sub { +[] },
      handles => {
        clients => 'elements',
        add_client => 'push'});

So the start is all the bits we are pulling in from CPAN to get the job done and we set this class up to be a Catalyst::Controller. We then create two attributes to hold the two basic models for this chat application. The first one is the history of the chat, which will be an arrayref of user => message:

    [
      { john => 'Hello bob' },
      { bob => 'Hey john' },
      ...
    ]

The clients is a arrayref of the connected clients. Ultimately as you will see this is an arrayref of handles to the websocket.

Since we are going to run this under a single process server, we get away with creating a model like this. Controllers are typically singletons associated with the application. So when runing under a single process server these attributes are always going to be available. If you ran this under a forking server, each child in the fork would have its own copy. In that case you'd need to save everything to a share storage, such as a database.

Lets look at an action:

    sub index :Path(/) {
      my ($self, $c) = @_;
      (my $url = $c->uri_for_action($self->action_for('ws')))
        ->scheme('ws');

      $c->stash(websocket_url => $url);
      $c->forward($c->view('HTML'));
    }

So this is going to be the root page. We just create a single link to what is going to be the websocket URL and pass on to the view. Lets see the view template:

    <!DOCTYPE html>
    <html lang="en" ng-app>
      <head>
        <title>Example Chat Server</title>
        <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.1.4/angular.min.js"></script>
        <script>
          function ChatCtrl($scope) {

            $scope.status = "Disconnected";
            $scope.username = "";
            $scope.socket = "";
            $scope.checked = 1;
            $scope.history = new Array;

            $scope.join = function() {
              socket = new WebSocket('[% websocket_url %]');

              socket.addEventListener("open", function(event) {
                $scope.$apply(function() {
                  $scope.status = "Connected";
                  $scope.checked = 0;
                  $scope.username = $scope.new_username;
                  socket.send(
                    angular.toJson({new: $scope.new_username})
                  );
                });
              });

              socket.addEventListener("close", function(event) {
                $scope.$apply(function() {
                  $scope.status = "Disconnected";
                });
              });

              socket.addEventListener("message", function(event) {
                $scope.$apply(function() {
                  $scope.history.push(JSON.parse(event.data));
                });
              });

              $scope.socket = socket;
            };

            $scope.send = function () {
              $scope.socket.send(
              angular.toJson({
                username: $scope.username,
                message: $scope.message}));

              $scope.message = '';
            }
          }
        </script>
      </head>
      <body ng-controller="ChatCtrl">
        <h1>The Chatroom Example</h1>
        <ul id="history">
          <li ng-repeat="item in history">
            <span ng-bind="item.username"></span>: 
            <span ng-bind="item.message"></span>
          </li>
        </ul>
        <div>
          Total Items: <span id="item-count" ng-bind="history.length"></span>
        </div>
        <div id="chatbox">
          Status: <span id="status" ng-bind="status"></span><br />

          <div ng-show="checked">
            <input id="join" type="button" value="Join" ng-click="join()" />&nbsp;
            <input id="new_username" ng-model="new_username" /><br />
          </div>
          <div ng-hide="checked">  
            <input id="send" type="button" value="Send" ng-click="send()" />&nbsp;
            <input id="text" ng-model="message" /><br />
          </div>
        </div>
      </body>
    </html>

We're using angular.js, which is Javascript framework that makes is easy to do your user interface work in a more model-view-controller manner. I'm not going to do details here since I'm still learning angular.js and my code is likely not the best possible approach. However if you know a bit of Javascript it should be pretty clear. There's a script section which defines some variables under a controller and sets events to listen for and things to do when those events happen.

The second part is some HTML with the angular.js tags which indicate binds to the variables under the controller. One really cool part of angular.js is in how if the variables are changed, HTML that binds to them automatically updates. This saves you a lot of low level plumbing details where you need to have the event update the data, and then pass control to a function that changes the UI. Not only does angular.js save that boilerplate, you end up with controller code that is much less tightly bound to the trivial details of your HTML markup.

So what happens here is that when someone clicks on the 'join' button, that creates a websocket connection and sends some json across that looks like this:

  { 'new': '$name' }

Where $name is the value entered into the new_username input box. We also change the UI a bit, hide the Join button and replace it with a send message button. That way after a person joins she can send messages.

Let's now look at the action that handles that websocket, bi-directional messaging:

    sub ws :Path(/ws) {
      my ($self, $ctx) = @_;
      my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($ctx->req->env);
      my $hd = AnyEvent::Handle->new(
        fh => $ctx->req->io_fh,
        on_error => sub { warn "Error ".pop });

      $hs->parse($hd->fh);
      $hd->push_write($hs->to_string);
      $hd->on_read(sub {
        (my $frame = $hs->build_frame)->append($_[0]->rbuf);
        while (my $message = $frame->next) {
          my $decoded = decode_json $message;
          if(my $user = $decoded->{new}) {
            $decoded = {username=>$user, message=>"Joined!"};
            foreach my $item ($self->history) {
              $hd->push_write(
                $hs->build_frame(buffer=>encode_json($item))
                  ->to_bytes);
            }            
          } 

          $self->add_to_history($decoded);
          foreach my $client($self->clients) {
            $client->push_write(
              $hs->build_frame(buffer=>encode_json($decoded))
                ->to_bytes);
          }
        }
      });

      $self->add_client($hd);
    }

So there's a lot going on, let's break it down:

    sub ws :Path(/ws) {
      my ($self, $ctx) = @_;

This is just the normal Catalyst stuff, should be no surprises.

      my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($ctx->req->env);
      my $hd = AnyEvent::Handle->new(
        fh => $ctx->req->io_fh,
        on_error => sub { warn "Error ".pop });

You might recall us creating the server end of the websocket protocol from the echo example. Again, this is a parser for the server end, not a server all by itself! The second bit we bless the raw IO socket into an AnyEvent::Handle and set a callback for coping with errors.

      $hs->parse($hd->fh);
      $hd->push_write($hs->to_string);

This slurps up the start of the websocket request (it actually modifies the filehandle by sucking up all the HTTP lines and the call for upgrading to a websocket connection.) We then add to the write queue the lines that are the server side of the 'handshake', which basically informs the client that yes indeed we can do websockets and finalizes the communication.

The next bit is a callback we install onto the websocket to handle read events (in other words the event that happens when the client sends some traffic over the websocket).

      $hd->on_read(sub {
        (my $frame = $hs->build_frame)->append($_[0]->rbuf);
        while (my $message = $frame->next) {
          my $decoded = decode_json $message;
          if(my $user = $decoded->{new}) {
            $decoded = {username=>$user, message=>"Joined!"};
            foreach my $item ($self->history) {
              $hd->push_write(
                $hs->build_frame(buffer=>encode_json($item))
                  ->to_bytes);
            }            
          } 

Websockets are raw, so its up to you to declare your own data encoding and protocol. Most people just use JSON since that's easy and also pretty compatible. Data comes down from the client in websocket frames, so you need to iterate over those frames. Since I am sending everyone over one frame, I can just go ahead and decode the JSON, but there's nothing stopping you from building a more complex communication protocol. Remeber, its just a socket, and its low level and raw!

I'm looking for incoming messages from clients, but in the case where I see the new key exists, that's the code for 'new connected user'. So when that happens, we send a message like user => 'Joined!', and also we send to the newly joined user the full history of the chat. Obviously this isn't going to scale but limiting it to the last 100 lines would also be easy to do.

          $self->add_to_history($decoded);
          foreach my $client($self->clients) {
            $client->push_write(
              $hs->build_frame(buffer=>encode_json($decoded))
                ->to_bytes);
          }

Here we take the incoming message, add it to the history and the we iterate over all the connected clients and send the message. Again, for very large numbers of connected clients you'd probably need to do something smarter here but this would be fine for even a few thousand connections.

      $self->add_client($hd);
    }

In this last bit we make sure to add the just newly connected client to the big list of clients.

Remember, this websocket stuff in Catalyst is currently still new stuff, so yes there's not a lot of hand holding or sugar. Nothing stopping us from adding that, if it turns out people are actually building applications! But here you at least can give it a try!

To see it all at once, here's the full controller all together:

    package MyApp::Controller::Root;

    use Moose;
    use MooseX::MethodAttributes;
    use AnyEvent::Handle;
    use Protocol::WebSocket::Handshake::Server;
    use JSON;

    extends 'Catalyst::Controller';

    has 'history' => (
      is => 'bare',
      traits => ['Array'],
      isa  => 'ArrayRef[HashRef]',
      default => sub { +[] },
      handles => {
        history => 'elements',
        add_to_history => 'push'});

    has 'clients' => (
      is => 'bare',
      traits => ['Array'],
      default => sub { +[] },
      handles => {
        clients => 'elements',
        add_client => 'push'});




    sub index :Path(/) {
      my ($self, $c) = @_;
      (my $url = $c->uri_for_action($self->action_for('ws')))
        ->scheme('ws');

      $c->stash(websocket_url => $url);
      $c->forward($c->view('HTML'));
    }

    sub ws :Path(/ws) {
      my ($self, $ctx) = @_;
      my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($ctx->req->env);
      my $hd = AnyEvent::Handle->new(
        fh => $ctx->req->io_fh,
        on_error => sub { warn "Error ".pop });

      $hs->parse($hd->fh);
      $hd->push_write($hs->to_string);
      $hd->on_read(sub {
        (my $frame = $hs->build_frame)->append($_[0]->rbuf);
        while (my $message = $frame->next) {
          my $decoded = decode_json $message;
          if(my $user = $decoded->{new}) {
            $decoded = {username=>$user, message=>"Joined!"};
            foreach my $item ($self->history) {
              $hd->push_write(
            $hs->build_frame(buffer=>encode_json($item))
              ->to_bytes);
        }            
      } 

      $self->add_to_history($decoded);
      foreach my $client($self->clients) {
        $client->push_write(
          $hs->build_frame(buffer=>encode_json($decoded))
            ->to_bytes);
      }
    }
  });

  $self->add_client($hd);
}

__PACKAGE__->meta->make_immutable; __PACKAGE__->config( namespace => '');

Also See

Summary

Building on our knowledge of how Catalyst exposes a low level, bi-directional socket based on the PSGI specification, we explored using AnyEvent to build a multiuser chat applications.

Author

John Napiorkowski email:jjnapiork@cpan.org