Why doesn't more Java code use PipedInputStream / PipedOutputStream?

JavaDesign PatternsConcurrencyPipe

Java Problem Overview


I've discovered this idiom recently, and I am wondering if there is something I am missing. I've never seen it used. Nearly all Java code I've worked with in the wild favors slurping data into a string or buffer, rather than something like this example (using HttpClient and XML APIs for example):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

That code uses a Unix-piping style technique to prevent multiple copies of the XML data being kept in memory. It uses the HTTP Post output stream and the DOM Load/Save API to serialize an XML Document as the content of the HTTP request. As far as I can tell it minimizes the use of memory with very little extra code (just the few lines for Runnable, PipedInputStream, and PipedOutputStream).

So, what's wrong with this idiom? If there's nothing wrong with this idiom, why haven't I seen it?

EDIT: to clarify, PipedInputStream and PipedOutputStream replace the boilerplate buffer-by-buffer copy that shows up everywhere, and they also allow you to process incoming data concurrently with writing out the processed data. They don't use OS pipes.

Java Solutions


Solution 1 - Java

From the Javadocs:

>Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread.

This may partially explain why it is not more commonly used.

I'd assume another reason is that many developers do not understand its purpose / benefit.

Solution 2 - Java

In your example you're creating two threads to do the work that could be done by one. And introducing I/O delays into the mix.

Do you have a better example? Or did I just answer your question.


To pull some of the comments (at least my view of them) into the main response:

  • Concurrency introduces complexity into an application. Instead of dealing with a single linear flow of data, you now have to be concerned about sequencing of independent data flows. In some cases, the added complexity may be justified, particularly if you can leverage multiple cores/CPUs to do CPU-intensive work.
  • If you are in a situation where you can benefit from concurrent operations, there's usually a better way to coordinate the flow of data between threads. For example, passing objects between threads using a concurrent queue, rather than wrapping the piped streams in object streams.
  • Where a piped stream may be a good solution is when you have multiple threads performing text processing, a la a Unix pipeline (eg: grep | sort).

In the specific example, the piped stream allows use of an existing RequestEntity implementation class provided by HttpClient. I believe that a better solution is to create a new implementation class, as below, because the example is ultimately a sequential operation that cannot benefit from the complexity and overhead of a concurrent implementation. While I show the RequestEntity as an anonymous class, reusability would indicate that it should be a first-class class.

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});

Solution 3 - Java

I too only discovered the PipedInputStream/PipedOutputStream classes recently.

I am developing an Eclipse plug-in that needs to execute commands on a remote server via SSH. I am using JSch and the Channel API reads from an input stream and writes to an output stream. But I need to feed commands through the input stream and read the responses from an output stream. Thats where PipedInput/OutputStream comes in.

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();

Solution 4 - Java

Also, back to the original example: no, it does not exactly minimize memory usage either. DOM tree(s) get built, in-memory buffering done -- while that is better than full byte array replicas, it's not that much better. But buffering in this case will be slower; and an extra thread is also created -- you can not use PipedInput/OutputStream pair from within a single thread.

Sometimes PipedXxxStreams are useful, but the reason they are not used more is because quite often they are not the right solution. They are ok for inter-thread communication, and that's where I have used them for what that's worth. It's just that there aren't that many use cases for this, given how SOA pushes most such boundaries to be between services, instead of between threads.

Solution 5 - Java

Here's a use case where pipes make sense:

Suppose you have a third party lib, such as an xslt mapper or crypto lib that has an interface like this: doSomething(inputStream, outputStream). And you do not want to buffer the result before sending over the wire. Apache and other clients disallow direct access to the wire outputstream. Closest you can get is obtaining the outputstream - at an offset, after headers are written - in a request entity object. But since this is under the hood, it's still not enough to pass an inputstream and outputstream to the third party lib. Pipes are a good solution to this problem.

Incidentally, I wrote an inversion of Apache's HTTP Client API [PipedApacheClientOutputStream] which provides an OutputStream interface for HTTP POST using Apache Commons HTTP Client 4.3.4. This is an example where Piped Streams might make sense.

Solution 6 - Java

I tried using these classes a while back for something, I forget the details. But I did discover that their implementation is fatally flawed. I can't remember what it was but I have a sneaky memory that it may have been a race condition which meant that they occasionally deadlocked (And yes, of course I was using them in separately threads: they simply aren't usable in a single thread and weren't designed to be).

I might have a look at their source code andsee if I can see what the problem might have been.

Solution 7 - Java

java.io pipes have too much context switching (per byte read/write) and their java.nio counterpart requires you to have some NIO background and proper usage of channels and stuff, this is my own implementation of pipes using a blocking queue which for a single producer/consumer will perform fast and scale well:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

Solution 8 - Java

> So, what's wrong with this idiom? If > there's nothing wrong with this idiom, > why haven't I seen it? > > EDIT: to clarify, PipedInputStream and > PipedOutputStream replace the > boilerplate buffer-by-buffer copy that > shows up everywhere, and they also > allow you to process incoming data > concurrently with writing out the > processed data. They don't use OS > pipes.

You have stated what it does but haven't stated why you are doing this.

If you believe that this will either reduce resources used (cpu/memory) or improve performance then it won't do either. However it will make your code more complex.

Basically you have a solution without a problem for which it solves.

Solution 9 - Java

PipedInputStream and PipeOutputStream will sleep its thread for 1 second whenever they are blocking waiting for the other side to read or write out of the full or empty buffer. Do not use.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionSteven HuwigView Question on Stackoverflow
Solution 1 - Javamatt bView Answer on Stackoverflow
Solution 2 - JavakdgregoryView Answer on Stackoverflow
Solution 3 - JavaBrian MatthewsView Answer on Stackoverflow
Solution 4 - JavaStaxManView Answer on Stackoverflow
Solution 5 - JavaRobert ChristianView Answer on Stackoverflow
Solution 6 - JavaAdrian PronkView Answer on Stackoverflow
Solution 7 - JavaGuido MedinaView Answer on Stackoverflow
Solution 8 - JavaPeter LawreyView Answer on Stackoverflow
Solution 9 - JavaDWoldrichView Answer on Stackoverflow