diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index ce9b7e2881..bbdf537157 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -3,19 +3,23 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; +use Appwrite\Event\Mail; use Appwrite\Event\Realtime; -use Utopia\Migration\Destinations\CSV as DestinationCSV; +use Appwrite\Template\Template; use Exception; use Utopia\CLI\Console; use Utopia\Config\Config; use Utopia\Database\Database; use Utopia\Database\Document; +use Utopia\Locale\Locale; use Utopia\Database\Exception\Authorization; use Utopia\Database\Exception\Conflict; use Utopia\Database\Exception\Restricted; use Utopia\Database\Exception\Structure; +use Utopia\Database\Query; use Utopia\Migration\Destination; use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite; +use Utopia\Migration\Destinations\CSV as DestinationCSV; use Utopia\Migration\Exception as MigrationException; use Utopia\Migration\Source; use Utopia\Migration\Sources\Appwrite; @@ -27,6 +31,7 @@ use Utopia\Migration\Sources\Supabase; use Utopia\Migration\Transfer; use Utopia\Platform\Action; use Utopia\Queue\Message; +use Utopia\Storage\Compression\Compression; use Utopia\Storage\Device; use Utopia\System\System; @@ -37,6 +42,7 @@ class Migrations extends Action protected Database $dbForPlatform; protected Device $deviceForMigrations; + protected Device $deviceForFiles; protected Document $project; @@ -71,16 +77,28 @@ class Migrations extends Action ->inject('logError') ->inject('queueForRealtime') ->inject('deviceForMigrations') + ->inject('deviceForFiles') + ->inject('queueForMails') ->callback($this->action(...)); } /** * @throws Exception */ - public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime, Device $deviceForMigrations): void - { + public function action( + Message $message, + Document $project, + Database $dbForProject, + Database $dbForPlatform, + callable $logError, + Realtime $queueForRealtime, + Device $deviceForMigrations, + Device $deviceForFiles, + Mail $queueForMails, + ): void { $payload = $message->getPayload() ?? []; $this->deviceForMigrations = $deviceForMigrations; + $this->deviceForFiles = $deviceForFiles; if (empty($payload)) { throw new Exception('Missing payload'); @@ -105,7 +123,7 @@ class Migrations extends Action return; } - $this->processMigration($migration, $queueForRealtime); + $this->processMigration($migration, $queueForRealtime, $queueForMails); } /** @@ -175,6 +193,7 @@ class Migrations extends Action protected function processDestination(Document $migration, string $apiKey): Destination { $destination = $migration->getAttribute('destination'); + $options = $migration->getAttribute('options', []); return match ($destination) { DestinationAppwrite::getName() => new DestinationAppwrite( @@ -185,14 +204,32 @@ class Migrations extends Action Config::getParam('collections', [])['databases']['collections'], ), DestinationCSV::getName() => new DestinationCSV( - $this->deviceForMigrations, + $this->deviceForFiles, $migration->getAttribute('resourceId'), - $migration->getAttribute('options', [])['columns'] ?? [], + $options['bucketId'], + $options['filename'], + $options['columns'], + $options['delimiter'], + $options['enclosure'], + $options['escape'], + $options['header'], ), default => throw new \Exception('Invalid destination type'), }; } + /** + * Sanitize a filename to make it filesystem-safe + */ + protected function sanitizeFilename(string $filename): string + { + // Replace problematic characters with underscores + $sanitized = \preg_replace('/[:\/<>"|*?]/', '_', $filename); + $sanitized = \preg_replace('/[^\x20-\x7E]/', '_', $sanitized); + $sanitized = \trim($sanitized); + return empty($sanitized) ? 'export' : $sanitized; + } + /** * @throws Authorization * @throws Structure @@ -202,24 +239,18 @@ class Migrations extends Action */ protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document { - $errorMessages = []; - $clonedMigrationDocument = clone $migration; - - // we cannot use #sensitive because - // `errors` is nested which requires an override. - $errors = $clonedMigrationDocument->getAttribute('errors', []); + $messages = []; + $errors = $migration->getAttribute('errors', []); foreach ($errors as $error) { - $decoded = json_decode($error, true); - - if (is_array($decoded) && isset($decoded['trace'])) { + $decoded = \json_decode($error, true); + if (\is_array($decoded) && isset($decoded['trace'])) { unset($decoded['trace']); - $errorMessages[] = json_encode($decoded); + $messages[] = json_encode($decoded); } } - // set the errors back without trace - $clonedMigrationDocument->setAttribute('errors', $errorMessages); + $migration->setAttribute('errors', $messages); /** Trigger Realtime Events */ $queueForRealtime @@ -227,10 +258,14 @@ class Migrations extends Action ->setSubscribers(['console', $project->getId()]) ->setEvent('migrations.[migrationId].update') ->setParam('migrationId', $migration->getId()) - ->setPayload($clonedMigrationDocument->getArrayCopy(), ['options', 'credentials']) + ->setPayload($migration->getArrayCopy(), sensitive: ['options', 'credentials']) ->trigger(); - return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration); + return $this->dbForProject->updateDocument( + 'migrations', + $migration->getId(), + $migration + ); } /** @@ -285,11 +320,13 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function processMigration(Document $migration, Realtime $queueForRealtime): void - { - $project = $this->project; - $projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId()); - $tempAPIKey = $this->generateAPIKey($projectDocument); + protected function processMigration( + Document $migration, + Realtime $queueForRealtime, + Mail $queueForMails, + ): void { + $project = $this->dbForPlatform->getDocument('projects', $this->project->getId()); + $tempAPIKey = $this->generateAPIKey($project); $transfer = $source = $destination = null; @@ -299,17 +336,15 @@ class Migrations extends Action empty($migration->getAttribute('credentials', [])) ) { $credentials = $migration->getAttribute('credentials', []); - - $credentials['projectId'] = $credentials['projectId'] ?? $projectDocument->getId(); + $credentials['projectId'] = $credentials['projectId'] ?? $project->getId(); $credentials['endpoint'] = $credentials['endpoint'] ?? 'http://appwrite/v1'; $credentials['apiKey'] = $credentials['apiKey'] ?? $tempAPIKey; - $migration->setAttribute('credentials', $credentials); } $migration->setAttribute('stage', 'processing'); $migration->setAttribute('status', 'processing'); - $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); + $this->updateMigrationDocument($migration, $project, $queueForRealtime); $source = $this->processSource($migration); $destination = $this->processDestination($migration, $tempAPIKey); @@ -322,40 +357,44 @@ class Migrations extends Action /** Start Transfer */ if (empty($source->getErrors())) { $migration->setAttribute('stage', 'migrating'); - $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); + $this->updateMigrationDocument($migration, $project, $queueForRealtime); $transfer->run( $migration->getAttribute('resources'), - function () use ($migration, $transfer, $projectDocument, $queueForRealtime) { + function () use ($migration, $transfer, $project, $queueForRealtime) { $migration->setAttribute('resourceData', json_encode($transfer->getCache())); $migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); - $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); + $this->updateMigrationDocument($migration, $project, $queueForRealtime); }, $migration->getAttribute('resourceId'), $migration->getAttribute('resourceType') ); } - $destination->shutDown(); - $source->shutDown(); + // Debug logging for CSV exports before shutdown + if ($migration->getAttribute('destination') === DestinationCSV::getName()) { + $statusCounters = $transfer->getStatusCounters(); + Console::info('CSV export transfer completed. Status counters: ' . json_encode($statusCounters)); + Console::info('CSV export options: ' . json_encode($migration->getAttribute('options'))); + Console::info('CSV export errors: ' . json_encode($destination->getErrors())); + } + + $destination->shutdown(); + $source->shutdown(); $sourceErrors = $source->getErrors(); $destinationErrors = $destination->getErrors(); - if (! empty($sourceErrors) || ! empty($destinationErrors)) { + if (!empty($sourceErrors) || ! empty($destinationErrors)) { $migration->setAttribute('status', 'failed'); $migration->setAttribute('stage', 'finished'); - $errorMessages = []; - foreach ($sourceErrors as $error) { - $errorMessages[] = json_encode($error); - } - foreach ($destinationErrors as $error) { - $errorMessages[] = json_encode($error); + $errors = []; + foreach ([...$sourceErrors, ...$destinationErrors] as $error) { + $errors[] = \json_encode($error); } - $migration->setAttribute('errors', $errorMessages); - + $migration->setAttribute('errors', $errors); return; } @@ -382,57 +421,100 @@ class Migrations extends Action $sourceErrors = $source->getErrors(); $destinationErrors = $destination->getErrors(); - $errorMessages = []; - foreach ($sourceErrors as $error) { - $errorMessages[] = json_encode($error); - } - foreach ($destinationErrors as $error) { - $errorMessages[] = json_encode($error); + $errors = []; + foreach ([...$sourceErrors, ...$destinationErrors] as $error) { + $errors[] = \json_encode($error); } - $migration->setAttribute('errors', $errorMessages); + $migration->setAttribute('errors', $errors); } } finally { - $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); + $this->updateMigrationDocument($migration, $project, $queueForRealtime); if ($migration->getAttribute('status', '') === 'failed') { Console::error('Migration('.$migration->getSequence().':'.$migration->getId().') failed, Project('.$this->project->getSequence().':'.$this->project->getId().')'); - if ($destination) { - $destination->error(); + $sourceErrors = $source?->getErrors() ?? []; + $destinationErrors = $destination?->getErrors() ?? []; - foreach ($destination->getErrors() as $error) { - /** @var MigrationException $error */ - call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [ + foreach ([...$sourceErrors, ...$destinationErrors] as $error) { + /** @var MigrationException $error */ + if ($error->getCode() === 0 || $error->getCode() >= 500) { + ($this->logError)($error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [ 'migrationId' => $migration->getId(), 'source' => $migration->getAttribute('source') ?? '', 'destination' => $migration->getAttribute('destination') ?? '', 'resourceName' => $error->getResourceName(), - 'resourceGroup' => $error->getResourceGroup() + 'resourceGroup' => $error->getResourceGroup(), ]); } } - if ($source) { - $source->error(); - - foreach ($source->getErrors() as $error) { - /** @var MigrationException $error */ - call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [ - 'migrationId' => $migration->getId(), - 'source' => $migration->getAttribute('source') ?? '', - 'destination' => $migration->getAttribute('destination') ?? '', - 'resourceName' => $error->getResourceName(), - 'resourceGroup' => $error->getResourceGroup() - ]); - } - } + $source?->error(); + $destination?->error(); } if ($migration->getAttribute('status', '') === 'completed') { $destination?->success(); $source?->success(); + + if ($migration->getAttribute('destination') === DestinationCSV::getName()) { + $this->handleCSVExportComplete($project, $migration, $queueForMails); + } } } } + + protected function handleCSVExportComplete(Document $project, Document $migration, Mail $queueForMails): void + { + $options = $migration->getAttribute('options', []); + $bucketId = $options['bucketId'] ?? null; + $filename = $options['filename'] ?? 'export.csv'; + $userInternalId = $options['userInternalId'] ?? ''; + $resourceId = $migration->getAttribute('resourceId'); + + // Save file to bucket + $bucket = $this->dbForProject->getDocument('buckets', $bucketId); + if ($bucket->isEmpty()) { + throw new \Exception("Bucket not found: $bucketId"); + } + + $path = $this->deviceForFiles->getPath($bucketId . '/' . $this->sanitizeFilename($filename) . '.csv'); + $size = $this->deviceForFiles->getFileSize($path); + $mime = $this->deviceForFiles->getFileMimeType($path); + $hash = $this->deviceForFiles->getFileHash($path); + $algorithm = Compression::NONE; + $fileId = \md5($resourceId); + + $this->dbForProject->createDocument('bucket_' . $bucket->getSequence(), new Document([ + '$id' => $fileId, + '$permissions' => [], + 'bucketId' => $bucket->getId(), + 'bucketInternalId' => $bucket->getSequence(), + 'name' => $filename, + 'path' => $path, + 'signature' => $hash, + 'mimeType' => $mime, + 'sizeOriginal' => $size, + 'sizeActual' => $size, + 'algorithm' => $algorithm, + 'comment' => '', + 'chunksTotal' => 1, + 'chunksUploaded' => 1, + 'openSSLVersion' => null, + 'openSSLCipher' => null, + 'openSSLTag' => null, + 'openSSLIV' => null, + 'search' => \implode(' ', [$fileId, $filename]), + 'metadata' => ['content_type' => $mime] + ])); + + Console::info("Created file document in bucket: $fileId"); + + // No notification required, skip email sending + if (!($options['notify'] ?? false)) { + return; + } + + } }