Convert subprocess stdout stream into non-blocking iterator in Rust

In one of my programs I had to interact with another subprocess. This subprocess took data from stdin and wrote result to stdout. It wasn’t just simple reading and writing - it took constant data stream from stdin and somewhere in the middle writes something to stdout. I had to capture this something as soon as possible and take actions according this response.

The main would look like this:

    fn main() {
        let process = Process::new();

        loop {
            let data = get_some_data();
            process.push(data);

            for response in proc.responses() {
                // do something based on response
            }
        }
    }

 Blocking iterator

Firstly I just tried to read lines from process stdout. However I discovered that this BufReader is blocking. Code for the blocking stdout iterator is below. Because it is blocking we cannot put it into loop. Therefore I had to come up with a non-blocking version.


    use std::io::prelude::*;
    use std::process::{Command, Stdio};
    use std::io::BufReader;

    pub struct Process {
        process: std::process::Child,
    }

    impl Process {

        pub fn new() -> Process {
            let mut process =
                    Command::new("someprocess.sh")
                            .stdin(Stdio::piped())
                            .stdout(Stdio::inherit())
                            .spawn().unwrap();

            Process {process: process}

        }

        pub fn push(&mut self, buf: &[u8]) {
            let mut stdin = self.process.stdin
                                .as_mut().unwrap();

            stdin.write_all(buf);
        }

        pub fn responses(&mut self) -> ProcessIntoIterator {
            ProcessIntoIterator {
                subprocess: self,
            }
        }
    }

    pub struct ProcessIntoIterator<'a> {
        subprocess: &'a mut Process,
    }

    impl <'a>Iterator for ProcessIntoIterator<'a> {
        type Item = String;
        fn next(&mut self) -> Option<String> {
            let stdout = self.subprocess.process.stdout
                             .as_mut().unwrap();

            let reader = BufReader::new(stdout);
            let mut result: Option<String> = None;

            // blocks until subrocess finishes
            for line in reader.lines() {
                result = Some(line.unwrap());
            }

            if result.is_some() {
                result
            }
            else {
                None
            }
        }
    }

 Non-blocking iterator

This version has method called run() which starts a thread that reads data from stdout and forwards it to iterator if data is available. Notice that channel is used for sending data from stdout reader thread to iterator (main) thread. If data is not available iterator returns None and quits. Exactly what I needed.


    use std::io::prelude::*;
    use std::process::{Command, Stdio};
    use std::sync::mpsc;
    use std::thread;
    use std::io::BufReader;

    pub struct Process {
        process: std::process::Child,
        tx: mpsc::Sender<Option<String>>,
        rx: mpsc::Receiver<Option<String>>,
    }

    impl Process {
        pub fn new() -> Process {
            let mut process =
                    Command::new("someprocess.sh")
                            .stdin(Stdio::piped())
                            .stdout(Stdio::inherit())
                            .spawn().unwrap();

            let (tx, rx) = mpsc::channel();
            Process {process: process,
                      tx: tx,
                      rx: rx,
            }
        }

        pub fn run(&mut self) {
            let tx = self.tx.clone();
            let stdout = self.process.stdout
                             .take().unwrap();

            thread::spawn(move || {
                let reader = BufReader::new(stdout);

                for line in reader.lines() {
                    tx.send(Some(line.unwrap()));
                }
            });
        }

        pub fn push(&mut self, buf: &[u8]) {
            let mut stdin = self.process.stdin
                                .as_mut().unwrap();

            stdin.write_all(buf);
        }

        pub fn packets(&mut self) -> ProcessIntoIterator {
            ProcessIntoIterator {
                subprocess: self,
            }
        }
    }

    pub struct ProcessIntoIterator<'a> {
        subprocess: &'a mut Process,
    }

    impl <'a>Iterator for ProcessIntoIterator<'a> {
        type Item = String;
        fn next(&mut self) -> Option<String> {
            let data = self.subprocess.rx.try_recv();
            if data.is_ok() {
                data.unwrap()
            }
            else {
                None
            }
        }
    }

Thats it! It works as expected. With non-blocking version we also have to call process.run() to start the thread:

    fn main() {
        let process = Process::new();
        process.run();

        loop {
            let data = get_some_data();
            process.push(data);

            for response in proc.responses() {
                // do something based on response
            }
        }
    }
 
34
Kudos
 
34
Kudos

Now read this

Pipe SDR IQ data through FM demodulator for FSK9600 AX25 reception

Problem I thought that RTL-SDR and its command line tools are so common in these days that software for decoding everything and especially simple FSK9600 definitely exists. I was kind of right…except there are some corner cases. I looked... Continue →