CIS: wyatt courses   home


. Pipes


.. about pipes

A pipe 
======
      Used for one-way communication of a stream of bytes.

To create a pipe, the system call is "pipe"
   Takes an array of two integers 
   Fills in the array with 2 file descriptors to be used for low-level I/O.

Creating a pipe
=============== 
   int pfd[2]; 
   pipe(pfd);

I/O with a pipe
===============
The two file descriptors can be used for I/O 
   write(pfd[1], buf, size); 
   read (pfd[0], buf, size);




A pipe opened before the fork becomes shared between the processes. ^^^^^^^^^^^^^^^ This gives two read ends and two write ends. The read end of pipe will not be closed until both read ends are closed, and the write end will not be closed until both write ends are closed. Either process can write into the pipe, and either can read from it. Which process will get what is not known. -------------------------------------------------------- pipe before and after fork()

if(pid == 0) { /* child */ close(pfd[1]); … } else { /* parent */ close(pfd[0]); … } For predictable behaviour, one of the processes must close its read end, and the other must close its write end. Then it will become a simple pipeline again. Pipe’s buffer size is minimally 512 & blocks on write if full

.. code: 1 pipe


--------------------------------------------------------

--------------------------------------------------------


#include <stdio.h> #define SIZE 1024 int main(int argc, char **argv) { int pfd[2]; int nread; int pid; char buf[SIZE]; char filler = 0; if (pipe(pfd) == -1){ perror("pipe failed"); exit(1); } if ((pid = fork()) < 0){ perror("fork failed"); exit(2); } if (pid == 0) { /* child */ close(pfd[1]); while ((nread = read(pfd[0], buf, SIZE))!= 0) printf("child read %s\n", buf); close(pfd[0]); } else { /* parent */ close(pfd[0]); strcpy(buf, "Hello child"); write(pfd[1], buf, strlen(buf)+1); close(pfd[1]); } exit(0); } A pipeline works because the two processes know the file descriptor of each end of the pipe. Each process has a stdin (0), a stdout (1) and a stderr (2). The file descriptors will depend on which other files have been opened, but could be 3 and 4, say. When the processes have ceased communication, the parent closes its write end. This means that the child gets eof on its next read, and it can close its read end. Two processes wish to communicate with each other, in a 2-way manner. Two pipes can be used for this: The two processes issue read/write requests P1: P2: read(pipe2) read(pipe1) write(pipe1) write(pipe2)

.. code: 2 pipes

Two processes wish to communicate with each other, in a 2-way manner. 
Two pipes can be used for this: 


             The two processes issue read/write requests 
	   P1: 				             P2: 
	 read(pipe2) 			      read(pipe1) 
	 write(pipe1) 			     write(pipe2) 


EXAMPLE 1 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/wait.h> struct procStats { int pid; int pnum; }; int main() { int pCount = 10; bool isParent = true; int i = 0; int pip[2]; int cid; procStats stats; pipe(pip); // create a pipe while( i < pCount && isParent) { isParent = fork(); //parent keeps forking i++; } if(isParent)// parent code { printf("parent process reporting pid: %X \n", getpid()); for(i=1; i<=pCount; i++) { read(pip[0], &stats, 8); printf("child %d reporting pid: %X \n", stats.pnum, stats.pid); } for(i=1; i<=pCount; i++) wait(NULL); } else //child { stats.pid = getpid(); stats.pnum = i; write(pip[1], &stats, 8); // 8 is size- WHY? } } ------------------------------------------- $ a.out parent process reporting pid: 3E47 child 1 reporting pid: 3E48 child 2 reporting pid: 3E49 child 3 reporting pid: 3E4A child 5 reporting pid: 3E4C child 4 reporting pid: 3E4B child 6 reporting pid: 3E4D child 8 reporting pid: 3E4F child 7 reporting pid: 3E4E child 10 reporting pid: 3E51 child 9 reporting pid: 3E50
A process may want to both 'write to' and 'read from' a child. In this case it creates two pipes. One of these is used by the parent for writing and by the child for reading. The other is used by the child for writing and the parent for reading. "dup" is used to duplicate a file descriptor to use with fork/exec A fifo (named pipe) can be used for unrelated processes Common variations on this method of IPC are: A pipeline may consist of three or more processes. 1. The parent can fork twice to give two children. 2. The parent can fork once and the child can fork once, giving a parent, child and grandchild. 3. The parent can create two pipes before any forking. After a fork there will then be a total of 8 ends open (2 processes * two ends * 2 pipes). Some ends should be closed so that only one read and only one write end remains. #include <iostream> #include <iomanip> #include <fstream> #include <sys/wait.h> using namespace std; int main (void) { ofstream out; int returnPid = 1; int pip1[2], pip2[2]; int i = 0; pipe(pip1); pipe(pip2); while (i < 2 && returnPid != 0 ) { returnPid = fork(); ++i; } //CHILD if (returnPid == 0) { if (i == 1) { //CHILD 1 write(pip1[1], &i , 4); } else if (i == 2) { //CHILD 2 write(pip2[1], &i , 4); } } // end if //PARENT else { int x = 0; read(pip1[0], &x, 4); cout << x << " kid 1 heard from" << endl; read(pip2[0], &x, 4); cout << x << " kid 2 heard from" << endl; close(pip1[0]); close(pip1[1]); close(pip2[0]); close(pip2[1]); wait(NULL); wait(NULL); } return 0; } // end main 1 kid 1 heard from 2 kid 2 heard from

.. code: 2 kids each with 2 pipes

jupiter|/export/home/wyatt/private/355/Pipes$ cat simpParChildPipe.cpp
// wyatt fork & pipe
#include <iostream>
#include <iomanip>
#include <fstream>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
using namespace std;

int main (void)
{
 const int NK = 2;      // number of kids to spawn
 static bool done=false;// whether need to seed random #
 int returnPid;         // result of fork
 int i, x;              // loop index & kid # indicator (0 through NK-1)

 // structure for the messages passed between parent & kids
 struct info{ //
    unsigned int index;
    unsigned int pid;
 };
 struct info msgp, msgp2, msgk, msgk2;

 // file descriptors for pipe
 // number of kids x 2 pipes per kid & each pipe has 2 ends
 int pfd0[2], pfd1[2], pfd2[2], pfd3[2]; 

   // PARENT ONLY CODE  - NO KIDS YET
   // set up 2 pipes per kid
   pipe(pfd0);    pipe(pfd1);     pipe(pfd2);    pipe(pfd3);

   // PARENT FORKS all the kids here & stays in loop
   for (i = 0; i < NK; i++)
   {
     returnPid = fork();
     if (returnPid == 0)
       break; // KIDS break out of loop here
   }

   //======= PARENT ONLY CODE ===================================
   // code has exited from loop after NK iterations, so returnPid non-zero
   if (returnPid != 0) {
      // sleep a bit
      srand(getpid());
      sleep(rand()%3);

      // set up msg
      msgp.index = 99;   msgp.pid = getpid();

      // parent closes unused pipe-ends
      //  //kid 0
      close(pfd0[0]); // close read end k0
      close(pfd1[1]); // close write end k0
      //  //kid 1
      close(pfd2[0]); // close read end k1
      close(pfd3[1]); // close write end k1

      // write to kid pipes
      write(pfd0[1], &msgp, sizeof(msgp)); // write to kid0 pipe
      write(pfd2[1], &msgp, sizeof(msgp)); // write to kid1 pipe

      // parent reads from each kid & reports
      read(pfd1[0], &msgp2, sizeof(msgp2)); // read pipe from kid0
      cerr << "Parent got message: kid# " << msgp2.index 
           << "  pid=" << msgp2.pid << endl;
      read(pfd3[0], &msgp2, sizeof(msgp2)); // read pipe from kid1
      cerr << "Parent got message: kid# " << msgp2.index 
           << "  pid=" << msgp2.pid << endl;

      // parent waits for all kids to end
      for(int j = 0; j < NK; j++)
         wait(NULL);
   }
   //======= END PARENT CODE ================================

   //======= KID'S CODE =====================================
   // 'i' will be 0 through NK-1
   else
   {
      // set up message to send to parent
      msgk.index = i;
      msgk.pid = getpid();

      if (i==0) // kid 0
      {
        // kid0 closes unused pipe-ends
        close(pfd0[1]); // close write end for k0
        close(pfd1[0]); // close read end for k0

        sleep(i*2);

        // kid0 reads from  parent
        read (pfd0[0],&msgk2, sizeof(msgk2));

        // print out the info gotten in message from parent
        cerr << "Kid0 got message: id# " << msgk2.index 
             << "  pid=" << msgk2.pid << endl;

        // write to parent
        write(pfd1[1], &msgk, sizeof(msgk));

      }
      if (i==1) // kid 1
      {
        // kid1 closes unused pipe-ends
        close(pfd2[1]); // close write end for k1
        close(pfd3[0]); // close read end for k1

        sleep(i*2);

        // kid1 reads from  parent
        read (pfd2[0],&msgk2, sizeof(msgk2));

        // print out the info gotten in message from parent
        cerr << "Kid1 got message: id# " << msgk2.index 
             << "  pid=" << msgk2.pid << endl;

        // write to parent
        write(pfd3[1], &msgk, sizeof(msgk));

      }
   }
   //======= END KID CODE =====================================

 return 0;
} // end main function