Practice of Programming

プログラム とか Linuxとかの話題

Perl で WebSocket クライアント(AnyEvent)&サーバ(psgi)

こんなんでいいかなぁ。その2。

forkじゃアレなんで、AnyEventで書いてみた(https://gist.github.com/1000223)。
追記: 以下、to_stringですが、Protocol-WebSocket-0.00906 では、Protocol::WebSocket::Handshake は to_bytes に変わっているようです。

#!/usr/bin/perl

use utf8;
use strict;
use warnings;
use AnyEvent;
use Protocol::WebSocket::Frame;
use Protocol::WebSocket::Handshake::Client;
use IO::Socket;
use constant {READ => 0, WRITE => 1};

$| = 1;

run();

sub run {
  my $cv = AE::cv;
  my $s = IO::Socket::INET->new(PeerAddr => '127.0.0.1', PeerPort => 50000, Proto => 'tcp', Blocking => 0);
  if (not $s or not $s->connected) {
    client_exit();
  }

  my $hc = Protocol::WebSocket::Handshake::Client->new(url => 'ws://127.0.0.1:50000/');
  # handshare request
  $s->syswrite($hc->to_string);

  my (@messages, $wsr, $wsw, $stdin);

  my $finish = sub { undef $stdin; undef $wsr; undef $wsw; $cv->send; client_exit($s) };

  local @SIG{qw/INT TERM ALRM/} = ($finish) x 3;

  $stdin = AE::io *STDIN, READ, sub {
    my $line = <STDIN>;
    unless ($line) {
      $finish->();
    } else {
      chomp $line;
      push @messages, Encode::decode('utf8', $line)
    }
  };

  $wsw = AE::io $s, WRITE, sub {
    if ($s->connected) {
      while (my $msg = shift @messages) {
        $s->syswrite(Protocol::WebSocket::Frame->new($msg)->to_string);
      }
    } else {
      $finish->();
    }
  };

  # parse server response
  my $frame_chunk = '';
  until ($hc->is_done) {
    $s->sysread(my $buf, 1024);
    if ($buf) {
      if ($buf =~ s{(\x00.+)$}{}) {
        $frame_chunk = $1;
      }
      print $buf;
      $hc->parse($buf);
      if ($hc->error) {
        warn $hc->error;
        $finish->();
      }
    }
  }

  my $frame = Protocol::WebSocket::Frame->new();
  $frame->append($frame_chunk) if $frame_chunk;
  $wsr = AE::io $s, READ, sub {
    $s->sysread(my $buf, 100);
    $frame->append($buf);
    while (my $msg = $frame->next) {
      print Encode::encode('utf8', $msg), "\n";
    }
  };

  $cv->recv;

  client_exit($s);
}

sub client_exit {
  my $s = shift;
  close $s if $s;
  exit;
}

Protocol::WebSocket は、Handshake時にサーバに投げるリクエスト作ったり、それをparseして、レスポンス作ったり、WebSocketのフレームを作ったりするモジュールです。単体でサーバになったり、クライアントになったりするものではありません。


ミニマムなサーバの実装は、下記にあります。
http://showmetheco.de/articles/2011/3/using-protocol-websocket-with-plack.html


あんまり説明するようなこともないのですけれど。


上のクライアントのコードでクライアントがサーバにHandshakeのリクエスト送る部分は、以下の部分。

  my $s = IO::Socket::INET->new(PeerAddr => '127.0.0.1', PeerPort => 50000, Proto => 'tcp', Blocking => 0);
  # ...
  my $hc = Protocol::WebSocket::Handshake::Client->new(url => 'ws://127.0.0.1:50000/');
  # handshare request
  $s->syswrite($hc->to_string);

上に紹介したblogのコードで、クライアントからのHandshakeのリクエストをパースしてる部分。

      my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
      $hs->parse($fh) or return [400, [], [$hs->error]];

それから、クライアントにレスポンスを返している部分。

            my $h = AnyEvent::Handle->new(fh => $fh);
            # ...
            $h->push_write($hs->to_string);

クライアント側は、サーバのレスポンスを受け取るのだけど、sysreadで読んでいるので、サーバがすぐにフレームを送ってきた場合、余計に取れちゃいます。
レスポンス最後にくっついたフレームまで取っちゃいます。なので、フレームの手前を正規表現で切り取って、残りをフレームに渡すようにしました。

  # parse server response
  my $frame_chunk = '';
  until ($hc->is_done) {
    $s->sysread(my $buf, 1024);
    if ($buf) {
      if ($buf =~ s{(\x00.+)$}{}) {
        $frame_chunk = $1;
      }
      print $buf;
      $hc->parse($buf);
      # ...
    }
  }

  my $frame = Protocol::WebSocket::Frame->new();
  $frame->append($frame_chunk) if $frame_chunk;

ついでに、さっきのblogをを参考にした、簡易なchatサーバ。

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent::Handle;
use Protocol::WebSocket::Handshake::Server;
use Protocol::WebSocket::Frame;

my %member;
my $cnt = 0;

my $psgi_app = sub {
    my $env = shift;

    my $cv = AnyEvent->condvar;

    my $fh = $env->{'psgix.io'} or return [500, [], []];
    my $hs = Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
    $hs->parse($fh) or return [400, [], [$hs->error]];

    return sub {
      my $respond = shift;

      my $h = AnyEvent::Handle->new(fh => $fh);
      $member{fileno($fh)} = $h;

      $h->push_write($hs->to_string);

      my $frame = Protocol::WebSocket::Frame->new;
      $h->on_read(
                  sub {
                    $frame->append($_[0]->rbuf);
                    while (my $message = $frame->next) {
                      $message = Protocol::WebSocket::Frame->new($message)->to_string;
                      $_->push_write($message) for values %member;
                    }
                  }
                 );
      $h->on_error(
                   sub {
                     my ($hdl, $fatal, $msg) = @_;
                     delete $member{fileno($fh)};
                     $hdl->destroy;
                     $cv->send;
                     undef $h;
                   });
      $h->on_eof(
                 sub {
                   my ($hdl) = @_;
                   delete $member{fileno($fh)};
                   $hdl->destroy;
                   $cv->send;
                   undef $h;
                 });

    };
};

$psgi_app;

twiggyで起動します。

% twiggy -p 50000 app.psgi

50000 にしてるのは、最初に書いたクライアントと合わせただけなので、特に意味はないです。


htmlとjsは、id:motemenさんの以下のものを参考にすれば、結構簡単に作れるんじゃないかなとか思います。
https://gist.github.com/632374


初AnyEvent、初WebSocketみたいなもんなんで、なんか変なことしてるかもしれません。
お気づきの点があれば、突っ込んで頂ければと思います。