Servlet-3 Async Context, how to do asynchronous writes?

JavaCometServlet 3.0

Java Problem Overview


Problem Description

Servlet-3.0 API allows to detach a request/response context and answer to it later.

However if I try to write a big amount of data, something like:

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

It may actually block - and it does block in trivial test cases - for both Tomcat 7 and Jetty 8. The tutorials recommend to create a thread pool that would handle such a setup - witch is generally the counter-positive to a traditional 10K architecture.

However if I have 10,000 open connections and a thread pool of let's say 10 threads, it is enough for even 1% of clients that have low speed connections or just blocked connection to block the thread pool and completely block the comet response or slow it down significantly.

The expected practice is to get "write-ready" notification or I/O completion notification and than continue to push the data.

How can this be done using Servlet-3.0 API, i.e. how do I get either:

  • Asynchronous Completion notification on I/O operation.
  • Get non-blocking I/O with write ready notification.

If this is not supported by the Servlet-3.0 API, are there any Web Server specific APIs (like Jetty Continuation or Tomcat CometEvent) that allow to handle such events truly asynchronously without faking asynchronous I/O using thread pool.

Does anybody know?

And if this is not possible can you confirm it with a reference to documentation?

Problem demonstration in a sample code

I had attached the code below that emulates event-stream.

Notes:

  • it uses ServletOutputStream that throws IOException to detect disconnected clients
  • it sends keep-alive messages to make sure clients are still there
  • I created a thread pool to "emulate" asynchronous operations.

In such an example I explicitly defined thread pool of size 1 to show the problem:

  • Start an application
  • Run from two terminals curl http://localhost:8080/path/to/app (twice)
  • Now send the data with curd -d m=message http://localhost:8080/path/to/app
  • Both clients received the data
  • Now suspend one of the clients (Ctrl+Z) and send the message once again curd -d m=message http://localhost:8080/path/to/app
  • Observe that another non-suspended client either received nothing or after the message was transfered stopped receiving keep-alive requests because other thread is blocked.

I want to solve such a problem without using thread pool, because with 1000-5000 open connections I can exhaust the thread pool very fast.

The sample code below.


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose
    
    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("\n");
            }
            sb.append("\n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alive\n\n";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;
            
            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);
            
        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };
    
    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

The sample above uses threads to prevent blocking... However if the number of blocking clients is bigger than the size of the thread pool it would block.

How could it be implemented without blocking?

Java Solutions


Solution 1 - Java

I've found the Servlet 3.0 Asynchronous API tricky to implement correctly and helpful documentation to be sparse. After a lot of trial and error and trying many different approaches, I was able to find a robust solution that I've been very happy with. When I look at my code and compare it to yours, I notice one major difference that may help you with your particular problem. I use a ServletResponse to write the data and not a ServletOutputStream.

Here my go-to Asynchronous Servlet class adapted slightly for your some_big_data case:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {
        
        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}

Solution 2 - Java

During my research on this topic, this thread kept popping up, so figured I mention it here:

Servlet 3.1 introduced async operations on ServletInputStream and ServletOutputStream. See ServletOutputStream.setWriteListener.

An example can be found at http://docs.oracle.com/javaee/7/tutorial/servlets013.htm

Solution 3 - Java

Solution 4 - Java

We can't quite cause the writes to be asynchronous. We realistically have to live with the limitation that when we do write something out to a client, we expect to be able to do so promptly and are able to treat it as an error if we don't. That is, if our goal is to stream data to the client as fast as possible and use the blocking/non-blocking status of the channel as a way to control the flow, we're out of luck. But, if we're sending data at a low rate that a client should be able to handle, we are able at least to promptly disconnect clients that don't read quickly enough.

For example, in your application, we send the keepalives at a slow-ish rate (every few seconds) and expect clients to be able to keep up with all the events they're being sent. We splurge the data to the client, and if it can't keep up, we can disconnect it promptly and cleanly. That's a bit more limited than true asynchronous I/O, but it should meet your need (and incidentally, mine).

The trick is that all of the methods for writing out output which just throw IOExceptions actually do a bit more than that: in the implementation, all the calls to things that can be interrupt()ed will be wrapped with something like this (taken from Jetty 9):

catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

(I also note that this doesn't happen in Jetty 8, where an InterruptedException is logged and the blocking loop is immediately retried. Presumably you make to make sure your servlet container is well-behaved to use this trick.)

That is, when a slow client causes a writing thread to block, we simply force the write to be thrown up as an IOException by calling interrupt() on the thread. Think about it: the non-blocking code would consume a unit of time on one of our processing threads to execute anyway, so using blocking writes that are just aborted (after say one millisecond) is really identical in principle. We're still just chewing up a short amount of time on the thread, only marginally less efficiently.

I've modified your code so that the main timer thread runs a job to bound the time in each write just before we start the write, and the job is cancelled if the write completes quickly, which it should.

A final note: in a well-implemented servlet container, causing the I/O to throw out ought to be safe. It would be nice if we could catch the InterruptedIOException and try the write again later. Perhaps we'd like to give slow clients a subset of the events if they can't keep up with the full stream. As far as I can tell, in Jetty this isn't entirely safe. If a write throws, the internal state of the HttpResponse object might not be consistent enough to handle re-entering the write safely later. I expect it's not wise to try to push a servlet container in this way unless there are specific docs I've missed offering this guarantee. I think the idea is that a connection is designed to be shut down if an IOException happens.

Here's the code, with a modified version of RunJob::run() using a grotty simple illustration (in reality, we'd want to use the main timer thread here rather than spin up one per-write which is silly).

public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);
    
    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
    	canceller.interrupt();
    	while (true) {
    		try { canceller.join(); break; }
    		catch (InterruptedException e) { }
    	}
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}

Solution 5 - Java

Is Spring an option for you? Spring-MVC 3.2 has a class called DeferredResult, which will gracefully handle your "10,000 open connections/10 server pool threads" scenario.

Example: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

Solution 6 - Java

I've had a quick look at your listing, so I may have missed some points. The advantage of a pool thread is to share thread resources between several tasks over time. Maybe you can solve your problem by spacing keepAlive responses of different http connections, instead of grouping all of them at the same time.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionArtyomView Question on Stackoverflow
Solution 1 - JavaherrtimView Answer on Stackoverflow
Solution 2 - JavaErich EichingerView Answer on Stackoverflow
Solution 3 - JavaATilaraView Answer on Stackoverflow
Solution 4 - JavaNicholas WilsonView Answer on Stackoverflow
Solution 5 - JavaJJ ZabkarView Answer on Stackoverflow
Solution 6 - Javauser2121502View Answer on Stackoverflow