Skip to content

Commit a990446

Browse files
chore(sdk): local task execution refactor + cleanup (kubeflow#10420)
1 parent ddb2f9a commit a990446

12 files changed

+198
-38
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,6 @@ __pycache__
8585
# Coverage
8686
.coverage
8787
.coverage*
88+
89+
# kfp local execution default directory
90+
local_outputs/

sdk/python/kfp/dsl/pipeline_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def _execute_locally(self, args: Dict[str, Any]) -> None:
193193
raise NotImplementedError(
194194
'Local pipeline execution is not currently supported.')
195195

196-
self._outputs = task_dispatcher.run_single_component(
196+
self._outputs = task_dispatcher.run_single_task(
197197
pipeline_spec=self.component_spec.to_pipeline_spec(),
198198
arguments=args,
199199
)

sdk/python/kfp/local/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import os
1818
from typing import Union
1919

20+
from kfp import local
21+
2022

2123
class LocalRunnerType(abc.ABC):
2224
"""The ABC for user-facing Runner configurations.
@@ -85,6 +87,13 @@ def __init__(
8587
self.pipeline_root = pipeline_root
8688
self.raise_on_error = raise_on_error
8789

90+
@classmethod
91+
def validate(cls):
92+
if cls.instance is None:
93+
raise RuntimeError(
94+
f"Local environment not initialized. Please run '{local.__name__}.{init.__name__}()' before executing tasks locally."
95+
)
96+
8897

8998
def init(
9099
# annotate with subclasses, not parent class, for more helpful ref docs

sdk/python/kfp/local/config_test.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ def test_local_runner_config_is_singleton(self):
5959
local.SubprocessRunner(use_venv=False))
6060
self.assertFalse(instance.raise_on_error, False)
6161

62+
def test_validate_success(self):
63+
config.LocalExecutionConfig(
64+
pipeline_root='other/local/root',
65+
runner=local.SubprocessRunner(use_venv=False),
66+
raise_on_error=False,
67+
)
68+
config.LocalExecutionConfig.validate()
69+
70+
def test_validate_fail(self):
71+
with self.assertRaisesRegex(
72+
RuntimeError,
73+
f"Local environment not initialized. Please run 'kfp\.local\.init\(\)' before executing tasks locally\."
74+
):
75+
config.LocalExecutionConfig.validate()
76+
6277

6378
class TestInitCalls(unittest.TestCase):
6479

sdk/python/kfp/local/executor_input_utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from typing import Any, Dict
1818

1919
from google.protobuf import json_format
20+
from google.protobuf import struct_pb2
2021
from kfp.compiler import pipeline_spec_builder
2122
from kfp.dsl import utils
2223
from kfp.pipeline_spec import pipeline_spec_pb2
@@ -60,7 +61,7 @@ def construct_executor_input(
6061
for param_name in output_parameter_keys
6162
},
6263
artifacts={
63-
artifact_name: make_artifact_list(
64+
artifact_name: artifact_type_schema_to_artifact_list(
6465
name=artifact_name,
6566
artifact_type=artifact_spec.artifact_type,
6667
task_root=task_root,
@@ -116,7 +117,7 @@ def construct_local_task_root(
116117
)
117118

118119

119-
def make_artifact_list(
120+
def artifact_type_schema_to_artifact_list(
120121
name: str,
121122
artifact_type: pipeline_spec_pb2.ArtifactTypeSchema,
122123
task_root: str,
@@ -128,7 +129,7 @@ def make_artifact_list(
128129
type=artifact_type,
129130
uri=os.path.join(task_root, name),
130131
# metadata always starts empty for output artifacts
131-
metadata={},
132+
metadata=struct_pb2.Struct(),
132133
)
133134
])
134135

sdk/python/kfp/local/logging_utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from typing import Any, Dict, Generator, List
2121

2222
from kfp import dsl
23+
from kfp.local import status
2324

2425

2526
class Color:
@@ -139,3 +140,16 @@ def make_log_lines_for_outputs(outputs: Dict[str, Any]) -> List[str]:
139140
output_lines.append(f'{key_chars}{value}')
140141

141142
return output_lines
143+
144+
145+
def format_task_name(task_name: str) -> str:
146+
return color_text(f'{task_name!r}', Color.CYAN)
147+
148+
149+
def format_status(task_status: status.Status) -> str:
150+
if task_status == status.Status.SUCCESS:
151+
return color_text(task_status.name, Color.GREEN)
152+
elif task_status == status.Status.FAILURE:
153+
return color_text(task_status.name, Color.RED)
154+
else:
155+
raise ValueError(f'Got unknown status: {task_status}')

sdk/python/kfp/local/logging_utils_test.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from kfp import dsl
2121
from kfp.local import logging_utils
22+
from kfp.local import status
2223

2324

2425
class TestIndentedPrint(unittest.TestCase):
@@ -202,5 +203,31 @@ def test_mix_params_and_artifacts(self):
202203
self.assertListEqual(actual, expected)
203204

204205

206+
class TestFormatStatus(unittest.TestCase):
207+
208+
def test_success_status(self):
209+
self.assertEqual(
210+
logging_utils.format_status(status.Status.SUCCESS),
211+
'\x1b[92mSUCCESS\x1b[0m')
212+
213+
def test_failure_status(self):
214+
self.assertEqual(
215+
logging_utils.format_status(status.Status.FAILURE),
216+
'\x1b[91mFAILURE\x1b[0m')
217+
218+
def test_invalid_status(self):
219+
with self.assertRaisesRegex(ValueError,
220+
r'Got unknown status: INVALID_STATUS'):
221+
logging_utils.format_status('INVALID_STATUS')
222+
223+
224+
class TestFormatTaskName(unittest.TestCase):
225+
226+
def test(self):
227+
self.assertEqual(
228+
logging_utils.format_task_name('my-task'),
229+
'\x1b[96m\'my-task\'\x1b[0m')
230+
231+
205232
if __name__ == '__main__':
206233
unittest.main()

sdk/python/kfp/local/placeholder_utils_test.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@
5151
},
5252
'uri':
5353
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/out_a',
54+
# include metadata on outputs since it allows us to
55+
# test the placeholder
56+
# "{{$.outputs.artifacts[''out_a''].metadata[''foo'']}}"
57+
# for comprehensive testing, but in practice metadata
58+
# will never be set on output artifacts since they
59+
# haven't been created yet
5460
'metadata': {
5561
'foo': {
5662
'bar': 'baz'
@@ -62,7 +68,8 @@
6268
'outputFile':
6369
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/executor_output.json'
6470
}
65-
}, executor_input)
71+
},
72+
executor_input)
6673

6774
EXECUTOR_INPUT_DICT = json_format.MessageToDict(executor_input)
6875

@@ -96,7 +103,6 @@ def test(self):
96103
class TestResolveIndividualPlaceholder(parameterized.TestCase):
97104

98105
# TODO: consider supporting JSON escape
99-
# TODO: update when input artifact constants supported
100106
# TODO: update when output lists of artifacts are supported
101107
@parameterized.parameters([
102108
(

sdk/python/kfp/local/task_dispatcher.py

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
"""Code for dispatching a local task execution."""
1515
import logging
16-
from typing import Any, Dict
16+
from typing import Any, Dict, Tuple
1717

1818
from kfp import local
1919
from kfp.local import config
@@ -25,10 +25,11 @@
2525
from kfp.local import status
2626
from kfp.local import subprocess_task_handler
2727
from kfp.local import task_handler_interface
28+
from kfp.local import utils
2829
from kfp.pipeline_spec import pipeline_spec_pb2
2930

3031

31-
def run_single_component(
32+
def run_single_task(
3233
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
3334
arguments: Dict[str, Any],
3435
) -> Dict[str, Any]:
@@ -41,36 +42,59 @@ def run_single_component(
4142
Returns:
4243
A LocalTask instance.
4344
"""
44-
if config.LocalExecutionConfig.instance is None:
45-
raise RuntimeError(
46-
f"Local environment not initialized. Please run '{local.__name__}.{local.init.__name__}()' before executing tasks locally."
47-
)
45+
config.LocalExecutionConfig.validate()
46+
component_name, component_spec = list(pipeline_spec.components.items())[0]
47+
executor_spec = get_executor_spec(
48+
pipeline_spec,
49+
component_spec.executor_label,
50+
)
51+
executor_spec = utils.struct_to_executor_spec(executor_spec)
52+
4853
# all global state should be accessed here
4954
# do not access local config state downstream
50-
return _run_single_component_implementation(
51-
pipeline_spec=pipeline_spec,
55+
outputs, _ = _run_single_task_implementation(
56+
pipeline_name=pipeline_spec.pipeline_info.name,
57+
component_name=component_name,
58+
component_spec=component_spec,
59+
executor_spec=executor_spec,
5260
arguments=arguments,
5361
pipeline_root=config.LocalExecutionConfig.instance.pipeline_root,
5462
runner=config.LocalExecutionConfig.instance.runner,
5563
raise_on_error=config.LocalExecutionConfig.instance.raise_on_error,
5664
)
65+
return outputs
5766

5867

59-
def _run_single_component_implementation(
68+
def get_executor_spec(
6069
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
70+
executor_label: str,
71+
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec:
72+
return pipeline_spec.deployment_spec['executors'][executor_label]
73+
74+
75+
Outputs = Dict[str, Any]
76+
77+
78+
def _run_single_task_implementation(
79+
pipeline_name: str,
80+
component_name: str,
81+
component_spec: pipeline_spec_pb2.ComponentSpec,
82+
executor_spec: pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec,
6183
arguments: Dict[str, Any],
6284
pipeline_root: str,
6385
runner: config.LocalRunnerType,
6486
raise_on_error: bool,
65-
) -> Dict[str, Any]:
66-
"""The implementation of a single component runner."""
87+
) -> Tuple[Outputs, status.Status]:
88+
"""The implementation of a single component runner.
6789
68-
component_name, component_spec = list(pipeline_spec.components.items())[0]
90+
Returns a tuple of (outputs, status). If status is FAILURE, outputs
91+
is an empty dictionary.
92+
"""
6993

70-
pipeline_resource_name = executor_input_utils.get_local_pipeline_resource_name(
71-
pipeline_spec.pipeline_info.name)
7294
task_resource_name = executor_input_utils.get_local_task_resource_name(
7395
component_name)
96+
pipeline_resource_name = executor_input_utils.get_local_pipeline_resource_name(
97+
pipeline_name)
7498
task_root = executor_input_utils.construct_local_task_root(
7599
pipeline_root=pipeline_root,
76100
pipeline_resource_name=pipeline_resource_name,
@@ -82,15 +106,9 @@ def _run_single_component_implementation(
82106
task_root=task_root,
83107
)
84108

85-
executor_spec = pipeline_spec.deployment_spec['executors'][
86-
component_spec.executor_label]
87-
88-
container = executor_spec['container']
89-
image = container['image']
90-
91-
command = list(container['command']) if 'command' in container else []
92-
args = list(container['args']) if 'args' in container else []
93-
full_command = command + args
109+
container = executor_spec.container
110+
image = container.image
111+
full_command = list(container.command) + list(container.args)
94112

95113
executor_input_dict = executor_input_utils.executor_input_to_dict(
96114
executor_input=executor_input,
@@ -115,10 +133,7 @@ def _run_single_component_implementation(
115133
TaskHandler = task_handler_map[runner_type]
116134

117135
with logging_utils.local_logger_context():
118-
task_name_for_logs = logging_utils.color_text(
119-
f'{task_resource_name!r}',
120-
logging_utils.Color.CYAN,
121-
)
136+
task_name_for_logs = logging_utils.format_task_name(task_resource_name)
122137

123138
logging.info(f'Executing task {task_name_for_logs}')
124139
task_handler = TaskHandler(
@@ -137,7 +152,7 @@ def _run_single_component_implementation(
137152

138153
if task_status == status.Status.SUCCESS:
139154
logging.info(
140-
f'Task {task_name_for_logs} finished with status {logging_utils.color_text(task_status.value, logging_utils.Color.GREEN)}'
155+
f'Task {task_name_for_logs} finished with status {logging_utils.format_status(task_status)}'
141156
)
142157

143158
outputs = executor_output_utils.get_outputs_for_task(
@@ -148,14 +163,13 @@ def _run_single_component_implementation(
148163
output_string = [
149164
f'Task {task_name_for_logs} outputs:',
150165
*logging_utils.make_log_lines_for_outputs(outputs),
151-
'\n',
152166
]
153167
logging.info('\n'.join(output_string))
154168
else:
155169
logging.info(f'Task {task_name_for_logs} has no outputs')
156170

157171
elif task_status == status.Status.FAILURE:
158-
msg = f'Task {task_name_for_logs} finished with status {logging_utils.color_text(task_status.value, logging_utils.Color.RED)}'
172+
msg = f'Task {task_name_for_logs} finished with status {logging_utils.format_status(task_status)}'
159173
if raise_on_error:
160174
raise RuntimeError(msg)
161175
else:
@@ -166,4 +180,4 @@ def _run_single_component_implementation(
166180
# for developers; user should never hit this
167181
raise ValueError(f'Got unknown status: {task_status}')
168182

169-
return outputs
183+
return outputs, task_status

sdk/python/kfp/local/task_dispatcher_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def many_type_component(
271271
r'Wrote executor output file to',
272272
r'.*',
273273
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n",
274-
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: 'hellohello'\n model: Model\( name='model',\n uri='[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model',\n metadata={'foo': 'bar'} \)\n\n",
274+
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: 'hellohello'\n model: Model\( name='model',\n uri='[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model',\n metadata={'foo': 'bar'} \)\n",
275275
]
276276

277277
self.assertRegex(

sdk/python/kfp/local/utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright 2024 The Kubeflow Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Assorted utilities."""
15+
16+
from google.protobuf import json_format
17+
from google.protobuf import struct_pb2
18+
from kfp.pipeline_spec import pipeline_spec_pb2
19+
20+
21+
def struct_to_executor_spec(
22+
struct: struct_pb2.Struct,
23+
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec:
24+
executor_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec()
25+
json_format.ParseDict(json_format.MessageToDict(struct), executor_spec)
26+
return executor_spec

0 commit comments

Comments
 (0)