#/usr/bin/perl -w

# ==============================================================================
# IMIP MULTIPLEXOR - must be catted onto the end of node.pl
# ==============================================================================

use IO::Socket;

require 'node.pl';


# ------------------------------------------------------------------------------
# globals

use vars qw( $debug
	     $con
	     $msg_type
	     $my_ref
	     $your_ref
	     %fields
	     $body );

$debug = 0;


# ------------------------------------------------------------------------------
# constants

my $MAX_TIME_DIFF       = 5; # a time difference of 5 seconds between computers is unacceptable
my $MAX_LISTEN          = 20;
my $MAX_FWD_TIMEOUT     = 30;
my $DEFAULT_FWD_TIMEOUT = $MAX_FWD_TIMEOUT;
my $HELLO_TIMEOUT       = 20;
my $PING_MID_INTERVAL   = 0; # 20
my $MAX_FWD_ATTEMPTS    = 10;


# ------------------------------------------------------------------------------
# command line arguments

die "syntax: $0 listen_sock hub_host hub_port deployment [customer]*" unless @ARGV >= 4;
my ($mux_socket, $hub_host, $hub_port, $deployment, @customers) = @ARGV;
my $customers = join ' ', @customers;


# ------------------------------------------------------------------------------
# file my variables

# sequence number for my_refs
my $my_ref_seq = 1;

# active requests
my %active_outgoing_requests = ();
my %active_incoming_requests = ();
my @incoming_request_queue = ();

# hub connection
my $sock_hub;
my $con_hub;

# listening socket
unlink $mux_socket;
my $sock_listen = IO::Socket::UNIX->new( Local     => $mux_socket,
					 Listen    => $MAX_LISTEN )
    or die "cannot open listening socket: $!";
chmod 0777, $mux_socket;

# keeping track of mids
my $n_free_mids = 0;
my $focus_mid_con = -1;
my @mid_cons = ();

# ------------------------------------------------------------------------------
# 'listen' connection handler

set_readh('listen', sub {
    accept_connection();

  { my $con = $con;
    set_timeout($HELLO_TIMEOUT, sub {
	message($con, "timeout: time limit for connection has expired");
	abort_connection($con);
    }); }
});


# ------------------------------------------------------------------------------
# 'new' connection handler

set_conh('new',
    # received a 'hello_mux' message
    hello_mid => sub {
	my $time = $fields{time};

	if (abs(time - $time) > $MAX_TIME_DIFF) {
	    # the time difference is too great - reply with an error
	    send_message( 'error', $your_ref, '.',
			  code    => 1,
			  message => 'clocks are not synchronized' );
	    reset_timeout();
	    return;
	}

	# state of the mid is initially 'new'
	# - it should change state to 'free' some time
	$con->{mid_state} = 'new';

	# send a welcoming reply - TODO: could do this earlier?
	send_message('welcome', $your_ref, '.');
	
	set_connection_type('mid');

	message('connection from mid activated');

	cancel_timeout();
	
        if ($PING_MID_INTERVAL) {
	    my $con = $con;
	    set_timeout($PING_MID_INTERVAL, sub {
		message($con, "should 'ping' the mid now!");
		reset_timeout($con);
	    });
	}

	# add this connection to the ring of connections
	push @mid_cons, $con;
    },

    # unknown message
    DEFAULT => sub {
	message("unknown message type '". $msg_type ."'");
	abort_connection();
    },

    # if the connection is closed by the peer
    CLOSING => sub {
	message("new connection closed");
    },

    # if there is an error on this connection
    ERROR => sub {
	message("error on new connection");
    },
);


# ------------------------------------------------------------------------------
# 'mid' connection handler

set_conh('mid',
    # a request from a local mid for a remote object
    # we forward it to our hub
    request => sub {
	my $fwd_my_ref = $my_ref_seq ++;
	$active_outgoing_requests{$fwd_my_ref} = { from_con     => $con,
						   req_your_ref => $your_ref,
						   fields       => {%fields} };

	# forward the request
	send_message($con_hub, 'request', '.', $fwd_my_ref, %fields);

	my $timeout = $fields{timeout} || $DEFAULT_FWD_TIMEOUT;
	$timeout = $MAX_FWD_TIMEOUT if $timeout > $MAX_FWD_TIMEOUT;

      { my ($con, $your_ref) = ($con, $your_ref);
	set_timeout("fwd:$fwd_my_ref", $timeout, sub {
	    # the response did not arrive in time
	    send_message( $con, 'error', $your_ref, '.',
			  code    => 3,
			  message => 'response did not arrive before timeout' );
	    delete $active_outgoing_requests{$fwd_my_ref};
	    message($con, 'response did not arrive before timeout');
	}); }
    },

    # a response from a local mid to a remote request
    # we forward it to our hub if the ref is valid
    response => sub {
	my $request = delete $active_incoming_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message('received expired / spurious response');
	    return;
	}
	unless ($my_ref eq $request->{fwd_my_ref}) {
	    message("severe protocol error! response ref $my_ref does not match expected ref $request->{fwd_my_ref}");
	    return;
	}

	# forward the response
	send_message( $con_hub, 'response', $request->{req_your_ref}, '.',
		      %fields, $body );

	cancel_timeout("fwd:$my_ref");
    },

    # a mid is making a state change
    state => sub {
	mid_change_state($fields{state});
    },

    # the mid connection is closing!
    CLOSING => sub {
	# was it free? then decrement free count
	mid_change_state('closed');

	# remove the connection from the ring
	my $i = 0;
	++$i while $mid_cons[$i] ne $con;
	splice @mid_cons, $i, 1;
	-- $focus_mid_con if $focus_mid_con >= $i;
    },
);


# ------------------------------------------------------------------------------
# 'hub' connection handler

set_conh('hub',
    # a request from a remote mid for a local object
    # we forward it to a 'free' mid.  If the mid goes 'busy_remote', this means it was okay.
    # If it goes 'busy_local' (very unlikely), this means that it had changed state before
    # it got the request, and therefore we should try another mid.
    # if no mids are free, ??? we might return an error to the hub ??? or wait ??? - params?
    request => sub {
	my $fwd_my_ref = $my_ref_seq ++;

	# note - this is indexed by mid connection, not the fwd_my_ref, which is a check
	my $request = $active_incoming_requests{$fwd_my_ref} = { from_con     => $con,
								 req_your_ref => $your_ref,
								 fwd_my_ref   => $fwd_my_ref,
								 fields       => {%fields} };

	# we need to set a timeout in case no reply comes back
	my $delay = $fields{timeout} || $DEFAULT_FWD_TIMEOUT;
	$delay = $MAX_FWD_TIMEOUT if $delay > $MAX_FWD_TIMEOUT;
	
      { my ($con, $your_ref) = ($con, $your_ref);
	set_timeout("fwd:$fwd_my_ref", $delay, sub {
	    # the response did not arrive in time
	    send_message( $con, 'error', $your_ref, '.',
			  code    => 3,
			  message => 'response did not arrive before timeout' );
	    delete $active_incoming_requests{$fwd_my_ref};
	    message($con, 'response did not arrive before timeout');
	}); }

	push @incoming_request_queue, $request;

	fwd_incoming_requests();
    },

    # a response from a remote mid to a local request
    # we forward it to the mid if the ref is valid
    response => sub {
	my $request = delete $active_outgoing_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message('received expired / spurious response');
	    return;
	}

	# forward the response
	send_message( $request->{from_con}, 'response', $request->{req_your_ref}, '.',
		      %fields, $body);

	cancel_timeout("fwd:$my_ref");
    },

    # the hub has sent us an error
    error => sub {
	if ($fields{code} == 1) {
	    # this means the clocks are not syncronized, and we're not welcome :`(
	    message("please synchronize clock with the hub");
	    exit 1;
	} else {
	    # perhaps the hub cannot service a request?
	    message("received error from hub: $fields{message}");
	    my $request = $active_outgoing_requests{$my_ref};
	    if ($request) {
		cancel_timeout("fwd:$my_ref");
		# forward the error to the mid
		send_message( $request->{from_con}, 'error', $request->{req_your_ref}, '.',
			      %fields );
	    }
            # TODO: check if my_ref matches; forward this to mid?
	}
    },

    # acknowlegement of connection to hub
    welcome => sub {
	# nice to know...!
	# TODO - move this to a different connection state?
    },

    # the hub connection is closing - we need to find a new hub, pronto!
    CLOSING => sub {
	# TODO
	exit 0;
    },
);


# ------------------------------------------------------------------------------
# change the state of a mid connection
# TODO: check only valid transitions?

sub mid_change_state {
    my $con = @_ == 2 ? shift : $con;
    my $new_state = shift;
    my $old_state = $con->{mid_state};

    vmessage("STATE CHANGED: $con->{mid_state} -> $new_state");

    $con->{mid_state} = $new_state;

    --$n_free_mids if $old_state eq 'free';
    ++$n_free_mids if $new_state eq 'free';

    if ($old_state eq 'requested') {
	my $request = delete $con->{request};
	# we asked this mid to do something - is it going to?
	if ($new_state ne 'busy_remote') {
	    # no, it isn't! so we need to find a new mid to do it - hopefully
	    unshift @incoming_request_queue, $request;
	}
    }

    # we don't need to call this every time...q
    fwd_incoming_requests();
}


# ------------------------------------------------------------------------------
# forward incoming requests from the queue to mids

sub fwd_incoming_requests {
    while ($n_free_mids and @incoming_request_queue) {
	my $request = shift @incoming_request_queue;

	next unless exists $active_incoming_requests{$request->{fwd_my_ref}};

	# choose a mid to take the request; we cycle through them
	do { $focus_mid_con = ($focus_mid_con + 1) % @mid_cons }
	while $mid_cons[$focus_mid_con]{mid_state} ne 'free';
	
	my $mid_con = $mid_cons[$focus_mid_con];
	
	# forward the request
	send_message( $mid_con, 'request', '.', $request->{fwd_my_ref},
		      %{ $request->{fields} } );
	
	mid_change_state($mid_con, 'requested');
	
	# request has not been confirmed yet, we need to remember it on the connection
	# in case the connection is sending us a 'busy_local' before it receives the request
	$mid_con->{request} = $request;
    };
}


# ------------------------------------------------------------------------------
# begin work

install_signal_handlers();

connect_to_hub();

add_connection($sock_listen, 'listen');

main_loop();


# ------------------------------------------------------------------------------
# connect to the hub

sub connect_to_hub {
    # establish connection to hub
    $sock_hub = IO::Socket::INET->new( Proto    => 'tcp',
				       PeerAddr => $hub_host,
				       PeerPort => $hub_port )
	or die 'cannot connect to hub';

    $con_hub = add_connection($sock_hub, 'hub');

    # say hello!
    send_message( 'hello_mux', '.', $my_ref_seq++,
		  deployment => $deployment,
		  customers  => $customers,
		  time       => time );

    # the reply will be handled by the connection handler
}
