Source code for custom_jobs

# -*- coding: utf-8 -*-

'''
Small library of custom :class:`~client_types.EngineExecutionJob` subclasses.

Provides jobs for map/reduce patterns, cross-validation folding, and lists manipulations.
'''

from __future__ import absolute_import
from soma_workflow.client_types import Job, EngineExecutionJob, BarrierJob
from six.moves import range
from six.moves import zip


[docs]class MapJob(EngineExecutionJob): ''' Map job: converts lists into series of single items. Typically an input named ``inputs`` is a list of items. The job will separate items and output each of them as an output parameter. The i-th item will be output as ``output_<i>`` by default. The inputs / outputs names can be customized using the named parameters ``input_names`` and ``output_names``. Several lists can be split in the same job. The job will also output a ``lengths`` parameter which will contain the input lists lengths. This lengths can typically be input in reduce jobs to perform the reverse operation (see :class:`ReduceJob`). * ``input_names`` is a list of input parameters names, each being a list to be split. The default is ``['inputs']`` * ``output_names`` is a list of patterns used to build output parameters names. Each item is a string containing a substitution pattern ``"%d"`` which will be replaced with a number. The default is ``['output_%d']``. Each pattern will be used to replace items from the corresponding input in the same order. Thus ``input_names`` and ``output_names`` should be the same length. * all other parameters given in ``param_dict`` are passed to the output dictionary of the job, so that the job acts as a :class:`~soma_workflow.client_types.BarrierJob` for parameters which are not "mapped". ''' def __init__(self, command=[], referenced_input_files=None, referenced_output_files=None, name='map', param_dict=None, **kwargs): if param_dict is None: param_dict = {} if 'input_names' not in param_dict: param_dict['input_names'] = ['inputs'] if 'output_names' not in param_dict: param_dict['output_names'] = ['output_%d'] for inp in param_dict['input_names']: if inp not in param_dict: param_dict[inp] = [] super(MapJob, self).__init__( command=[], referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, name=name, param_dict=param_dict, has_outputs=True) @classmethod def engine_execution(cls, self): input_names = self.param_dict.get('input_names', ['inputs']) output_names = self.param_dict.get('output_names', ['output_%d']) out_dict = dict(self.param_dict) for name in ['input_names', 'output_names'] + input_names \ + output_names: if name in out_dict: del out_dict[name] lengths = [] for inp, out in zip(input_names, output_names): inputs = self.param_dict[inp] for i, item in enumerate(inputs): out_dict[out % i] = item lengths.append(len(inputs)) out_dict['lengths'] = lengths return out_dict
[docs]class ReduceJob(EngineExecutionJob): ''' Reduce job: converts series of inputs into lists. Typically a series of inputs named ``input_0`` .. ``input_<n>`` will be output as a single list named ``outputs``. Several input series can be handled by the job, and input names can be customized. * The numbers of inputs for each series is given as the ``lengths`` input parameter. It is typically linked from the output of a :class:`MapJob`. * Input parameters names patterns are given as the ``input_names`` parameter. It is a list of patterns, each containing a ``%d``pattern for the input number. The defaut value is ``['input_%d']``. * Output parameters names are given as the ``output_names`` parameter. The default is ``['outputs']``. * all other parameters given in ``param_dict`` are passed to the output dictionary of the job, so that the job acts as a :class:`~soma_workflow.client_types.BarrierJob` for parameters which are not "reduced". ''' def __init__(self, command=[], referenced_input_files=None, referenced_output_files=None, name='reduce', param_dict=None, **kwargs): if param_dict is None: param_dict = {} if 'input_names' not in param_dict: param_dict['input_names'] = ['input_%d'] if 'output_names' not in param_dict: param_dict['output_names'] = ['outputs'] if 'lengths' not in param_dict: param_dict['lengths'] = [0] super(ReduceJob, self).__init__( command=[], referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, name=name, param_dict=param_dict, has_outputs=True) self.resize_inputs() def resize_inputs(self): for param, l in zip(self.param_dict['input_names'], self.param_dict['lengths']): for i in range(l): p = param % i if p not in self.param_dict: self.param_dict[p] = '' i = l while param % i in self.param_dict: del self.param_dict[param % i] i += 1 @classmethod def engine_execution(cls, self): input_names = self.param_dict.get('input_names', ['input_%d']) output_names = self.param_dict.get('output_names', ['outputs']) lengths = self.param_dict['lengths'] out_dict = dict(self.param_dict) for name in ['input_names', 'output_names', 'lengths'] + input_names \ + output_names: if name in out_dict: del out_dict[name] for inp, out, length in zip(input_names, output_names, lengths): out_list = [self.param_dict[inp % i] for i in range(length)] out_dict[out] = out_list return out_dict
[docs]class LeaveOneOutJob(EngineExecutionJob): ''' Removes an element from an input list, outputs it on a single separate output. The input list should be specified as the ``inputs`` parameter, and the item index as ``index``. The output parameters ``train`` and ``test`` will be assigned the modified list and extracted element, respectively. ''' def __init__(self, command=[], referenced_input_files=None, referenced_output_files=None, name='leave_one_out', param_dict=None, **kwargs): if param_dict is None: param_dict = {} if 'inputs' not in param_dict: param_dict['inputs'] = [] if 'index' not in param_dict: param_dict['index'] = 0 super(LeaveOneOutJob, self).__init__( command=[], referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, name=name, param_dict=param_dict, has_outputs=True) @classmethod def engine_execution(cls, self): inputs = self.param_dict['inputs'] index = self.param_dict['index'] output_item = inputs[index] output_list = inputs[:index] + inputs[index + 1:] out_dict = { 'train': output_list, 'test': output_item, } return out_dict
[docs]class CrossValidationFoldJob(EngineExecutionJob): ''' Separates an input list into folds, one (larger) for training, one (smaller) for testing. The input list ``inputs`` is separated into folds. The number of folds should be specified as the ``nfolds`` parameter, the fold number as ``fold``. Outputs are ``train` and ``test`` parameters. ''' def __init__(self, command=[], referenced_input_files=None, referenced_output_files=None, name='cross_validation', param_dict=None, **kwargs): if param_dict is None: param_dict = {} if 'inputs' not in param_dict: param_dict['inputs'] = [] if 'nfolds' not in param_dict: param_dict['nfolds'] = 2 if 'fold' not in param_dict: param_dict['fold'] = 0 super(CrossValidationFoldJob, self).__init__( command=[], referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, name=name, param_dict=param_dict, has_outputs=True) @classmethod def engine_execution(cls, self): inputs = self.param_dict['inputs'] fold = self.param_dict['fold'] nfolds = self.param_dict['nfolds'] nitems = len(inputs) fold_size = nitems // nfolds nsupp = nitems % nfolds begin = fold_size * fold begin += min(begin, nsupp) end = fold_size * (fold + 1) end += min(end, nsupp) train = inputs[:begin] + inputs[end:] test = inputs[begin:end] out_dict = { 'train': train, 'test': test, } return out_dict
[docs]class ListCatJob(EngineExecutionJob): ''' Concatenates several lists into a single list The input lists should be specified as the ``inputs`` parameter (a list of lists, thus). The output parameter ``outputs`` will be assigned the concatenated list. ''' def __init__(self, command=[], referenced_input_files=None, referenced_output_files=None, name='list_cat', param_dict=None, **kwargs): if param_dict is None: param_dict = {} if 'inputs' not in param_dict: param_dict['inputs'] = [] super(ListCatJob, self).__init__( command=[], referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, name=name, param_dict=param_dict, has_outputs=True) @classmethod def engine_execution(cls, self): inputs = self.param_dict['inputs'] outputs = [] for in_list in inputs: outputs += in_list out_dict = { 'outputs': outputs, } return out_dict
[docs]class StrCatJob(EngineExecutionJob): ''' Concatenates inputs into a string Inputs listed in ``input_names`` are concatenated into an output string. Inputs may be strings or lists of strings. Listes are also concatenated into a string. The output parameter is given as the ``output_name`` parameter, if given, and defaults to `` output`` otherwise. ``input_names`` is optional and defaults to ``inputs`` (thus by default the job expects a single list). ''' def __init__(self, command=[], referenced_input_files=None, referenced_output_files=None, name='strcat', param_dict=None, **kwargs): if param_dict is None: param_dict = {} if 'input_names' not in param_dict: param_dict['input_names'] = ['inputs'] if 'output_name' not in param_dict: param_dict['output_name'] = 'output' output_name = param_dict['output_name'] if output_name not in param_dict: param_dict[output_name] = '' for iname in param_dict['input_names']: if iname not in param_dict: param_dict[iname] = '' super(StrCatJob, self).__init__( command=[], referenced_input_files=referenced_input_files, referenced_output_files=referenced_output_files, name=name, param_dict=param_dict, has_outputs=True) @classmethod def engine_execution(cls, self): input_names = self.param_dict['input_names'] output_name = self.param_dict['output_name'] outputs = [] for name in input_names: value = self.param_dict[name] if isinstance(value, list): outputs.append(''.join(value)) else: outputs.append(value) output = ''.join(outputs) out_dict = { output_name: output, } return out_dict