fixed compound commands not working in pipelines

improved pipe assignment logic to prevent potential resource leaks
This commit is contained in:
2026-03-05 13:34:34 -05:00
parent 624677b961
commit cdc9e7e266
8 changed files with 353 additions and 178 deletions

View File

@@ -39,7 +39,7 @@ use crate::{
utils::RedirVecUtils,
},
prelude::*,
procio::{IoMode, IoStack},
procio::{IoMode, IoStack, PipeGenerator},
state::{
self, ShFunc, VarFlags, VarKind, read_logic, read_shopts, write_jobs, write_logic, write_vars,
},
@@ -319,22 +319,17 @@ impl Dispatcher {
};
let name = self.source_name.clone();
self.io_stack.append_to_frame(subsh.redirs);
let _guard = self.io_stack.pop_frame().redirect()?;
self.run_fork("anonymous_subshell", |s| {
if let Err(e) = s.set_assignments(assignments, AssignBehavior::Export) {
e.print_error();
return;
};
s.io_stack.append_to_frame(subsh.redirs);
let mut argv = match prepare_argv(argv) {
Ok(argv) => argv,
Err(e) => {
e.try_blame(blame).print_error();
return;
}
};
let subsh = argv.remove(0);
let subsh_body = subsh.0.to_string();
let subsh_raw = argv[0].span.as_str();
let subsh_body = subsh_raw[1..subsh_raw.len() - 1].to_string(); // Remove surrounding parentheses
if let Err(e) = exec_input(subsh_body, None, s.interactive, Some(name)) {
e.print_error();
@@ -385,7 +380,7 @@ impl Dispatcher {
func_body.body_mut().propagate_context(func_ctx);
func_body.body_mut().flags = func.flags;
if let Err(e) = self.exec_brc_grp(func_body.body().clone()) {
if let Err(e) = self.exec_pipeline(func_body.body().clone()) {
match e.kind() {
ShErrKind::FuncReturn(code) => {
state::set_status(*code);
@@ -409,11 +404,15 @@ impl Dispatcher {
}
fn exec_brc_grp(&mut self, brc_grp: Node) -> ShResult<()> {
let NdRule::BraceGrp { body } = brc_grp.class else {
unreachable!()
unreachable!("expected BraceGrp node, got {:?}", brc_grp.class)
};
if self.interactive {
log::debug!("Executing brace group, body: {:?}", body);
}
let fork_builtins = brc_grp.flags.contains(NdFlags::FORK_BUILTINS);
self.io_stack.append_to_frame(brc_grp.redirs);
if self.interactive {}
let guard = self.io_stack.pop_frame().redirect()?;
let brc_grp_logic = |s: &mut Self| -> ShResult<()> {
for node in body {
@@ -705,40 +704,14 @@ impl Dispatcher {
let NdRule::Pipeline { cmds } = pipeline.class else {
unreachable!()
};
self.job_stack.new_job();
let fork_builtin = cmds.len() > 1; // If there's more than one command, we need to fork builtins
let (mut in_redirs, mut out_redirs) = self.io_stack.pop_frame().redirs.split_by_channel();
// Zip the commands and their respective pipes into an iterator
let pipes_and_cmds = get_pipe_stack(cmds.len()).into_iter().zip(cmds);
if self.interactive {
log::debug!("Executing pipeline, cmds: {:#?}", cmds);
}
let is_bg = pipeline.flags.contains(NdFlags::BACKGROUND);
self.fg_job = !is_bg && self.interactive;
let mut tty_attached = false;
for ((rpipe, wpipe), mut cmd) in pipes_and_cmds {
if let Some(pipe) = rpipe {
self.io_stack.push_to_frame(pipe);
} else {
for redir in std::mem::take(&mut in_redirs) {
self.io_stack.push_to_frame(redir);
}
}
if let Some(pipe) = wpipe {
self.io_stack.push_to_frame(pipe);
if cmd.flags.contains(NdFlags::PIPE_ERR) {
let err_redir = Redir::new(IoMode::Fd { tgt_fd: STDERR_FILENO, src_fd: STDOUT_FILENO }, RedirType::Output);
self.io_stack.push_to_frame(err_redir);
}
} else {
for redir in std::mem::take(&mut out_redirs) {
self.io_stack.push_to_frame(redir);
}
}
if fork_builtin {
cmd.flags |= NdFlags::FORK_BUILTINS;
}
self.job_stack.new_job();
if cmds.len() == 1 {
self.fg_job = !is_bg && self.interactive;
let cmd = cmds.into_iter().next().unwrap();
self.dispatch_node(cmd)?;
// Give the pipeline terminal control as soon as the first child
@@ -746,13 +719,57 @@ impl Dispatcher {
// SIGTTOU when they try to modify terminal attributes.
// Only for interactive (top-level) pipelines — command substitution
// and other non-interactive contexts must not steal the terminal.
if !tty_attached
&& !is_bg
if !is_bg
&& self.interactive
&& let Some(pgid) = self.job_stack.curr_job_mut().unwrap().pgid()
{
attach_tty(pgid).ok();
tty_attached = true;
}
} else {
let (mut in_redirs, mut out_redirs) = self.io_stack.pop_frame().redirs.split_by_channel();
let mut pipes = PipeGenerator::new(cmds.len()).as_io_frames();
self.fg_job = !is_bg && self.interactive;
let mut tty_attached = false;
let last_cmd = cmds.len() - 1;
for (i, mut cmd) in cmds.into_iter().enumerate() {
let mut frame = pipes.next().ok_or_else(|| {
ShErr::at(
ShErrKind::InternalErr,
cmd.get_span(),
"failed to set up pipeline redirections".to_string(),
)
})?;
if i == 0 {
for redir in std::mem::take(&mut in_redirs) {
frame.push(redir);
}
} else if i == last_cmd {
for redir in std::mem::take(&mut out_redirs) {
frame.push(redir);
}
}
let _guard = frame.redirect()?;
cmd.flags |= NdFlags::FORK_BUILTINS; // multiple cmds means builtins must fork
self.dispatch_node(cmd)?;
// Give the pipeline terminal control as soon as the first child
// establishes the PGID, so later children (e.g. nvim) don't get
// SIGTTOU when they try to modify terminal attributes.
// Only for interactive (top-level) pipelines — command substitution
// and other non-interactive contexts must not steal the terminal.
if !tty_attached
&& !is_bg
&& self.interactive
&& let Some(pgid) = self.job_stack.curr_job_mut().unwrap().pgid()
{
attach_tty(pgid).ok();
tty_attached = true;
}
}
}
let job = self.job_stack.finalize_job().unwrap();
@@ -818,7 +835,15 @@ impl Dispatcher {
// Set up redirections here so we can attach the guard to propagated errors.
self.io_stack.append_to_frame(mem::take(&mut cmd.redirs));
let redir_guard = self.io_stack.pop_frame().redirect()?;
let frame = self.io_stack.pop_frame();
if self.interactive {
log::debug!(
"popped frame for builtin '{}', frame: {:#?}",
cmd_raw,
frame
);
}
let redir_guard = frame.redirect()?;
// Register ChildProc in current job
let job = self.job_stack.curr_job_mut().unwrap();

View File

@@ -87,7 +87,6 @@ impl ParsedSrc {
Err(error) => return Err(vec![error]),
}
}
log::debug!("Tokens: {:#?}", tokens);
let mut errors = vec![];
let mut nodes = vec![];
@@ -214,9 +213,7 @@ impl Node {
assign_node.walk_tree(f);
}
}
NdRule::Pipeline {
ref mut cmds,
} => {
NdRule::Pipeline { ref mut cmds } => {
for cmd_node in cmds {
cmd_node.walk_tree(f);
}
@@ -271,7 +268,7 @@ bitflags! {
const FORK_BUILTINS = 0b000010;
const NO_FORK = 0b000100;
const ARR_ASSIGN = 0b001000;
const PIPE_ERR = 0b010000;
const PIPE_ERR = 0b010000;
}
}
@@ -642,7 +639,7 @@ pub enum NdRule {
argv: Vec<Tk>,
},
Pipeline {
cmds: Vec<Node>
cmds: Vec<Node>,
},
Conjunction {
elements: Vec<ConjunctNode>,
@@ -778,16 +775,16 @@ impl ParseStream {
/// appearing at the bottom The check_pipelines parameter is used to prevent
/// left-recursion issues in self.parse_pipeln()
fn parse_block(&mut self, check_pipelines: bool) -> ShResult<Option<Node>> {
try_match!(self.parse_func_def()?);
try_match!(self.parse_brc_grp(false /* from_func_def */)?);
try_match!(self.parse_case()?);
try_match!(self.parse_loop()?);
try_match!(self.parse_for()?);
try_match!(self.parse_if()?);
try_match!(self.parse_test()?);
if check_pipelines {
try_match!(self.parse_pipeln()?);
} else {
try_match!(self.parse_func_def()?);
try_match!(self.parse_brc_grp(false /* from_func_def */)?);
try_match!(self.parse_case()?);
try_match!(self.parse_loop()?);
try_match!(self.parse_for()?);
try_match!(self.parse_if()?);
try_match!(self.parse_test()?);
try_match!(self.parse_cmd()?);
}
Ok(None)
@@ -1448,16 +1445,17 @@ impl ParseStream {
while let Some(mut cmd) = self.parse_block(false)? {
let is_punctuated = node_is_punctuated(&cmd.tokens);
node_tks.append(&mut cmd.tokens.clone());
if *self.next_tk_class() == TkRule::ErrPipe {
cmd.flags |= NdFlags::PIPE_ERR;
}
if *self.next_tk_class() == TkRule::ErrPipe {
cmd.flags |= NdFlags::PIPE_ERR;
}
cmds.push(cmd);
if *self.next_tk_class() == TkRule::Bg {
let tk = self.next_tk().unwrap();
node_tks.push(tk.clone());
flags |= NdFlags::BACKGROUND;
break;
} else if (!matches!(*self.next_tk_class(),TkRule::Pipe | TkRule::ErrPipe)) || is_punctuated {
} else if (!matches!(*self.next_tk_class(), TkRule::Pipe | TkRule::ErrPipe)) || is_punctuated
{
break;
} else if let Some(pipe) = self.next_tk() {
node_tks.push(pipe)
@@ -1470,9 +1468,7 @@ impl ParseStream {
} else {
Ok(Some(Node {
// TODO: implement pipe_err support
class: NdRule::Pipeline {
cmds,
},
class: NdRule::Pipeline { cmds },
flags,
redirs: vec![],
context: self.context.clone(),
@@ -1555,7 +1551,7 @@ impl ParseStream {
match tk.class {
TkRule::EOI
| TkRule::Pipe
| TkRule::ErrPipe
| TkRule::ErrPipe
| TkRule::And
| TkRule::BraceGrpEnd
| TkRule::Or
@@ -1867,9 +1863,7 @@ where
check_node(assign_node, filter, operation);
}
}
NdRule::Pipeline {
ref mut cmds,
} => {
NdRule::Pipeline { ref mut cmds } => {
for cmd_node in cmds {
check_node(cmd_node, filter, operation);
}