-
Notifications
You must be signed in to change notification settings - Fork 55
Running SciLuigi workflows with Slurm
In this example we show how to adapt the example workflow so that we can run it on a cluster using the Slurm Workload Manager (Slurm).
The workflow consists of two tasks, one which creates a file called foo.txt and writes foo in it, and one which reads foo.txt and swaps every occurrence of foo with the name of the cluster node on which the workflow is running.
To do this, we have to slightly change the workflow definition. In particular, we set up a runmode parameter which will allow us to specify from the command line whether we want the workflow to be run locally or on the cluster.
When we define the tasks, we have to pass an additional SlurmInfo object, which contains the specification of the resources we want to allocate to our tasks and other Slurm parameters.
class MyWorkflow(sciluigi.WorkflowTask):
# We define a runmode parameter to specify how to run the workflow
runmode = luigi.Parameter()
def workflow(self):
if self.runmode == 'local':
runmode = sciluigi.RUNMODE_LOCAL
elif self.runmode == 'hpc':
runmode = sciluigi.RUNMODE_HPC
elif self.runmode == 'mpi':
runmode = sciluigi.RUNMODE_MPI
else:
raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')
# we construct our tasks as before, but we pass an additional
# SlurmInfo object
foowriter = self.new_task('foowriter', MyFooWriter,
slurminfo=sciluigi.SlurmInfo(
runmode=runmode,
project='myname', # this should be the account (salloc -A, strange name choice)
partition='mypartition',
cores='1',
time='1:00:00',
jobname='foowriter',
threads='1'))
fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
slurminfo=sciluigi.SlurmInfo(
runmode=runmode,
project='myname',
partition='mypartition',
cores='1',
time='1:00:00',
jobname='fooreplacer',
threads='1'))
# Here we do the *magic*: Connecting outputs to inputs:
fooreplacer.in_foo = foowriter.out_foo
# Return the last task(s) in the workflow chain.
return fooreplacerThe parameters we pass to SlurmInfo will be used to construct a call to the Slurm command salloc, which will request the resources on the cluster. On these lines, you can see how the SciLuigi parameters translate to salloc parameters, and you can find a list of the salloc parameters here or by just typing salloc --help on your cluster.
It may be useful to point out that actually what in SciLuigi is called project_name, corresponds to the -A or --account in Slurm, which is the account to which resources will be charged.
Now we can move on to setting up our tasks. MyFooWriter will be unchanged, except for the fact that it subclass sciluigi.SlurmTask instead of sciluigi.Task:
class MyFooWriter(sciluigi.SlurmTask):
def out_foo(self):
return sciluigi.TargetInfo(self, 'foo.txt')
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')Then, we will create a bash script called replace_with_hostname.sh, in the same directory as our workflow, which is going to replace the foo string with the hostname of the machine on which the job is running. To do that, it should contain the following lines
#! /bin/bash
sed "s/foo/$(hostname)/" $1 > $2This script will use the sed command to replace occurrences of foo in a file specified by the first command line argument with the host name, and write the modified file to the second command line argument.
We can now write the MyFooReplacer task, which will call the bash script we just created.
class MyFooReplacer(sciluigi.SlurmTask):
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
# As the path to the returned target(info), we
# use the directory of the foo file, and save the modified
# version to a file called bar.txt
out_file = os.path.join(os.path.dirname(self.in_foo().path), 'bar.txt')
return sciluigi.TargetInfo(self, out_file)
def run(self):
# Here, we use the in-built self.ex() method, to execute our script, and we pass two command line arguments to it
self.ex('./replace_with_hostname.sh {in_file} {out_file}'.format(
in_file = self.in_foo().path,
out_file = self.out_replaced().path))At the end of the script, we need to add
if __name__ == '__main__':
sciluigi.run_local(main_task_cls=MyWorkflow)Finally, and supposing that we saved this code to a script called sciluigi_slurm_example.py, we can run locally, with
python sciluigi_slurm_example.py --runmode localOr on the cluster, with
python sciluigi_slurm_example.py --runmode hpc