Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ codex-app-server-protocol = { git = "https://github.com/openai/codex.git", packa
codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" }
sha2 = "0.10"
derivative = "2.2.0"

[target.'cfg(windows)'.dependencies]
winsplit = "0.1.0"
89 changes: 82 additions & 7 deletions crates/executors/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,53 @@
use std::path::PathBuf;

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use shlex::Quoter;
use thiserror::Error;
use ts_rs::TS;
use workspace_utils::shell::resolve_executable_path;

use crate::executors::ExecutorError;

#[derive(Debug, Error)]
pub enum CommandBuildError {
#[error("base command cannot be parsed: {0}")]
InvalidBase(String),
#[error("base command is empty after parsing")]
EmptyCommand,
#[error("failed to quote command: {0}")]
QuoteError(#[from] shlex::QuoteError),
}

#[derive(Debug, Clone)]
pub struct CommandParts {
program: String,
args: Vec<String>,
}

impl CommandParts {
pub fn new(program: String, args: Vec<String>) -> Self {
Self { program, args }
}

pub async fn into_resolved(self) -> Result<(PathBuf, Vec<String>), ExecutorError> {
let CommandParts { program, args } = self;
let executable = resolve_executable_path(&program)
.await
.ok_or(ExecutorError::ExecutableNotFound { program })?;
Ok((executable, args))
}

pub fn to_shell_string(&self) -> Result<String, CommandBuildError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this and associated logic now that the watchkill script has been removed

let quoter = Quoter::new().allow_nul(true);
let mut words: Vec<&str> = Vec::with_capacity(1 + self.args.len());
words.push(self.program.as_str());
for arg in &self.args {
words.push(arg.as_str());
}
quoter.join(words).map_err(CommandBuildError::QuoteError)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema, Default)]
pub struct CmdOverrides {
Expand Down Expand Up @@ -60,15 +107,26 @@ impl CommandBuilder {
}
self
}
pub fn build_initial(&self) -> String {
let mut parts = vec![self.base.clone()];
if let Some(ref params) = self.params {
parts.extend(params.clone());
}
parts.join(" ")

pub fn build_initial(&self) -> Result<CommandParts, CommandBuildError> {
self.build(&[])
}

pub fn build_follow_up(
&self,
additional_args: &[String],
) -> Result<CommandParts, CommandBuildError> {
self.build(additional_args)
}

fn build(&self, additional_args: &[String]) -> Result<CommandParts, CommandBuildError> {
let mut parts = split_command_line(&self.simple_join(additional_args))?;

let program = parts.remove(0);
Ok(CommandParts::new(program, parts))
}

pub fn build_follow_up(&self, additional_args: &[String]) -> String {
fn simple_join(&self, additional_args: &[String]) -> String {
let mut parts = vec![self.base.clone()];
if let Some(ref params) = self.params {
parts.extend(params.clone());
Expand All @@ -78,6 +136,23 @@ impl CommandBuilder {
}
}

fn split_command_line(input: &str) -> Result<Vec<String>, CommandBuildError> {
#[cfg(windows)]
{
let parts = winsplit::split(input);
if parts.is_empty() {
Err(CommandBuildError::EmptyCommand)
} else {
Ok(parts)
}
}

#[cfg(not(windows))]
{
shlex::split(input).ok_or_else(|| CommandBuildError::InvalidBase(input.to_string()))
}
}

pub fn apply_overrides(builder: CommandBuilder, overrides: &CmdOverrides) -> CommandBuilder {
let builder = if let Some(ref base) = overrides.base_command_override {
builder.override_base(base.clone())
Expand Down
25 changes: 13 additions & 12 deletions crates/executors/src/executors/acp/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use tokio_util::{
io::ReaderStream,
};
use tracing::error;
use workspace_utils::{shell::get_shell_command, stream_lines::LinesStreamExt};
use workspace_utils::stream_lines::LinesStreamExt;

use super::{AcpClient, SessionManager};
use crate::executors::{ExecutorError, SpawnedChild, acp::AcpEvent};
use crate::{
command::CommandParts,
executors::{ExecutorError, SpawnedChild, acp::AcpEvent},
};

/// Reusable harness for ACP-based conns (Gemini, Qwen, etc.)
pub struct AcpAgentHarness {
Expand Down Expand Up @@ -50,18 +53,17 @@ impl AcpAgentHarness {
&self,
current_dir: &Path,
prompt: String,
full_command: String,
command_parts: CommandParts,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
let (program_path, args) = command_parts.into_resolved().await?;
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(full_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1");

let mut child = command.group_spawn()?;
Expand All @@ -88,18 +90,17 @@ impl AcpAgentHarness {
current_dir: &Path,
prompt: String,
session_id: &str,
full_command: String,
command_parts: CommandParts,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
let (program_path, args) = command_parts.into_resolved().await?;
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(full_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1");

let mut child = command.group_spawn()?;
Expand Down
35 changes: 16 additions & 19 deletions crates/executors/src/executors/amp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command};
use ts_rs::TS;
use workspace_utils::{msg_store::MsgStore, shell::get_shell_command};
use workspace_utils::msg_store::MsgStore;

use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
Expand Down Expand Up @@ -45,20 +45,19 @@ impl Amp {
#[async_trait]
impl StandardCodingAgentExecutor for Amp {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = self.build_command_builder().build_initial();
let command_parts = self.build_command_builder().build_initial()?;
let (executable_path, args) = command_parts.into_resolved().await?;

let combined_prompt = self.append_prompt.combine_prompt(prompt);

let mut command = Command::new(shell_cmd);
let mut command = Command::new(executable_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&amp_command);
.args(&args);

let mut child = command.group_spawn()?;

Expand All @@ -77,22 +76,20 @@ impl StandardCodingAgentExecutor for Amp {
prompt: &str,
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();

// 1) Fork the thread synchronously to obtain new thread id
let fork_cmd = self.build_command_builder().build_follow_up(&[
let builder = self.build_command_builder();
let fork_line = builder.build_follow_up(&[
"threads".to_string(),
"fork".to_string(),
session_id.to_string(),
]);
let fork_output = Command::new(shell_cmd)
])?;
let (fork_program, fork_args) = fork_line.into_resolved().await?;
let fork_output = Command::new(fork_program)
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&fork_cmd)
.args(&fork_args)
.output()
.await?;
let stdout_str = String::from_utf8_lossy(&fork_output.stdout);
Expand All @@ -112,23 +109,23 @@ impl StandardCodingAgentExecutor for Amp {
tracing::debug!("AMP threads fork -> new thread id: {}", new_thread_id);

// 2) Continue using the new thread id
let continue_cmd = self.build_command_builder().build_follow_up(&[
let continue_line = builder.build_follow_up(&[
"threads".to_string(),
"continue".to_string(),
new_thread_id.clone(),
]);
])?;
let (continue_program, continue_args) = continue_line.into_resolved().await?;

let combined_prompt = self.append_prompt.combine_prompt(prompt);

let mut command = Command::new(shell_cmd);
let mut command = Command::new(continue_program);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&continue_cmd);
.args(&continue_args);

let mut child = command.group_spawn()?;

Expand Down
91 changes: 39 additions & 52 deletions crates/executors/src/executors/claude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,51 @@ impl ClaudeCode {

apply_overrides(builder, &self.cmd)
}
}

#[async_trait]
impl StandardCodingAgentExecutor for ClaudeCode {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
session_id: Option<&str>,
) -> Result<SpawnedChild, ExecutorError> {
let command_builder = self.build_command_builder().await;
let mut base_command = command_builder.build_initial();
let command_parts = if let Some(session_id) = session_id {
command_builder.build_follow_up(&[
"--fork-session".to_string(),
"--resume".to_string(),
session_id.to_string(),
])?
} else {
command_builder.build_initial()?
};

if self.plan.unwrap_or(false) {
base_command = create_watchkill_script(&base_command);
}
let plan_enabled = self.plan.unwrap_or(false);

if self.approvals.unwrap_or(false) || self.plan.unwrap_or(false) {
if self.approvals.unwrap_or(false) || plan_enabled {
write_python_hook(current_dir).await?
}

let combined_prompt = self.append_prompt.combine_prompt(prompt);

let mut command = Command::new(shell_cmd);
let mut command = if plan_enabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly think that this can be removed

let plan_script = create_watchkill_script(&command_parts.to_shell_string()?);
let (shell_cmd, shell_arg) = get_shell_command();
let mut cmd = Command::new(shell_cmd);
cmd.arg(shell_arg).arg(plan_script);
cmd
} else {
let (program_path, args) = command_parts.into_resolved().await?;
let mut cmd = Command::new(program_path);
cmd.args(&args);
cmd
};

command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&base_command);
.current_dir(current_dir);

let mut child = command.group_spawn()?;

Expand All @@ -164,51 +181,21 @@ impl StandardCodingAgentExecutor for ClaudeCode {

Ok(child.into())
}
}

#[async_trait]
impl StandardCodingAgentExecutor for ClaudeCode {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
self.spawn(current_dir, prompt, None).await
}

async fn spawn_follow_up(
&self,
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let command_builder = self.build_command_builder().await;
// Build follow-up command with --resume {session_id}
let mut base_command = command_builder.build_follow_up(&[
"--fork-session".to_string(),
"--resume".to_string(),
session_id.to_string(),
]);

if self.plan.unwrap_or(false) {
base_command = create_watchkill_script(&base_command);
}

if self.approvals.unwrap_or(false) || self.plan.unwrap_or(false) {
write_python_hook(current_dir).await?
}

let combined_prompt = self.append_prompt.combine_prompt(prompt);

let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&base_command);

let mut child = command.group_spawn()?;

// Feed the followup prompt in, then close the pipe
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(combined_prompt.as_bytes()).await?;
stdin.shutdown().await?;
}

Ok(child.into())
self.spawn(current_dir, prompt, Some(session_id)).await
}

fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &Path) {
Expand Down
Loading
Loading