#!/usr/bin/perl -w

# ==============================================================================
# IMIP HUB
# ==============================================================================

# TODO: periodic time-checks? resynchronisation?  - would need support for LONG timeouts!

use IO::Socket;

require 'node.pl';


# ------------------------------------------------------------------------------
# constants

my $MAX_LISTEN          = 20;
my $MAX_FWD_TIMEOUT     = 30;
my $DEFAULT_FWD_TIMEOUT = $MAX_FWD_TIMEOUT;
my $HELLO_TIMEOUT       = 20;
my $HUB_WELCOME_TIMEOUT = 5;
my $PING_MUX_INTERVAL   = 0; # 20
my $PING_HUB_INTERVAL   = 0; # 20
my $CACHE_SOCKET        = '/usr/mi/var/cache.sock';
my $MSG_EOL_ACCEPT      = '\015?\012';
my $MSG_TERM_ACCEPT     = "$MSG_EOL_ACCEPT\\.$MSG_EOL_ACCEPT";


# ------------------------------------------------------------------------------
# command line arguments

die "syntax: $0 [-d|-D] listen_port (other_hub_host:port)*" unless @ARGV >= 1;
my $hub_port = shift;
my @other_hubs_host_port = @ARGV;

my $hub_host = get_sensible_host_name();
my $hub_host_port = "$hub_host:$hub_port";


# ------------------------------------------------------------------------------
# file my variables

# sequence number for my_refs
# TODO: get this to wrap around if it gets really big
# (check occassionally, not necessarily every time)
my $my_ref_seq = 1;

# indexes of connections
my %con_by_deployment_customer = ();
my %cons_by_deployment = ();

# active requests
my %active_requests = ();

# cache
my $con_cache;
my @cache_requests = ();

# connections *made by us* to other hubs, by my_ref used in 'hello_hub' message
# This is only used in the initial connection stage... seems to be unnecessary!
#my %con_other_hub_by_ref = ();

# monitoring
my $start_time = time;
my $n_requests = 0;
my $n_responses = 0;
my $n_errors = 0;
my $n_cache_hits = 0;
my $n_obj_cached = 0;


# ------------------------------------------------------------------------------
# listening socket
my $sock_listen = IO::Socket::INET->new( Proto     => 'tcp',
					 LocalPort => $hub_port,
					 Listen    => $MAX_LISTEN,
					 Reuse     => 1 )
    or die "cannot open listening socket: $!";


# ------------------------------------------------------------------------------
# connect to the cache
my $sock_cache = IO::Socket::UNIX->new( Peer => $CACHE_SOCKET )
    or die "cannot connect to cache: $!";


# ------------------------------------------------------------------------------
# 'listen' connection handler

set_read_handler('listen', sub {
    my $con = shift;

    my $new_con = accept_connection($con);

    set_timeout($new_con, $HELLO_TIMEOUT, sub {
	my $con = shift;
	message($con, "timeout: time limit for connection has expired");
	abort_connection($con);
    });
});


# ------------------------------------------------------------------------------
# 'new' connection handler

set_connection_handler('new',
    # received a 'hello_mux' message - from a mux on an mi-server in some deployment
    hello_mux => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	return unless time_check($con, $your_ref, $fields);

	my ($deployment, $customers) = @$fields{qw(deployment customers)};
	my @customers = split /\s+/, $customers;

	$con->{deployment} = $deployment;
	$con->{customers} = \@customers;

	# index this connection - customer_names are only unique within a deployment
	push @{ $cons_by_deployment{$deployment} }, $con;
	for my $customer (@customers) {
	    $con_by_deployment_customer{$deployment}{$customer} = $con;
	}

	# send a welcoming reply
	send_message($con, 'welcome', $your_ref, '.');
	
	set_connection_type($con, 'mux');

	message($con, 'connection from mux activated');

	cancel_timeout($con);
	
        if ($PING_MUX_INTERVAL) {
	    set_timeout($con, $PING_MUX_INTERVAL, sub {
		my $con = shift;
		message($con, "should 'ping' the mux now!");
		reset_timeout($con);
	    });
	}

	# send message to all other hubs - with new routing info
	for my $hub_con (@{ get_connections('hub') }) {
	    send_message( $hub_con, 'new_mux', '.', '.', 
			  deployment => $deployment,
			  customers => $customers );
	}
    },

    # received a 'hello_mon' message - wants to monitor stats
    hello_mon => sub {
	my ($con, $my_ref, $your_ref) = @_;

	send_message($con, 'welcome', $your_ref, '.');
	set_connection_type($con, 'mon');

	cancel_timeout($con);
    },

    # received a 'hello_hub' message - from another hub in this hub LAN
    # All of these network topology communications should be rare, so we don't care if they're a bit slow
    hello_hub => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	return unless time_check($con, $your_ref, $fields);

	# TODO: what if it already has some muxes connected to it?
	# well, we WON'T ALLOW THAT!! it would be easy to do, though

	# Send the hub the list of deployments / customers that we can reach *directly* from here
	# For each deployment, we include the _number_ of servers (muxes) through which the hub can access that
	# deployment.  This is so that load can be balanced evenly.

	my %cust_by_depl = ();
	my %n_serv_by_depl = ();
	for my $con (@{ get_connections('mux') }) {
	    push @{ $cust_by_depl{$con->{deployment}} }, @{$con->{customers}};
	    $n_serv_by_depl{$con->{deployment}} ++;
	}
	my $deployment_customer = join ' ; ', map {"$_ $n_serv_by_depl{$_}: ". (join ' ', @{$cust_by_depl{$_}})} keys %cust_by_depl;
	# syntax e.g. vic 3: chathamps blahps borghs ... ; vicceo 2: asf sdaf werqw asd ... ; ...

	# send also a list of other hubs that we know about, so it can connect to them also if it wants to
	# this does NOT include the new one, nor us!!

	print "there are ".@{ get_connections('hub') }." other hubs!\n";

	my $other_hubs = join ' ', map { $_->{host_port} } @{ get_connections('hub') };

	# send a welcoming reply, with all customers that can be reached directly from here
	send_message( $con, 'welcome', $your_ref, '.',
		      deployment_customer => $deployment_customer,
		      other_hubs => $other_hubs );

	set_connection_type($con, 'hub');
	$con->{host_port} = $fields->{host_port};

	message($con, 'connection from other hub activated');

	cancel_timeout($con);

	# can we put all the 'ping' timeouts together?
        if ($PING_HUB_INTERVAL) {
	    set_timeout($con, $PING_HUB_INTERVAL, sub {
		my $con = shift;
		message($con, "should 'ping' the other hub now!");
		reset_timeout($con);
	    });
	}
    },

    # unknown message
    DEFAULT => sub {
	my ($con, $msg_type) = @_;
	message($con, "unknown message type '". $msg_type ."'");
	abort_connection();
    },

    # if the connection is closed by the peer
    CLOSING => sub {
	my $con = shift;

	message($con, "new connection closed");
    },

    # if there is an error on this connection
    ERROR => sub {
	my $con = shift;

	message($con, "error on new connection");
    },
);


# ------------------------------------------------------------------------------
# 'mon' (monitor) connection handler

set_connection_handler('mon',
    'stats' => sub {
	my $con = shift;

	send_stats($con);
    },

    'auto' => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	send_stats($con);
	set_timeout($con, $fields->{period} || 5, sub {
	    my $con = shift;
	    send_stats($con);
	    reset_timeout($con);
	});
    },

    'auto_off' => sub {
	my $con = shift;

	cancel_timeout($con);
    }
);


# ------------------------------------------------------------------------------
# send current stats to the monitor

sub send_stats {
    my $con = shift;
    my $up_time = time - $start_time;
    return if $up_time == 0 or $n_requests == 0; # avoid / by 0 error

    my ($n_selects, $n_msg_recv, $n_msg_sent) = get_node_stats();

    my $n_msg_tot = $n_msg_recv + $n_msg_sent;
    my $req_per_sec = $n_requests / $up_time;
    my $sel_per_sec = $n_selects / $up_time;
    my $msg_recv_per_sec = $n_msg_recv / $up_time;
    my $msg_sent_per_sec = $n_msg_sent / $up_time;
    my $msg_tot_per_sec = $n_msg_tot / $up_time;

    send_message( $con, 'stats', '.', '.',
		  up_time => $up_time,
		  n_requests => $n_requests,
		  n_responses => $n_responses,
		  n_errors => $n_errors,
		  n_selects => $n_selects,
		  n_msg_recv => $n_msg_recv,
		  n_msg_sent => $n_msg_sent,
		  n_msg_tot => $n_msg_tot,
		  req_per_sec => $req_per_sec,
		  sel_per_sec => $sel_per_sec,
		  msg_recv_per_sec => $msg_recv_per_sec,
		  msg_sent_per_sec => $msg_sent_per_sec,
		  msg_tot_per_sec => $msg_tot_per_sec,
		  n_obj_cached => $n_obj_cached,
		  n_cache_hits => $n_cache_hits,
		  frac_cache_hits => $n_cache_hits / $n_requests,
		  n_active_reqs => scalar(keys %active_requests),
		  n_active_cache_reqs => scalar(@cache_requests) );
}


# ------------------------------------------------------------------------------
# 'mux' connection handler

set_connection_handler('mux',
    # a request from another node for a remote object
    # we forward it to an appropriate destination (another hub, or remote mux)
    request => \&handle_request,

    # a response to a request
    # we forward it to the node that sent us the request (another hub, or remote mux)
    response => \&handle_response,

    # something went wrong, in response to a forwarded request
    # (at the moment, that is the only case when an 'error' is sent)
    error => \&handle_error,

    # The mux has closed the connection for some reason - this should be *rare*!
    # We remove its entries from the routing tables.
    # TODO: reply with errors for all active connections through this mux (or re-route)? should we bother?
    CLOSING => sub {
	my $con = shift;

	message($con, 'mux closing - removing from routing tables');

	my $deployment = $con->{deployment};
	my $customers = $con->{customers};

        for my $customer (@$customers) {
            delete $con_by_deployment_customer{$deployment}{$customer};
        }

        my $cons = $cons_by_deployment{$deployment};

        if (@$cons == 1) {
            # this was the only connection for its deployment
            delete $cons_by_deployment{$deployment}
        } else {
            # remove this connection from the list for its deployment
            @$cons = grep {$_ != $con} @$cons;
        }

	# send a message to all other hubs, informing them of the closed mux
	for my $hub_con (@{ get_connections('hub') }) {
	    send_message( $hub_con, 'mux_closed', '.', '.', 
			  deployment => $deployment,
			  customers => (join ' ', @$customers) );
	}
    }
);


# ------------------------------------------------------------------------------
# 'hub' connection handler

set_connection_handler('hub',
    welcome => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	message($con, 'connection to other hub activated');

	# register all the deployments / customers for this hub
	for my $depl_custs (split / ; /, $fields->{deployment_customer}) {
	    my ($depl, $n_serv, $custs) = $depl_custs =~ /^(\S+) (\d+): (.*)$/;
	    
	    my @custs = split / /, $custs;

	    push @{ $con->{cust_by_depl}{$depl} }, @custs;
	    for my $cust (@custs) {
		$con_by_deployment_customer{$depl}{$cust} = $con;
	    }
	    # routing by deployment via another hub:
	    push @{ $cons_by_deployment{$depl} }, ($con) x $n_serv;
	        # TODO: distribute these more randomly somehow?
	        # shuffle after each insert? Flloyd-Steinberg error diffusion?!
	}

	# are there any hubs we don't know of in the list of other hubs returned?
	# 1. hash what hubs we DO know about!
	my %connected_hubs = map { $_->{host_port}, 1 } @{ get_connections('hub') };

	# 2. check the new ones
	for my $other_hub_host_port (split / /, $fields->{other_hubs}) {
	    next if $connected_hubs{$other_hub_host_port}; # already connected to it

	    # let's connect to this one too!
	    connect_to_other_hub($other_hub_host_port);
	}

	# cancel the 'welcome timeout'
	cancel_timeout($con);
    },

    new_mux => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;

	my $depl = $fields->{deployment};
	my @custs = split / /, $fields->{customers};

	push @{ $con->{cust_by_depl}{$depl} }, @custs;
	# routing by deployment via another hub - %cons_by_deployment
	push @{ $cons_by_deployment{$depl} }, $con;
	for my $cust (@custs) {
	    $con_by_deployment_customer{$depl}{$cust} = $con;
	}
    },

    # a request from another node for a remote object
    # we forward it to an appropriate destination (another hub, or remote mux)
    request => \&handle_request,

    # a response to a request
    # we forward it to the node that sent us the request (another hub, or remote mux)
    response => \&handle_response,

    # something went wrong, in response to a forwarded request
    # (at the moment, that is the only case when an 'error' is sent)
    error => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;
	if ($fields->{code} == 1) {
	    # this means the clocks are not syncronized, and we're not welcome :`(
	    message($con, "please synchronize clock with the other hub/s");
	    exit 1;
	} else {
	    handle_error(@_);
	}
    },

    # a mux attached to another hub has closed
    mux_closed => sub {
	my ($con, $my_ref, $your_ref, $fields) = @_;
	message($con, 'mux on another hub closing - removing from routing tables');

	my $depl = $fields->{deployment};
	my @custs = split / /, $fields->{customers};

	my $cust_by_depl = $con->{cust_by_depl}{$depl};
	my %dead_custs = map {$_, 1} @custs;
	@$cust_by_depl = grep { !$dead_custs{$_} } @$cust_by_depl;

	for my $cust (@custs) {
	    delete $con_by_deployment_customer{$depl}{$cust};
	}

	# routing by deployment via another hub - %cons_by_deployment
	# we want to remove _one_ of the instances of this connection only.
	my $cons = $cons_by_deployment{$depl};
	for my $i (0..$#$cons) {
	    if ($cons->[$i] == $con) {
		splice @$cons, $i, 1;
		last;
	    }
	}
    },

    # another hub is closing - remove it from routing table
    # TODO: reply with errors for all active connections through this mux (or re-route)? should we bother?
    CLOSING => sub {
	my $con = shift;
	message($con, 'hub closing - removing from routing tables');

	my $deployment = $con->{deployment};
	my $customers = $con->{customers};

	# TODO: routing by deployment via another hub - %cons_by_deployment
	while (my ($deployment, $customers) = each %{ $con->{cust_by_depl} }) {
	    for my $customer (@$customers) {
		delete $con_by_deployment_customer{$deployment}{$customer};
	    }

	    my $cons = $cons_by_deployment{$deployment};
	    # remove this connection from the list for its deployment
	    @$cons = grep {$_ != $con} @$cons;

	    unless (@$cons) {
		# this was the only connection for its deployment
		delete $cons_by_deployment{$deployment}
	    }
	}
    },
    # need to link this to all the other functions for requests, responses, etc.
);


# ------------------------------------------------------------------------------
# a request from another node for a remote object
# we forward it to an appropriate destination (another hub, or remote mux)

sub handle_request {
    my ($con, $my_ref, $your_ref, $fields) = @_;

    ++$n_requests;

    my $type = $fields->{type};
    
    my $forward_request_param = [$con, $your_ref, $fields];

    # but check the cache first!
    # TODO: timeouts!!

    my $message = <<End;
query $fields->{type} $fields->{identifier} $fields->{requester}
.
End
    send_data($con_cache, \$message);
    vmessage($con, "CACHE QUERY:\n$message");

    # push the sub, then its parameters
    push @cache_requests, [$con, $your_ref, $fields];
}


# ------------------------------------------------------------------------------
# a response to a request
# we forward it to the node that sent us the request (another hub, or remote mux)

sub handle_response {
    my ($con, $my_ref, $your_ref, $fields, $body) = @_;
    
    ++$n_responses;
    
    my $request = delete $active_requests{$my_ref};
    unless ($request) {
	# drop expired / spurious responses
	message($con, 'received response for expired / spurious request');
	return;
    }
    
    # cache the response - do not need to check time here - we can assume it's in the future.
    # cache messages will not cope with binary data
    # there is no response from the cache to a store (we hope)
    if ($fields->{cache_until}) {
	my $message = <<End;
store $fields->{cache_key} $fields->{cache_until}
$$body
.
End
        send_data($con_cache, \$message);

        vmessage($con, "CACHED: $fields->{cache_key} $fields->{cache_until}\n$$body");
        ++$n_obj_cached;
    }

    # forward the response
    send_message( $request->{from_con}, 'response', $request->{req_your_ref}, '.',
	          %$fields, $body);

    # TODO: this timeout is not strictly necessary - if the network doesn't break!
#	cancel_timeout("fwd:$my_ref");
}


# ------------------------------------------------------------------------------
# something went wrong in response to a forwarded request
# (at the moment, that is the only case when an 'error' is sent)

sub handle_error {
    my ($con, $my_ref, $your_ref, $fields) = @_;
	
    ++$n_errors;
    
    my $request = delete $active_requests{$my_ref};
    unless ($request) {
	# drop expired / spurious responses
	message($con, 'received error for expired / spurious request');
	return;
    }
    
    # TODO: try another server?
    
    # forward the error
    send_message( $request->{from_con}, 'error', $request->{req_your_ref}, '.',
		  %$fields );

    # TODO: this timeout is not strictly necessary - if the network doesn't break!
#	cancel_timeout("fwd:$my_ref");
}


# ------------------------------------------------------------------------------
# forward a request to the appropriate server

sub forward_request {
    my ($con, $your_ref, $fields) = @{shift()};
    
    my ($deployment, $customer) = split /\s+/, $fields->{identifier};

    my $server_con;

    if ($customer eq 'NULL') {
	# route simply by deployment when it doesn't matter which customer/box it goes to
	if (my $server_cons = $cons_by_deployment{$deployment}) {
	    # get next, and cycle the servers for this deployment
	    # For the moment, we simply distribute the load (of NULL customer requests)
	    # evenly amongst the servers.
	    push @$server_cons, ($server_con = shift @$server_cons);
	}
    } else {
        $server_con = $con_by_deployment_customer{$deployment}{$customer};
    }
    
    unless ($server_con) {
	# the combination of deployment and customer is unknown - reply with an error
	send_message($con, 'error', $your_ref, '.',
		     code    => 2,
		     message => "cannot reach customer $deployment:$customer");
	return;
    }
    
    my $fwd_my_ref = $my_ref_seq ++;
    $active_requests{$fwd_my_ref} = { from_con     => $con,
				      req_your_ref => $your_ref,
				      fields       => $fields };
    
    # forward the request
    send_message($server_con, 'request', '.', $fwd_my_ref, %$fields);
    
    my $timeout = $fields->{timeout} || $DEFAULT_FWD_TIMEOUT;
    $timeout = $MAX_FWD_TIMEOUT if $timeout > $MAX_FWD_TIMEOUT;
    
    # TODO: this timeout is not strictly necessary - if the network doesn't break!
    # ALSO - nested closures break Perl!!
#	    set_timeout("fwd:$fwd_my_ref", $timeout, sub {
#	        my ($con, $your_ref, $fwd_my_ref) = @{shift()};
                # the response did not arrive in time
#		send_message( $con, 'error', $your_ref, '.',
#			      code    => 3,
#			      message => 'response did not arrive before timeout' );
#		delete $active_requests{$fwd_my_ref};
#		message($con, 'response did not arrive before timeout');
#	    }, [$con, $your_ref, $fwd_my_ref]);
};


# ------------------------------------------------------------------------------
# 'cache' connection handler
# this uses a simple \n.\n terminated textual message

set_read_handler('cache', sub {
    my $con = shift;
    return unless read_connection($con);
    while ($con->{read_buf} =~ s/^(.*?)$MSG_TERM_ACCEPT//so) {
	vmessage($con, "FROM CACHE:\n$1");
	my ($status, $data) = split /$MSG_EOL_ACCEPT/, $1, 2;
        my ($cache_request_param) = shift @cache_requests;
        if ($status ne '000 OK') {
            message($con, "cache error: $status");
            # TODO: what? should never happen
            handle_cache_response($cache_request_param);
	    return;
        }
#TODO: cancel_timeout($con);
        handle_cache_response($cache_request_param, $data eq '' ? undef : \$data);
    }
});

# TODO: close handler for the cache ; cache independence - or at least die gracefully


# ------------------------------------------------------------------------------
# handle a hit or miss response from the cache

sub handle_cache_response {
    my $param = shift;
    my ($con, $your_ref) = @$param;
    my $data = shift;
    if ($data) {
	# the cache gave us the data! ($data is a scalar-ref)
	# forward the response
	vmessage($con, "CACHE HIT");
	++$n_cache_hits;
	send_message( $con, 'response', $your_ref, '.',
		      #TODO: cache_hit => 1, ?
		      $data );
    } else {
	vmessage($con, "CACHE MISS");
	# the cache was not helpful
	forward_request($param);
    }
}


# ------------------------------------------------------------------------------
# work out the (a?) hostname for this machine
# Is there a less dodgy way to do this?

sub get_sensible_host_name {
    my $hub_host;
    do {
	$hub_host = (gethostent())[0];
	if (not defined $hub_host) {
	    $hub_host = 'localhost'; # last resort
	    last;
	}
    } until $hub_host =~ /\./; # not 'localhost'!
    return $hub_host;
}


# ------------------------------------------------------------------------------
# begin work

install_signal_handlers();

add_connection($sock_listen, 'listen');

$con_cache = add_connection($sock_cache, 'cache');


#connect to other hub/s that were specified on the command line

for (@other_hubs_host_port) {
    connect_to_other_hub($_);
}


# ------------------------------------------------------------------------------
# connect to another hub

sub connect_to_other_hub {
    my $other_hub_host_port = shift;
    my ($other_hub_host, $other_hub_port) = split ':', $other_hub_host_port;

    if ($other_hub_host eq 'localhost') {
	warn "trying to connect to $other_hub_host_port - localhost - is dangerous!";
	# because it will think that 'localhost' and the real hostname are two different hosts
    }

    unless ($other_hub_port) {
	message(undef, "Invalid host:port '$other_hub_host_port'");
	return;
    }

    # open a socket to it
    my $sock_other_hub = IO::Socket::INET->new( Proto    => 'tcp',
						PeerAddr => $other_hub_host,
						PeerPort => $other_hub_port )
	or do {
	    message(undef, "cannot open socket to hub on $other_hub_host:$other_hub_port: $!");
	    return;
	};

    my $con_other_hub = add_connection($sock_other_hub, 'hub');
    $con_other_hub->{host_port} = $other_hub_host_port;
    
    my $my_ref = $my_ref_seq++; # not actually being used...
    send_message( $con_other_hub, 'hello_hub', '.', $my_ref,
		  time      => time,
		  host_port => $hub_host_port );

    # a timeout for connecting to each of the other hub machines - is this inefficient?
    set_timeout($con_other_hub, $HUB_WELCOME_TIMEOUT, sub {
	my $con = shift;
	message($con, "timeout: time limit for welcome from other hub has expired");
	die 'cannot connect to other hubs as specified';
    });
}

main_loop();
