A flexible peer-to-peer RPC system.
This egg is a thin but flexible layer on top of tcp-server and s11n providing remote-procedure-call based communications. Special support for callbacks is provided, which makes the interface more a peer-to-peer than a client-server solution.
Registers procedure to be callable by incoming RPC requests under the name name. Names of procedures are matched using equal?.
If callback-outgoing? is true, a reverse lookup entry associating the procedure with its name is also created. The table of reverse lookup entries is used by rpc:procedure to send a callback stub to the remote machine instead of the procedure itself, should the procedure be one of the parameters of a RPC call.
Unregisters the given name-or-procedure as an externally callable object. If a procedure is passed for the name-or-procedure parameter, it can only successfully be removed if a reverse lookup entry for this procedure exists.
As mutex lock intervals are kept as short as possible, it may happen, that a currently active server thread calls the procedure once more immediately after its removal before it becomes completely unavailable to the outside world.
The standard port number to establish RPC connections to. The default value is 29296.
The procedure used to establish network connections for RPC. Defaults to tcp-connect and must be signature-compatible with it.
Determines whether an RPC connection to the given host and port is active. The table of active connections is thread-local.
Retrieves an existing RPC connection to the given host and port or creates a new one. The table of active connections is thread-local.
Closes an existing RPC connection to the given host and port. Fails if no such connection exists. The table of active connections is thread-local.
Closes all existing RPC connections of the current thread.
Inside the server threads processing RPC requests, this parameter is set to the address (as a string) of the peer on behalf of which the thread is executing.
Consider this parameter read-only unless you really know what you are doing. You may seriously mess up communications otherwise.
Creates a procedure that can be called with any number of parameters to invoke the externally callable procedure published as name on the server at host:port with the given arguments.
The arguments are scanned for procedures and if any such are found, those in the reverse lookup table are replaced with callback stubs before all the parameters are serialized over the network connection. Callback stubs are small procedures that use the value of the rpc:current-peer and rpc:default-server-port parameters in the remote server thread in order to determine where they came from and to use rpc:procedure again to connect back to their home and execute their real counterpart.
Some care has been taken to isolate code executing in an RPC server thread properly:
Uses make-tcp-server to create a server procedure. The server threads spawned by this procedure are continuously processing RPC requests from their clients until the connection is closed.
This is a simple database server and client using sqlite3. As sqlite3 is not perfectly thread-safe and as there are of course better database servers around the example is perhaps a little academic, but it illustrates the use of this extension quite nicely.
Note that in this example the rpc:default-server-port parameter in the server can be set by the client because the server does not know where the client is listening. In a similar way, rpc:current-peer may be reset if the client knows its public IP better than the server.
;;;; rpc-demo.scm ;;;; Simple database server / client (require-extension (srfi 18) extras tcp rpc sqlite3) ;;; Common things (define operation (string->symbol (car (command-line-arguments)))) (define param (cadr (command-line-arguments))) (define rpc:listener (if (eq? operation 'server) (tcp-listen (rpc:default-server-port)) (tcp-listen 0))) ;; Start server thread (define rpc:server (make-thread (cute (rpc:make-server rpc:listener) "rpc:server") 'rpc:server)) (thread-start! rpc:server) ;;; Server side (define (server) (rpc:publish-procedure! 'change-response-port (lambda (port) (rpc:default-server-port port)) #f) (let ((db (sqlite3:open param))) (set-finalizer! db sqlite3:finalize!) (rpc:publish-procedure! 'query (lambda (sql callback) (print "Executing query '" sql "' ...") (sqlite3:for-each-row callback db sql)))) (thread-join! rpc:server)) ;;; Client side (define (callback1 . columns) (let loop ((c columns) (i 0)) (unless (null? c) (printf "~a=~s " i (car c)) (loop (cdr c) (+ i 1)))) (newline)) (define callback2-results '()) (define (callback2 . columns) (set! callback2-results (cons columns callback2-results))) (define (client) ((rpc:procedure 'change-response-port "localhost") (tcp-listener-port rpc:listener)) ((rpc:procedure 'query "localhost") param callback1) (rpc:publish-procedure! 'callback2 callback2) ((rpc:procedure 'query "localhost") param callback2) (pp callback2-results)) ;;; Run it (if (eq? operation 'server) (server) (client))
Copyright (c) 2005, Thomas Chust <email@example.com>. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. Neither the name of the author nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.