Django bulk_create with ignore rows that cause IntegrityError?

DjangoPostgresqlBulk Load

Django Problem Overview


I am using bulk_create to loads thousands or rows into a postgresql DB. Unfortunately some of the rows are causing IntegrityError and stoping the bulk_create process. I was wondering if there was a way to tell django to ignore such rows and save as much of the batch as possible?

Django Solutions


Solution 1 - Django

This is now possible on Django 2.2

Django 2.2 adds a new ignore_conflicts option to the bulk_create method, from the documentation:

> On databases that support it (all except PostgreSQL < 9.5 and Oracle), setting the ignore_conflicts parameter to True tells the database to ignore failure to insert any rows that fail constraints such as duplicate unique values. Enabling this parameter disables setting the primary key on each model instance (if the database normally supports it).

Example:

Entry.objects.bulk_create([
    Entry(headline='This is a test'),
    Entry(headline='This is only a test'),
], ignore_conflicts=True)

Solution 2 - Django

One quick-and-dirty workaround for this that doesn't involve manual SQL and temporary tables is to just attempt to bulk insert the data. If it fails, revert to serial insertion.

objs = [(Event), (Event), (Event)...]

try:
    Event.objects.bulk_create(objs)

except IntegrityError:
    for obj in objs:
        try:
            obj.save()
        except IntegrityError:
            continue

If you have lots and lots of errors this may not be so efficient (you'll spend more time serially inserting than doing so in bulk), but I'm working through a high-cardinality dataset with few duplicates so this solves most of my problems.

Solution 3 - Django

(Note: I don't use Django, so there may be more suitable framework-specific answers)

It is not possible for Django to do this by simply ignoring INSERT failures because PostgreSQL aborts the whole transaction on the first error.

Django would need one of these approaches:

  1. INSERT each row in a separate transaction and ignore errors (very slow);
  2. Create a SAVEPOINT before each insert (can have scaling problems);
  3. Use a procedure or query to insert only if the row doesn't already exist (complicated and slow); or
  4. Bulk-insert or (better) COPY the data into a TEMPORARY table, then merge that into the main table server-side.

The upsert-like approach (3) seems like a good idea, but upsert and insert-if-not-exists are surprisingly complicated.

Personally, I'd take (4): I'd bulk-insert into a new separate table, probably UNLOGGED or TEMPORARY, then I'd run some manual SQL to:

LOCK TABLE realtable IN EXCLUSIVE MODE;

INSERT INTO realtable 
SELECT * FROM temptable WHERE NOT EXISTS (
    SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);

The LOCK TABLE ... IN EXCLUSIVE MODE prevents a concurrent insert that creates a row from causing a conflict with an insert done by the above statement and failing. It does not prevent concurrent SELECTs, only SELECT ... FOR UPDATE, INSERT,UPDATE and DELETE, so reads from the table carry on as normal.

If you can't afford to block concurrent writes for too long you could instead use a writable CTE to copy ranges of rows from temptable into realtable, retrying each block if it failed.

Solution 4 - Django

Or 5. Divide and conquer

I didn't test or benchmark this thoroughly, but it performs pretty well for me. YMMV, depending in particular on how many errors you expect to get in a bulk operation.

def psql_copy(records):
    count = len(records)
    if count < 1:
        return True
    try:
        pg.copy_bin_values(records)
        return True
    except IntegrityError:
        if count == 1:
            # found culprit!
            msg = "Integrity error copying record:\n%r"
            logger.error(msg % records[0], exc_info=True)
            return False
    finally:
        connection.commit()
        
    # There was an integrity error but we had more than one record.
    # Divide and conquer.
    mid = count / 2
    return psql_copy(records[:mid]) and psql_copy(records[mid:])
    # or just return False

Solution 5 - Django

Even in Django 1.11 there is no way to do this. I found a better option than using Raw SQL. It using djnago-query-builder. It has an upsert method

from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)] 
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])

Note: The library only support postgreSQL

Here is a gist that I use for bulk insert that supports ignoring IntegrityErrors and returns the records inserted.

Solution 6 - Django

Late answer for pre Django 2.2 projects :

I ran into this situation recently and I found my way out with a seconder list array for check the uniqueness.

In my case, the model has that unique together check, and bulk create is throwing Integrity Error exception because of the array of bulk create has duplicate data in it.

So I decided to create checklist besides bulk create objects list. Here is the sample code; The unique keys are owner and brand, and in this example owner is an user object instance and brand is a string instance:

create_list = []
create_list_check = []
for brand in brands:
    if (owner.id, brand) not in create_list_check:
        create_list_check.append((owner.id, brand))
        create_list.append(ProductBrand(owner=owner, name=brand))

if create_list:
    ProductBrand.objects.bulk_create(create_list)

Solution 7 - Django

it's work for me
i am use this this funtion in thread.
my csv file contains 120907 no of rows.

def products_create():
	full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
	filename = os.listdir(full_path)[0]
	logger.debug(filename)
	logger.debug(len(Product.objects.all()))
	if len(Product.objects.all()) > 0:
		logger.debug("Products Data Erasing")
		Product.objects.all().delete()
		logger.debug("Products Erasing Done")

	csvfile = os.path.join(full_path,filename)
	csv_df = pd.read_csv(csvfile,sep=',')
	csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
	row_iter = csv_df.iterrows()
	logger.debug(row_iter)

	logger.debug("New Products Creating")

	for index, row in row_iter:
		Product.objects.create(part_number = row[0],
							   part_description = row[1],
							   mrp = row[2],
							   hsn_code = row[3],
							   gst = row[4],
							   )

	# products_list = [
	# 			Product(
	# 				part_number = row[0] ,
	# 				part_description = row[1],
	# 				mrp = row[2],
	# 				hsn_code = row[3],
	# 				gst = row[4],
	# 			)
	# 			for index, row in row_iter

	# 		]
	# logger.debug(products_list)

	# Product.objects.bulk_create(products_list)
	logger.debug("Products uploading done")```

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMeithamView Question on Stackoverflow
Solution 1 - DjangoCesar CanassaView Answer on Stackoverflow
Solution 2 - DjangoIvanView Answer on Stackoverflow
Solution 3 - DjangoCraig RingerView Answer on Stackoverflow
Solution 4 - DjangoAryeh Leib TaurogView Answer on Stackoverflow
Solution 5 - DjangoNoortheen RajaView Answer on Stackoverflow
Solution 6 - DjangoSencer H.View Answer on Stackoverflow
Solution 7 - DjangoGaurav NagarView Answer on Stackoverflow