Celery task that runs more tasks

PythonDjangoTaskCelery

Python Problem Overview


I am using celerybeat to kick off a primary task that kicks of a number of secondary tasks. I have both tasks written already.

Is there a way to easily do this? Does Celery allow for tasks to be run from within tasks?

My example:

@task
def compute(users=None):
    if users is None:
        users = User.objects.all()

    tasks = []
    for user in users:
        tasks.append(compute_for_user.subtask((user.id,)))

    job = TaskSet(tasks)
    job.apply_async() # raises a IOError: Socket closed
    
@task
def compute_for_user(user_id):
    #do some stuff

compute gets called from celerybeat, but causes an IOError when it tries to run apply_async. Any ideas?

Python Solutions


Solution 1 - Python

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks". See the documentation for Sets of tasks, Subtasks and Callbacks, which @Paperino was kind enough to link to.

For version 3.0, Celery changed to using groups for this, and other, types of behavior.

Your code shows that you are already familiar with this interface. Your actual question seems to be, "Why am I getting a 'Socket Closed' IOError when I try to run my set of subtasks?" I don't think anyone can answer that, because you have not provided enough information about your program. Your excerpt cannot be run as-is, so we cannot examine the problem you're having for ourselves. Please post the stacktrace provided with the IOError, and with any luck, someone that can help you with your crasher will come along.

Solution 2 - Python

You can use something like this (Support in 3.0 )

g = group(compute_for_user.s(user.id) for user in users)
g.apply_async()

Solution 3 - Python

And since version 3.0 'TaskSet' isn't the term anymore... Groups, Chains and Chords as a special type of subtask is the new thing, see http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks

Solution 4 - Python

For the IOError mentioned, although the information here is not sufficient to tell what caused it, my guess is that you tried to establish a connection inside the task function, so whenever a task is called, a new connection is built. If the task is to be called thousand times, there will be thousand connections. This will flood the system socket manager and the IOError is its complaint.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMantas VidutisView Question on Stackoverflow
Solution 1 - PythonJeremy W. ShermanView Answer on Stackoverflow
Solution 2 - PythonAbhilash JosephView Answer on Stackoverflow
Solution 3 - Pythonmichel.iamitView Answer on Stackoverflow
Solution 4 - PythonZheng LiuView Answer on Stackoverflow