Asynchronous exception handling in Python

PythonPython 3.xPython Asyncio

Python Problem Overview


I've the following code using asyncio and aiohttp to make asynchronous HTTP requests.

import sys
import asyncio
import aiohttp

@asyncio.coroutine
def get(url):
    try:
        print('GET %s' % url)
        resp = yield from aiohttp.request('GET', url)
    except Exception as e:
        raise Exception("%s has error '%s'" % (url, e))
    else:
        if resp.status >= 400:
            raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))

    return (yield from resp.text())

@asyncio.coroutine
def fill_data(run):
    url = 'http://www.google.com/%s' % run['name']
    run['data'] = yield from get(url)

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    task = asyncio.wait([fill_data(r) for r in runs])
    loop.run_until_complete(task)   
    return runs

try:
    get_runs()
except Exception as e:
    print(repr(e))
    sys.exit(1)

For some reason, exceptions raised inside the get function are not caught:

Future/Task exception was never retrieved
Traceback (most recent call last):
  File "site-packages/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "mwe.py", line 25, in fill_data
    run['data'] = yield from get(url)
  File "mwe.py", line 17, in get
    raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'

So, what is correct way to handle exceptions raised by coroutines?

Python Solutions


Solution 1 - Python

asyncio.wait doesn't actually consume the Futures passed to it, it just waits for them to complete, and then returns the Future objects:

> coroutine asyncio.wait(futures, *, loop=None, timeout=None, > return_when=ALL_COMPLETED)

> Wait for the Futures and coroutine objects > given by the sequence futures to complete. Coroutines will be wrapped > in Tasks. Returns two sets of Future: (done, pending).

Until you actually yield from/await the items in the done list, they'll remain unconsumed. Since your program exits without consuming the futures, you see the "exception was never retrieved" messages.

For your use-case, it probably makes more sense to use asyncio.gather, which will actually consume each Future, and then return a single Future that aggregates all their results (or raises the first Exception thrown by a future in the input list).

def get_runs():
    runs = [ {'name': 'one'}, {'name': 'two'} ]
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*[fill_data(r) for r in runs])
    loop.run_until_complete(tasks)
    return runs

Output:

GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)

Note that asyncio.gather actually lets you customize its behavior when one of the futures raises an exception; the default behavior is to raise the first exception it hits, but it can also just return each exception object in the output list:

> asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) > > Return a future aggregating results from the given coroutine objects > or futures. > > All futures must share the same event loop. If all the tasks are done > successfully, the returned future’s result is the list of results (in > the order of the original sequence, not necessarily the order of > results arrival). If return_exceptions is True, exceptions in the > tasks are treated the same as successful results, and gathered in the > result list; otherwise, the first raised exception will be immediately > propagated to the returned future.

Solution 2 - Python

To debug or "handle" exceptions in callback:

Coroutine which return some result or raise exceptions:

@asyncio.coroutine
def async_something_entry_point(self):
    try:
        return self.real_stuff_which_throw_exceptions()
    except:
        raise Exception(some_identifier_here + ' ' + traceback.format_exc())

And callback:

def callback(self, future: asyncio.Future):
    exc = future.exception()
    if exc:
        # Handle wonderful empty TimeoutError exception
        if type(exc) == TimeoutError:
            self.logger('<Some id here> callback exception TimeoutError')
        else:
            self.logger("<Some id here> callback exception " + str(exc))
    
    # store your result where you want
    self.result.append(
        future.result()
    )

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionYury BaydaView Question on Stackoverflow
Solution 1 - PythondanoView Answer on Stackoverflow
Solution 2 - PythonOleg NeumyvakinView Answer on Stackoverflow