Workflow#
Interface to underlying gRPC workflow.
- class ansys.dpf.core.workflow.Workflow(workflow=None, server=None)#
Represents a workflow.
A workflow is a black box containing operators and exposing only the necessary operator’s inputs and outputs to compute a given algorithm.
- Parameters
server (ansys.dpf.core.server, optional) – Server with the channel connected to the remote or local instance. The default is
None
, in which case an attempt is made to use the global server.workflow (workflow_message_pb2.Workflow) –
Examples
Create a generic workflow computing the minimum of displacement by chaining the
'U'
and'min_max_fc'
operators.>>> from ansys.dpf import core as dpf >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow = dpf.Workflow() >>> workflow.add_operators([disp_op,max_fc_op]) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> workflow.set_output_name("min", max_fc_op.outputs.field_min) >>> workflow.set_output_name("max", max_fc_op.outputs.field_max)
>>> from ansys.dpf.core import examples >>> data_src = dpf.DataSources(examples.multishells_rst) >>> workflow.connect("data_sources", data_src) >>> min = workflow.get_output("min", dpf.types.field) >>> max = workflow.get_output("max", dpf.types.field)
- connect(pin_name, inpt, pin_out=0)#
Connect an input on the workflow using a pin name.
- Parameters
pin_name (str) – Name of the pin to connect. This name should be exposed before with wf.set_input_name
inpt (str, int, double, bool, list of int, list of doubles,) – Field, FieldsContainer, Scoping, ScopingsContainer,
MeshedRegion – Object to connect to.
MeshesContainer – Object to connect to.
DataSources – Object to connect to.
Operator – Object to connect to.
pin_out (int, optional) – If the input is an operator, the output pin of the input operator. The default is
0
.
Examples
Create a generic workflow computing the minimum of displacement by chaining the
'U'
and'min_max_fc'
operators.>>> from ansys.dpf import core as dpf >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow = dpf.Workflow() >>> workflow.add_operators([disp_op,max_fc_op]) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> workflow.set_output_name("min", max_fc_op.outputs.field_min) >>> workflow.set_output_name("max", max_fc_op.outputs.field_max)
>>> from ansys.dpf.core import examples >>> data_src = dpf.DataSources(examples.multishells_rst) >>> workflow.connect("data_sources", data_src) >>> min = workflow.get_output("min", dpf.types.field) >>> max = workflow.get_output("max", dpf.types.field)
- get_output(pin_name, output_type)#
Retrieve the output of the operator on the pin number. A progress bar following the workflow state is printed.
- Parameters
pin_name (str) – Name of the pin to retrieve. This name should be exposed before with wf.set_output_name
output_type (core.type enum) – Type of the requested output.
- set_input_name(name, *args)#
Set the name of the input pin of the workflow to expose it for future connection.
- Parameters
name (str) – Name of the pin to connect. This name should be exposed before with wf.set_input_name
*args (core.Operator, core.Input, int) – Operator with its input pin number or input to name.
Examples
>>> from ansys.dpf import core as dpf
>>> workflow = dpf.Workflow() >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources)
>>> from ansys.dpf.core import examples >>> data_src = dpf.DataSources(examples.multishells_rst) >>> workflow.connect("data_sources", data_src)
- set_output_name(name, *args)#
Set the name of the output pin of the workflow to expose it for future connection.
- Parameters
name (str) – Name of the pin to connect. This name should be exposed before with wf.set_input_name
*args (core.Operator, core.Output, int) – Operator with its outpt pin number or output to name.
Examples
>>> from ansys.dpf import core as dpf >>> from ansys.dpf.core import examples >>> workflow = dpf.Workflow() >>> model = dpf.Model(examples.simple_bar) >>> disp_op = model.results.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow.set_output_name("contour", disp_op.outputs.fields_container) >>> fc = workflow.get_output("contour", dpf.types.fields_container)
- add_operators(operators)#
Add operators to the list of operators of the workflow.
- Parameters
operators (dpf.core.Operator, list of dpf.core.Operator) – Operators to add to the list.
Examples
>>> from ansys.dpf import core as dpf
>>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> max_op = dpf.Operator("min_max") >>> workflow.add_operator([disp_op,max_op])
- add_operator(operator)#
Add an operator to the list of operators of the workflow.
- Parameters
operator (dpf.core.Operator) – Operator to add to the list.
Examples
>>> from ansys.dpf import core as dpf
>>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> workflow.add_operator(disp_op)
- record(identifier=None, transfer_ownership=True)#
Add the workflow to DPF’s internal registry with an ID returned by this method.
The workflow can be recovered by
dpf.core.Workflow.get_recorded_workflow(id)
.- Parameters
identifier (str, optional) – Name given to the workflow.
transfer_ownership (bool) – Whether to transfer the ownership. The default is
True
. If the ownership is not transferred, the workflow is removed from the internal registry as soon as the workflow has been recovered by its ID.
Examples
>>> from ansys.dpf import core as dpf
>>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> workflow.add_operator(disp_op) >>> # ... >>> id = workflow.record() >>> workflow_copy = dpf.Workflow.get_recorded_workflow(id)
- static get_recorded_workflow(id, server=None)#
Retrieve a workflow registered (with workflow.record())
- Parameters
id (int) – ID given by the method “record”.
- Returns
workflow – workflow registered in dpf’s registry (server side)
- Return type
core.Workflow()
Examples
>>> from ansys.dpf import core as dpf
>>> workflow = dpf.Workflow() >>> disp_op = dpf.Operator("U") >>> workflow.add_operator(disp_op) >>> # ... >>> id = workflow.record() >>> workflow_copy = dpf.Workflow.get_recorded_workflow(id)
- property info#
Dictionary with the operator names and the exposed input and output names.
- Returns
info – Dictionary with
"operator_names"
,"input_names"
, and"output_names"
key.- Return type
dictionarry str->list str
- property operator_names#
List of the names of operators added in the workflow.
- Returns
names
- Return type
list str
- property input_names#
List of the input names exposed in the workflow with set_input_name.
- Returns
names
- Return type
list str
- property output_names#
List of the output names exposed in the workflow with set_output_name.
- Returns
names
- Return type
list str
- connect_with(left_workflow, output_input_names=None)#
Chain 2 workflows together so that they become one workflow.
The one workflow contains all the operators, inputs, and outputs exposed in both workflows.
- Parameters
left_workflow (core.Workflow) – Second workflow’s outputs to chained with this workflow’s inputs.
output_input_names (str tuple, str dict optional) – Input name of the left_workflow to be cained with the output name of this workflow. The default is
None
, in which case the inputs in the left_workflow with the same names as the outputs of this workflow are chained.
Examples
+-------------------------------------------------------------------------------------------------+ | INPUT: | | | |input_output_names = ("output","field" ) | | _____________ ____________ | | "data_sources" -> |left_workflow| -> "stuff" "field" -> | this | -> "contour" | |"time_scoping" -> | | "mesh_scoping" -> | | | | |_____________| -> "output" |____________| | | OUTPUT | | ____ | |"data_sources" -> |this| -> "stuff" | |"time_scoping" -> | | -> "contour" | |"mesh_scoping" -> |____| -> "output" | +-------------------------------------------------------------------------------------------------+ # noqa: E501
- create_on_other_server(*args, **kwargs)#
Create a new instance of a workflow on another server. The new Workflow has the same operators, exposed inputs and output pins as this workflow. Connections between operators and between data and operators are kept (except for exposed pins).
- Parameters
server (server.DPFServer, optional) – Server with channel connected to the remote or local instance. When
None
, attempts to use the global server.ip (str, optional) – ip address on which the new instance should be created (always put a port in args as well)
port (str, int , optional) –
address (str, optional) – address on which the new instance should be created (“ip:port”)
- Return type
Examples
Create a generic Workflow computing the minimum of displacement by chaining the
'U'
and'min_max_fc'
operators.>>> from ansys.dpf import core as dpf >>> disp_op = dpf.operators.result.displacement() >>> max_fc_op = dpf.operators.min_max.min_max_fc(disp_op) >>> workflow = dpf.Workflow() >>> workflow.add_operators([disp_op,max_fc_op]) >>> workflow.set_input_name("data_sources", disp_op.inputs.data_sources) >>> workflow.set_output_name("min", max_fc_op.outputs.field_min) >>> workflow.set_output_name("max", max_fc_op.outputs.field_max) >>> #other_server = dpf.start_local_server(as_global=False) >>> #new_workflow = workflow.create_on_other_server(server=other_server) >>> #assert 'data_sources' in new_workflow.input_names