Estaba tratando de migrar un MongoDB grande de ~ 600k documentos, así:
for await (const doc of db.collection('collection').find({ legacyProp: { $exists: true }, })) { // additional data fetching from separate collections here const newPropValue = await fetchNewPropValue(doc._id) await db.collection('collection').findOneAndUpdate({ _id: doc._id }, [{ $set: { newProp: newPropValue } }, { $unset: ['legacyProp'] }]) } }
Cuando finalizó el script de migración, los datos aún se estaban actualizando durante aproximadamente 30 minutos. Llegué a esta conclusión calculando el recuento de documentos que contienen la propiedad legacyProp
:
db.collection.countDocuments({ legacyProp: { $exists: true } })
que fue disminuyendo en las llamadas posteriores. Después de un tiempo, las actualizaciones se detuvieron y el recuento final de documentos que contenían accesorios heredados fue de alrededor de 300k, por lo que la actualización falló silenciosamente y provocó una pérdida de datos. Tengo curiosidad por saber qué sucedió exactamente y, lo que es más importante, ¿cómo se actualizan grandes colecciones de MongoDB sin pérdida de datos? Tenga en cuenta que hay una obtención de datos adicional involucrada antes de cada operación de actualización.
Mi primer intento sería construir la función de fetchNewPropValue()
en una canalización de agregación.
Eche un vistazo a los operadores de tuberías de agregación
Si esto no es posible, puede intentar colocar todos los newPropValue en una matriz y usarlos así. Las propiedades de 600k deberían caber fácilmente en su RAM.
const newPropValues = await fetchNewPropValue() // getting all new properties as array [{_id: ..., val: ...}, {_id: ..., val: ...}, ...] db.getCollection('collection').updateMany( { legacyProp: { $exists: true } }, [ { $set: { newProp: { $first: { $filter: { input: newPropValues, cond: { $eq: ["$_id", "$$this._id"] } } } } } }, { $set: { legacyProp: "$$REMOVE", newProp: "$$newProp.val" } } ] )
O puedes probar bulkWrite:
let bulkOperations = [] db.getCollection('collection').find({ legacyProp: { $exists: true } }).forEach(doc => { const newPropValue = await fetchNewPropValue(doc._id); bulkOperations.push({ updateOne: { filter: { _id: doc._id }, update: { $set: { newProp: newPropValue }, $unset: { legacyProp: "" } } } }); if (bulkOperations.length > 10000) { db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false }); bulkOperations = []; } }) if (bulkOperations.length > 0) db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false })