#!/usr/bin/perl # # Publish a job
# FILE: publish.perl   New ArDean Leith                        Feb. 2002  
#                      TRACE log added                         Jan. 2008
#                      (($jobidt !=  0)&&($jobidt != -$jobid)) Jan. 2008
#                        
# LOCATION: /net/bali/usr1/spider/pubsub/ 
#
# PURPOSE: Submit job to publisher que. 
#
# USAGE EXAMPLE:  publish.perl  "spider pam/acn @apmq 17 x77=17" 
#
# INPUT:
#    Command listing           (argument )
#
# OUTPUT: 
#    Success or failure        (returned)  

use Getopt::Std;

local($jobid, $runcommand, $n, $machine);

$dir_pubsub = $ENV{'PUBSUB_DIR'};

#Que location & name
if ($dir_pubsub) 
   {$que = "$dir_pubsub/pubsub.que";}
else
   {die "Environmenal variable: PUBSUB_DIR is not defined \n";}

###$que = "$dir_pubsub/dev/junk.que";    
$trace = "$dir_pubsub/pubsub.trace";

# Get current directory location
$locnow = $ENV{'PWD'};

# Get que host
$quehost = "samoa";  ####%%%%
if ($ENV{PUBSUB_HOST}) 
   {$quehost = $ENV{PUBSUB_HOST};}
#print  "Current que host: $quehost \n";

# Get current PID
getopt("j",);
#system("echo option: $opt_j \n");
if ($opt_j)
   {
   $jobid = $opt_j;      # PID sent from caller
   #system("echo Callers PID: $jobid \n");
   } 
else
   {$jobid = $$;}        # Current PID

$runcommand = join(" ",@ARGV);
#print "runcommand:  $runcommand  \n";

# Log location & name (in current directory)
$log = $locnow . "/pubsub.log";

$checkwait    = 3;       # Loop wait time (seconds)

# Get currentusername
$user      = $ENV{'USER'};  # Current user name

$ntry_mach = 0;            # Number of machines tried so far
$started   = 0;
@badnodes  = ();

# Find first OS command in $runcommand
$runcommand1 = $runcommand;
$runcommand1 =~ m/(\w+)/o ;
$run1 = $1;
#print "run1: $run1: \n"; 

# Substitute ';' for any ; in $runcommand to preserve ';' in rsh
$runcommands = $runcommand;
$runcommands =~ s/\;/';'/g; 

# Substitute $jobid for any PUBSUB_JOBID in $runcommand
$runcommands =~ s/PUBSUB_JOBID/$jobid/g; 

while (($started == 0) && ( $ntry_mach < 10 ) )
   { # Loop until started or >= 10 machines tried (arbitrary limit)

   #print "In publish; User: $user Location: $locnow\n";
   # Publish job now by listing in publisher que
   &publish("$que","$jobid","$user","$locnow","@badnodes");

   # Record qued time for use in log line 
   $quedtime = time();
   #system(" echo Waiting for sub to  job: $jobid for: $user \n");

   $ntry_mach++;    # Increment counter for number of machines tried

   # Wait for a subscription, find machine that subscribed
   $machine = '';
   while (!$machine)
      { 
      if ( ! open(PUBQUE, "+< $que")) 
          { system ("echo publish can not open: $que \n") ; return 0; }

      # Shared lock= 1, Exclusive lock=2, Non-blocking request=4,  Free lock=8
      unless (flock(PUBQUE, 1))
         { die " Can not read_lock: $que \n";}

      seek(PUBQUE,0,0) || die "Can't seek to start of: $que \n"; #al oct07

      # Read the  queue and see if this job is assigned to a machine yet
      while ()       #read input
         {
         $linet = $_;
         #print  "linet: $linet \n";

         chop($linet);
         ($jobidt,$user,$machval,$rest) = split(/\b\s+/,$linet,4);

         if ($jobidt == -$jobid)
            { # This job is locked, need to start this job on $machval
            $machine = $machval;
            #print "machine: $machine \n"; 
            $line = $linet;
            last;               # No need to read further
            }
         } # End of:  while ()   

      # Unlock the que file 
      flock(PUBQUE,8);
      close(PUBQUE);

      if (!$machine) 
         { # Wait $checkwait seconds and loop (will loop forever until $machine)
         select(undef,undef,undef,$checkwait);    # Waits $check seconds
         }

      }  # End of: while ($machine)

   # Machine found -------------------------------------------------------------

   #print "Publish; jobidt: " . $jobidt . "machine:" . $rest . "\n\n";

   # See if $locnow is visable from the subscriber machine
   $tolocrun = "rsh -n $machine \"ls -d $locnow\" 2>&1 ";
   #print "tolocrun:  $tolocrun  \n"; 

   $gotlocrun = `$tolocrun`;     
   $gotlocrun =~ s/^\s+//;    # Delete initial whitespace
   $gotlocrun =~ s/\s+$//;    # Delete initial whitespace

   #print "gotlocrun: $gotlocrun: \n";
   #print "locnow   : $locnow: \n";
   $locnowpsl = $locnow . "/";    # Add final slash (some PWD returns)

   if (($locnow ne $gotlocrun) && ($locnowpsl ne $gotlocrun))
      {
      system("echo ERROR Can not access: $locnow on: $machine");
      #system("echo ERROR :$locnow: NOT SAME AS :$gotlocrun:");
      del_locked_job($jobid);      # Delete this locked job from que 

      # Make this a 'bad node'
      push(@badnodes,$machine);

      # try next machine
      next;     
      }
 
   $torun = "rsh $machine cd $locnow ';' $runcommands ';' rsh -n $quehost \"$dir_pubsub/delete.perl $jobid $locnow/pubsub.log\" &";
   #print "Starting: $torun \n"; 
   system("$torun");
   $torun =~ s/';'/;/g;           # Get rid of ' around ';' 
   print "Running: $torun \n"; 

   # List starting time in log line 
   $starttime = time();
    ($sec,$min,$hour,$mday,$month,$year,$more) = localtime($starttime); 
   $time = sprintf("%04d-%02d-%02d-%02d:%02d:%02d", 
                       $year+1900,$month+1,$mday, $hour,$min,$sec);

   #system ("echo : locking log file now");
   # Open log file for appending
   if ( ! open(LOG, ">> $log"))
      { system ("echo In publish; can not open: $log \n") ; return 0; }

   # Lock the log file
   flock(LOG, 2);

   # Copy QUE info to log file and append job-group, starting time & command
   print LOG "$jobid $machine $quedtime $starttime (Start: $time) (Oper: $runcommand) \n";
   flock(LOG,8);
   close(LOG);

   $ntry        = 0;

   while ( ($started == 0) && ( $ntry < 20 ) )
      #while ( ($started == 0) && ( $ntry < 4 ) )
      {  # Loop while job not running yet, or until ntry >= 20

      #$search = "rsh -n $machine \"ps -u $user -o cmd | grep $run | grep -v tcsh | wc -l\" 2>&1";
      #$procs = `$search` ;

      #see if command is running yet on $machine
      @procst = `rsh -n $machine \"ps -u $user -o cmd\"` ;
      #print "Procst returns: @procst ";

      if ($procst =~ m/[A-Za-z]+/x )
          {  # Something wrong with 'rsh' call
          system("echo ERROR: $procst");
          last;
          }

      @proc = grep { /.$run1.*/ } @procst;
      # print "proc: @proc \n";
      $nprocs = $#proc + 1;
      #print "Found: $nprocs process containing: $run1  on: $machine \n";

      if ($nprocs > 0 )
          {  # runcommand has started, exit loop
          #system("echo runcommand has started with procs: $procs");
          $started = 1;
          }

      else
          {  # See if jobid has already finished and been deleted from que

          #$str = "grep $jobid $log | grep Runtime";
          #print "See if jobid has finished: $str \n";
          $str = "grep $jobid $log | grep Runtime | wc -l 2>&1";
          $started = `$str`;
          #print " Job: $jobid Finished already: $started \n" ;
          }

      if ( $started < 1)
         {
         # Wait $checkwait seconds and loop to see if started after wait
         select(undef,undef,undef,$checkwait); # Waits $check seconds
         $ntry++;                              # Increment starting try counter
         #print "Inner looping: $ntry";
         }

      } # End of: while ( !$started && ( $ntry < 18 ) )
 
   # Delete this job from que now (as started, over, or dead)
   del_locked_job($jobid);      # Delete this job from que 

   #$started=0; # USE THIS AS TEST FOR BADNODES

   if ( $started < 1)
      {  # Try another machine
      push(@badnodes,$machine);
      #system("echo Outer looping: $ntry_mach");
      print " Job: $jobid Re-published as it failed to run on: $machine\n" ;
      }

   }  # END of: while (($started == 0) && ( $ntry_mach < 20 ) )

   # Open trace file for appending
   if ( ! open(TRACE, ">> $trace"))
      { system ("echo In publish; can not open: $trace \n"); }
   else
      {
      # Lock the trace file
      #flock(TRACE, 2);
      print TRACE " Starting:   $time $jobid $user $machine $locnow ($runcommand) \n";
      #flock(TRACE,8);
      close(TRACE);
      }


# ---------------------- publish --------------------------------------
sub publish 
{
    if (@_ > 5)
       {die  "Usage: $0 que jobid user directory badnodes 
       Where:
       que is:        publisher que file \n
       jobid is:      job PID \n
       user is:       username \n 
       directory is:  current working directory \n 
       badnodes is:   listing of bad nodes \n"
       }

    $quefile = $_[0];
    #print "quefile: $quefile \n";

    $jobid = $_[1];
    #print "jobid: $jobid \n";

    $user = $_[2];
    #print "user: $user \n";

    $locnowt = $_[3];
    #print "locnowt is: $locnowt \n";

    my (@badnodes) = $_[4];
    #print "badnodes: @badnodes \n";
    
    # Open Publisher que file for appending
    open(PUBQUE, ">> $quefile") ||
        open(PUBQUE, "> $quefile") ||
           die "Publish can not open: $quefile \n";

   # Open trace file for appending
   $pubtime = time();
   ($sec,$min,$hour,$mday,$month,$year,$more) = localtime($pubtime); 
   $ptime = sprintf("%04d-%02d-%02d-%02d:%02d:%02d", 
                    $year+1900,$month+1,$mday, $hour,$min,$sec);
   {$trace = "$dir_pubsub/pubsub.trace";}
   if ( ! open(TRACE1, ">> $trace"))
      { system ("echo In publish(publish); Can not open: $trace \n"); }
   else
      {
      # Append publish  info to trace file
      print TRACE1 " Publishing: $ptime $jobid $user ------ $locnow \n";
      close(TRACE1);
      }

    # No buffering of  output
    my    $old_fh = select(PUBQUE);
    local $|      = 1;
    select($old_fh);
    if ($jobid , 0) 
       {$jobid=-$jobid;}

    # Shared lock= 1, Exclusive lock=2, Non-blocking request=4,  Free lock=8
    unless (flock(PUBQUE, 2 | 4))  #al dec07
       {
       print "Waiting for lock on: $quefile ";
       flock(PUBQUE, 2)  or die "Can't lock: $quefile";
       }
    seek(PUBQUE,0,2) || die "Can't seek to end of: $quefile \n"; #al oct07

    # Put jobid & username in Publisher que file 
    ##print PUBQUE "$jobid $user $locnowt @badnodes\n";
    print PUBQUE "$jobid $user $locnowt\n";

    # Close Publisher que file
    flock(PUBQUE, 8);
    close(PUBQUE);

    if ($badnodes[0]) 
       { system(" echo Publishing job: $jobid for: $user Bad nodes: @badnodes\n");}
    else
       { system(" echo Publishing jobid: $jobid for: $user\n");}

    return 1;
}


# ---------------------- del_locked_job --------------------------------------

sub del_locked_job 
    {
    if (@_ > 3)
       {die  "Usage: $0 jobid
       Where:
       jobid is:    job PID \n"
       }
    local $linet,$out,$jobidt;

    $jobid = $_[0];
    #print "Deleting locked jobid: $jobid \n";

    # Open QUE file
    if ( ! open(PUBQUE, "+< $que")) 
          { system ("echo del_locked_job can not open: $que \n") ; return 0; }

    #No buffering of  output
    my    $old_fh = select(PUBQUE);
    local $|      = 1;
    select($old_fh);

    # Shared lock= 1, Exclusive lock=2, Non-blocking request=4,  Free lock=8
    unless (flock(PUBQUE, 2))
       { die " Can not write_lock: $que \n";}
    seek(PUBQUE,0,0) || die "Can't seek to start of: $que \n"; #al oct07

    # Read the whole queue and rewrite the queue without the deleted job
    $out = '';
    while ()       # Read que input
       {
       $linet = $_;
       ($jobidt) = split(/\b\s+/,$linet,1);  # First field in line
       #print  "jobidt NOW: $jobidt";   

        if (($jobidt !=  0) && ($jobidt != -$jobid))
          { # Not for current job, keep line in que file
          $out .= $linet;
          #print  "jobidt: $jobidt jobid: $jobid : $out :";   
          #print  "out NOW: $out";   
          }
       }

   #print  "QUE NOW: $out";   

   seek(PUBQUE,0,0);      # Goto que beginning again
   print PUBQUE $out;     # Put all output lines in que file
   truncate(PUBQUE, tell(PUBQUE));

   # Unlock the que file 
   flock(PUBQUE,8);
   close(PUBQUE);

   }        
#