#!/usr/bin/perl -w

# ==============================================================================
# IMIP MULTIPLEXOR
# ==============================================================================

use IO::Socket;

require 'node.pl';


# ------------------------------------------------------------------------------
# constants

my $MAX_LISTEN          = 20;
my $MAX_FWD_TIMEOUT     = 30;
my $DEFAULT_FWD_TIMEOUT = $MAX_FWD_TIMEOUT;
my $HELLO_TIMEOUT       = 20;
my $PING_MID_INTERVAL   = 0; # 20
my $MAX_FWD_ATTEMPTS    = 10;


# ------------------------------------------------------------------------------
# command line arguments

die "syntax: $0 [-d|-D] listen_sock hub_host hub_port deployment (customer)*" unless @ARGV >= 4;
my ($mux_socket, $hub_host, $hub_port, $deployment, @customers) = @ARGV;
my $customers = join ' ', @customers;


# ------------------------------------------------------------------------------
# file my variables

# sequence number for my_refs
# TODO: get this to wrap around if it gets really big
# (check occassionally, not necessarily every time)
my $my_ref_seq = 1;

# active requests
my %active_outgoing_requests = ();
my %active_incoming_requests = ();
my @incoming_request_queue = ();

# hub connection
my $sock_hub;
my $con_hub;

# listening socket
unlink $mux_socket;
my $sock_listen = IO::Socket::UNIX->new( Local     => $mux_socket,
					 Listen    => $MAX_LISTEN )
    or die "cannot open listening socket: $!";
chmod 0777, $mux_socket;

# keeping track of mids
my $n_free_mids = 0;
my $focus_mid_con = -1;
my @mid_cons = ();


# ------------------------------------------------------------------------------
# 'listen' connection handler

set_read_handler('listen', sub {
    my $con = shift;

    my $new_con = accept_connection($con);

    set_timeout($new_con, $HELLO_TIMEOUT, \&listen_timeout);
});

sub listen_timeout {
    my $con = shift;
    message($con, "timeout: time limit for connection has expired");
    abort_connection($con);
}

# ------------------------------------------------------------------------------
# 'new' connection handler

set_connection_handler('new',
    # received a 'hello_mid' message
    hello_mid => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	return unless time_check($con, $your_ref, $fields);

	# state of the mid is initially 'new'
	# - it should change state to 'free' some time
	$con->{mid_state} = 'new';

	# send a welcoming reply
	send_message($con, 'welcome', $your_ref, '.');
	
	set_connection_type($con, 'mid');

	message($con, 'connection from mid activated');

	cancel_timeout($con);
	
        if ($PING_MID_INTERVAL) {
	    set_timeout($con, $PING_MID_INTERVAL, sub {
		my $con = shift;
		message($con, "should 'ping' the mid now!");
		reset_timeout($con);
	    });
	}

	# add this connection to the ring of connections
	push @mid_cons, $con;
    },

    # unknown message
    DEFAULT => sub {
	my ($con, $msg_type) = @_;
	message($con, "unknown message type '". $msg_type ."'");
	abort_connection();
    },

    # if the connection is closed by the peer
    CLOSING => sub {
	my $con = shift;
	message($con, "new connection closed");
    },

    # if there is an error on this connection
    ERROR => sub {
	my $con = shift;
	message($con, "error on new connection");
    },
);


# ------------------------------------------------------------------------------
# 'mid' connection handler

set_connection_handler('mid',
    # a request from a local mid for a remote object
    # we forward it to our hub
    request => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	my $fwd_my_ref = $my_ref_seq ++;
	$active_outgoing_requests{$fwd_my_ref} = { from_con     => $con,
						   req_your_ref => $your_ref,
						   fields       => $fields };

	# the mid must be busy_local now! or perhaps it might be requesting an object as part of filling a request?
	mid_change_state($con, 'busy_local') unless $con->{mid_state} eq 'busy_local';

	# forward the request
	send_message($con_hub, 'request', '.', $fwd_my_ref, %$fields);

	my $timeout = $fields->{timeout} || $DEFAULT_FWD_TIMEOUT;
	$timeout = $MAX_FWD_TIMEOUT if $timeout > $MAX_FWD_TIMEOUT;

	# TODO: this timeout is not strictly necessary - if the network doesn't break!
	return;
	
	set_timeout("fwd:$fwd_my_ref", $timeout, sub {
	    my ($con, $your_ref, $fwd_my_ref) = @{shift()};
	    # the response did not arrive in time
	    send_message( $con, 'error', $your_ref, '.',
			  code    => 3,
			  message => 'response did not arrive before timeout' );
	    delete $active_outgoing_requests{$fwd_my_ref};
	    message($con, 'response did not arrive before timeout');
	}, [$con, $your_ref, $fwd_my_ref]);
    },

    # a response from a local mid to a remote request
    # we forward it to our hub if the ref is valid
    response => sub {
	my ($con, $my_ref, $your_ref, $fields, $body) = @_;

	my $request = delete $active_incoming_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message($con, 'received expired / spurious response');
	    return;
	}
	unless ($my_ref eq $request->{fwd_my_ref}) {
	    message($con, "severe protocol error! response ref $my_ref does not match expected ref $request->{fwd_my_ref}");
	    return;
	}

	# allow state change to be included in response
	if (my $state = delete $fields->{state}) {
	    mid_change_state($con, $state);
	}

	# forward the response
	send_message( $con_hub, 'response', $request->{req_your_ref}, '.',
		      %$fields, $body );

	cancel_timeout("fwd:$my_ref");
    },

    # TODO: forward errors from local mids to remote mids?

    # a mid is making a state change
    state => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	mid_change_state($con, $fields->{state});
    },

    # the mid connection is closing!
    CLOSING => sub {
	my $con = shift;

	# was it free? then decrement free count
	# any active incoming request will be posted to another mid! (this works)
	mid_change_state($con, 'closed');

	# remove the connection from the ring
	my $i = 0;
	++$i while $mid_cons[$i] ne $con;
	splice @mid_cons, $i, 1;
	-- $focus_mid_con if $focus_mid_con >= $i;

	message($con, 'mid closed connection');

	# drop any active outgoing requests - we don't send a 'cancel' message or anything
	while (my ($fwd_my_ref, $request) = each %active_outgoing_requests) {
	    if ($request->{from_con} == $con) {
		delete $active_outgoing_requests{$fwd_my_ref};
		message($con, '  dropped outgoing request');
		# also, need to kill the timeout
		# TODO: this timeout is not strictly necessary - if the network doesn't break!
#		cancel_timeout("fwd:$fwd_my_ref");
	    }
	}
    },
);


# ------------------------------------------------------------------------------
# 'hub' connection handler

set_connection_handler('hub',
    # a request from a remote mid for a local object
    # we forward it to a 'free' mid.  If the mid goes 'busy_remote', this means it was okay.
    # If it goes 'busy_local' (very unlikely), this means that it had changed state before
    # it got the request, and therefore we should try another mid.
    # if no mids are free, ??? we might return an error to the hub ??? or wait ??? - params?
    request => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	my $fwd_my_ref = $my_ref_seq ++;

	# note - this is indexed by mid connection, not the fwd_my_ref, which is a check
	my $request = $active_incoming_requests{$fwd_my_ref} = { #from_con     => $con, - this is always from the hub anyhow
								 req_your_ref => $your_ref,
								 fwd_my_ref   => $fwd_my_ref,
								 fields       => $fields };

	# we need to set a timeout in case no reply comes back
	my $delay = $fields->{timeout} || $DEFAULT_FWD_TIMEOUT;
	$delay = $MAX_FWD_TIMEOUT if $delay > $MAX_FWD_TIMEOUT;
	
	set_timeout("fwd:$fwd_my_ref", $delay,
		    \&timeout_response, [$con, $your_ref, $fwd_my_ref]);

	push @incoming_request_queue, $request;

	fwd_incoming_requests();
    },

    # a response from a remote mid to a local request
    # we forward it to the mid if the ref is valid
    response => sub {
	my ($con, $my_ref, $your_ref, $fields, $body) = @_;

	my $request = delete $active_outgoing_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message($con, 'received expired / spurious response');
	    return;
	}

	# forward the response
	send_message( $request->{from_con}, 'response', $request->{req_your_ref}, '.',
		      %$fields, $body);

	# TODO: this timeout is not strictly necessary - if the network doesn't break!
#	cancel_timeout("fwd:$my_ref");
    },

    # the hub has sent us an error
    error => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	if ($fields->{code} == 1) {
	    # this means the clocks are not syncronized, and we're not welcome :`(
	    message($con, "please synchronize clock with the hub");
	    exit 1;
	} else {
	    # perhaps the hub cannot service a request?
	    message($con, "received error from hub: $fields->{message}");
	    my $request = delete $active_outgoing_requests{$my_ref};
	    if ($request) {
		cancel_timeout("fwd:$my_ref");
		# forward the error to the mid
		send_message( $request->{from_con}, 'error', $request->{req_your_ref}, '.',
			      %$fields );
		message($con, 'hub replied to request with an error');
	    } else {
		message($con, 'received error for expired / spurious request');
	    }
            # TODO: check if my_ref matches; forward this to mid?
	}
    },

    # acknowlegement of connection to hub
    welcome => sub {
	# nice to know...!
	# TODO - move this to a different connection state?
    },

    # the hub connection is closing
    CLOSING => sub {
	# TODO: find a new hub, pronto!
	exit 0;
    },
);

sub timeout_response {
    my ($con, $your_ref, $fwd_my_ref) = @{shift()};
    # the response did not arrive in time
    send_message( $con, 'error', $your_ref, '.',
		  code    => 3,
		  message => 'response did not arrive before timeout' );
    delete $active_incoming_requests{$fwd_my_ref};
    message($con, 'response did not arrive before timeout');
}


# ------------------------------------------------------------------------------
# change the state of a mid connection
# TODO: check only valid transitions?

sub mid_change_state {
    my ($con, $new_state) = @_;
    my $old_state = $con->{mid_state};

    vmessage($con, "STATE CHANGED: $con->{mid_state} -> $new_state");

    $con->{mid_state} = $new_state;

    --$n_free_mids if $old_state eq 'free';
    ++$n_free_mids if $new_state eq 'free';

    if ($old_state eq 'requested') {
	my $request = delete $con->{request};
	# we asked this mid to do something - is it going to?
	if ($new_state ne 'busy_remote') {
	    # no, it isn't! so we need to find a new mid to do it - hopefully
	    unshift @incoming_request_queue, $request;
	}
    }

    # we don't need to call this every time...
    fwd_incoming_requests();
}


# ------------------------------------------------------------------------------
# forward incoming requests from the queue to mids

sub fwd_incoming_requests {
    while ($n_free_mids and @incoming_request_queue) {
	my $request = shift @incoming_request_queue;

	next unless exists $active_incoming_requests{$request->{fwd_my_ref}};

	# choose a mid to take the request; we cycle through them
	do { $focus_mid_con = ($focus_mid_con + 1) % @mid_cons }
	while $mid_cons[$focus_mid_con]{mid_state} ne 'free';
	
	my $mid_con = $mid_cons[$focus_mid_con];
	
	# forward the request
	send_message( $mid_con, 'request', '.', $request->{fwd_my_ref},
		      %{ $request->{fields} } );
	
	mid_change_state($mid_con, 'requested');
	
	# request has not been confirmed yet, we need to remember it on the connection
	# in case the connection is sending us a 'busy_local' before it receives the request
	$mid_con->{request} = $request;
    };
}


# ------------------------------------------------------------------------------
# begin work

install_signal_handlers();

connect_to_hub();

add_connection($sock_listen, 'listen');

main_loop();


# ------------------------------------------------------------------------------
# connect to the hub

sub connect_to_hub {
    # establish connection to hub
    $sock_hub = IO::Socket::INET->new( Proto    => 'tcp',
				       PeerAddr => $hub_host,
				       PeerPort => $hub_port )
	or die 'cannot connect to hub';

    $con_hub = add_connection($sock_hub, 'hub');

    # say hello!
    send_message( $con_hub, 'hello_mux', '.', $my_ref_seq++,
		  deployment => $deployment,
		  customers  => $customers,
		  time       => time );

    # the reply will be handled by the connection handler
}
