• Home
  • Sanskrit
  • About
  • Filtering Data through Pipes

    April 21, 2022

    I recently realized that there is a huge dearth of information online about how to use an external program as a filter within your own program. The basic mechanisms of fork(), pipe(), etc. are well-described, but nobody explains how to both send data to a program and receive data from it within a single program. I propose to present a working example with about three hundred lines of C code.

    Theory

    Let us first recall a few theoretical points. A pipe is basically a communication channel that possesses two endpoints, represented as file descriptors: a write end, through which data is sent, and a read end, through which data is received. Pipes are generally used to communicate some information between processes, but they can also be useful within a single process, and even within a single thread. (We’ll see an example of the latter case further on.) In the POSIX standard, the function popen uses pipes internally. The call popen("program", "w") creates a pipe, makes its read end the standard input of the given program, and gives you its write end; conversely, the call popen("program", "r") makes the write end of the pipe the standard output of the given program, and gives you the read end.

    An important limitation of pipes is that they are unidirectional. You can send data through one, or receive data through it, but you cannot do both at the same time with a single pipe. (Some implementations support bidirectional pipes, but I won’t get into this.) This type of operation is however quite common in practice: you often want to send data to an external program, let it massage it in some way, and get back the result for further processing. In this situation, the external program works like a filter. The easiest way to perform such filtering is to use a temporary file, possibly in conjunction with popen(). We could also choose a completely different communication channel that allows bidirectional data exchange, such as sockets, but this wouldn’t work with programs that haven’t been written with this use case in mind.

    We instead will use pipes—more precisely, two pipes: one for writing to the standard input of the subprocess, the other for reading its standard output. But let us first ask a basic question: how is it that two programs connected through a pipe manage to synchronize with one another, when their processing speed might be vastly different? The answer is obviously that they wait on each other when needed. This is done transparently by the kernel, such that the programs themselves need not worry about it. Indeed, pipes possess an internal buffer which size varies depending on the operating system, and which can even change dynamically at runtime if needed. (On Linux, you can look at /proc/sys/fs/pipe-max-size to get an upper bound on the size of this buffer.) If the program on the write end of the pipe produces data too fast for the other one to absorb and fills up the pipe buffer, the kernel blocks it—that is to say, puts it to sleep—until the other one catches up. Conversely, if the program on the read end of the pipe consumes data too quickly, the kernel blocks it until the other one catches up.

    These details can be ignored when dealing with a single pipe, but not in our case, where we want to both send data to a process and receive data from it. Intuitively, it is evident that our implementation will have to alternate reads and writes. For if we keep sending data to the external process without consuming from time to time the data it sends us back, we will at some point fill the buffer and be blocked by the kernel; conversely, the external program might be blocked by the kernel if it fills its output buffer. How to prevent this from happening?

    There are several possible solutions, the most obvious being to spawn a new process or a new thread to send data to the external process while our parent process—or main thread—consumes what the external process sends us. I however think it is preferable and simpler to use a single-threaded model in conjunction with an event loop implemented with poll() or equivalent.

    Sample Filter Program

    But let us get our hands dirty and examine a sample program that filters through an arbitrary shell command the data it receives on its standard input and prints the result on its standard output. The full source code is available here. You can use the program like so:

    ./filter "sort -R" < /usr/share/dict/words

    For the sake of brevity, I omit the error recovery path, and instead abort the program at the first error, without cleaning up anything. The “happy” path, however, is cleaned up adequately when the program terminates.

    Let us begin by defining three basic utilities:

    #define array_len(x) (sizeof(x) / sizeof(x)[0])
    
    noreturn void die(const char *s)
    {
        int my_errno = errno;
    
        fputs(s, stderr);
        if (my_errno)
            fprintf(stderr, ": %s", strerror(my_errno));
        fputc('\n', stderr);
        fflush(stderr);
        exit(EXIT_FAILURE);
    }
    
    struct copy_state {
        int in_fd;
        int out_fd;
        char buf[4096];
        char *p;
        ssize_t nr;
    };
    
    int copy(struct copy_state *s)
    {
        ssize_t written;
    
        if (s->nr > 0)
            goto write;
    read:
        s->nr = read(s->in_fd, s->buf, sizeof s->buf);
        if (s->nr < 0) {
            if (errno == EINTR)
                goto read;
            if (errno == EAGAIN)
                return 0;
            die("read");
        }
        if (s->nr == 0)
            return EOF;
        s->p = s->buf;
    write:
        written = write(s->out_fd, s->p, s->nr);
        if (written < 0) {
            if (errno == EINTR)
                goto write;
            if (errno == EAGAIN)
                return 0;
            die("write");
        }
        s->nr -= written;
        if (s->nr > 0) {
            s->p += written;
            goto write;
        }
        goto read;
    }

    There is nothing special happening here. The macro array_len returns the length of an array. The function die() exits with an error message that describes the current errno, if any. (Note that errno is saved in a variable at the beginning of the function; this is because the call to fputs() below might overwrite it.)

    The function copy() is a bit more involved, but has a simple purpose: it just reads data from in_fd and copies it verbatim to out_fd, and does this in a greedy way, consuming as much data as possible from in_fd. The complexity arises from the need to maintain the current state correctly and to handle boundary conditions, to wit:

    Of note are the checks for EAGAIN. This error condition is only raised on file descriptors that refer to sockets, except when the file descriptor has been made non-blocking. We will see later on that making non-blocking the two ends of the pipes we manage is necessary, hence the presence of these checks.

    We now come to the most interesting part of the program, where we start a subprocess and setup two pipes to communicate with it. Only two pipes are needed here because we are not interested in the standard error stream of the child process. If you want to capture it, too, it is necessary to create an extra pipe. If you just want to silence the error output of the child program, you can do so by replacing its standard error stream with an opened file descriptor to /dev/null within the child process.

    // Two pipes, thus four fds, to communicate with the subprocess
    enum {
        STDIN_READ,   // Read end of the subprocess stdin
        STDIN_WRITE,  // Write end of the subprocess stdin
        STDOUT_READ,  // Read end of the subprocess stdout
        STDOUT_WRITE, // Write end of the subprocess stdout
    };
    
    noreturn void setup_subprocess(int pipes[static 4], const char *cmd)
    {
        static const int8_t fd_map[] = {
            // Replace the standard input of the subprocess with
            // the read end of our STDIN pipe
            [STDIN_READ] = STDIN_FILENO,
            // Close the write end of our STDIN pipe
            [STDIN_WRITE] = -1,
            // Close the read end of our STDOUT pipe
            [STDOUT_READ] = -1,
            // Replace the stdout of the subprocess with the write
            // end of our STDOUT pipe
            [STDOUT_WRITE] = STDOUT_FILENO,
        };
    
        for (size_t i = 0; i < array_len(fd_map); i++) {
            if (fd_map[i] >= 0 && dup2(pipes[i], fd_map[i]) < 0)
                die("dup2");
            if (close(pipes[i]))
                die("close");
        }
        execl("/bin/sh", "/bin/sh", "-c", cmd, (char *)NULL);
        die("execl");
    }
    
    pid_t start_subprocess(struct pollfd fds[static 2], const char *cmd)
    {
        static const int8_t fd_map[] = {
            // Close the read end of our STDIN pipe
            [STDIN_READ] = -1,
            // Save the write end of our STDIN pipe in fds[STDIN_FILENO]
            [STDIN_WRITE] = STDIN_FILENO,
            // Save the read end of our STDOUT in fds[STDOUT_FILENO]
            [STDOUT_READ] = STDOUT_FILENO,
            // Close the write end of our STDOUT pipe
            [STDOUT_WRITE] = -1,
        };
        int pipes[4];
        pid_t pid;
    
        for (size_t i = 0; i < array_len(pipes); i += 2) {
            if (pipe(&pipes[i]) < 0)
                die("pipe");
        }
        switch (pid = fork()) {
        case -1:
            die("fork");
        case 0:
            setup_subprocess(pipes, cmd);
        }
        for (size_t i = 0; i < array_len(fd_map); i++) {
            if (fd_map[i] < 0) {
                if (close(pipes[i]))
                    die("close");
            } else {
                fds[fd_map[i]].fd = pipes[i];
            }
        }
        return pid;
    }

    Let us start with the start_subprocess function. Its role is to run the command cmd in a shell and to fill the struct pollfd[2] it is given as argument with two file descriptors: the write end of a pipe connected to the standard input of the subprocess, and the read end of a pipe connected to the standard output of this subprocess, respectively. To make it more obvious how file descriptors are set up, I introduced an enum, as well as two tables that basically describe what should be done with each file descriptor.

    In start_subprocess, we first create two pipes, then start a child process with fork(). At this point the execution of the program is split: the child process enters the function setup_subprocess, from which it never returns, while the parent pursues the execution of the same function. The parent then closes the two pipe ends it doesn’t need and copies the other ones into the struct provided by the caller.

    The function setup_subprocess, which is executed by the child process, replaces the standard input and the standard output of the program to start with the appropriate ends of the pipes its parent created. This is accomplished with dup2(). The function then closes all the extra files descriptors its parent created—which includes those it cloned with dup2— and replaces the process image with execl, which here invokes the shell with a command. The execl() function—as well as other functions from the same family, like execve()—does not return if successful, thus the die() statement that follows its invocation.

    So far we’ve seen how to start a child process and setup pipes to communicate with it. Let us now see how to send data to the subprocess and receive data from it while eschewing the blocking issues we talked about earlier on. Here is the main loop code. The pollfd structures passed as arguments are both initialized: the one at index STDIN_FILENO (0) holds the write end of the pipe connected to the standard input of the subprocess, while the one at index STDOUT_FILENO (1) holds the read end of the pipe connected the standard output of the subprocess.

    void main_loop(struct pollfd fds[static 2])
    {
        struct copy_state in = {
            .in_fd = STDIN_FILENO,
            .out_fd = fds[STDIN_FILENO].fd,
        };
        struct copy_state out = {
            .in_fd = fds[STDOUT_FILENO].fd,
            .out_fd = STDOUT_FILENO,
        };
        int ret;
    
        for (size_t i = 0; i < 2; i++) {
            int flags = fcntl(fds[i].fd, F_GETFL);
            if (flags < 0)
                die("fcntl");
            if (fcntl(fds[i].fd, F_SETFL, flags | O_NONBLOCK) < 0)
                die("fcntl");
        }
    poll:
        ret = poll(fds, 2, -1);
        if (ret <= 0) {
            if (ret == 0 || errno == EINTR || errno == EAGAIN)
                goto poll;
            die("poll");
        }
        if (fds[STDIN_FILENO].revents & POLLOUT) {
            if (copy(&in) == EOF) {
                close(fds[STDIN_FILENO].fd);
                fds[STDIN_FILENO].fd = -1;
            }
        } else if (fds[STDIN_FILENO].revents & (POLLHUP | POLLERR)) {
            errno = 0;
            die("subprocess closed its stdin too soon");
        }
        if (fds[STDOUT_FILENO].revents & POLLIN) {
            if (copy(&out) == EOF)
                goto fini;
        } else if (fds[STDOUT_FILENO].revents & (POLLHUP | POLLERR)) {
            goto fini;
        }
        goto poll;
    fini:
        if (fds[STDIN_FILENO].fd >= 0) {
            errno = 0;
            die("subprocess didn't consume its stdin in full");
        }
    }

    The first thing we do here is to make non-blocking the two file descriptors we’re going to watch, with the help of fcntl(). The neat effect is that, instead of interrupting our program when we try to write too much data or attempt to read an empty stream, the kernel will instead inform us that we should retry later on by setting errno to EAGAIN—which simply means “try again later”. At this point, we could enter a busy loop, alternating reads and writes until all the data is consumed, but it is cleaner to use poll(), select() or an equivalent mechanism. I used poll() here because it is the most recent mechanism that remains portable and is specified in POSIX.

    After a successful call to poll(), we check our two pipes ends to see whether we can interact with them: if the standard input of the process can accept more data, we copy our own standard input to it until it is full; likewise, if the standard output of the process has pending data, we copy it to our own standard output as much as possible A very important thing to notice here is that, when we’re finished copying our standard input to the standard input of the subprocess, we close our end of the pipe. This is necessary for the subprocess to reach EOF. Without it, the subprocess would block forever, waiting for data. We also set the fd to -1 to make the kernel ignore it in subsequent calls to poll().

    As a bonus, we also try to catch two possible error conditions, to wit:

    At this point we have fed the subprocess all the data available in our standard input, and we have consumed the entirety of its standard output. It remains to check whether the subprocess was successful viz. whether it exited normally by returning EXIT_SUCCESS from main or by calling some variant of the exit() function with the same error code. To achieve that, we have to wait for the subprocess to terminate and to examine how it died. This is done with the waitpid() function. Now this function is blocking per default. Given that we’re running a single thread and that we most likely have some other important work to do, we’ll make it non-blocking–by passing it the WNOHANG flag. Furthermore, instead of calling it again and again in a busy loop, we’ll ask the kernel to notify us when the subprocess we created might have died, and only call waitpid() on these occasions. The kernel informs us of this condition by sending us the signal SIGCHLD, which just means: “a child process died”. Note the use of the indefinite: SIGCHLD tells us that a subprocess died, but not which one, so we have to make sure that the one that stopped is actually the one we are waiting for.

    To ask the kernel to notify us when one of our child processes dies, we must install a signal handler viz. a callback function that will be invoked when this happens. Now signal handlers are extremely limited in what they can do, because they can be invoked at arbitrary points in the execution of the program. Only a few functions are safe to call, among which the write system call. This gave D. J. Bernstein the idea to introduce what he calls a “self-pipe”. The basic idea is to create a pipe, monitor its read end in the main event loop through poll() or equivalent, and write a single byte to it from the signal handler whenever a new signal is catched. The neat effect is that we receive signals through a file descriptor, when our program is in a well-defined state. (The Linux kernel has since introduced something similar with signalfd().) Since event loops already monitor a set of file descriptors, it is convenient to just add an extra one.

    The end of our filter program follows:

    static int self_pipe[2];
    
    void handle_sigchld(int sig)
    {
        while (write(self_pipe[1], "", 1) < 0 && errno == EINTR)
            ;
    }
    
    void set_sigchld_handler(void)
    {
        struct sigaction sa;
    
        if (pipe(self_pipe) < 0)
            die("pipe");
        for (size_t i = 0; i < array_len(self_pipe); i++) {
            int flags = fcntl(self_pipe[i], F_GETFL);
            if (flags < 0)
                die("fcntl");
            if (fcntl(self_pipe[i], F_SETFL, flags | O_NONBLOCK) < 0)
                die("fcntl");
        }
        memset(&sa, 0, sizeof sa);
        sa.sa_handler = handle_sigchld;
        if (sigemptyset(&sa.sa_mask))
            die("sigemptyset");
        sa.sa_flags = SA_RESTART;
        if (sigaction(SIGCHLD, &sa, NULL))
            die("sigaction");
    }
    
    noreturn void wait_for_process(pid_t pid)
    {
        struct pollfd fd = {.fd = self_pipe[0], .events = POLLIN};
        int status;
        pid_t pid2;
        ssize_t nr;
        char dummy;
    
    poll:
        status = poll(&fd, 1, 1000);
        if (status < 0) {
            if (errno == EINTR || errno == EAGAIN)
                goto poll;
            die("poll");
        }
        if (status == 0) {
            fputs("waiting...\n", stderr);
            goto poll;
        }
        nr = read(fd.fd, &dummy, 1);
        if (nr <= 0) {
            if (nr == 0 || errno == EINTR || errno == EAGAIN)
                goto poll;
            die("read");
        }
        pid2 = waitpid(pid, &status, WNOHANG);
        if (pid2 < 0)
            die("waitpid");
        if (pid2 != pid)
            goto poll;
        if (close(self_pipe[0]) || close(self_pipe[1]))
            die("close");
        if (WIFEXITED(status)) {
            if (WEXITSTATUS(status) == EXIT_SUCCESS)
                exit(EXIT_SUCCESS);
            errno = 0;
            die("subprocess returned a non-zero error code");
        }
        errno = 0;
        die("subprocess was killed by a signal");
    }
    
    int main(int argc, char **argv)
    {
        struct pollfd fds[2] = {
            [STDIN_FILENO] = {.events = POLLOUT},
            [STDOUT_FILENO] = {.events = POLLIN},
        };
        pid_t pid;
        int ret;
        const char *cmd = argc > 1 ? argv[1] : "sort";
    
        set_sigchld_handler();
        pid = start_subprocess(fds, cmd);
        main_loop(fds);
        close(fds[STDOUT_FILENO].fd);
        wait_for_process(pid);
    }

    Note that, while the program waits for the subprocess it created to exit, it prints every second the message waiting.... It is very unlikely you will see this message in practice, because, when the program enters the wait_for_process() function, the standard input and the standard output of the subprocess it created are closed. Now a program typically doesn’t close these streams for its whole lifetime, such that the fact that they are closed is a strong clue that it has already exited. For a case where this is not true, try to run the following:

    ./filter "sort -R; exec 1<&-; sleep 10" < /usr/share/dict/words

    The command exec 1<&- closes the standard output in Bash.