April 21, 2022
I recently realized that there is a huge dearth of information online
about how to use an external program as a filter within your own
program. The basic mechanisms of fork()
,
pipe()
, etc. are well-described, but nobody explains how to
both send data to a program and receive data from it within a
single program. I propose to present a working example with about three
hundred lines of C code.
Let us first recall a few theoretical points. A pipe is basically a
communication channel that possesses two endpoints, represented as file
descriptors: a write end, through which data is sent, and a read end,
through which data is received. Pipes are generally used to communicate
some information between processes, but they can also be useful within a
single process, and even within a single thread. (We’ll see an example
of the latter case further on.) In the POSIX standard, the function
popen
uses pipes internally. The call
popen("program", "w")
creates a pipe, makes its read end
the standard input of the given program
, and gives you its
write end; conversely, the call popen("program", "r")
makes
the write end of the pipe the standard output of the given
program
, and gives you the read end.
An important limitation of pipes is that they are unidirectional. You
can send data through one, or receive data through it, but you cannot do
both at the same time with a single pipe. (Some implementations support
bidirectional pipes, but I won’t get into this.) This type of operation
is however quite common in practice: you often want to send data to an
external program, let it massage it in some way, and get back the result
for further processing. In this situation, the external program works
like a filter. The easiest way to perform such filtering is to use a
temporary file, possibly in conjunction with popen()
. We
could also choose a completely different communication channel that
allows bidirectional data exchange, such as sockets, but this wouldn’t
work with programs that haven’t been written with this use case in
mind.
We instead will use pipes—more precisely, two pipes: one for writing
to the standard input of the subprocess, the other for reading its
standard output. But let us first ask a basic question: how is it that
two programs connected through a pipe manage to synchronize with one
another, when their processing speed might be vastly different? The
answer is obviously that they wait on each other when needed. This is
done transparently by the kernel, such that the programs themselves need
not worry about it. Indeed, pipes possess an internal buffer which size
varies depending on the operating system, and which can even change
dynamically at runtime if needed. (On Linux, you can look at
/proc/sys/fs/pipe-max-size
to get an upper bound on the
size of this buffer.) If the program on the write end of the pipe
produces data too fast for the other one to absorb and fills up the pipe
buffer, the kernel blocks it—that is to say, puts it to sleep—until the
other one catches up. Conversely, if the program on the read end of the
pipe consumes data too quickly, the kernel blocks it until the other one
catches up.
These details can be ignored when dealing with a single pipe, but not in our case, where we want to both send data to a process and receive data from it. Intuitively, it is evident that our implementation will have to alternate reads and writes. For if we keep sending data to the external process without consuming from time to time the data it sends us back, we will at some point fill the buffer and be blocked by the kernel; conversely, the external program might be blocked by the kernel if it fills its output buffer. How to prevent this from happening?
There are several possible solutions, the most obvious being to spawn
a new process or a new thread to send data to the external process while
our parent process—or main thread—consumes what the external process
sends us. I however think it is preferable and simpler to use a
single-threaded model in conjunction with an event loop implemented with
poll()
or equivalent.
But let us get our hands dirty and examine a sample program that filters through an arbitrary shell command the data it receives on its standard input and prints the result on its standard output. The full source code is available here. You can use the program like so:
./filter "sort -R" < /usr/share/dict/words
For the sake of brevity, I omit the error recovery path, and instead abort the program at the first error, without cleaning up anything. The “happy” path, however, is cleaned up adequately when the program terminates.
Let us begin by defining three basic utilities:
#define array_len(x) (sizeof(x) / sizeof(x)[0])
noreturn void die(const char *s)
{
int my_errno = errno;
fputs(s, stderr);
if (my_errno)
fprintf(stderr, ": %s", strerror(my_errno));
fputc('\n', stderr);
fflush(stderr);
exit(EXIT_FAILURE);
}
struct copy_state {
int in_fd;
int out_fd;
char buf[4096];
char *p;
ssize_t nr;
};
int copy(struct copy_state *s)
{
ssize_t written;
if (s->nr > 0)
goto write;
read:
s->nr = read(s->in_fd, s->buf, sizeof s->buf);
if (s->nr < 0) {
if (errno == EINTR)
goto read;
if (errno == EAGAIN)
return 0;
die("read");
}
if (s->nr == 0)
return EOF;
s->p = s->buf;
write:
written = write(s->out_fd, s->p, s->nr);
if (written < 0) {
if (errno == EINTR)
goto write;
if (errno == EAGAIN)
return 0;
die("write");
}
s->nr -= written;
if (s->nr > 0) {
s->p += written;
goto write;
}
goto read;
}
There is nothing special happening here. The macro
array_len
returns the length of an array. The function
die()
exits with an error message that describes the
current errno
, if any. (Note that errno
is
saved in a variable at the beginning of the function; this is because
the call to fputs()
below might overwrite it.)
The function copy()
is a bit more involved, but has a
simple purpose: it just reads data from in_fd
and copies it
verbatim to out_fd
, and does this in a greedy way,
consuming as much data as possible from in_fd
. The
complexity arises from the need to maintain the current state correctly
and to handle boundary conditions, to wit:
EOF
must be recognized and dealt withread()
and write()
can be interrupted by
system calls, hence the checks for EINTR
write()
doesn’t necessarily write the buffer it is
given in full, hence the need to retry several timesOf note are the checks for EAGAIN
. This error condition
is only raised on file descriptors that refer to sockets, except when
the file descriptor has been made non-blocking. We will see later on
that making non-blocking the two ends of the pipes we manage is
necessary, hence the presence of these checks.
We now come to the most interesting part of the program, where we
start a subprocess and setup two pipes to communicate with it. Only two
pipes are needed here because we are not interested in the standard
error stream of the child process. If you want to capture it, too, it is
necessary to create an extra pipe. If you just want to silence the error
output of the child program, you can do so by replacing its standard
error stream with an opened file descriptor to /dev/null
within the child process.
// Two pipes, thus four fds, to communicate with the subprocess
enum {
STDIN_READ, // Read end of the subprocess stdin
STDIN_WRITE, // Write end of the subprocess stdin
STDOUT_READ, // Read end of the subprocess stdout
STDOUT_WRITE, // Write end of the subprocess stdout
};
noreturn void setup_subprocess(int pipes[static 4], const char *cmd)
{
static const int8_t fd_map[] = {
// Replace the standard input of the subprocess with
// the read end of our STDIN pipe
[STDIN_READ] = STDIN_FILENO,
// Close the write end of our STDIN pipe
[STDIN_WRITE] = -1,
// Close the read end of our STDOUT pipe
[STDOUT_READ] = -1,
// Replace the stdout of the subprocess with the write
// end of our STDOUT pipe
[STDOUT_WRITE] = STDOUT_FILENO,
};
for (size_t i = 0; i < array_len(fd_map); i++) {
if (fd_map[i] >= 0 && dup2(pipes[i], fd_map[i]) < 0)
die("dup2");
if (close(pipes[i]))
die("close");
}
execl("/bin/sh", "/bin/sh", "-c", cmd, (char *)NULL);
die("execl");
}
pid_t start_subprocess(struct pollfd fds[static 2], const char *cmd)
{
static const int8_t fd_map[] = {
// Close the read end of our STDIN pipe
[STDIN_READ] = -1,
// Save the write end of our STDIN pipe in fds[STDIN_FILENO]
[STDIN_WRITE] = STDIN_FILENO,
// Save the read end of our STDOUT in fds[STDOUT_FILENO]
[STDOUT_READ] = STDOUT_FILENO,
// Close the write end of our STDOUT pipe
[STDOUT_WRITE] = -1,
};
int pipes[4];
pid_t pid;
for (size_t i = 0; i < array_len(pipes); i += 2) {
if (pipe(&pipes[i]) < 0)
die("pipe");
}
switch (pid = fork()) {
case -1:
die("fork");
case 0:
setup_subprocess(pipes, cmd);
}
for (size_t i = 0; i < array_len(fd_map); i++) {
if (fd_map[i] < 0) {
if (close(pipes[i]))
die("close");
} else {
fds[fd_map[i]].fd = pipes[i];
}
}
return pid;
}
Let us start with the start_subprocess
function. Its
role is to run the command cmd
in a shell and to fill the
struct pollfd[2]
it is given as argument with two file
descriptors: the write end of a pipe connected to the standard input of
the subprocess, and the read end of a pipe connected to the standard
output of this subprocess, respectively. To make it more obvious how
file descriptors are set up, I introduced an enum, as well as two tables
that basically describe what should be done with each file
descriptor.
In start_subprocess
, we first create two pipes, then
start a child process with fork()
. At this point the
execution of the program is split: the child process enters the function
setup_subprocess
, from which it never returns, while the
parent pursues the execution of the same function. The parent then
closes the two pipe ends it doesn’t need and copies the other ones into
the struct provided by the caller.
The function setup_subprocess
, which is executed by the
child process, replaces the standard input and the standard output of
the program to start with the appropriate ends of the pipes its parent
created. This is accomplished with dup2()
. The function
then closes all the extra files descriptors its parent created—which
includes those it cloned with dup2
— and replaces the
process image with execl
, which here invokes the shell with
a command. The execl()
function—as well as other functions
from the same family, like execve()
—does not return if
successful, thus the die()
statement that follows its
invocation.
So far we’ve seen how to start a child process and setup pipes to
communicate with it. Let us now see how to send data to the subprocess
and receive data from it while eschewing the blocking issues we talked
about earlier on. Here is the main loop code. The pollfd
structures passed as arguments are both initialized: the one at index
STDIN_FILENO
(0) holds the write end of the pipe connected
to the standard input of the subprocess, while the one at index
STDOUT_FILENO
(1) holds the read end of the pipe connected
the standard output of the subprocess.
void main_loop(struct pollfd fds[static 2])
{
struct copy_state in = {
.in_fd = STDIN_FILENO,
.out_fd = fds[STDIN_FILENO].fd,
};
struct copy_state out = {
.in_fd = fds[STDOUT_FILENO].fd,
.out_fd = STDOUT_FILENO,
};
int ret;
for (size_t i = 0; i < 2; i++) {
int flags = fcntl(fds[i].fd, F_GETFL);
if (flags < 0)
die("fcntl");
if (fcntl(fds[i].fd, F_SETFL, flags | O_NONBLOCK) < 0)
die("fcntl");
}
poll:
ret = poll(fds, 2, -1);
if (ret <= 0) {
if (ret == 0 || errno == EINTR || errno == EAGAIN)
goto poll;
die("poll");
}
if (fds[STDIN_FILENO].revents & POLLOUT) {
if (copy(&in) == EOF) {
close(fds[STDIN_FILENO].fd);
fds[STDIN_FILENO].fd = -1;
}
} else if (fds[STDIN_FILENO].revents & (POLLHUP | POLLERR)) {
errno = 0;
die("subprocess closed its stdin too soon");
}
if (fds[STDOUT_FILENO].revents & POLLIN) {
if (copy(&out) == EOF)
goto fini;
} else if (fds[STDOUT_FILENO].revents & (POLLHUP | POLLERR)) {
goto fini;
}
goto poll;
fini:
if (fds[STDIN_FILENO].fd >= 0) {
errno = 0;
die("subprocess didn't consume its stdin in full");
}
}
The first thing we do here is to make non-blocking the two file
descriptors we’re going to watch, with the help of fcntl()
.
The neat effect is that, instead of interrupting our program when we try
to write too much data or attempt to read an empty stream, the kernel
will instead inform us that we should retry later on by setting
errno
to EAGAIN
—which simply means “try again
later”. At this point, we could enter a busy loop, alternating reads and
writes until all the data is consumed, but it is cleaner to use
poll()
, select()
or an equivalent mechanism. I
used poll()
here because it is the most recent mechanism
that remains portable and is specified in POSIX.
After a successful call to poll()
, we check our two
pipes ends to see whether we can interact with them: if the standard
input of the process can accept more data, we copy our own standard
input to it until it is full; likewise, if the standard output of the
process has pending data, we copy it to our own standard output as much
as possible A very important thing to notice here is that, when we’re
finished copying our standard input to the standard input of the
subprocess, we close our end of the pipe. This is necessary for the
subprocess to reach EOF. Without it, the subprocess would block forever,
waiting for data. We also set the fd to -1 to make the kernel ignore it
in subsequent calls to poll()
.
As a bonus, we also try to catch two possible error conditions, to wit:
At this point we have fed the subprocess all the data available in
our standard input, and we have consumed the entirety of its standard
output. It remains to check whether the subprocess was successful
viz. whether it exited normally by returning EXIT_SUCCESS
from main or by calling some variant of the exit()
function
with the same error code. To achieve that, we have to wait for the
subprocess to terminate and to examine how it died. This is done with
the waitpid()
function. Now this function is blocking per
default. Given that we’re running a single thread and that we most
likely have some other important work to do, we’ll make it
non-blocking–by passing it the WNOHANG
flag. Furthermore,
instead of calling it again and again in a busy loop, we’ll ask the
kernel to notify us when the subprocess we created might have died, and
only call waitpid()
on these occasions. The kernel informs
us of this condition by sending us the signal SIGCHLD
,
which just means: “a child process died”. Note the use of the
indefinite: SIGCHLD
tells us that a subprocess died, but
not which one, so we have to make sure that the one that stopped is
actually the one we are waiting for.
To ask the kernel to notify us when one of our child processes dies,
we must install a signal handler viz. a callback function that will be
invoked when this happens. Now signal handlers are extremely limited in
what they can do, because they can be invoked at arbitrary points in the
execution of the program. Only a few functions are safe to call, among
which the write
system call. This gave D. J. Bernstein the idea
to introduce what he calls a “self-pipe”. The basic idea is to create a
pipe, monitor its read end in the main event loop through
poll()
or equivalent, and write a single byte to it from
the signal handler whenever a new signal is catched. The neat effect is
that we receive signals through a file descriptor, when our program is
in a well-defined state. (The Linux kernel has since introduced
something similar with signalfd()
.) Since event loops
already monitor a set of file descriptors, it is convenient to just add
an extra one.
The end of our filter program follows:
static int self_pipe[2];
void handle_sigchld(int sig)
{
while (write(self_pipe[1], "", 1) < 0 && errno == EINTR)
;
}
void set_sigchld_handler(void)
{
struct sigaction sa;
if (pipe(self_pipe) < 0)
die("pipe");
for (size_t i = 0; i < array_len(self_pipe); i++) {
int flags = fcntl(self_pipe[i], F_GETFL);
if (flags < 0)
die("fcntl");
if (fcntl(self_pipe[i], F_SETFL, flags | O_NONBLOCK) < 0)
die("fcntl");
}
memset(&sa, 0, sizeof sa);
sa.sa_handler = handle_sigchld;
if (sigemptyset(&sa.sa_mask))
die("sigemptyset");
sa.sa_flags = SA_RESTART;
if (sigaction(SIGCHLD, &sa, NULL))
die("sigaction");
}
noreturn void wait_for_process(pid_t pid)
{
struct pollfd fd = {.fd = self_pipe[0], .events = POLLIN};
int status;
pid_t pid2;
ssize_t nr;
char dummy;
poll:
status = poll(&fd, 1, 1000);
if (status < 0) {
if (errno == EINTR || errno == EAGAIN)
goto poll;
die("poll");
}
if (status == 0) {
fputs("waiting...\n", stderr);
goto poll;
}
nr = read(fd.fd, &dummy, 1);
if (nr <= 0) {
if (nr == 0 || errno == EINTR || errno == EAGAIN)
goto poll;
die("read");
}
pid2 = waitpid(pid, &status, WNOHANG);
if (pid2 < 0)
die("waitpid");
if (pid2 != pid)
goto poll;
if (close(self_pipe[0]) || close(self_pipe[1]))
die("close");
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) == EXIT_SUCCESS)
exit(EXIT_SUCCESS);
errno = 0;
die("subprocess returned a non-zero error code");
}
errno = 0;
die("subprocess was killed by a signal");
}
int main(int argc, char **argv)
{
struct pollfd fds[2] = {
[STDIN_FILENO] = {.events = POLLOUT},
[STDOUT_FILENO] = {.events = POLLIN},
};
pid_t pid;
int ret;
const char *cmd = argc > 1 ? argv[1] : "sort";
set_sigchld_handler();
pid = start_subprocess(fds, cmd);
main_loop(fds);
close(fds[STDOUT_FILENO].fd);
wait_for_process(pid);
}
Note that, while the program waits for the subprocess it created to
exit, it prints every second the message waiting...
. It is
very unlikely you will see this message in practice, because, when the
program enters the wait_for_process()
function, the
standard input and the standard output of the subprocess it created are
closed. Now a program typically doesn’t close these streams for its
whole lifetime, such that the fact that they are closed is a strong clue
that it has already exited. For a case where this is not true, try to
run the following:
./filter "sort -R; exec 1<&-; sleep 10" < /usr/share/dict/words
The command exec 1<&-
closes the standard output
in Bash.