#!/usr/bin/perl # #
# FILE: subscribe.perl ArDean Leith Sep. 2002
# Added local permit files Mar. 2005
# Local permit fallthru Apr. 2005
# Local & master permit checked Sep. 2005
# Local permit subsidiary Jun. 2007
# Partition Jun. 2007
# Seek, removed demon, removed openque Jan. 2008
# In que locked msg., removed /n Jan. 2008
#
# MASTER LOCATION: /net/bali/usr1/spider/pubsub/
#
# PURPOSE: Subscribe to job from pubsub que. Left running like a deamon
#
# USAGE EXAMPLE: subscribe.perl pubsub.que
#
# INPUT:
# queue (argument #1) OPTIONAL (has hardwired default)
#
# OUTPUT:
# command (returned)
#
#
# ------------------ subscribe command --------------------------------
#use POSIX qw(uname);
use File::Basename;
# String containing 'ps' identifier for SPIDER job
if (! $ARGV[0])
{die "Usage: $0 [publish_que]
Purpose: Subscribe to job from publisher que
Argument:
publish_que is: file containing publisher que to be watched.\n";
}
local($locnow);
$master = $ENV{'PUBSUB_MASTER'}; # site specific name of master node
# site specific name cluster partition (can usually be left undefined)
$partition = $ENV{'PUBSUB_PARTITION'}; # site specific name cluster partition
system("echo ");
$dir_pubsub = $ENV{'PUBSUB_DIR'};
$que = "$dir_pubsub/$ARGV[0]$partition";
system("echo Subscribe que: $que \n");
$permitfile = "$dir_pubsub/pubsub.permit$partition";
system ("echo Permitfile: $permitfile \n");
# Converting this process to a 'daemon' not implemented fails
$time_to_die = 0; # RUNS FOREVER!!
$checkwait = 2; # Wait time before checking QUE again
until ($time_to_die)
{
# Find a job
($jobid,$locnow) = &findajob("$que");
$locnow =~ s/\n$//;
$locnow =~ s/ //;
#system(" echo subscribe found jobid: $jobid locnow: $locnow __\n");
$started = 0;
if ($jobid > 0)
{ # Got a job from the QUE, find a subscriber for this job
#system(" echo findajob returned jobid: $jobid \n");
# Find a node for the job to run on
$found_a_node = 0;
until ($found_a_node)
{
#system(" echo found_a_node: $found_a_node \n");
$base = basename($permitfile);
$localpermitfile = "$locnow/$base";
#system(" echo localpermitfile is: $localpermitfile \n");
if ( -r $localpermitfile )
{
#system("echo Using localpermitfile: $localpermitfile");
open(PERMIT, "< $localpermitfile") ||
die "Can not open local permissions file: $localpermitfile \n";
$localpermit = 1;
}
else
{
#system(" echo Using: $permitfile");
open(PERMIT, "< $permitfile") ||
die "Can not open permissions file: $permitfile \n";
$localpermit = 0;
}
$out = ' ';
$n = 0;
$foundid = 0;
while () # Read permit file, skip comment lines (#)
{
# Read input line from permit file, skip comment lines (anything containing #) --------
#system("echo raw permit input: $_ ");
if (/\s*#/)
{ #system("echo skipping: $_ ");
next;
}
#system("echo permit string: $_ ");
($machine,$limit,$days,$time1,$time2,$check,$comment) = /(\w+)\s+(\d+)\s+(\d+)\s+(\d+:\d\d)\s+(\d+:\d\d)\s+(\d+)\s+(.*)/; # split at spaces
#system(" echo limit,days,time1,time2,check: $limit,$days,$time1,$time2,$checkok \n");
#system(" echo limit: $limit \n");
if ( $localpermit )
{ # Check to see if machine OK by master permit file --------------------------
open(MASTERPERMIT, "< $permitfile") ||
die "Can not open master permissions file: $permitfile \n";
@master_line = ; # Read whole file
close(MASTERPERMIT);
$okinmaster = 0;
foreach(@master_line)
{
if (m/$machine/ && ! m/#+.*$machine/)
{ # Matching machine in master permit
# print "match and not comment : " . $_ . "\n";
$okinmaster = 1;
last;
}
}
if (! $okinmaster)
{next;} # Machine OK in local permit but not permitted in master .permit
}
# Find how many SPIDER pubsub jobs are already running on this machine --------------
$locked = `grep '$machine ' $que`;
if ($locked)
{
system("echo On $machine: a different job is already starting");
next;
}
#system("echo On $machine: No jobs starting");
# SEARCHES FOR A STRING SPECIFIC TO COMMAND USAGE %%%% (spider)
$procs = `rsh -n $machine "ps -u root -N -o cmd | grep 'spider' | grep -v grep | wc -l" 2>&1`;
#$procs = `rsh -n $machine "ps -ef | grep spider | grep -v grep | wc -l" 2>&1`;
#$procs = `rsh -n $machine "ps -u root -N -o cmd | grep 'tcsh.*spider' | grep -v grep | wc -l" 2>&1`;
#$procs = `rsh -n $machine ps -ef | grep spider | wc -l`;
#system("echo Number of Pubsub jobs on $machine: $procs");
if ($procs =~ m/[A-Za-z]+/x)
{ # Something wrong with rsh call
system("echo ERROR: $procs");
next;
}
chop($procs);
if ($procs >= $limit)
{next;} # This machine has too many jobs running
# Check to see if time OK for running on this machine ---------------------------
# Get current time
($sec,$min,$hour,$mday,$month,$year,$dayofweek,$line) = localtime(time());
($hour1,$min1) = split(/:/,$time1);
($hour2,$min2) = split(/:/,$time2);
$mins0 = $hour * 60 + $min ;
$mins1 = $hour1 * 60 + $min1;
$mins2 = $hour2 * 60 + $min2;
#system("echo days: $days day of week: $dayofweek \n");
#system("echo mins0: $mins0 mins1: $mins1 mins2: $mins2 \n");
#system("echo hour: $hour hour1: $hour1 hour2: $hour2 \n");
if (($mins0 < $mins1) || ($mins0 > $mins2))
{
#system("echo ($mins0 < $mins1) || ($mins0 > $mins2) \n");
next;
}
if (($days == 5) && (($dayofweek == 0) || ($dayofweek == 6)))
{
#system("echo ($days == 5) && (($dayofweek != 0) || ($dayofweek != 6)) \n");
next;
}
# OK time to start a job on this machine
# system("echo On: $machine time OK to start a job \n");
# OK to start a job on this machine -------------------------------------------------
system("echo On: $machine Current jobs: $procs OK to start: $jobid \n");
# Lock the job in the que by giving it negative number
$gotid = &lockajob($que,$jobid,$machine);
if ($gotid == $jobid)
{ # locked OK
#system("echo Locked: $gotid \n");
$found_a_node = 1;
# Start the job
#system("echo Can start job: $jobid on: $machine \n");
$started = 1;
last;
} # End of if ($gotid == $jobid)
elsif ($gotid < 0)
{
system("echo Error -- lockajob could not find job: $jobid in: $que!!");
$found_a_node = 1;
last;
} # End of if ($gotid < 0)
} # End of while (PERMIT)
close(PERMIT);
$found_a_node = 1; #Force it to check another job
} # END of until ($found_a_node)
} # END of if ($jobid)
if (!$started)
{select(undef,undef,undef,$checkwait);} # Waits $check seconds and loops
} # END of until ($time_to_die)
exit;
# ----------------------- findajob ---------------------------------
# USAGE: findajob ("pubsub.que")
#
# INPUT:
# Que name & location (argument #1)
#
# OUTPUT:
# Found job id (returned)
sub findajob
{ # Finds next job from que, returns job info
if (@_ < 1)
{die "Usage: $0 que
que is: file containing publisher que. \n"
}
local($rest,$jobid,$user,$n,$locnow);
$que = $_[0];
#print "Opening: " . $que . "\n";
if ( ! open(PUBQUE, "+< $que"))
{ system ("echo Subscriber can not open: $que \n") ; return 0; }
$foundid = 0;
$rest = ' ';
$n = 0;
# Shared lock= 1, Exclusive lock=2, Non-blocking request=4, Free lock=8
unless (flock(PUBQUE, 1))
{ die " Can not read_lock: $que \n";}
seek(PUBQUE,0,0) || die "Can't seek to start of: $que \n"; #al jan08
while () # Read input from que file
{
($jobidt, $user, $locnow, $rest) = split(/\b\s+/,$_,3);
$n++;
#print "input[".$n."]: " . $_."\n";
#print "jobidt[".$n."]: " . $jobidt . " user: " . $user . "\n";
#print "locnow[".$n."]: " . $locnow . " rest: " . $rest . "\n";
if ($jobidt > 0)
{ # not locked job, need to start this job
#print "jobidt: ". $jobidt."locnow: ".$locnow." rest:". $rest."\n";
$foundid = $jobidt;
last;
}
}
flock(PUBQUE,8); # unlock que file
#system ("echo foundid,locnow: $foundid,$locnow\n") ;
return $foundid, $locnow;
}
# ----------------------- lockajob ---------------------------------
# USAGE example: lockajob ("pubsub.que",1,node1)
#
# INPUT:
# queue (argument #1)
# job number (argument #2)
# machine (argument #3)
#
# OUTPUT:
# locked number (returned)
#
sub lockajob
{ # Finds specific job in que, locks job, returns job id
if (@_ < 1)
{die "Usage: $0 que jobid machine\n
que is: Publisher que.\n
jobid is: Job PID to start.\n
machine is: Machine to start this job on. \n"
}
local($que,$jobid,$machine,$user,$rest);
$que = $_[0];
$jobid = $_[1];
$machine = $_[2];
#print "lockajob que: $que jobid: $jobid machine: $machine\n";
$lockedid = 0;
if ( ! open(PUBQUE, "+< $que"))
{ system ("echo lockajob can not open: $que \n") ; return $lockedid; }
# No buffering of output
my $old_fh = select(PUBQUE);
local $| = 1;
select($old_fh);
$out = '';
$n = 0;
$lockedid = -1;
# Shared lock= 1, Exclusive lock=2, Non-blocking request=4, Free lock=8
unless (flock(PUBQUE, 2))
{ die " Can not write_lock: $que \n";}
seek(PUBQUE,0,0) || die "Can't seek to start of: $que \n"; #al jan08
while () # read input lines from que file
{
($idnow,$user,$rest) = split(/\b\s+/,$_,3);
$n++;
#print "idnow[ $n ]: $idnow $user \n";
if ($idnow == $jobid)
{ # Want to lock this job now
#print "idnow: " . $idnow . "rest:" . $rest . "\n\n";
$out .= "-$jobid $user $machine $rest"; # alter lock & copy line
$lockedid = $jobid;
#system ("echo lockedid,rest: $lockedid, $rest \n") ;
while () # Read remaining input, copy to OUTPUT
{$out .= $_; }
last;
}
else
{ # Another job, just copy to OUTPUT
$out .= $_;
# print "copy: " . $_ ;
}
} # END of: while ()
if ($lockedid >= 0)
{
# Copy new publisher que to PUBQUE file
seek(PUBQUE,0,0) || die "Can't seek to start of $que";
print PUBQUE $out;
truncate(PUBQUE, tell(PUBQUE));
}
flock(PUBQUE, 8);
close(PUBQUE);
return $lockedid;
}
#