package FastDownloader;
# $Id$
# vim: set ts=4 :
use strict; use warnings FATAL => 'all'; use feature qw(say state);

# This module wraps a general-purpose API around the parallel 
# downloader used by Jim Avera's version of apt-fast.
#
# The specified files are downloaded with two levels of parallelism:
#
#   1.  The "axel" program is used to download each file in pieces using 
#       several concurrent network connections (yes, it would be nice if
#       everything was implemented here in Perl, but axel works well
#       and is widely available).
#
#   2.  Multiple (default: 4) files are downloaded concurrently on separate
#       threads, trying to access different servers concurrently.  
#       This is intended to increase thruput if one server is bogged down
#       but not another.  It also overlaps post-processing of each file
#       (via callbacks).
#
# apt-fast uses this library to pre-download files needed by apt-get.  
# This whole business was inspired by Matt Parnell's 
# apt-fast.sh shell script (http://www.mattparnell.com).

use Exporter 'import';
our @EXPORT    = qw(FastDownload);

# Defaults, may be changed by application (but better to use OO api)
our $Num_Threads = 4;  
our $Destdir     = ".";
our $Quiet       = 0; 
our $Debug       = 0;

use threads; 
use threads::shared;
use Thread::Queue;
use Carp;
use File::Temp qw(mktemp);
use File::Spec::Functions qw(rel2abs);
use File::Basename qw(basename dirname);

my @needed_progs = ("axel");
my $global_lock : shared;
sub worker($); # forward

#-----------------------------------------------------------
# USAGE:
#  $obj = FastDownloader->new(filespec,filespec,...)
#  $obj = FastDownloader->new(
#               filespecs    => [<filespec>, ...],
#               num_threads  => number,
#               dryrun       => boolean,
#               quiet        => boolean,
#               debug        => boolean,
#             );
#
#  $obj->abort();  # terminate prematurely now
#  $obj->wait();   # wait for normal termination
#  $hash = $obj->get_stats();
#
# Each <filespec> is a hash ref to
#   { url        => "url", 
#     local_file => "filename or path",
#     destdir    => "destination_directory",
#     callback   => "subname",  # name of sub, not a subref
#     ...
#   }
#
# {url} is required, the others optional.  Only ftp and http are supported.
#
# {local_file} indicates where to store the downloaded file.  It may be
#    an arbitrary absolute path, otherwise is relative to {destdir}.
#    If omitted, the basename of the server path is used.
#
# {destdir} defaults to the current directory.
#
# {callback} is called after the file is downloaded to a temporary
#   location, before it has been renamed to its final path.
#   It is called with arguments ($filespec, $temp_path, $final_path).
#   If the sub returns false the file will be discarded.
#   $temp_path is the file's current location, which should not be changed.
#
# Other user-defined members may be present, e.g. for use by the callback.
# To avoid name clashes, user keys should include an upper-case letter.
#   
# <filespec> may also be just a string containing the url.  This is
# equivalent to { url => string }.
#-----------------------------------------------------------
sub new {
    my $class = shift;
    my $self = (@_==1 && ref($_[0]) eq 'HASH') ? { %{$_[0]} } : { @_ };

    foreach (keys %$self) {
      croak "Unknown option '$_'\n" 
        unless /^(filespecs|destdir|num_threads|dryrun|quiet|silent|debug)$/;
    }

    $self->{num_threads} //= $Num_Threads;
    $self->{debug}       //= $Debug;
  
    croak "Invalid number of threads: $self->{num_threads}\n" 
    unless $self->{num_threads} =~ /^[1-9]\d*$/;

    $self->{destdir} //= ".";
    croak "'",$self->{destdir},"' is not an existing directory\n" unless -d $self->{destdir};
  
    # Install external program(s) if necessary
    foreach (@needed_progs) { # e.g. "axel"
        lock $global_lock;
        if ( ! -x "/usr/bin/$_" ) {
            die "$_ is not installed\n" if ! -t STDERR;
            while(1) {
                my $answer;
                if ($self->{silent}) {
                  $answer = "yes";
                } else {
                  print STDERR "$_ is not installed.  Do you want to install it? (y/n)\n";
                  $answer = <STDIN>;
                }
                if ($answer =~ /^y/i) {
                    if (system("apt-get", "install", $_, "-y", "--force-yes")==0) {
                        warn "$_ installed\n";
                    } else {
                        croak "Unable to install $_\n";
                    }
                    last;
                }
                elsif ($answer =~ /^n/i) {
                    die "Ok, then.\n";
                }
            }
        }
    }
  
    # Sort the files into separate host_buckets for each server
    my %host_buckets; 
    foreach my $fspec ( @{$self->{filespecs}} ) {
        $fspec = {url => $fspec, destdir => $self->{destdir}} if ! ref($fspec);
        my $url = $fspec->{url} // croak "Filespec has no url member!";
        $fspec->{destdir} //= $Destdir;
        my ($protocol, $host, $remote_path, $query) 
          = ($url =~ /^(\w+):\/\/([^\/]+)(.*?)(\?.*)?$/)
            or croak "url '${url}' has invalid syntax\n";
        croak "url '${url}' has unsupported protocol '$protocol'\n"
          unless $protocol =~ /^(ftp|http)$/i;
        croak "url '${url}' has no <host>" unless $host;
        croak "url '${url}' has no <filename>" unless $remote_path;
        $fspec->{local_file} //= basename($remote_path);
        push @{ $host_buckets{$host} }, $fspec;

        if (my $cb = $fspec->{callback}) {
            croak "callback must be a subroutine *name*, not subref\n"
              if ref $cb;
            $fspec->{callback} = caller()."::".$cb if $cb !~ /::/;
        }
    }
 
    $self->{parent_tid}         = threads->tid();
    $self->{started_file_count} = 0;
    $self->{file_count}         = 0;
    $self->{total_bytes}        = 0;
    $self->{aborting}           = 0;
    $self->{start_time}         = time;

    # Create a separate queue for each host.  
    # Threads will pick new work from the least-active host.
    $self->{busycounts}        = { };
    $self->{queues}            = { };
    while (my ($hostname, $list) = each %host_buckets) {
      $self->{busycounts}->{$hostname} = 0;
      my $q = $self->{queues}->{$hostname} = Thread::Queue->new();
      foreach my $fspec (@$list) {
        $q->enqueue($fspec) if defined $fspec;
      }
    }

    # Prepare to share $self with the threads
    $self = shared_clone($self);

    # Bless the object so DESTROY will be called if we abort before returning
    bless $self,$class;
  
    if ($self->{debug}) {
      require Data::Dumper;
      warn Data::Dumper->Dump([$self],["### new ".__PACKAGE__]);
    }

    # Create a fixed number of worker threads, which determine the 
    # number of concurrent file-download/postprocess activities.
    # Each individual file will be downloaded using "axel".
    warn "### Creating $self->{num_threads} threads...\n" if $self->{debug};
    $self->{threads} = shared_clone [
      map{threads->create(\&worker,$self)} 1..$self->{num_threads}
    ];

    warn "### new returning... \n" if $self->{debug};

    return $self; #blessed above
}

sub DESTROY {
    local($., $@, $!, $^E, $?);
    my $self = shift;
    my $tid = threads->tid();
    my $busy = !!(exists $self->{threads});
    if ($tid == $self->{parent_tid}) {
      warn "### DESTROY called by parent_tid $tid (busy=$busy)\n" if $self->{debug};
      $self->abort() if $busy;
    } else {
      warn "### DESTROY called by NON-parent_tid $tid (busy=$busy)\n" if $self->{debug};
    }
}

sub abort {
    my $self = shift;
    warn "### abort() called\n" if $self->{debug};
    my $threads = $self->{threads};
    croak "->abort() called when not running\n" unless $threads;
    $self->{aborting} = 1;
    eval {
      $self->wait();
    };
    die "bug" if exists $self->{threads};
}

sub wait {
    my $self = shift;

    # Wait for the threads to finish the downloads
    my $err;
    foreach (@{$self->{threads}}) { 
        warn "[",threads->tid,"] Calling join on thr ",$_->tid,"...\n" if $self->{debug};
        my $thr_result = $_->join();
        warn "[",threads->tid,"] join returned ",
             (defined($thr_result) ? "\"$thr_result\"" : "UNDEF"),"\n" if $self->{debug};
        if    (! defined $thr_result)  { $err = "Thread(s) died!"; }
        elsif ($thr_result eq "ok")    { }
        elsif ($thr_result eq "abort") { $err = "Thread(s) aborted"; }
    }

    delete $self->{threads};  # not-busy, stops DESTROY from aborting

    $err = "BUG: Threads exited 'normally' but busycounts remain"
      if (!$err) && %{ $self->{busycounts} };
    for my $hostname (sort keys %{ $self->{queues} }) {
      my $queue =  $self->{queues}->{$hostname};
      $err = "BUG: Threads exited 'normally' but non-empty queue for $hostname"
        if (!$err) && $queue->pending() != 0;
    }

    croak "$err\n" if $err;

    $self->{elapsed_secs} = time() - $self->{start_time};

    return $self->get_stats();
}

sub get_stats {
    my $self = shift;
    croak "get_stats() called before wait()\n" if exists $self->{threads};
    return {
        elapsed_secs => $self->{elapsed_secs},
        file_count   => $self->{file_count},
        total_bytes  => $self->{total_bytes},
        failed_file_count => $self->{started_file_count}-$self->{file_count},
    };
}

sub worker($) {
    my $self = shift;
    my ($queues, $busycounts, $dryrun, $quiet, $silent, $debug) 
      = @$self{qw/queues busycounts dryrun quiet silent debug/};
    my $tid = threads->tid();

    warn "### worker $tid started\n" if $debug;

    # Remove an item from the queue for the least-busy host
    # (we don't actually know how busy a host is, only how many
    # files we are concurrently downloading from it).
    OUTER_LOOP: 
    for (;;) {
      return "abort" if $self->{aborting};
      my ($fspec, $myseq);
      { 
        warn "[tid $tid] lock...\n" if $debug;
        lock $self;
        my ($lb_hostname, $mincount);
        for my $hn (keys %$busycounts) { # 'each' broken in shared data!
          my $count = $busycounts->{$hn};
          if (!defined($mincount) || $count < $mincount) {
            $mincount = $count;
            $lb_hostname = $hn;
          }
        }
        if (! defined $lb_hostname) {
          warn "### [tid $tid] No busycounts left\n" if $debug;
          last OUTER_LOOP;
        }
        if (! defined($fspec = $queues->{$lb_hostname}->dequeue_nb())) {
          warn "### [tid $tid] Nothing more for host $lb_hostname\n" 
            if $debug;
          delete $busycounts->{$lb_hostname}; 
          redo OUTER_LOOP;
        }
        $myseq = ($self->{started_file_count} += 1);
        warn "[tid $tid] UNlocking\n" if $debug;
      }
      die "bug" unless defined $fspec;

      my ($url, $local_file, $dest_dir, $callback)
          = @$fspec{qw/url local_file destdir callback/};

      warn "### [tid $tid] dequeued $url\n" if $debug;

      my $final_path = rel2abs($local_file, $dest_dir);

      # Download to a temporary file in same directory as the final location
      my $tmp_path = mktemp($final_path."_part_XXXX") or die "mktemp:$!";

      my @cmd = ("axel", ($quiet ? "-q" : ()), "-a", "-o", $tmp_path, $url);
      if ($dryrun) {
          warn "> [tid $tid] [dryrun] [$myseq] @cmd\n" unless $silent;
      } else {
          warn "> [tid $tid] [$myseq] @cmd\n" unless $silent;
          # Future: fork & exec, store pid, and make ->abort() kill the pids
          if (system(@cmd) != 0) {
              unlink $tmp_path;
              return "abort" if $self->{aborting};
              lock $self;
              die "\n\n\n$cmd[0] failed to download\n   $url\n   (exit status ", ($? >> 8), ")\ndied";
          }
          my $is_ok = 1;
          if ($callback) {
              no strict 'refs';
              $is_ok = $callback->($fspec, $tmp_path, $final_path);
          }
          if ($is_ok) {
              rename $tmp_path, $final_path 
                  or die "Could not rename $tmp_path to $final_path: $!";
              lock $self;
              $self->{file_count} += 1;  
              $self->{total_bytes} += (-s $final_path);
          } else {
              unlink $tmp_path;
              warn "[tid $tid] callback return failure for $tmp_path\n" if $debug;
          }
      }
    }
    warn "### [tid $tid] no more work, returning ok.\n" if $debug;
    return "ok";
}

1;

