Skip to content

Commit ff7f24e

Browse files
authored
Improvements and features (#260)
1 parent 6ba04d6 commit ff7f24e

18 files changed

+508
-16
lines changed

examples/helloworld/greetings_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
This file contains a Simple Worker that can be used in any workflow.
33
For detailed information https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-2-write-worker
4-
""""
4+
"""
55
from conductor.client.worker.worker_task import worker_task
66

77

File renamed without changes.
File renamed without changes.
File renamed without changes.

examples/orkes/http_poll.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import uuid
2+
3+
from conductor.client.orkes_clients import OrkesClients
4+
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
5+
from conductor.client.workflow.task.http_poll_task import HttpPollTask, HttpPollInput
6+
7+
8+
def main():
9+
workflow_executor = OrkesClients().get_workflow_executor()
10+
workflow = ConductorWorkflow(executor=workflow_executor, name='http_poll_example_' + str(uuid.uuid4()))
11+
http_poll = HttpPollTask(task_ref_name='http_poll_ref',
12+
http_input=HttpPollInput(
13+
uri='https://orkes-api-tester.orkesconductor.com/api',
14+
polling_strategy='EXPONENTIAL_BACKOFF',
15+
polling_interval=1000,
16+
termination_condition='(function(){ return $.output.response.body.randomInt < 10;})();'),
17+
)
18+
workflow >> http_poll
19+
20+
# execute the workflow to get the results
21+
result = workflow.execute(workflow_input={}, wait_for_seconds=10)
22+
print(f'result: {result.output}')
23+
24+
25+
if __name__ == '__main__':
26+
main()

examples/orkes/multiagent_chat.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import time
2+
import uuid
3+
from typing import List
4+
5+
from conductor.client.ai.orchestrator import AIOrchestrator
6+
from conductor.client.automator.task_handler import TaskHandler
7+
from conductor.client.configuration.configuration import Configuration
8+
from conductor.client.orkes_clients import OrkesClients
9+
from conductor.client.worker.worker_task import worker_task
10+
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
11+
from conductor.client.workflow.task.do_while_task import LoopTask
12+
from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete, ChatMessage
13+
from conductor.client.workflow.task.set_variable_task import SetVariableTask
14+
from conductor.client.workflow.task.simple_task import SimpleTask
15+
from conductor.client.workflow.task.switch_task import SwitchTask
16+
from conductor.client.workflow.task.timeout_policy import TimeoutPolicy
17+
18+
19+
def main():
20+
agent1_provider = 'openai_v1'
21+
agent1_model = 'gpt-4'
22+
23+
agent1_provider = 'mistral'
24+
agent1_model = 'mistral-large-latest'
25+
26+
agent2_provider = 'anthropic_cloud'
27+
agent2_model = 'claude-3-sonnet-20240229'
28+
# anthropic_model = 'claude-3-opus-20240229'
29+
30+
moderator_provider = 'cohere_saas'
31+
moderator_model = 'command-r'
32+
33+
mistral = 'mistral'
34+
mistral_model = 'mistral-large-latest'
35+
36+
api_config = Configuration()
37+
38+
clients = OrkesClients(configuration=api_config)
39+
workflow_executor = clients.get_workflow_executor()
40+
workflow_client = clients.get_workflow_client()
41+
42+
moderator = 'moderator'
43+
moderator_text = """You are very good at moderating the debates and discussions. In this discussion, there are 2 panelists, ${ua1} and ${ua2}.
44+
As a moderator, you summarize the discussion so far, pick one of the panelist ${ua1} or ${ua2} and ask them a relevant question to continue the discussion.
45+
You are also an expert in formatting the results into structured json format. You only output a valid JSON as a response.
46+
You answer in RFC8259 compliant
47+
JSON format ONLY with two fields result and user. You can effectively manage a hot discussion while keeping it
48+
quite civil and also at the same time continue the discussion forward encouraging participants and their views.
49+
Your answer MUST be in a JSON dictionary with keys "result" and "user". Before answer, check the output for correctness of the JSON format.
50+
The values MUST not have new lines or special characters that are not escaped. The JSON must be RFC8259 compliant.
51+
52+
You produce the output in the following JSON keys:
53+
54+
{
55+
"result": ACTUAL_MESSAGE
56+
"user": USER_WHO_SOULD_RESPOND_NEXT --> One of ${ua1} or ${ua2}
57+
}
58+
59+
"result" should summarize the conversation so far and add the last message in the conversation.
60+
"user" should be the one who should respond next.
61+
You be fair in giving chance to all participants, alternating between ${ua1} and ${ua2}.
62+
the last person to talk was ${last_user}
63+
Do not repeat what you have said before and do not summarize the discussion each time,
64+
just use first person voice to ask questions to move discussion forward.
65+
Do not use filler sentences like 'in this discussion....'
66+
JSON:
67+
68+
"""
69+
70+
agent1 = 'agent_1'
71+
agent1_text = """
72+
You are ${ua1} and you reason and think like ${ua1}. Your language reflects your persona.
73+
You are very good at analysis of the content and coming up with insights and questions on the subject and the context.
74+
You are in a panel with other participants discussing a specific event/topic as set in the context.
75+
You avoid any repetitive argument, discussion that you have already talked about.
76+
Here is the context on the conversation, add a follow up with your insights and questions to the conversation:
77+
Do not mention that you are an AI model.
78+
${context}
79+
80+
You answer in a very clear way, do not add any preamble to the response:
81+
"""
82+
83+
agent2 = 'agent_2'
84+
agent2_text = """
85+
You are ${ua2} and you reason and think like ${ua2}. Your language reflects your persona.
86+
You are very good at continuing the conversation with more insightful question.
87+
You are in a panel with other participants discussing a specific event/topic as set in the context.
88+
You bring in your contrarian views to the conversation and always challenge the norms.
89+
You avoid any repetitive argument, discussion that you have already talked about.
90+
Your responses are times extreme and a bit hyperbolic.
91+
When given the history of conversation, you ask a meaningful followup question that continues to conversation
92+
and dives deeper into the topic.
93+
Do not mention that you are an AI model.
94+
Here is the context on the conversation:
95+
${context}
96+
97+
You answer in a very clear way, do not add any preamble to the response:
98+
"""
99+
100+
orchestrator = AIOrchestrator(api_configuration=api_config)
101+
102+
orchestrator.add_prompt_template(moderator, moderator_text, 'moderator instructions')
103+
orchestrator.associate_prompt_template(moderator, moderator_provider, [moderator_model])
104+
105+
orchestrator.add_prompt_template(agent1, agent1_text, 'agent1 instructions')
106+
orchestrator.associate_prompt_template(agent1, agent1_provider, [agent1_model])
107+
108+
orchestrator.add_prompt_template(agent2, agent2_text, 'agent2 instructions')
109+
orchestrator.associate_prompt_template(agent2, agent2_provider, [agent2_model])
110+
111+
get_context = SimpleTask(task_reference_name='get_document', task_def_name='GET_DOCUMENT')
112+
get_context.input_parameter('url','${workflow.input.url}')
113+
114+
wf_input = {'ua1': 'donald trump', 'ua2': 'joe biden', 'last_user': '${workflow.variables.last_user}',
115+
'url': 'https://www.foxnews.com/media/billionaire-mark-cuban-dodges-question-asking-pays-fair-share-taxes-pay-owe'}
116+
117+
template_vars = {
118+
'context': get_context.output('result'),
119+
'ua1': '${workflow.input.ua1}',
120+
'ua2': '${workflow.input.ua2}',
121+
}
122+
123+
max_tokens = 500
124+
moderator_task = LlmChatComplete(task_ref_name='moderator_ref',
125+
max_tokens=2000,
126+
llm_provider=moderator_provider, model=moderator_model,
127+
instructions_template=moderator,
128+
messages='${workflow.variables.history}',
129+
template_variables={
130+
'ua1': '${workflow.input.ua1}',
131+
'ua2': '${workflow.input.ua2}',
132+
'last_user': '${workflow.variables.last_user}'
133+
})
134+
135+
agent1_task = LlmChatComplete(task_ref_name='agent1_ref',
136+
max_tokens=max_tokens,
137+
llm_provider=agent1_provider, model=agent1_model,
138+
instructions_template=agent1,
139+
messages=[ChatMessage(role='user', message=moderator_task.output('result'))],
140+
template_variables=template_vars)
141+
142+
set_variable1 = (SetVariableTask(task_ref_name='task_ref_name1')
143+
.input_parameter('history',
144+
[
145+
ChatMessage(role='assistant', message=moderator_task.output('result')),
146+
ChatMessage(role='user',
147+
message='[' + '${workflow.input.ua1}] ' + f'{agent1_task.output("result")}')
148+
])
149+
.input_parameter('_merge', True)
150+
.input_parameter('last_user', "${workflow.input.ua1}"))
151+
152+
agent2_task = LlmChatComplete(task_ref_name='agent2_ref',
153+
max_tokens=max_tokens,
154+
llm_provider=agent2_provider, model=agent2_model,
155+
instructions_template=agent2,
156+
messages=[ChatMessage(role='user', message=moderator_task.output('result'))],
157+
template_variables=template_vars)
158+
159+
set_variable2 = (SetVariableTask(task_ref_name='task_ref_name2')
160+
.input_parameter('history', [
161+
ChatMessage(role='assistant', message=moderator_task.output('result')),
162+
ChatMessage(role='user', message='[' + '${workflow.input.ua2}] ' + f'{agent2_task.output("result")}')
163+
])
164+
.input_parameter('_merge', True)
165+
.input_parameter('last_user', "${workflow.input.ua2}"))
166+
167+
init = SetVariableTask(task_ref_name='init_ref')
168+
init.input_parameter('history',
169+
[ChatMessage(role='user',
170+
message="""analyze the following context:
171+
BEGIN
172+
${get_document.output.result}
173+
END """)]
174+
)
175+
init.input_parameter('last_user', '')
176+
177+
wf = ConductorWorkflow(name='multiparty_chat_tmp', version=1, executor=workflow_executor)
178+
179+
script = """
180+
(function(){
181+
if ($.user == $.ua1) return 'ua1';
182+
if ($.user == $.ua2) return 'ua2';
183+
return 'ua1';
184+
})();
185+
"""
186+
next_up = SwitchTask(task_ref_name='next_up_ref', case_expression=script, use_javascript=True)
187+
next_up.switch_case('ua1', [agent1_task, set_variable1])
188+
next_up.switch_case('ua2', [agent2_task, set_variable2])
189+
next_up.input_parameter('user', moderator_task.output('user'))
190+
next_up.input_parameter('ua1', '${workflow.input.ua1}')
191+
next_up.input_parameter('ua2', '${workflow.input.ua2}')
192+
193+
loop_tasks = [moderator_task, next_up]
194+
chat_loop = LoopTask(task_ref_name='loop', iterations=6, tasks=loop_tasks)
195+
wf >> get_context >> init >> chat_loop
196+
197+
198+
199+
wf.timeout_seconds(1200).timeout_policy(timeout_policy=TimeoutPolicy.TIME_OUT_WORKFLOW)
200+
wf.register(overwrite=True)
201+
202+
result = wf.execute(wait_until_task_ref=agent1_task.task_reference_name, wait_for_seconds=1,
203+
workflow_input=wf_input)
204+
205+
result = workflow_client.get_workflow_status(result.workflow_id, include_output=True, include_variables=True)
206+
print(f'started workflow {api_config.ui_host}/{result.workflow_id}')
207+
while result.is_running():
208+
time.sleep(10) # wait for 10 seconds LLMs are slow!
209+
result = workflow_client.get_workflow_status(result.workflow_id, include_output=True, include_variables=True)
210+
op = result.variables['history']
211+
if len(op) > 1:
212+
print('=======================================')
213+
print(f'{op[len(op) - 1]["message"]}')
214+
print('\n')
215+
216+
217+
if __name__ == '__main__':
218+
main()

examples/shell_worker.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import subprocess
2+
from typing import List
3+
4+
from conductor.client.automator.task_handler import TaskHandler
5+
from conductor.client.configuration.configuration import Configuration
6+
from conductor.client.worker.worker_task import worker_task
7+
8+
9+
# @worker_task(task_definition_name='shell')
10+
def execute_shell(command: str, args: List[str]) -> str:
11+
full_command = [command]
12+
full_command = full_command + args
13+
result = subprocess.run(full_command, stdout=subprocess.PIPE)
14+
15+
return str(result.stdout)
16+
17+
@worker_task(task_definition_name='task_with_retries2')
18+
def execute_shell() -> str:
19+
return "hello"
20+
21+
def main():
22+
# defaults to reading the configuration using following env variables
23+
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
24+
# CONDUCTOR_AUTH_KEY : API Authentication Key
25+
# CONDUCTOR_AUTH_SECRET: API Auth Secret
26+
api_config = Configuration()
27+
28+
29+
task_handler = TaskHandler(configuration=api_config)
30+
task_handler.start_processes()
31+
32+
task_handler.join_processes()
33+
34+
35+
if __name__ == '__main__':
36+
main()

examples/untrusted_host.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import urllib3
2+
3+
from conductor.client.automator.task_handler import TaskHandler
4+
from conductor.client.configuration.configuration import Configuration
5+
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
6+
from conductor.client.http.api_client import ApiClient
7+
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
8+
from conductor.client.orkes.orkes_task_client import OrkesTaskClient
9+
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
10+
from conductor.client.worker.worker_task import worker_task
11+
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
12+
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
13+
from greetings_workflow import greetings_workflow
14+
import requests
15+
16+
17+
def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
18+
workflow = greetings_workflow(workflow_executor=workflow_executor)
19+
workflow.register(True)
20+
return workflow
21+
22+
23+
@worker_task(task_definition_name='hello')
24+
def hello(name: str) -> str:
25+
print(f'executing.... {name}')
26+
return f'Hello {name}'
27+
28+
29+
def main():
30+
urllib3.disable_warnings()
31+
32+
# points to http://localhost:8080/api by default
33+
api_config = Configuration()
34+
api_config.http_connection = requests.Session()
35+
api_config.http_connection.verify = False
36+
37+
metadata_client = OrkesMetadataClient(api_config)
38+
task_client = OrkesTaskClient(api_config)
39+
workflow_client = OrkesWorkflowClient(api_config)
40+
41+
task_handler = TaskHandler(configuration=api_config)
42+
task_handler.start_processes()
43+
44+
# task_handler.stop_processes()
45+
46+
47+
if __name__ == '__main__':
48+
main()

src/conductor/client/http/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,4 @@
5252
from conductor.client.http.models.integration import Integration
5353
from conductor.client.http.models.integration_api import IntegrationApi
5454
from conductor.client.http.models.state_change_event import StateChangeEvent, StateChangeConfig, StateChangeEventType
55+
from conductor.client.http.models.workflow_task import CacheConfig

src/conductor/client/http/models/workflow_status.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
import six
55

6+
terminal_status = ('COMPLETED', 'FAILED', 'TIMED_OUT', 'TERMINATED')
7+
successful_status = ('PAUSED', 'COMPLETED')
8+
running_status = ('RUNNING', 'PAUSED')
69

710
class WorkflowStatus(object):
811
"""NOTE: This class is auto generated by the swagger code generator program.
@@ -189,6 +192,21 @@ def to_dict(self):
189192

190193
return result
191194

195+
def is_completed(self) -> bool:
196+
"""Checks if the workflow has completed
197+
:return: True if the workflow status is COMPLETED, FAILED or TERMINATED
198+
"""
199+
return self._status in terminal_status
200+
201+
def is_successful(self) -> bool:
202+
"""Checks if the workflow has completed in successful state (ie COMPLETED)
203+
:return: True if the workflow status is COMPLETED
204+
"""
205+
return self._status in successful_status
206+
207+
def is_running(self) -> bool:
208+
return self.status in running_status
209+
192210
def to_str(self):
193211
"""Returns the string representation of the model"""
194212
return pprint.pformat(self.to_dict())

0 commit comments

Comments
 (0)