diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 8a630f8cf..a7799abc1 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -22,6 +22,7 @@ use Utopia\Database\Exception\Timeout as TimeoutException; use Utopia\Database\Exception\Transaction as TransactionException; use Utopia\Database\Exception\Type as TypeException; +use Utopia\Database\Operator; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Mongo\Client; @@ -1343,6 +1344,11 @@ public function castingAfter(Document $collection, Document $document): Document continue; } + // Operators are resolved by the database (aggregation pipeline); skip casting + if (Operator::isOperator($value)) { + continue; + } + if ($array) { if (is_string($value)) { $decoded = json_decode($value, true); @@ -1438,6 +1444,11 @@ public function castingBefore(Document $collection, Document $document): Documen continue; } + // Operators are resolved by the database (aggregation pipeline); skip casting + if (Operator::isOperator($value)) { + continue; + } + if ($array) { if (is_string($value)) { $decoded = json_decode($value, true); @@ -1647,10 +1658,16 @@ public function updateDocument(Document $collection, string $id, Document $docum unset($record['_id']); // Don't update _id $options = $this->getTransactionOptions(); - $updateQuery = [ - '$set' => $record, - ]; - $this->client->update($name, $filters, $updateQuery, $options); + + $pipeline = $this->buildOperatorPipeline($record); + if ($pipeline !== null) { + $this->updateWithPipeline($name, $filters, $pipeline, $options); + } else { + $updateQuery = [ + '$set' => $record, + ]; + $this->client->update($name, $filters, $updateQuery, $options); + } } catch (MongoException $e) { throw $this->processException($e); } @@ -1689,11 +1706,16 @@ public function updateDocuments(Document $collection, Document $updates, array $ $record = $updates->getArrayCopy(); $record = $this->replaceChars('$', '_', $record); - $updateQuery = [ - '$set' => $record, - ]; - try { + $pipeline = $this->buildOperatorPipeline($record); + if ($pipeline !== null) { + return $this->updateWithPipeline($name, $filters, $pipeline, $options, multi: true); + } + + $updateQuery = [ + '$set' => $record, + ]; + return $this->client->update( $name, $filters, @@ -1706,6 +1728,298 @@ public function updateDocuments(Document $collection, Document $updates, array $ } } + /** + * Build an aggregation pipeline update from a record that may contain Operator instances. + * + * Returns null when the record contains no operators, so the caller can fall back to a + * plain `$set` update. When operators are present, every regular value is wrapped in + * `$literal` (so it is never interpreted as an aggregation expression) and every operator + * is translated into the equivalent aggregation expression, all merged into a single + * `$set` stage. + * + * @param array $record + * @return array>|null + * @throws DatabaseException + */ + private function buildOperatorPipeline(array $record): ?array + { + $hasOperators = false; + foreach ($record as $value) { + if (Operator::isOperator($value)) { + $hasOperators = true; + break; + } + } + + if (!$hasOperators) { + return null; + } + + $set = []; + foreach ($record as $key => $value) { + if (Operator::isOperator($value)) { + $set[$key] = $this->getOperatorExpression($value, $key); + } else { + // Wrap literals so values are never parsed as aggregation expressions/field paths + $set[$key] = ['$literal' => $value]; + } + } + + return [['$set' => $set]]; + } + + /** + * Execute an aggregation pipeline update. + * + * The Mongo client's update() helper wraps the update document in toObject(), which would + * turn a pipeline (a list) into an object and break it. We therefore build the raw update + * command and send it through query(), letting BSON encode the pipeline as an array. + * + * @param string $collection + * @param array $filters + * @param array> $pipeline + * @param array $options + * @param bool $multi + * @return int Number of matched documents + * @throws MongoException + */ + private function updateWithPipeline(string $collection, array $filters, array $pipeline, array $options = [], bool $multi = false): int + { + $command = [ + 'update' => $collection, + 'updates' => [ + [ + 'q' => $this->client->toObject($filters), + 'u' => $pipeline, + 'multi' => $multi, + 'upsert' => false, + ], + ], + ]; + + if (isset($options['session'])) { + $command['session'] = $options['session']; + } + + $result = $this->client->query($command); + + return \is_int($result) ? $result : 0; + } + + /** + * Execute a batch of upsert operations, supporting aggregation-pipeline updates. + * + * Mirrors the Mongo client's upsert() helper but does not wrap each update in toObject(), + * so an update may be either a classic update document or an aggregation pipeline (list). + * + * @param string $collection + * @param array, update: array}> $operations + * @param array $options + * @return int + * @throws MongoException + */ + private function executeUpsert(string $collection, array $operations, array $options = []): int + { + $updates = []; + foreach ($operations as $op) { + $updates[] = [ + 'q' => $this->client->toObject($op['filter']), + 'u' => $op['update'], + 'upsert' => true, + 'multi' => false, + ]; + } + + $command = \array_merge( + [ + 'update' => $collection, + 'updates' => $updates, + ], + $options + ); + + $result = $this->client->query($command); + + return \is_int($result) ? $result : 0; + } + + /** + * Translate an Operator into a MongoDB aggregation expression for use inside a `$set` stage. + * + * @param Operator $operator + * @param string $field The (already escaped) field name the expression is assigned to + * @return mixed + * @throws DatabaseException + */ + private function getOperatorExpression(Operator $operator, string $field): mixed + { + $ref = '$' . $field; + $method = $operator->getMethod(); + $values = $operator->getValues(); + + switch ($method) { + // Numeric operators + case Operator::TYPE_INCREMENT: + $expr = ['$add' => [['$ifNull' => [$ref, 0]], $values[0] ?? 1]]; + if (isset($values[1])) { + $expr = ['$min' => [$expr, $values[1]]]; + } + return $expr; + + case Operator::TYPE_DECREMENT: + $expr = ['$subtract' => [['$ifNull' => [$ref, 0]], $values[0] ?? 1]]; + if (isset($values[1])) { + $expr = ['$max' => [$expr, $values[1]]]; + } + return $expr; + + case Operator::TYPE_MULTIPLY: + $expr = ['$multiply' => [['$ifNull' => [$ref, 0]], $values[0] ?? 1]]; + if (isset($values[1])) { + $expr = ['$min' => [$expr, $values[1]]]; + } + return $expr; + + case Operator::TYPE_DIVIDE: + $expr = ['$divide' => [['$ifNull' => [$ref, 0]], $values[0]]]; + if (isset($values[1])) { + $expr = ['$max' => [$expr, $values[1]]]; + } + return $expr; + + case Operator::TYPE_MODULO: + return ['$mod' => [['$ifNull' => [$ref, 0]], $values[0]]]; + + case Operator::TYPE_POWER: + $expr = ['$pow' => [['$ifNull' => [$ref, 0]], $values[0]]]; + if (isset($values[1])) { + $expr = ['$min' => [$expr, $values[1]]]; + } + return $expr; + + // String operators + case Operator::TYPE_STRING_CONCAT: + return ['$concat' => [['$ifNull' => [$ref, '']], ['$literal' => $values[0] ?? '']]]; + + case Operator::TYPE_STRING_REPLACE: + // An empty search is a no-op (matches SQL REPLACE semantics); MongoDB's + // $replaceAll would otherwise insert the replacement between every character. + if (($values[0] ?? '') === '') { + return ['$ifNull' => [$ref, '']]; + } + return ['$replaceAll' => [ + 'input' => ['$ifNull' => [$ref, '']], + 'find' => ['$literal' => $values[0]], + 'replacement' => ['$literal' => $values[1] ?? ''], + ]]; + + // Boolean operators + case Operator::TYPE_TOGGLE: + return ['$not' => [['$ifNull' => [$ref, false]]]]; + + // Array operators + case Operator::TYPE_ARRAY_APPEND: + return ['$concatArrays' => [['$ifNull' => [$ref, []]], ['$literal' => \array_values($values)]]]; + + case Operator::TYPE_ARRAY_PREPEND: + return ['$concatArrays' => [['$literal' => \array_values($values)], ['$ifNull' => [$ref, []]]]]; + + case Operator::TYPE_ARRAY_INSERT: + $index = (int)($values[0] ?? 0); + $value = $values[1] ?? null; + $size = ['$size' => '$$arr']; + $before = ['$cond' => [['$lte' => [$index, 0]], [], ['$slice' => ['$$arr', $index]]]]; + $after = ['$cond' => [['$gte' => [$index, $size]], [], ['$slice' => ['$$arr', ['$subtract' => [$index, $size]]]]]]; + return ['$let' => [ + 'vars' => ['arr' => ['$ifNull' => [$ref, []]]], + 'in' => ['$concatArrays' => [$before, ['$literal' => [$value]], $after]], + ]]; + + case Operator::TYPE_ARRAY_REMOVE: + return ['$filter' => [ + 'input' => ['$ifNull' => [$ref, []]], + 'cond' => ['$ne' => ['$$this', ['$literal' => $values[0] ?? null]]], + ]]; + + case Operator::TYPE_ARRAY_UNIQUE: + // Preserve first-occurrence order while removing duplicates + return ['$reduce' => [ + 'input' => ['$ifNull' => [$ref, []]], + 'initialValue' => [], + 'in' => ['$cond' => [ + ['$in' => ['$$this', '$$value']], + '$$value', + ['$concatArrays' => ['$$value', ['$$this']]], + ]], + ]]; + + case Operator::TYPE_ARRAY_INTERSECT: + // Keep elements present in the given set, preserving original order + return ['$filter' => [ + 'input' => ['$ifNull' => [$ref, []]], + 'cond' => ['$in' => ['$$this', ['$literal' => \array_values($values)]]], + ]]; + + case Operator::TYPE_ARRAY_DIFF: + // Remove elements present in the given set, preserving original order + return ['$filter' => [ + 'input' => ['$ifNull' => [$ref, []]], + 'cond' => ['$not' => [['$in' => ['$$this', ['$literal' => \array_values($values)]]]]], + ]]; + + case Operator::TYPE_ARRAY_FILTER: + return ['$filter' => [ + 'input' => ['$ifNull' => [$ref, []]], + 'cond' => $this->getArrayFilterCondition((string)($values[0] ?? ''), $values[1] ?? null), + ]]; + + // Date operators + case Operator::TYPE_DATE_ADD_DAYS: + return ['$dateAdd' => [ + 'startDate' => ['$ifNull' => [$ref, '$$NOW']], + 'unit' => 'day', + 'amount' => (int)($values[0] ?? 0), + ]]; + + case Operator::TYPE_DATE_SUB_DAYS: + return ['$dateSubtract' => [ + 'startDate' => ['$ifNull' => [$ref, '$$NOW']], + 'unit' => 'day', + 'amount' => (int)($values[0] ?? 0), + ]]; + + case Operator::TYPE_DATE_SET_NOW: + return '$$NOW'; + + default: + throw new DatabaseException("Unsupported operator: {$method}"); + } + } + + /** + * Build the aggregation condition expression used by the arrayFilter operator. + * + * @param string $condition + * @param mixed $compare + * @return array + */ + private function getArrayFilterCondition(string $condition, mixed $compare): array + { + $value = ['$literal' => $compare]; + + return match ($condition) { + 'equal' => ['$eq' => ['$$this', $value]], + 'notEqual' => ['$ne' => ['$$this', $value]], + 'greaterThan' => ['$gt' => ['$$this', $value]], + 'greaterThanEqual' => ['$gte' => ['$$this', $value]], + 'lessThan' => ['$lt' => ['$$this', $value]], + 'lessThanEqual' => ['$lte' => ['$$this', $value]], + 'isNull' => ['$eq' => ['$$this', null]], + 'isNotNull' => ['$ne' => ['$$this', null]], + default => ['$literal' => true], // unknown condition keeps every element + }; + } + /** * @param Document $collection * @param string $attribute @@ -1724,6 +2038,7 @@ public function upsertDocuments(Document $collection, string $attribute, array $ $attribute = $this->filter($attribute); $operations = []; + $hasPipeline = false; foreach ($changes as $change) { $document = $change->getNew(); $oldDocument = $change->getOld(); @@ -1776,20 +2091,41 @@ public function upsertDocuments(Document $collection, string $attribute, array $ $update['$unset'] = $unsetFields; } } else { - // Update all fields - $update = [ - '$set' => $record - ]; + $pipeline = $this->buildOperatorPipeline($record); - if (!empty($unsetFields)) { - $update['$unset'] = $unsetFields; - } + if ($pipeline !== null) { + // Operator-based upsert: resolve operators via an aggregation pipeline + // so they apply atomically, with $ifNull defaults on insert. + $set = $pipeline[0]['$set']; + + // Generate an _id only on insert; keep the existing one on update. + if (empty($document->getSequence())) { + $set['_id'] = ['$ifNull' => ['$_id', $this->client->createUuid()]]; + } - // Add UUID7 _id for new documents in upsert operations - if (empty($document->getSequence())) { - $update['$setOnInsert'] = [ - '_id' => $this->client->createUuid() + $update = [['$set' => $set]]; + + if (!empty($unsetFields)) { + $update[] = ['$unset' => \array_keys($unsetFields)]; + } + + $hasPipeline = true; + } else { + // Update all fields + $update = [ + '$set' => $record ]; + + if (!empty($unsetFields)) { + $update['$unset'] = $unsetFields; + } + + // Add UUID7 _id for new documents in upsert operations + if (empty($document->getSequence())) { + $update['$setOnInsert'] = [ + '_id' => $this->client->createUuid() + ]; + } } } @@ -1801,11 +2137,17 @@ public function upsertDocuments(Document $collection, string $attribute, array $ $options = $this->getTransactionOptions(); - $this->client->upsert( - $name, - $operations, - options: $options - ); + if ($hasPipeline) { + // The client's upsert() wraps each update in toObject(), which would corrupt a + // pipeline (a list). Send the raw command so BSON encodes pipelines as arrays. + $this->executeUpsert($name, $operations, $options); + } else { + $this->client->upsert( + $name, + $operations, + options: $options + ); + } } catch (MongoException $e) { throw $this->processException($e); } @@ -3477,7 +3819,7 @@ public function getSupportForSpatialIndexNull(): bool */ public function getSupportForOperators(): bool { - return false; + return true; } /**