Convert subprocess stdout stream into non-blocking iterator in Rust

In one of my programs I had to interact with another subprocess. This subprocess took data from stdin and wrote result to stdout. It wasn’t just simple reading and writing - it took constant data stream from stdin and somewhere in the middle writes something to stdout. I had to capture this something as soon as possible and take actions according this response.

The main would look like this:

    fn main() {
        let process = Process::new();

        loop {
            let data = get_some_data();
            process.push(data);

            for response in proc.responses() {
                // do something based on response
            }
        }
    }

Blocking iterator #

Firstly I just tried to read lines from process stdout. However I discovered that this BufReader is blocking. Code for the blocking stdout iterator is below. Because it is blocking we cannot put it into loop. Therefore I had to come up with a non-blocking version.


    use std::io::prelude::*;
    use std::process::{Command, Stdio};
    use std::io::BufReader;

    pub struct Process {
        process: std::process::Child,
    }

    impl Process {

        pub fn new() -> Process {
            let mut process =
                    Command::new("someprocess.sh")
                            .stdin(Stdio::piped())
                            .stdout(Stdio::inherit())
                            .spawn().unwrap();

            Process {process: process}

        }

        pub fn push(&mut self, buf: &[u8]) {
            let mut stdin = self.process.stdin
                                .as_mut().unwrap();

            stdin.write_all(buf);
        }

        pub fn responses(&mut self) -> ProcessIntoIterator {
            ProcessIntoIterator {
                subprocess: self,
            }
        }
    }

    pub struct ProcessIntoIterator<'a> {
        subprocess: &'a mut Process,
    }

    impl <'a>Iterator for ProcessIntoIterator<'a> {
        type Item = String;
        fn next(&mut self) -> Option<String> {
            let stdout = self.subprocess.process.stdout
                             .as_mut().unwrap();

            let reader = BufReader::new(stdout);
            let mut result: Option<String> = None;

            // blocks until subrocess finishes
            for line in reader.lines() {
                result = Some(line.unwrap());
            }

            if result.is_some() {
                result
            }
            else {
                None
            }
        }
    }

Non-blocking iterator #

This version has method called run() which starts a thread that reads data from stdout and forwards it to iterator if data is available. Notice that channel is used for sending data from stdout reader thread to iterator (main) thread. If data is not available iterator returns None and quits. Exactly what I needed.


    use std::io::prelude::*;
    use std::process::{Command, Stdio};
    use std::sync::mpsc;
    use std::thread;
    use std::io::BufReader;

    pub struct Process {
        process: std::process::Child,
        tx: mpsc::Sender<Option<String>>,
        rx: mpsc::Receiver<Option<String>>,
    }

    impl Process {
        pub fn new() -> Process {
            let mut process =
                    Command::new("someprocess.sh")
                            .stdin(Stdio::piped())
                            .stdout(Stdio::inherit())
                            .spawn().unwrap();

            let (tx, rx) = mpsc::channel();
            Process {process: process,
                      tx: tx,
                      rx: rx,
            }
        }

        pub fn run(&mut self) {
            let tx = self.tx.clone();
            let stdout = self.process.stdout
                             .take().unwrap();

            thread::spawn(move || {
                let reader = BufReader::new(stdout);

                for line in reader.lines() {
                    tx.send(Some(line.unwrap()));
                }
            });
        }

        pub fn push(&mut self, buf: &[u8]) {
            let mut stdin = self.process.stdin
                                .as_mut().unwrap();

            stdin.write_all(buf);
        }

        pub fn packets(&mut self) -> ProcessIntoIterator {
            ProcessIntoIterator {
                subprocess: self,
            }
        }
    }

    pub struct ProcessIntoIterator<'a> {
        subprocess: &'a mut Process,
    }

    impl <'a>Iterator for ProcessIntoIterator<'a> {
        type Item = String;
        fn next(&mut self) -> Option<String> {
            let data = self.subprocess.rx.try_recv();
            if data.is_ok() {
                data.unwrap()
            }
            else {
                None
            }
        }
    }

Thats it! It works as expected. With non-blocking version we also have to call process.run() to start the thread:

    fn main() {
        let process = Process::new();
        process.run();

        loop {
            let data = get_some_data();
            process.push(data);

            for response in proc.responses() {
                // do something based on response
            }
        }
    }
 
58
Kudos
 
58
Kudos

Now read this

Release Rust embedded firmware using Github Actions

Github Actions # Github Actions is a nice way to setup CI/CD pipelines for your Github projects. Let’s setup it for an embedded firmware project written in Rust. Continuous integration script would run every time when new code is pushed... Continue →