#!/usr/bin/perl # # Subscribe to a job from publisher que
# FILE: subscribe.perl   ArDean Leith                          Sep. 2002
#                        Added local permit files              Mar. 2005
#                        Local permit fallthru                 Apr. 2005
#                        Local & master permit checked         Sep. 2005
#                        Local permit subsidiary               Jun. 2007
#                        Partition                             Jun. 2007
#                        Seek, removed demon, removed openque  Jan. 2008
#                        In que locked msg., removed /n        Jan. 2008
#
# MASTER LOCATION: /net/bali/usr1/spider/pubsub/
#
# PURPOSE: Subscribe to job from pubsub que. Left running like a deamon
#
# USAGE EXAMPLE:  subscribe.perl pubsub.que 
#
# INPUT:
#    queue            (argument #1) OPTIONAL (has hardwired default)
#
# OUTPUT: 
#    command          (returned)  
#
#
# ------------------ subscribe command --------------------------------

#use POSIX qw(uname);
use File::Basename;

# String containing 'ps' identifier for SPIDER job

if (! $ARGV[0]) 
   {die "Usage: $0 [publish_que] 
         Purpose: Subscribe to job from publisher que
         Argument:
           publish_que is: file containing publisher que to be watched.\n"; 
   }

local($locnow);

$master    = $ENV{'PUBSUB_MASTER'};    # site specific name of master node
# site specific name cluster partition (can usually be left undefined)
$partition = $ENV{'PUBSUB_PARTITION'}; # site specific name cluster partition 

system("echo  "); 

$dir_pubsub = $ENV{'PUBSUB_DIR'};

$que = "$dir_pubsub/$ARGV[0]$partition";
system("echo Subscribe que: $que  \n"); 

$permitfile = "$dir_pubsub/pubsub.permit$partition";
system ("echo Permitfile: $permitfile \n");

# Converting this process to a 'daemon' not implemented fails 

$time_to_die  = 0;     # RUNS FOREVER!!
$checkwait    = 2;     # Wait time before checking QUE again

until ($time_to_die)
   {   
   # Find a job
   ($jobid,$locnow) = &findajob("$que");
   $locnow =~ s/\n$//;
   $locnow =~ s/ //;
   #system(" echo subscribe found jobid:  $jobid locnow: $locnow __\n");

   $started = 0;
   if ($jobid > 0) 
      {  # Got a job from the QUE, find a subscriber for this job
      #system(" echo findajob returned jobid:  $jobid \n");

      # Find a node for the job to run on
      $found_a_node = 0;

      until ($found_a_node)
         {
         #system(" echo found_a_node:  $found_a_node \n");

         $base  = basename($permitfile);
         $localpermitfile = "$locnow/$base";

         #system(" echo localpermitfile is: $localpermitfile \n");
         if ( -r $localpermitfile ) 
            {
            #system("echo Using localpermitfile: $localpermitfile");
            open(PERMIT, "< $localpermitfile") ||
                die "Can not open local permissions file: $localpermitfile \n";
            $localpermit = 1;
            }
         else
            {
            #system(" echo Using: $permitfile");
            open(PERMIT, "< $permitfile") ||
                die "Can not open permissions file: $permitfile \n";
            $localpermit = 0;
            }

         $out     = ' ';
         $n       = 0;
         $foundid = 0;

         while ()   # Read permit file, skip comment lines (#)
            {
            # Read input line from permit file, skip comment lines (anything containing #) --------
            #system("echo  raw permit input: $_ ");
            if (/\s*#/)
               { #system("echo  skipping: $_ ");   
               next;  
               }

            #system("echo  permit string:       $_ ");
            ($machine,$limit,$days,$time1,$time2,$check,$comment) = /(\w+)\s+(\d+)\s+(\d+)\s+(\d+:\d\d)\s+(\d+:\d\d)\s+(\d+)\s+(.*)/;    # split at spaces
            #system(" echo limit,days,time1,time2,check: $limit,$days,$time1,$time2,$checkok \n");
            #system(" echo limit: $limit  \n");

            if ( $localpermit )
               { # Check to see if machine OK by master permit file --------------------------

               open(MASTERPERMIT, "< $permitfile") ||
                   die "Can not open master permissions file: $permitfile \n";
               @master_line = ;    # Read whole file
               close(MASTERPERMIT); 
 
               $okinmaster = 0;
               foreach(@master_line)
                  {
                  if (m/$machine/ && ! m/#+.*$machine/)    
                     {  # Matching machine in master permit     
                     # print "match and not comment : " . $_ .  "\n";
                     $okinmaster = 1;
                     last;
                     } 
                  }
               if (! $okinmaster)
                  {next;}    # Machine OK in local permit but not permitted in master .permit
               }

            # Find how many SPIDER pubsub jobs are already running on this machine --------------
            $locked = `grep '$machine ' $que`;
            if ($locked)   
               {
               system("echo On $machine: a different job is already starting");
               next;
               }   
            #system("echo On $machine: No jobs starting");

            # SEARCHES FOR A STRING SPECIFIC TO COMMAND USAGE %%%% (spider)
            $procs = `rsh -n $machine "ps -u root -N -o cmd | grep 'spider' | grep -v grep  | wc -l" 2>&1`;

            #$procs = `rsh -n $machine "ps -ef | grep spider | grep -v grep | wc -l" 2>&1`;  
            #$procs = `rsh -n $machine "ps -u root -N -o cmd | grep 'tcsh.*spider' | grep -v grep  | wc -l" 2>&1`;
            #$procs = `rsh -n $machine ps -ef | grep spider | wc -l`; 

            #system("echo Number of Pubsub jobs on $machine: $procs");
            if ($procs =~ m/[A-Za-z]+/x)
                {  # Something wrong with rsh call
                system("echo ERROR: $procs");
                next;
                }
            chop($procs);

            if ($procs >= $limit)
               {next;}    # This machine has too many jobs running

            # Check to see if time OK for running on this machine ---------------------------
            # Get current time
            ($sec,$min,$hour,$mday,$month,$year,$dayofweek,$line) = localtime(time());
            ($hour1,$min1) = split(/:/,$time1);
            ($hour2,$min2) = split(/:/,$time2);

            $mins0 = $hour  * 60 + $min ;
            $mins1 = $hour1 * 60 + $min1;
            $mins2 = $hour2 * 60 + $min2;

            #system("echo days: $days      day of week: $dayofweek  \n");
            #system("echo mins0: $mins0    mins1: $mins1     mins2: $mins2   \n");
            #system("echo hour: $hour      hour1: $hour1 hour2: $hour2 \n");

            if  (($mins0 < $mins1) || ($mins0 > $mins2))
                 { 
                 #system("echo ($mins0 < $mins1) || ($mins0 > $mins2) \n");
                 next;
                 }
 
            if (($days == 5) && (($dayofweek == 0) || ($dayofweek == 6)))
                 { 
                 #system("echo ($days == 5) && (($dayofweek != 0) || ($dayofweek != 6)) \n");
                 next;
                 }
 
            # OK time to start a job on this machine
            # system("echo On: $machine  time OK to start a job \n");

            # OK to start a job on this machine -------------------------------------------------
            system("echo On: $machine  Current jobs: $procs OK to start: $jobid \n");

            # Lock the job in the que by giving it negative number
            $gotid = &lockajob($que,$jobid,$machine);
            if ($gotid == $jobid)
               {  # locked OK
               #system("echo Locked: $gotid \n");
               $found_a_node = 1;

               # Start the job
               #system("echo Can start job: $jobid  on: $machine \n");
               $started = 1;
               last;
               }        # End of  if     ($gotid == $jobid)
            elsif ($gotid < 0)
               {
               system("echo Error -- lockajob could not find job: $jobid  in: $que!!");
               $found_a_node = 1; 
               last;
               }       # End of  if     ($gotid < 0)
            }           # End of  while  (PERMIT)

            close(PERMIT);
            $found_a_node = 1;  #Force it to check another job
        }               # END of  until  ($found_a_node)               
     }                  # END of  if     ($jobid)                

     if (!$started) 
        {select(undef,undef,undef,$checkwait);}    # Waits $check seconds and loops

  }                     # END of  until ($time_to_die)

exit;



# ----------------------- findajob --------------------------------- 
# USAGE:  findajob ("pubsub.que") 
#
# INPUT:
#    Que name & location (argument #1)
#
# OUTPUT: 
#    Found job id      (returned)  


sub findajob 
   {  # Finds next job from que, returns job info
   if (@_ < 1)
      {die  "Usage: $0 que 
      que is: file containing publisher que. \n"
      }
   local($rest,$jobid,$user,$n,$locnow);

   $que = $_[0];

   #print  "Opening: " . $que .  "\n"; 
   if ( ! open(PUBQUE, "+< $que"))
      { system ("echo Subscriber can not open: $que \n") ; return 0; }

   $foundid = 0;
   $rest    = ' ';
   $n       = 0;

   # Shared lock= 1, Exclusive lock=2, Non-blocking request=4,  Free lock=8
   unless (flock(PUBQUE, 1))
      { die " Can not read_lock: $que \n";}
   seek(PUBQUE,0,0) || die "Can't seek to start of: $que \n"; #al jan08

   while ()       # Read input from que file
      {
      ($jobidt, $user, $locnow, $rest) = split(/\b\s+/,$_,3);

      $n++;
      #print "input[".$n."]:    " . $_."\n";
      #print "jobidt[".$n."]:   " . $jobidt . "  user: " . $user . "\n";
      #print "locnow[".$n."]:   " . $locnow . "  rest: " . $rest . "\n";

      if ($jobidt > 0)
         {   # not locked job, need to start this job
         #print "jobidt: ". $jobidt."locnow: ".$locnow." rest:". $rest."\n";
         $foundid = $jobidt;
         last;
         }
      }

   flock(PUBQUE,8);    # unlock que file

   #system ("echo foundid,locnow:  $foundid,$locnow\n") ;
   return $foundid, $locnow;
   }


# ----------------------- lockajob --------------------------------- 
# USAGE example:  lockajob ("pubsub.que",1,node1) 
#
# INPUT:
#    queue            (argument #1)
#    job number       (argument #2)  
#    machine          (argument #3)  
#
# OUTPUT: 
#    locked number    (returned)  
#
 
sub lockajob 
   {  # Finds specific job in que, locks job, returns job id
   if (@_ < 1)
      {die  "Usage: $0 que jobid machine\n
      que is:      Publisher que.\n
      jobid is:    Job PID to start.\n
      machine is:  Machine to start this job on. \n"
      }
   local($que,$jobid,$machine,$user,$rest);

   $que      = $_[0];
   $jobid    = $_[1];
   $machine  = $_[2];
   #print  "lockajob  que: $que  jobid: $jobid  machine: $machine\n"; 

   $lockedid = 0;
   if ( ! open(PUBQUE, "+< $que"))
       { system ("echo lockajob can not open: $que \n") ; return $lockedid; }

   # No buffering of  output
   my    $old_fh  = select(PUBQUE);
   local $|       = 1;
   select($old_fh);

   $out      = '';
   $n        = 0;
   $lockedid = -1;

   # Shared lock= 1, Exclusive lock=2, Non-blocking request=4,  Free lock=8
   unless (flock(PUBQUE, 2))
      { die " Can not write_lock: $que \n";}
   seek(PUBQUE,0,0) || die "Can't seek to start of: $que \n"; #al jan08

   while ()       # read input lines from que file
      {
      ($idnow,$user,$rest) = split(/\b\s+/,$_,3);
      $n++;
      #print "idnow[ $n ]: $idnow $user \n";

      if ($idnow == $jobid)
         {   # Want to lock this job now
         #print "idnow: " . $idnow . "rest:" . $rest . "\n\n";

         $out .= "-$jobid $user $machine $rest";  # alter lock & copy line

         $lockedid = $jobid;
         #system ("echo lockedid,rest:  $lockedid, $rest \n") ;

         while ()         # Read remaining input, copy to OUTPUT
            {$out .=  $_; }
         last;
         }
      else
         {   # Another job, just copy to OUTPUT
         $out .=  $_;
         # print "copy: " . $_ ;
         }
      }   # END of: while ()      

   if ($lockedid >= 0)
      {
      # Copy new publisher que to PUBQUE file 
      seek(PUBQUE,0,0) || die "Can't seek to start of $que";
      print PUBQUE $out;
      truncate(PUBQUE, tell(PUBQUE));
      }

   flock(PUBQUE, 8);
   close(PUBQUE);

   return $lockedid;
   }

#