.. _workflow-creation-api: ===================== Workflow creation API ===================== .. contents:: Workflow creation API :local: The workflows are created using the soma_workflow.client API. This page presents the documentation of the Workflow, Job, FileTransfer and SharedResourcePath classes. .. seealso:: :ref:`examples` for a quick start. Workflow ======== .. autoclass:: client.Workflow :members: Group ===== .. autoclass:: client.Group :members: Job === .. inheritance-diagram:: client_types.Job client_types.EngineExecutionJob client_types.BarrierJob custom_jobs.MapJob custom_jobs.ReduceJob custom_jobs.ListCatJob custom_jobs.LeaveOneOutJob custom_jobs.CrossValidationFoldJob custom_jobs.StrCatJob :parts: 1 .. autoclass:: client_types.Job :members: .. autoclass:: client_types.EngineExecutionJob :members: .. autoclass:: client_types.BarrierJob :members: .. autoclass:: custom_jobs.MapJob :members: .. autoclass:: custom_jobs.ReduceJob :members: .. autoclass:: custom_jobs.ListCatJob :members: .. autoclass:: custom_jobs.StrCatJob :members: .. autoclass:: custom_jobs.LeaveOneOutJob :members: .. autoclass:: custom_jobs.CrossValidationFoldJob :members: FileTransfer ============ .. autoclass:: client.FileTransfer :members: .. _shared-resource-path-api: SharedResourcePath ================== .. autoclass:: client.SharedResourcePath :members: .. _temporary-path-api: TemporaryPath ============= .. autoclass:: client.TemporaryPath :members: .. _dynamic_outputs: Dynamic output parameters values in Soma-Workflow ================================================= This is a new feature in Soma-Workflow 3.1. It allows jobs to produce "output parameters", which will in turn be used as inputs in downstream jobs. As jobs are basically commandlines, they normally cannot have outputs other than files. Thus we manage the output parameters by allowing a job to write an additional file (JSON format) containing a dictionary of output parameters and values. Such a job will be "marked" as producing outputs (Job variable ``has_outputs``, and before running it, Soma-Workflow will set the environment variable ``SOMAWF_OUTPUT_PARAMS`` for it, with an output filename value. The job should write the output file at this location. The output parameters file will be read by Soma-Workflow once the job has successfully finished. Then when running later jobs, their input values will be changed accordingly. To allow this, we need a few things: Using dynamic output parameters ------------------------------- * such jobs should declare a dictionary of input parameters (Job variable ``param_dict``) * the workflow should define some "parameters links" between jobs in order to connect output parameters values from one job to the input parameters values of downstream jobs: Workflow variable ``param_linnks``. * The input parameters dictionary of a job has to be used to re-build its commandline properly after their values have changed. To do so, the commandline arguments of such jobs can contain some substitution strings in the python style ``"%(param)s"``, where ``param`` is a named parameter in the dictionary. To put things togethr in an example:: from soma_workflow.client import Job, Workflow, WorkflowController # workflow definition job1 = Job( command=['my_program', '%(in1)s'], name='job1:cp', param_dict={'in1': '/tmp/input_file'}, has_outputs=True) job2 = Job( command=['cat', '%(in1)s'], name='job2:cat', param_dict={'in1': 'undefined'}) workflow = Workflow(jobs=[job1, job2], name='test_workflow', param_links={job2: {'in1': [(job1, 'out1')]}}) # running it is the classical way # (or using soma_workflow_gui) wc = WorkflowController() wf_id = wc.submit_workflow(workflow) wc.wait_workflow(wf_id) Here, ``job1`` will run a program, ``my_program`` which will do its job, and write the output parameters file. The ``param_dict`` in job1 is not necessary here, since its parameters values will not change according to upstream operations, but if we want to allow job classes to be used in such "dynamic" workflows, we rather have to get used to name all their parameters. ``job2`` input parameter ``in1`` will get its value from ``job1`` ouput named ``out1``. This is specified via the workflow ``param_links`` which is a dict associating to a destination job a set of parameters values, each coming from another job's parameter. For instance:: param_links = { dest_job1: { 'dest_param1': [(src_job1, 'src_output_param1')], 'dest_param2': [(src_job2, 'src_output_param2')], }, dest_job2: { 'dest_param3': [(src_job3, 'src_output_param3')], 'dest_param4': [(src_job4, 'src_output_param4')], }, } Note that ``param_links`` in the workflow implicitly add jobs dependencies. These new dependencies are automatically added to the "classical" jobs dependencies, and may completely replace them if parameters links are correctly specified and used all along the workflow. Now the ``job1`` commandline program, if written in Python language, could look like this:: #!/usr/bin/env python import os import shutil import json import sys # we have to write the output parameters values in this file: output_param_file = os.environ.get('SOMAWF_OUTPUT_PARAMS') # we impose a "hard-coded" output path, just because our programs # works that way. out_file = os.path.expanduser('~/bubulle.txt') in_file = sys.argv[1] # input given as parameter from the job # let's say this is the "real" job of my_program shutil.copy2(in_file, out_file) # write output parameters (only one, in our case) if output_param_file: params = { 'out1': out_file, } json.dump(params, open(output_param_file, 'w')) As you can see, in a "classical" workflow, ``job2`` would not have known which file to print in its ``cat`` command. Using this "dynamic" parameters structure, it can work. Dynamic outputs and file transfers ---------------------------------- When a file name is an output of a job (the job decides where to write its output file), and this output file should be transfered from the computing resource to the client machine, then a FileTransfer object should be created (as usual), but with some little additional things to setup: * The FileTransfer object should be added both to the ``referenced_output_files`` parameter of jobs (as usual), and to the named parameters (``param_dict``) in order to tell how the FileTransfer object should be updated from the output JSON file of the job. * The FileTransfer cannot know its paths (both client and server side) before the job runs. * A client path has to be built after the job outputs tells which is the engine path. There is not absolute correct way to build it, it's a heuristic since we have no reference client directory in the general case. * To help this, we may use an additional job named parameter in the ``param_dict``: ``output_directory``, which will be used as a basis for client paths building. * Otherwise Soma-Workflow will try to guess a client directory from other job parameters. This will not work in all situations. Ex:: toutp = FileTransfer(False, 'job1_output') job1 = Job( command=['my_program', '%(in1)s'], name='job1:cp', param_dict={'in1': '/tmp/input_file', # this is an output FileTransfer param mathcing the # job ourput json dict 'out1': toutp, # this is a hint to where to write the client output file 'output_directory': '/tmp'}, has_outputs=True, referenced_output_files=[toutp]) job2 = swc.Job( command=[sys.executable, inp, 'job2_run', '%(in1)s'], name='job2:cat', param_dict={'in1': toutp}, referenced_input_files=ref_outp) workflow = swc.Workflow(jobs=[job1, job2], name='test_out_wf', param_links={job2: {'in1': [(job1, 'out1')]}}) .. _input_params_file: Job input parameters as file ---------------------------- When job parameters are specified through a dictionary of named parameters, it is also possible to pass the parameters set as a JSON file, rather than on commandline arguments. This may prove useful when the parameters list is long. In this situation the job must declare it using the ``use_input_params_file`` variable in Job. Then a temporary file will be created before the job is run. The parameters file is a JSON file containing a dictionary. The first level of this dict currently only contains the key ``"parameters"``, but will be able to hold job configuration in the future. The ``parameters`` sub-dict is the input parameters of the job. The input parameters fils location is specified through an environment variable: ``SOMAWF_INPUT_PARAMS``, and the job program should read it to get its actual arguments. Ex:: from soma_workflow.client import Job, Workflow, WorkflowController # workflow definition job1 = Job( command=['my_program', '%(in1)s'], name='job1:cp', param_dict={'in1': '/tmp/input_file'}, has_outputs=True) job3 = Job( command=['my_cat_program'], # no parameters on the commandline name='job2:cat', param_dict={'in1': 'undefined'}, use_input_params_file=True) # tell SWF to write the params file workflow2 = Workflow(jobs=[job1, job3], name='test_workflow', param_links={job3: {'in1': [(job1, 'out1')]}}) # running it is the classical way # (or using soma_workflow_gui) wc = WorkflowController() wf_id = wc.submit_workflow(workflow2) wc.wait_workflow(wf_id) Here ``job3`` must be modified to make it able to read the parameters file. In this example we call a program named ``my_cat_program``, which, if written in Python language, could look like this:: #!/usr/bin/env python import os import json # get the input pams file location from env variable param_file = os.environ.get('SOMAWF_INPUT_PARAMS') # read it params = json.load(open(param_file)) # now get our specific parameter(s) in_file = params['parameters']['in1'] # now the "real" job code just prints the file: print(open(in_file).read()) .. _params_link_functions: Parameters link functions ------------------------- To implement some classical schemas such as map/reduce or cross-validation patterns, we need to transform parameters values which are lists into a series of single values, each passed to a job, for instance. To do this, Soma-Workflow allows to specify link functions in ``param_links``. Such functions are called when building a job inputs before it runs (on engine side). Such functions are passed as an additional item in the link tuple, and may be strings (function definition, including module if appropriate), or a tuple containing a string (function definition) and arguments passed to it: Ex:: param_links = { dest_job1: { 'dest_param1': [(src_job1, 'src_output_param1', 'link_module.link_function')], 'dest_param2': [(src_job2, 'src_output_param2', ('link_module.link_function', 3))], }, } Arguments, if used, are passed in the function as first parameters, and may be used to specify a counter or identifier for the linked job. Functions are then called using 4 additional arguments: source parameter name, source parameter value, destination parameter name, destination parameter current value. The function should return the new destination parameter value. The destination parameter current value is useful in some cases, for instance to build a list from several source links (reduce pattern). A few functions are directly available in Soma-Workflow, in the module :mod:`param_link_functions`, to help implementing classical algorithmic patterns: :func:`~param_link_functions.list_to_sequence`, :func:`~param_link_functions.sequence_to_list`, :func:`~param_link_functions.list_all_but_one`, :func:`~param_link_functions.list_cv_train_fold`, :func:`~param_link_functions.list_cv_test_fold`. Custom functions may be used, as long as their module is specified, with the additional constraint that the custom functions modules should be installed on engine side on the computing resource. Note that this parameters links transformations are the reason why links are lists and not single values: intuitively one may think that a job parameter value is linked from a single job output value, but when dealing with lists, the situatioon may differ: a list can be built from several outputs. .. note:: aternative: Engine execution jobs An alternative to parameters link functions ias also available in Soma-Workflow 3.1 and later: :ref:`engine_execution_job`. They are more convenient to use in some situations, and less in others, so a workflow developer may choose the implementation he prefers. A cross-validation pattern example:: # let's say job1 inputs a list and outputs another list, 'outputs' job1 = Job( command=['my_program', '%(inputs)s'], name='prepare_inputs', param_dict={'inputs': ['file1', 'file2', 'file3', 'file4', 'file5']}, has_outputs=True) # train_jobs are identical train jobs training folds (we use 3 folds) # they output a single file, 'output' # test_jobs are identical test jobs testing the test part of each fold # they also output a single file, 'output' nfolds = 3 train_jobs = [] test_jobs = [] for fold in range(nfolds): job_train = Job( command=['train_model', '%(inputs)s'], name='train_fold%d' % fold, param_dict={'inputs', []}, has_outputs=True) train_jobs.append(job_train) job_test = Job( command=['test_model', '%(inputs)s'], name='test_fold%d' % fold, param_dict={'inputs', []}, has_outputs=True) test_jobs.append(job_test) # building the workflow jobs = [job1] + train_jobs + test_jobs dependencies = [] links = {} for fold in range(nfolds): links[train_jobs[fold]] = { 'inputs': [(job1, 'outputs', ('list_cv_train_fold', fold, nfolds))]} links[test_jobs[fold]] = { 'inputs': [(job1, 'outputs', ('list_cv_test_fold', fold, nfolds))]} workflow = Workflow(jobs, dependencies, name='train CV', param_links=links) A leave-one-out pattern example:: # let's say job1 inputs a list and outputs another list, 'outputs' job1 = Job( command=['my_program', '%(inputs)s'], name='prepare_inputs', param_dict={'inputs': ['file1', 'file2', 'file3', 'file4', 'file5']}, has_outputs=True) # train_jobs are identical train jobs training folds # they output a single file, 'output' # test_jobs are identical test jobs testing the single leftover file of # each fold. They also output a single file, 'output' nfolds = len(job1.param_dict['inputs']) train_jobs = [] test_jobs = [] for fold in range(nfolds): job_train = Job( command=['train_model', '%(inputs)s'], name='train_fold%d' % fold, param_dict={'inputs', []}, has_outputs=True) train_jobs.append(job_train) job_test = Job( command=['test_model', '%(input)s'], name='test_fold%d' % fold, param_dict={'input', 'undefined'}, has_outputs=True) test_jobs.append(job_test) # building the workflow jobs = [job1] + train_jobs + test_jobs dependencies = [] links = {} for fold in range(nfolds): links[train_jobs[fold]] = { 'inputs': [(job1, 'outputs', ('list_all_but_one', fold))]} links[test_jobs[fold]] = { 'inputs': [(job1, 'outputs', ('list_to_sequence', fold))]} workflow = Workflow(jobs, dependencies, name='train LOO', param_links=links) A map / reduce example:: # let's say job1 inputs a list and outputs another list, 'outputs' job1 = Job( command=['my_program', '%(inputs)s'], name='prepare_inputs', param_dict={'inputs': ['file1', 'file2', 'file3', 'file4', 'file5']}, has_outputs=True) # process_jobs are identical train jobs processing one of the input files # they output a single file, 'output' nfolds = len(job1.param_dict['inputs']) process_jobs = [] for fold in range(nfolds): job_proc = Job( command=['process_data', '%(input)s'], name='process_%d' % fold, param_dict={'input', 'undefined'}, has_outputs=True) process_jobs.append(job_proc) # a reduce node takes outputs of all processing nodes and, for instance, # concatenates them reduce_job = Job( command=['cat_program', '%(inputs)s'], name='cat_results', param_dict={'inputs': [], 'outputs': 'file_output'}) # building the workflow group_1 = Group(name='processing', elements=process_jobs) jobs = [job1] + process_jobs + [reduce_job] dependencies = [] links = {} links[reduce_job] = {'inputs': []} for fold in range(nfolds): links[process_jobs[fold]] = { 'inputs': [(job1, 'outputs', ('list_to_sequence', fold))]} links[reduce_job]['inputs'].append((process_job[fold], 'output', ('sequence_to_list', fold))) workflow = Workflow(jobs, root_group=[job1, group_1, reduce_job], dependencies, name='map-reduce', param_links=links) param_link_functions module +++++++++++++++++++++++++++ .. automodule:: param_link_functions :members: .. _jobs_with_conf: Jobs with configuration ======================= In Soma-Workflow 3.1 any job in a workflow is allowed to include some runtime configuration data. This is meant especially to enable the use of specific software which needs configuration, paths, or environment variables on the computing resource. This conficuration may of course be passed through environment variables (using the ``env`` variable / parameter of :class:`Job`, but also as a config dictionary which will be passed th the job. The job configuration is included in the job parameters as in the :ref:`dynamic output / input arguments passing system `. This thus allows 2 ways of passing the config dict to jobs: * as a named job parameter, `configuration_dict`. In this case this named parameter is reserved for configuration. It is used as another named parameter in the job:: job = Job(command=['my_program', '%(configuration_dict)s'], name='job1', configuration={'conf_var1': 'value1'}) The job will receive the configuration as a JSON string parameter * as a part of a parameters file. In this case the parameters file will be created before the job is run, even if the job does not use the outputs from another job, and the conficuration will be held in a sub-dictionary of this input JSON file, under the key ``"configuration_dict"``, separate from the ``"params"`` key. Therefore in this shape, ``configuration_dict`` is not a reserved parameter name:: job = Job(command=['my_program', name='job1', use_input_params_file=True, configuration={'conf_var1': 'value1'}) As in the dynamic parameters system, when the job is run, the environment variable ``SOMAWF_INPUT_PARAMS`` will hold the path of the input parameters / configuration file. .. _engine_execution_job: Engine execution jobs ===================== Soma-Workflow 3.1 introduced :class:`~client_types.EngineExecutionJob`. This class is the base class for some special jobs which will actually not run as jobs, but as python functions executed inside the engine process. They should be lightweight in processing, since the engine server should not have a heavy CPU load: it runs on the computing resource submitting machine (a cluster frontal typically) which is not designed to perform actual processing. Such jobs are meant to perform simple parameters transformations, such as transforming lists into series of single parameter values, or the opposite (map/reduce pattern), or string transformation (get a dirnname for instance) etc. :class:`~client_types.BarrierJob` used to be the only "special job" class in Soma-Workflow 2.10. They are now a subclass of :class:`~client_types.EngineExecutionJob` with the particularity of *not* doing any processing even on engine side. A few engine execution jobs types have been defined and are available for workflow developers. Other subclasses can be written, but should be installed and available on engine side to work. * :class:`~client_types.BarrierJob`: just a dependency hub. * :class:`~custom_jobs.MapJob`: *map* part of map/reduce pattern: transforms lists into series of single values parameters. * :class:`~custom_jobs.ReduceJob`: *reduce* part of map/reduce pattern: transforms series of single values parameters into list parameters. * :class:`~custom_jobs.ListCatJob`: conctenates several input lists. * :class:`~custom_jobs.LeaveOneOutJob`: leve-one-out pattern. * :class:`~custom_jobs.CrossValidationFoldJob`: cross-validation pattern. Such jobs are alternatives to :ref:`params_link_functions` described above.