import infractl
To run a Python program remotely:
await infractl.run(infractl.program('my_flow.py'))
To specify arguments for the program:
await infractl.run(infractl.program('my_flow.py'), parameters=['--help'])
To specify a function to execute:
await infractl.run(infractl.program('my_flow.py', name='main'))
To specify a function to execute and its parameters:
await infractl.run(infractl.program('my_flow.py', name='main'), parameters={'foo': 'bar'})
Currently, ICL uses Prefect for defining basic workflow building blocks: flow and tasks.
Create a Python file my_flow.py that defines a single flow my_flow:
from prefect import flow
@flow
def my_flow():
print('Hello from my_flow')
Note this is a regular Python file, so it can be developed, tested, and executed locally.
The following code deploys and runs flow my_flow in the default infrastructure:
await infractl.run(infractl.program('my_flow.py'))
If the Python file contains of multiple flows, it’s required to choose the flow that will be deployed. Example:
await infractl.run(infractl.program('my_flow.py', name='my_flow'))
Currently, the default infrastructure is expected to be available locally and accessible via localtest.me, which resolves to 127.0.0.1.
To deploy a flow to a remote ICL cluster define a custom infrastructure with an address of the remote cluster.
infrastructure = infractl.infrastructure(address='mycluster.example.com')
await infractl.deploy(infractl.program('my_flow.py'), infrastructure=infrastructure)
To run a flow with parameters:
await infractl.deploy(infractl.program('my_flow.py'), parameters={'url': 'https://example.com'})
Note that Prefect recommends using pydantic models to define flow parameters.
Infrastructure has the following parameters:
address - an optional address of the infrastructure.
If not specified, then the default infrastructure will be used, which is expected to be available locally with address localtest.me, which resolves to 127.0.0.1.
See the instructions to install a local single node ICL cluster.
gpus - an optional request for GPUs in the infrastructure.
If not specified, then no GPUs will be available.
If specified as a number, then the specified number of any GPUs will be available.
If specified as a tuple, for example, ('gpu.intel.com/i915', 1), then the specified number of requested GPUs will be available.
Runtime has the following parameters:
environment - environment variables to set in the runtime.
dependencies - Python dependencies to install in the runtime.
files - files that must be present in the runtime, see runtime files.
Example:
runtime = infractl.runtime(
environment={'foo': 'bar'},
dependencies={'pip': ['boto', 'botocore']},
)
infractl.run(infractl.program('my_flow.py'), runtime=runtime)
Currently, dependencies accepts only requirements that can be installed with pip.
The value for pip is a list of pip requirements specifiers.
Example:
runtime = infractl.runtime(
# 'data.csv' from the local working directory will be copied to the runtime working directory
files=['data.csv'],
)
await infractl.run(infractl.program('my_flow.py'), runtime=runtime)
When deploying a flow, it is possible to specify additional parameters for Prefect Deployment. Below are several examples of using deployment parameters for Prefect flow:
To add tags to the Prefect flow:
infractl.run(infractl.program('my_flow.py'), tags=['my_flow'])
To specify a schedule for a flow:
# This schedule will create flow runs for this deployment every day at midnight.
await infractl.run(infractl.program('my_flow.py'), cron='0 0 * * *')
To apply a JSON patch to the Prefect Kubernetes job:
# This customization adds a second container `busybox` to the pod.
# Make sure to terminate the second container when the main container completes!
customizations = [
{
'op': 'add',
'path': '/spec/template/spec/containers/1',
'value': {
'name': 'busybox',
'image': 'busybox',
'cmd': ['/bin/echo', 'hello from busybox'],
},
},
]
await infractl.run(infractl.program('my_flow.py'), customizations=customizations)
There are 3 options to specify a Prefect flow name for the program:
With argument name in infractl.deploy():
infractl.run(program, name='renamed')
With argument name in flow decorator:
@flow(name='renamed')
def my_flow():
...
If both arguments are not specified, then Prefect will derive a flow name from a function name.
For example, function my_flow will be deployed as flow my-flow.
parameters - a dictionary of named arguments if program’s entrypoint is a function,
otherwise - a list of arguments.
timeout - timeout in seconds for waiting for the program to complete,
None to wait forever (default).
detach - False to wait for the program completion (default), True - do not wait for program completion.
# Run the flow without waiting for the Terminal state, use `detach=True`
program = await infractl.run(
infractl.program('flows/flow7.py'),
detach=True,
)
# this command will print logs to sys.stdout
await program.stream_logs()
# this command will print logs to `file`
file = StringIO()
await program.stream_logs(file=file)
# how to read logs
file.seek(0)
logs = file.read()
# It is possible to change the frequency of sending requests to receive logs via `poll_interval`.
await program.stream_logs(poll_interval=60) # 60 sec
infractl build API allows building custom Docker images and pushing them to a Docker registry. For example, to build a Docker image and push it to the private Docker registry in the provided infrastructure:
import infractl.docker
builder = infractl.docker.builder()
image = builder.build(path='docker/my-image', tag='my-image:0.0.1')