13.2 Remote Procedure Calls (RPC)In this section, we use the Msg library to implement a Remote Procedure Call module, RPC.pm . The idea of RPC is to transparently invoke a subroutine in another process space and have it behave exactly as if it had been invoked in its own process. The following are the features we take for granted while calling ordinary subroutines, which the RPC module takes into account:
13.2.1 Using RPCLet us take a look at a sample use of the RPC module. The client is shown first: # Client stuff use RPC; my $conn = RPC->connect($host, $port); my $answer = $conn->rpc('ask_sheep', "Ba ba black sheep, have you any wool ?"); print "$answer\n"; The client sets up an RPC connection, given a host and port. A subroutine that is normally invoked as $answer = ask_sheep ($question); is invoked by using RPC as follows: $answer = $conn->rpc ("ask_sheep", $question); The client code knows it is making an RPC call. Making this transparent (as typical RPC systems do) is quite simple, really. Using eval , we can dynamically create a dummy client stub called ask_sheep on the caller's side and have it make the call to rpc() . The called subroutine, however, does not know whether it has been invoked locally or from a remote process (unless of course, it uses caller() to find out). The remote process (call it the RPC server) provides the required subroutines and invokes new_server and event_loop to accept incoming RPC calls; ask_sheep will get called at the right time. Simple! # Server stuff RPC->new_rpc_server($host, $port); RPC->event_loop(); sub ask_sheep { # Sample subroutine to be invoked from client print "Question: @_\n"; return "No"; }
Now, let us look at an example of using RPC between
peer processes
. Process 1 (identified by
$host1
,
$port1
) calls subroutine Process 1 looks like this: sub one { print "One called\n"; } $conn2 = RPC->new_rpc_server($host2, $port2); $conn2->rpc ("two"); Process 2 looks like this: sub two { print "Two called\n"; } $conn1 = RPC->new_rpc_server($host1, $port1); $conn1->rpc ("one"); Each process calls new_rpc_server to establish a listening port. Since the rpc call listens to incoming messages while it is still sending stuff out, neither process needs to call event_loop explicitly. A process that intends to hang around for a while should, of course, do so. 13.2.2 RPC: ImplementationThe RPC implementation is surprisingly small, thanks to the Msg and FreezeThaw modules. It inherits from Msg to provide the same connection and event loop abstractions. Let us examine the calling side first: package RPC; use Msg; use strict; use Carp; @RPC::ISA = qw(Msg); use FreezeThaw qw(freeze thaw); sub connect { my ($pkg, $host, $port) = @_; my $conn = $pkg->SUPER::connect($host,$port, \&_incoming_msg); return $conn; } connect simply calls Msg's connect , with _incoming_msg as the subroutine to notify on all incoming messages (including responses to subroutine calls and end-of-file indications). It leaves it to Msg's connect to create a connection object and bless it under RPC's auspices. Both Msg and RPC have been written so that they can be inherited by another module; the package name is not hardcoded. my $g_msg_id = 0; my $send_err = 0; sub handle_send_err { $send_err = $!; } handle_send_err overrides Msg::handle_send_err and stores any errors encountered while sending a message. This error code is checked in rpc , as shown next. The error handling in both RPC and Msg is definitely not up to snuff and needs a considerable amount of work before it can be reliably used in a production application. sub rpc { my $conn = shift; my $subname = shift; $subname = (caller() . '::' . $subname) unless $subname =~ /:/; my $gimme = wantarray ? 'a' : 's'; # Array or scalar my $msg_id = ++$g_msg_id; my $serialized_msg = freeze ('>', $msg_id, $gimme, @_); # Send and Receive $conn->send_later ($serialized_msg); do { Msg->event_loop(1); # Dispatch other messages until we # get a response } until (exists $conn->{rcvd}->{$msg_id} || $send_err); if ($send_err) { die "RPC Error: $send_err"; } # Dequeue message. my $rl_retargs = delete $conn->{rcvd}->{$msg_id}; # ref to list if (ref($rl_retargs->[0]) eq 'RPC::Error') { die ${$rl_retargs->[0]}; } wantarray ? @$rl_retargs : $rl_retargs->[0]; } rpc uses the FreezeThaw module's freeze method to bundle the following pieces of information into one big string:
The freeze method accounts for cyclic data structures and objects and returns one ASCII string, which means that we don't have to worry about the size of native integers or doubles or their memory layout (byte order). Msg->send_later() is used because it triggers nonblocking I/O where available. The message is really sent only when event_loop is called, because it determines when the socket is writable. At the same time, event_loop tracks other incoming messages and dispatches them. The count of 1 forces the event loop to return right after dispatching one round of messages, so we can retain control. When the response comes from the remote host, event_loop calls _incoming_msgs , which decodes it and hangs the return arguments on the connection object. Read on. Let us now take a look at the receiving side: sub new_server { my ($pkg, $my_host, $my_port) = @_; $pkg->SUPER::new_server($my_host, $my_port, sub {$pkg->_login(@_)}); } sub _login { \&_incoming_msg; } new_server , like connect , is a simple wrapper over its Msg counterpart. All incoming connections are unconditionally accepted by default, and messages are directed towards the subroutine _incoming_msg , shown next. Calling the _login procedure indirectly via $pkg gives you the opportunity to subclass RPC and supply your own _login procedure and refuse the connection if needed. sub _incoming_msg { my ($conn, $msg, $err) = @_; return if ($err); # Need better error handling. return unless defined($msg); my ($dir, $id, @args) = thaw ($msg); my ($result, @results); if ($dir eq '>') { # New request message my $gimme = shift @args; my $sub_name = shift @args; eval { no strict 'refs'; # Because we call the subroutine using # a symbolic reference if ($gimme eq 'a') { # Want an array back @results = &{$sub_name} (@args); } else { $result = &{$sub_name} (@args); } }; if ($@) { $msg = bless \$@, "RPC::Error"; $msg = freeze('<', $id, $msg); } elsif ($gimme eq 'a') { $msg = freeze('<', $id, @results); } else { $msg = freeze('<', $id, $result); } $conn->send_later($msg); } else { # Response to a message we had sent out earlier $conn->{rcvd}->{$id} = \@args; } } _incoming_msg is the counterpart to the rpc method. It unpacks the message sent by rpc and checks the direction (whether it is a request or a response). If it is a request, it calls the required subroutine using a symbolic reference. Notice that depending on the wantarray indication, it provides a scalar or vector result parameter. If eval reports an error, the $@ variable is stamped with an RPC::Error module tag and shipped back to the calling process (which invokes die ) . |
|