Convert subprocess stdout stream into non-blocking iterator in Rust

In one of my programs I had to interact with another subprocess. This subprocess took data from stdin and wrote result to stdout. It wasn’t just simple reading and writing - it took constant data stream from stdin and somewhere in the middle writes something to stdout. I had to capture this something as soon as possible and take actions according this response.

The main would look like this:

    fn main() {
        let process = Process::new();

        loop {
            let data = get_some_data();
            process.push(data);

            for response in proc.responses() {
                // do something based on response
            }
        }
    }

 Blocking iterator

Firstly I just tried to read lines from process stdout. However I discovered that this BufReader is blocking. Code for the blocking stdout iterator is below. Because it is blocking we cannot put it into loop. Therefore I had to come up with a non-blocking version.


    use std::io::prelude::*;
    use std::process::{Command, Stdio};
    use std::io::BufReader;

    pub struct Process {
        process: std::process::Child,
    }

    impl Process {

        pub fn new() -> Process {
            let mut process =
                    Command::new("someprocess.sh")
                            .stdin(Stdio::piped())
                            .stdout(Stdio::inherit())
                            .spawn().unwrap();

            Process {process: process}

        }

        pub fn push(&mut self, buf: &[u8]) {
            let mut stdin = self.process.stdin
                                .as_mut().unwrap();

            stdin.write_all(buf);
        }

        pub fn responses(&mut self) -> ProcessIntoIterator {
            ProcessIntoIterator {
                subprocess: self,
            }
        }
    }

    pub struct ProcessIntoIterator<'a> {
        subprocess: &'a mut Process,
    }

    impl <'a>Iterator for ProcessIntoIterator<'a> {
        type Item = String;
        fn next(&mut self) -> Option<String> {
            let stdout = self.subprocess.process.stdout
                             .as_mut().unwrap();

            let reader = BufReader::new(stdout);
            let mut result: Option<String> = None;

            // blocks until subrocess finishes
            for line in reader.lines() {
                result = Some(line.unwrap());
            }

            if result.is_some() {
                result
            }
            else {
                None
            }
        }
    }

 Non-blocking iterator

This version has method called run() which starts a thread that reads data from stdout and forwards it to iterator if data is available. Notice that channel is used for sending data from stdout reader thread to iterator (main) thread. If data is not available iterator returns None and quits. Exactly what I needed.


    use std::io::prelude::*;
    use std::process::{Command, Stdio};
    use std::sync::mpsc;
    use std::thread;
    use std::io::BufReader;

    pub struct Process {
        process: std::process::Child,
        tx: mpsc::Sender<Option<String>>,
        rx: mpsc::Receiver<Option<String>>,
    }

    impl Process {
        pub fn new() -> Process {
            let mut process =
                    Command::new("someprocess.sh")
                            .stdin(Stdio::piped())
                            .stdout(Stdio::inherit())
                            .spawn().unwrap();

            let (tx, rx) = mpsc::channel();
            Process {process: process,
                      tx: tx,
                      rx: rx,
            }
        }

        pub fn run(&mut self) {
            let tx = self.tx.clone();
            let stdout = self.process.stdout
                             .take().unwrap();

            thread::spawn(move || {
                let reader = BufReader::new(stdout);

                for line in reader.lines() {
                    tx.send(Some(line.unwrap()));
                }
            });
        }

        pub fn push(&mut self, buf: &[u8]) {
            let mut stdin = self.process.stdin
                                .as_mut().unwrap();

            stdin.write_all(buf);
        }

        pub fn packets(&mut self) -> ProcessIntoIterator {
            ProcessIntoIterator {
                subprocess: self,
            }
        }
    }

    pub struct ProcessIntoIterator<'a> {
        subprocess: &'a mut Process,
    }

    impl <'a>Iterator for ProcessIntoIterator<'a> {
        type Item = String;
        fn next(&mut self) -> Option<String> {
            let data = self.subprocess.rx.try_recv();
            if data.is_ok() {
                data.unwrap()
            }
            else {
                None
            }
        }
    }

Thats it! It works as expected. With non-blocking version we also have to call process.run() to start the thread:

    fn main() {
        let process = Process::new();
        process.run();

        loop {
            let data = get_some_data();
            process.push(data);

            for response in proc.responses() {
                // do something based on response
            }
        }
    }
 
35
Kudos
 
35
Kudos

Now read this

Turning Losi 1/24 short course truck into autonomos folk racer [part 1]

This year’s Robotex is not far away. It takes place from 29 - 30 november. This time it will be special because I decided to participate. Actually I decided it last December after Robotex 2013 when I saw Folkrace competition. It is entry... Continue →