Asynchronous exception handling in Python
PythonPython 3.xPython AsyncioPython Problem Overview
I've the following code using asyncio
and aiohttp
to make asynchronous HTTP requests.
import sys
import asyncio
import aiohttp
@asyncio.coroutine
def get(url):
try:
print('GET %s' % url)
resp = yield from aiohttp.request('GET', url)
except Exception as e:
raise Exception("%s has error '%s'" % (url, e))
else:
if resp.status >= 400:
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
return (yield from resp.text())
@asyncio.coroutine
def fill_data(run):
url = 'http://www.google.com/%s' % run['name']
run['data'] = yield from get(url)
def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop = asyncio.get_event_loop()
task = asyncio.wait([fill_data(r) for r in runs])
loop.run_until_complete(task)
return runs
try:
get_runs()
except Exception as e:
print(repr(e))
sys.exit(1)
For some reason, exceptions raised inside the get
function are not caught:
Future/Task exception was never retrieved
Traceback (most recent call last):
File "site-packages/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "mwe.py", line 25, in fill_data
run['data'] = yield from get(url)
File "mwe.py", line 17, in get
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'
So, what is correct way to handle exceptions raised by coroutines?
Python Solutions
Solution 1 - Python
asyncio.wait
doesn't actually consume the Futures
passed to it, it just waits for them to complete, and then returns the Future
objects:
> coroutine asyncio.wait(futures, *, loop=None, timeout=None, > return_when=ALL_COMPLETED)
> Wait for the Futures and coroutine objects
> given by the sequence futures to complete. Coroutines will be wrapped
> in Tasks. Returns two sets of Future
: (done, pending).
Until you actually yield from
/await
the items in the done
list, they'll remain unconsumed. Since your program exits without consuming the futures, you see the "exception was never retrieved" messages.
For your use-case, it probably makes more sense to use asyncio.gather
, which will actually consume each Future
, and then return a single Future
that aggregates all their results (or raises the first Exception
thrown by a future in the input list).
def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*[fill_data(r) for r in runs])
loop.run_until_complete(tasks)
return runs
Output:
GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)
Note that asyncio.gather
actually lets you customize its behavior when one of the futures raises an exception; the default behavior is to raise the first exception it hits, but it can also just return each exception object in the output list:
> asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
>
> Return a future aggregating results from the given coroutine objects
> or futures.
>
> All futures must share the same event loop. If all the tasks are done
> successfully, the returned future’s result is the list of results (in
> the order of the original sequence, not necessarily the order of
> results arrival). If return_exceptions
is True
, exceptions in the
> tasks are treated the same as successful results, and gathered in the
> result list; otherwise, the first raised exception will be immediately
> propagated to the returned future.
Solution 2 - Python
To debug or "handle" exceptions in callback:
Coroutine which return some result or raise exceptions:
@asyncio.coroutine
def async_something_entry_point(self):
try:
return self.real_stuff_which_throw_exceptions()
except:
raise Exception(some_identifier_here + ' ' + traceback.format_exc())
And callback:
def callback(self, future: asyncio.Future):
exc = future.exception()
if exc:
# Handle wonderful empty TimeoutError exception
if type(exc) == TimeoutError:
self.logger('<Some id here> callback exception TimeoutError')
else:
self.logger("<Some id here> callback exception " + str(exc))
# store your result where you want
self.result.append(
future.result()
)