1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| @Component public class ToolWorkflowEngine { @Data public static class WorkflowDefinition { private String name; private List<WorkflowStep> steps; private Map<String, Object> globalVariables; private Duration timeout; } @Data public static class WorkflowStep { private String name; private String toolName; private Map<String, Object> arguments; private List<String> dependsOn; private String condition; private boolean parallel; private int retryCount; } public CompletableFuture<WorkflowResult> executeWorkflow( WorkflowDefinition workflow, McpClient client) { WorkflowContext context = new WorkflowContext(workflow.getGlobalVariables()); return executeSteps(workflow.getSteps(), client, context) .orTimeout(workflow.getTimeout().toMillis(), TimeUnit.MILLISECONDS) .thenApply(results -> WorkflowResult.builder() .workflowName(workflow.getName()) .stepResults(results) .context(context) .success(true) .build()) .exceptionally(throwable -> WorkflowResult.builder() .workflowName(workflow.getName()) .success(false) .error(throwable.getMessage()) .build()); } private CompletableFuture<Map<String, ToolExecutionResult>> executeSteps( List<WorkflowStep> steps, McpClient client, WorkflowContext context) { Map<String, Set<String>> dependencies = buildDependencyGraph(steps); List<List<WorkflowStep>> executionLevels = topologicalSort(steps, dependencies); Map<String, ToolExecutionResult> results = new ConcurrentHashMap<>(); return executionLevels.stream() .reduce(CompletableFuture.completedFuture(results), (acc, level) -> acc.thenCompose(currentResults -> executeLevel(level, client, context, currentResults)), (f1, f2) -> f1.thenCombine(f2, (r1, r2) -> { r1.putAll(r2); return r1; })); } private CompletableFuture<Map<String, ToolExecutionResult>> executeLevel( List<WorkflowStep> steps, McpClient client, WorkflowContext context, Map<String, ToolExecutionResult> previousResults) { List<CompletableFuture<Pair<String, ToolExecutionResult>>> futures = steps.stream() .filter(step -> evaluateCondition(step.getCondition(), context)) .map(step -> executeStepWithRetry(step, client, context) .thenApply(result -> Pair.of(step.getName(), result))) .collect(Collectors.toList()); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> { Map<String, ToolExecutionResult> levelResults = new HashMap<>(previousResults); futures.forEach(future -> { Pair<String, ToolExecutionResult> pair = future.join(); levelResults.put(pair.getFirst(), pair.getSecond()); context.setStepResult(pair.getFirst(), pair.getSecond()); }); return levelResults; }); } private CompletableFuture<ToolExecutionResult> executeStepWithRetry( WorkflowStep step, McpClient client, WorkflowContext context) { return executeStepOnce(step, client, context) .exceptionallyCompose(throwable -> { if (step.getRetryCount() > 0) { logger.warn("Step {} failed, retrying", step.getName(), throwable); WorkflowStep retryStep = step.toBuilder() .retryCount(step.getRetryCount() - 1) .build(); return executeStepWithRetry(retryStep, client, context); } else { return CompletableFuture.completedFuture( createErrorResult("Step failed after retries: " + throwable.getMessage())); } }); } }
|