Best practice to query large number of ndb entities from datastore

Google App-EngineApp Engine-NdbGoogle Cloud-Datastore

Google App-Engine Problem Overview


I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

Background

I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

So the input to the handler be something like:

  • Start Date
  • End Date
  • Tag(s)

Output would be:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Common Code for Queries

Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Methods Tried

I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

A. Single Iteration

This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Large Fetch

The idea here was to see if I could do a single very large fetch.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Async fetches across time range

The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Async mapping

I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Outcome

I tested out one example query to collect overall response time and appstats traces. The results are:

A. Single Iteration

real: 15.645s

This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

Method A appstats

B. Large Fetch

real: 12.12s

Effectively the same as option A but a bit faster for some reason.

Method B appstats

C. Async fetches across time range

real: 15.251s

Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

Method C appstats

D. Async mapping

real: 13.752s

This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

Method D appstats

Recommendations

Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

Update

In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

After getting advice from several people, I found that the following items were key to understand and keep in mind:

  • Multiple queries can run in parallel
  • Only 10 RPC's can be in flight at once
  • Try to denormalize to the point that there are no secondary queries
  • This type of task is better left to map reduce and task queues, not real-time queries

So what I did to make it faster:

  • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
  • I denormalized the data further to remove the need for the secondary session query
  • I made use of ndb async operations and wait_any() to overlap the queries with the processing

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

Google App-Engine Solutions


Solution 1 - Google App-Engine

Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.

Solution 2 - Google App-Engine

The new experimental Data Processing feature (an AppEngine API for MapReduce) looks very suitable for solving this problem. It does automatic sharding to execute multiple parallel worker processes.

Solution 3 - Google App-Engine

I have a similar problem and after working with Google support for few weeks I can confirm there is no magic solution at least as of December 2017.

tl;dr: One can expect throughput from 220 entities/second for standard SDK running on B1 instance up to 900 entities/second for a patched SDK running on a B8 instance.

The limitation is CPU related and changing the instanced type directly impacts performance. This is confirmed by similar results obtained on B4 and B4_1G instances

The best throughput I got for an Expando entity with about 30 fields is:

Standard GAE SDK

  • B1 instance: ~220 entities/second
  • B2 instance: ~250 entities/second
  • B4 instance: ~560 entities/second
  • B4_1G instance: ~560 entities/second
  • B8 instance: ~650 entities/second

Patched GAE SDK

  • B1 instance: ~420 entities/second
  • B8 instance: ~900 entities/second

For standard GAE SDK I tried various approaches including multi-threading but the best proved to be fetch_async with wait_any. Current NDB library already does a great job of using async and futures under the hood so any attempt to push that using threads only make it worse.

I found two interesting approaches to optimize this:

Matt Faus explains the problem very well:

> GAE SDK provides an API for reading and writing objects derived from > your classes to the datastore. This saves you the boring work of > validating raw data returned from the datastore and repackaging it > into an easy-to-use object. In particular, GAE uses protocol buffers > to transmit raw data from the store to the frontend machine that needs > it. The SDK is then responsible for decoding this format and returning > a clean object to your code. This utility is great, but sometimes it > does a bit more work than you would like. [...] Using our profiling > tool, I discovered that fully 50% of the time spent fetching these > entities was during the protobuf-to-python-object decoding phase. This > means that the CPU on the frontend server was a bottleneck in these > datastore reads!

GAE-data-access-web-request

Both approaches try to reduce the time spent doing protobuf to Python decoding by reducing the number of fields decoded.

I tried both approaches but I only succeed with Matt's. SDK internals changed since Evan published his solution. I had to change a bit the code published by Matt here, but is was pretty easy - if there is interest I can publish the final code.

For a regular Expando entity with about 30 fields I used Matt's solution to decode only couple fields and obtained a significant improvement.

In conclusion one need to plan accordingly and don't expect to be able to process much more than few hundreds entities in a "real-time" GAE request.

Solution 4 - Google App-Engine

Large data operations on App Engine best implemented using some sort of mapreduce operation.

Here's a video describing the process, but including BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

It doesn't sound like you need BigQuery, but you probably want to use both the Map and Reduce portions of the pipeline.

The main difference between what you're doing and the mapreduce situation is that you're launching one instance and iterating through the queries, where on mapreduce, you would have a separate instance running in parallel for each query. You will need a reduce operation to "sum up" all the data, and write the result somewhere though.

The other problem you have is that you should use cursors to iterate. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

If the iterator is using a query offset, it'll be inefficient, since an offset issues the same query, skips past a number of results, and gives you the next set, while the cursor jumps straight to the next set.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionAllenView Question on Stackoverflow
Solution 1 - Google App-EnginemjibsonView Answer on Stackoverflow
Solution 2 - Google App-EngineMartin BerendsView Answer on Stackoverflow
Solution 3 - Google App-EngineFelixEnescuView Answer on Stackoverflow
Solution 4 - Google App-EnginedragonxView Answer on Stackoverflow