From c8729fb7796f2d7fd7a6639e03c9ce6799a02f51 Mon Sep 17 00:00:00 2001 From: rajarshi Date: Fri, 8 Mar 2024 17:05:06 +0000 Subject: [PATCH 1/4] converted to python3 --- taskfarm | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/taskfarm b/taskfarm index 142af07..656fc3a 100755 --- a/taskfarm +++ b/taskfarm @@ -1,4 +1,4 @@ -#!/usr/bin/python -E +#!/usr/bin/env python3 # # ICHEC Taskfarm utility # @@ -82,12 +82,12 @@ cores_per_node = len(set([ line[1] for line in lines if line[0]=='processor']))/ # A print function to control verbosity def verbose_print(str): if not 'TASKFARM_SILENT' in os.environ: - print str + print(str) sys.stdout.flush() # Check comand line arguments if len(sys.argv) != 2: - print >> sys.stderr, 'Usage: %s ' % sys.argv[0] + sys.stderr.write('Usage: %s ' % sys.argv[0]) sys.exit(1) taskfile = sys.argv[1] @@ -101,7 +101,7 @@ if 'TASKFARM_PPN' in os.environ: try: ppn = int(os.environ['TASKFARM_PPN']) except: - print >> sys.stderr, 'Error: $TASKFARM_PPN must be an integer value.' + sys.stderr.write('Error: $TASKFARM_PPN must be an integer value.') sys.exit(1) else: ppn = cores_per_node * smt @@ -142,13 +142,13 @@ else: # Error if an invalid process count is requested if smt > 1 and ppn > cores_per_node * threads_per_core: - print >> sys.stderr, 'Error: $TASKFARM_PPN must not exceed %d processes per node when $TASKFARM_SMT is set.' %(cores_per_node * threads_per_core,) + sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node when $TASKFARM_SMT is set.' %(cores_per_node * threads_per_core,)) sys.exit(1) elif smt == 1 and ppn > cores_per_node: - print >> sys.stderr, 'Error: $TASKFARM_PPN must not exceed %d processes per node.' %(cores_per_node,) + sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node.' %(cores_per_node,)) sys.exit(1) elif ppn < 1: - print >> sys.stderr, 'Error: $TASKFARM_PPN must request one or more processes per node.' + sys.stderr.write('Error: $TASKFARM_PPN must request one or more processes per node.') sys.exit(1) # Generate a list of unique nodes @@ -161,7 +161,7 @@ cpath=os.environ['PWD'] try: nodef = open(os.environ['PBS_NODEFILE']) except KeyError: - print >> sys.stderr, 'Error opening PBS_NODEFILE. Exiting.' + sys.stderr.write('Error opening PBS_NODEFILE. Exiting.') sys.exit(2) id = 0 cpt=cores_per_node/ppn @@ -207,7 +207,7 @@ tasknum = 0 try: taskf = open(taskfile) except: - print >> sys.stderr, 'Error opening task file. Exiting.' + sys.stderr.write('Error opening task file. Exiting.') sys.exit(2) l1 = [ processLine(line.strip()) for line in taskf if len(line.strip()) > 0] @@ -290,7 +290,7 @@ def wait(): # exit = status >> 8 id = taskinfo[pid]['id'] if exit != 0: - print >> sys.stderr, "'%s' killed by sig %d" % (taskinfo[pid]['task'], signal) + sys.stderr.write("'%s' killed by sig %d" % (taskinfo[pid]['task'], signal)) if not keep: popen_tmp = os.unlink(taskinfo[pid]['script']) del taskinfo[pid] @@ -317,7 +317,7 @@ for task in tasklist: f.write("#!/bin/bash\n"); f.write(task) f.close() - os.chmod(fp,0755) + os.chmod(fp,0o755) command = "%s -n %d -env I_MPI_PIN_PROCESSOR_LIST %s -host %s %s" % (launch,cores_per_task,cores,host,fp) popen_tmp = Popen([launch,'-env','I_MPI_PIN_PROCESSOR_LIST',cores,'-n', str(cores_per_task),'-host',host,fp]) pid = popen_tmp.pid From 6f671f538dc60509083b28c5bd608d079f377187 Mon Sep 17 00:00:00 2001 From: rajarshi Date: Mon, 11 Mar 2024 01:15:14 +0000 Subject: [PATCH 2/4] added newline to sys.stderr.write() --- taskfarm | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/taskfarm b/taskfarm index 656fc3a..5eb2646 100755 --- a/taskfarm +++ b/taskfarm @@ -80,14 +80,14 @@ threads_per_core = len(set([ line[1] for line in lines if line[0]=='physical id' cores_per_node = len(set([ line[1] for line in lines if line[0]=='processor']))/threads_per_core # A print function to control verbosity -def verbose_print(str): +def verbose_print(string): if not 'TASKFARM_SILENT' in os.environ: - print(str) + print(string) sys.stdout.flush() # Check comand line arguments if len(sys.argv) != 2: - sys.stderr.write('Usage: %s ' % sys.argv[0]) + sys.stderr.write('Usage: %s ' % sys.argv[0] + '\n') sys.exit(1) taskfile = sys.argv[1] @@ -101,7 +101,7 @@ if 'TASKFARM_PPN' in os.environ: try: ppn = int(os.environ['TASKFARM_PPN']) except: - sys.stderr.write('Error: $TASKFARM_PPN must be an integer value.') + sys.stderr.write('Error: $TASKFARM_PPN must be an integer value.\n') sys.exit(1) else: ppn = cores_per_node * smt @@ -142,13 +142,13 @@ else: # Error if an invalid process count is requested if smt > 1 and ppn > cores_per_node * threads_per_core: - sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node when $TASKFARM_SMT is set.' %(cores_per_node * threads_per_core,)) + sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node when $TASKFARM_SMT is set.' %(cores_per_node * threads_per_core,) + '\n') sys.exit(1) elif smt == 1 and ppn > cores_per_node: - sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node.' %(cores_per_node,)) + sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node.' %(cores_per_node,) + '\n') sys.exit(1) elif ppn < 1: - sys.stderr.write('Error: $TASKFARM_PPN must request one or more processes per node.') + sys.stderr.write('Error: $TASKFARM_PPN must request one or more processes per node.\n') sys.exit(1) # Generate a list of unique nodes @@ -161,7 +161,7 @@ cpath=os.environ['PWD'] try: nodef = open(os.environ['PBS_NODEFILE']) except KeyError: - sys.stderr.write('Error opening PBS_NODEFILE. Exiting.') + sys.stderr.write('Error opening PBS_NODEFILE. Exiting.\n') sys.exit(2) id = 0 cpt=cores_per_node/ppn @@ -207,7 +207,7 @@ tasknum = 0 try: taskf = open(taskfile) except: - sys.stderr.write('Error opening task file. Exiting.') + sys.stderr.write('Error opening task file. Exiting.\n') sys.exit(2) l1 = [ processLine(line.strip()) for line in taskf if len(line.strip()) > 0] @@ -290,7 +290,7 @@ def wait(): # exit = status >> 8 id = taskinfo[pid]['id'] if exit != 0: - sys.stderr.write("'%s' killed by sig %d" % (taskinfo[pid]['task'], signal)) + sys.stderr.write("'%s' killed by sig %d" % (taskinfo[pid]['task'], signal) + '\n') if not keep: popen_tmp = os.unlink(taskinfo[pid]['script']) del taskinfo[pid] From a7c40286ea6a088b7b22842ae12c76fb47febb20 Mon Sep 17 00:00:00 2001 From: rajarshi Date: Tue, 12 Mar 2024 18:27:31 +0000 Subject: [PATCH 3/4] changed --- taskfarm | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/taskfarm b/taskfarm index 5eb2646..289baba 100755 --- a/taskfarm +++ b/taskfarm @@ -156,12 +156,12 @@ work_nodes = [] node_ids = [] id_map = {} core_range={} -jid=os.environ['PBS_JOBID'] +jid=os.environ['SLURM_JOBID'] cpath=os.environ['PWD'] try: - nodef = open(os.environ['PBS_NODEFILE']) -except KeyError: - sys.stderr.write('Error opening PBS_NODEFILE. Exiting.\n') + nodef = os.environ['SLURM_JOB_NODELIST'] +except: + sys.stderr.write('Error opening SLURM_JOB_NODELIST. Exiting.\n') sys.exit(2) id = 0 cpt=cores_per_node/ppn From e3cbff4523d12d83dad199d1acb32058c54f47d9 Mon Sep 17 00:00:00 2001 From: rajarshi Date: Tue, 12 Mar 2024 18:27:44 +0000 Subject: [PATCH 4/4] renamed older version --- taskfarm_pbs | 329 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100755 taskfarm_pbs diff --git a/taskfarm_pbs b/taskfarm_pbs new file mode 100755 index 0000000..5eb2646 --- /dev/null +++ b/taskfarm_pbs @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +# +# ICHEC Taskfarm utility +# +# Brief note on versions: +# +# The first taskfarm utility deployed on ICHEC systems was an MPI-based program. +# The module was named "taskfarm". Mostly deprecated but module kept on Stokes +# for legacy reasons. +# +# Since then a new, Python-based implementation was written and first deployed +# on Stokes. The module was named "taskfarm2" to differentiate it from the MPI +# implementation. However, its version numbers went from 1.0 to 1.2 (see below). +# +# Fionn inherits taskfarm2/1.2 from Stokes, with some minor modifications. But +# the name of the module reverts to taskfarm with an initial version number of +# 2.3 to reflect its provenance. +# +# Version 1.0: Initial release on Stokes. +# +# Version 1.1: Improvements +# - Full environment support +# - Automatic directory change +# - Error handling on task file open failure +# +# Version 1.2: Bug Fix +# - Resolved process management race condition +# Improvements +# - Core/tread counts as parameters +# - SMT mode for extra threads - %TASKFARM_TASKNUM% token in taskfile +# - Warn when large task counts are submitted +# +# Version 2.3: Improvements +# - Explicitly call system version of Python to avoid potential +# conflict, e.g. if user loads a different Python module in the PBS +# script prior to running taskfarm. For similar reasons the "-E" flag +# is used to ignore user specified PYTHONPATH and PYTHONHOME paths +# which could cause module import failure in this script. +# - Modified Popen() to NOT invoke a bash login shell via "bash -l". +# Otherwise startup files (e.g. ~/.bash_profile) are sourced. Each +# task should inherit its environment from the PBS script calling +# taskfarm; sourcing startup files might cause unexpected behaviour. +# Version 2.4: New features +# - force a kill of the farm if a certain file exists in the forder of the running task +# default name of the file is abbadon change it via TASKFARM_STOPFILE +# - force a kill of the farm if a certain file exists in the forder of the running task +# and contains a certain magic word by default no magic is set. +# use TASKFARM_STOPMAGIC to define a word +# - TASKFARM_SLEEP controls at what intervals one checks for the file and, if the case, +# for the magic. Default value is 5 seconds +# Version 2.5: New features +# - replace pdsh by mpirun wrapped tasks +# - added TASKFARM_GROUP specify how may tasks to group in a metatask +# - added TASKFARM_KEEP if the variable is set intermediate metatasks will not be +# deleted +# - added TASKFARM_MPI if variable is set assumes that mpi binaries are executed +# - added TASKFARM_MPI_LAUNCHER contains the launcher to be used to push the tasks +# default is mpirun, at time of writing intel mpirun path is added in the module +# specify a new one by using this variable. +# Version 2.6: Bug fix +# - Issue with ShellShock fix. +# Version 2.7: Bug fix +# - Remove empty lines from the task list. +# - Use I_MPI_PIN_PROCESSOR_LIST instead of numactl, in order to avoid clash between +# cores selected by the mpirun and the one required via numactl. +# +# + +import os +import sys +import signal +import time +import re + +from subprocess import Popen + +# Physical characteristics of the compute nodes +lines=[ line.strip().replace('\t','').split(':') for line in open('/proc/cpuinfo') ] +threads_per_core = len(set([ line[1] for line in lines if line[0]=='physical id'])) +cores_per_node = len(set([ line[1] for line in lines if line[0]=='processor']))/threads_per_core + +# A print function to control verbosity +def verbose_print(string): + if not 'TASKFARM_SILENT' in os.environ: + print(string) + sys.stdout.flush() + +# Check comand line arguments +if len(sys.argv) != 2: + sys.stderr.write('Usage: %s ' % sys.argv[0] + '\n') + sys.exit(1) +taskfile = sys.argv[1] + +# How many processes per node +if 'TASKFARM_SMT' in os.environ: + smt = threads_per_core +else: + smt = 1 + +if 'TASKFARM_PPN' in os.environ: + try: + ppn = int(os.environ['TASKFARM_PPN']) + except: + sys.stderr.write('Error: $TASKFARM_PPN must be an integer value.\n') + sys.exit(1) +else: + ppn = cores_per_node * smt + +if 'TASKFARM_MPI' in os.environ: + cores_per_task=cores_per_node/ppn +else: + cores_per_task=1 + +if 'TASKFARM_MPI_LAUNCHER' in os.environ: + launch=os.environ['TASKFARM_MPI_LAUNCHER'] +else: + launch='mpirun' + +if 'TASKFARM_SLEEP' in os.environ: + sleep = float(os.environ['TASKFARM_SLEEP']) +else: + sleep = 5.0 + +if 'TASKFARM_STOPFILE' in os.environ: + stopfile=os.environ['TASKFARM_STOPFILE'] +else: + stopfile="abbadon" +#if a stop magic is set one shall look in the running folder of the PID in the stopfile for it... +# if found kill the farm +if 'TASKFARM_STOPMAGIC' in os.environ: + stopMagic=os.environ['TASKFARM_STOPMAGIC'] +else: + stopMagic="" +if 'TASKFARM_GROUP' in os.environ: + groupTasks=int(os.environ['TASKFARM_GROUP']) +else: + groupTasks=1 +if 'TASKFARM_KEEP' in os.environ: + keep=True +else: + keep=False + +# Error if an invalid process count is requested +if smt > 1 and ppn > cores_per_node * threads_per_core: + sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node when $TASKFARM_SMT is set.' %(cores_per_node * threads_per_core,) + '\n') + sys.exit(1) +elif smt == 1 and ppn > cores_per_node: + sys.stderr.write('Error: $TASKFARM_PPN must not exceed %d processes per node.' %(cores_per_node,) + '\n') + sys.exit(1) +elif ppn < 1: + sys.stderr.write('Error: $TASKFARM_PPN must request one or more processes per node.\n') + sys.exit(1) + +# Generate a list of unique nodes +work_nodes = [] +node_ids = [] +id_map = {} +core_range={} +jid=os.environ['PBS_JOBID'] +cpath=os.environ['PWD'] +try: + nodef = open(os.environ['PBS_NODEFILE']) +except KeyError: + sys.stderr.write('Error opening PBS_NODEFILE. Exiting.\n') + sys.exit(2) +id = 0 +cpt=cores_per_node/ppn +for line in nodef: + node = line.strip() + if work_nodes.count(node) == 0: + for i in range(ppn): + tmp_id = id+i%cores_per_node + node_ids.append(tmp_id) + work_nodes.append(node) + id_map[tmp_id] = node + start=i%cores_per_node*cpt + end=start+cpt-1 + core_range[tmp_id]=str(start)+"-"+str(end) + id += 1 +nodef.close() +verbose_print('Taskfarm started with %d workers (%d per node).' % (len(work_nodes), ppn)) + +def extractPaths(taskline): + np=re.search("^cd (\w+)", taskline) + if np is None: + np=re.search("^pushd (\w+)", taskline) + if np is None: + return "" + return np.groups()[0] + +def processLine(line): + if groupTasks!=1: + return preservePath(line) + else: + return line + +def preservePath(line): + if line[-1]==';': + return line+" cd "+cpath + else: + return line + "; cd "+cpath + +# Generate a task list from file +tasklist = [] +extraPaths = [] +tasknum = 0 +try: + taskf = open(taskfile) +except: + sys.stderr.write('Error opening task file. Exiting.\n') + sys.exit(2) + +l1 = [ processLine(line.strip()) for line in taskf if len(line.strip()) > 0] +lines = [ l1[i].replace('%TASKFARM_TASKNUM%', str(i)) for i in range(len(l1)) ] +if groupTasks != 1: + tf=len(lines) + tasklist = filter(None,[' && '.join(lines[i*groupTasks:(i+1)*groupTasks]) for i in range(tf//groupTasks+1)]) +else: + tasklist = lines +tasknum=len(tasklist) +extraPaths = [ extractPaths(task) for task in tasklist ] +taskf.close() + +if groupTasks == 1 : + verbose_print('Taskfarm read %d tasks from file \'%s\'.' % (len(tasklist), taskfile)) +else: + verbose_print('Taskfarm read %d tasks from file \'%s\' grouped as %d metatasks.' % (tf,taskfile,len(tasklist))) + +if len(tasklist) % len(work_nodes) != 0: + verbose_print('Warning: Taskfarm input should ideally provide a multiple of %d tasks for %d workers.' % (len(work_nodes), len(work_nodes))) +if len(tasklist) / len(work_nodes) > 20: + verbose_print('Warning: There are %d tasks for %d workers. Taskfarm is not ideal for high-throughput workloads.' % (len(tasklist), len(work_nodes))) + verbose_print('Warning: Running many tasks of a very short duration with Taskfarm is quite inefficient.') + verbose_print('Info: You can aggregate tasks using export TASKFARM_GROUP=xxx') + verbose_print('Info: with xxx how many consecutive tasks to group in a metatask') + +# Build an environment +environ = '' +for param in os.environ: + if param not in ['PROFILEREAD', 'BASH_FUNC_module()'] : + environ = environ + 'export ' + param + '=\'' + os.environ[param] + '\'; ' + +# Record information about tasks +taskinfo = {} + +def EnforceTrailingSlash(path): + if path[-1] != '/': + return path + '/' + else: + return path + +def checkMagic(filepath): + with open(filepath,"r") as f: + for line in f: + if stopMagic in line: + return True + + return False + + +def checkUserExit(pid): + fpath=EnforceTrailingSlash(taskinfo[pid]['path'])+stopfile + if os.path.exists(fpath) and stopMagic=="": + verbose_print("exit because file "+fpath+" is present!!!!") + sys.exit(1) + + if os.path.exists(fpath) and checkMagic(fpath): + verbose_print("exit because file "+fpath+" contains magic " + stopMagic) + sys.exit(1) + +# Wait for processes to exit +# Handle only one returned process per call +def wait(): + finished = {} + stillSleepy=True + while stillSleepy: + for pid in taskinfo.keys(): + if taskinfo[pid]['process'].poll() != None: + finished[pid] = {'status':taskinfo[pid]['process'].poll()} + checkUserExit(pid) + + if len(finished) > 0: + pid = finished.keys()[0] + exit = signal = finished[pid]['status'] + stillSleepy=False + else: + time.sleep(sleep) +# pid, status = os.wait() +# signal = status & 0xFF +# exit = status >> 8 + id = taskinfo[pid]['id'] + if exit != 0: + sys.stderr.write("'%s' killed by sig %d" % (taskinfo[pid]['task'], signal) + '\n') + if not keep: + popen_tmp = os.unlink(taskinfo[pid]['script']) + del taskinfo[pid] + return id + +# Signal handler for SIGINT +def kill_all(signum, stack): + verbose_print('Taskfarm interrupted. Please check for orphaned processes.') + sys.exit(1) +signal.signal(signal.SIGINT, kill_all) + +# Wait for a node to become free then run a task +k=0 +for task in tasklist: + if len(node_ids) == 0: + node_ids.append(wait()) + id = node_ids[0] + host = id_map[id] + cores=core_range[id] + del node_ids[0] + task = environ + ' cd ' + os.environ['PWD'] + ' && ' + task + fp=cpath+"/task-"+host+"-id"+str(id)+"-"+jid + '.' + str(k) + f=open(fp,"w") + f.write("#!/bin/bash\n"); + f.write(task) + f.close() + os.chmod(fp,0o755) + command = "%s -n %d -env I_MPI_PIN_PROCESSOR_LIST %s -host %s %s" % (launch,cores_per_task,cores,host,fp) + popen_tmp = Popen([launch,'-env','I_MPI_PIN_PROCESSOR_LIST',cores,'-n', str(cores_per_task),'-host',host,fp]) + pid = popen_tmp.pid + taskinfo[pid] = {'id': id, 'task': command, 'process': popen_tmp, 'path':cpath+"/"+extraPaths[k],'script':fp} + k += 1 +# Once all tasks have been started wait for them to finish +while len(taskinfo) > 0: + wait() +verbose_print('Taskfarm completed all tasks.')