Save export to bucket on complete

This commit is contained in:
Jake Barnby 2025-09-24 22:37:49 +12:00
parent c8993d7f71
commit 5e8951fbe0
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C

View file

@ -3,19 +3,23 @@
namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Mail;
use Appwrite\Event\Realtime;
use Utopia\Migration\Destinations\CSV as DestinationCSV;
use Appwrite\Template\Template;
use Exception;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Locale\Locale;
use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Query;
use Utopia\Migration\Destination;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite;
use Utopia\Migration\Destinations\CSV as DestinationCSV;
use Utopia\Migration\Exception as MigrationException;
use Utopia\Migration\Source;
use Utopia\Migration\Sources\Appwrite;
@ -27,6 +31,7 @@ use Utopia\Migration\Sources\Supabase;
use Utopia\Migration\Transfer;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Storage\Compression\Compression;
use Utopia\Storage\Device;
use Utopia\System\System;
@ -37,6 +42,7 @@ class Migrations extends Action
protected Database $dbForPlatform;
protected Device $deviceForMigrations;
protected Device $deviceForFiles;
protected Document $project;
@ -71,16 +77,28 @@ class Migrations extends Action
->inject('logError')
->inject('queueForRealtime')
->inject('deviceForMigrations')
->inject('deviceForFiles')
->inject('queueForMails')
->callback($this->action(...));
}
/**
* @throws Exception
*/
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime, Device $deviceForMigrations): void
{
public function action(
Message $message,
Document $project,
Database $dbForProject,
Database $dbForPlatform,
callable $logError,
Realtime $queueForRealtime,
Device $deviceForMigrations,
Device $deviceForFiles,
Mail $queueForMails,
): void {
$payload = $message->getPayload() ?? [];
$this->deviceForMigrations = $deviceForMigrations;
$this->deviceForFiles = $deviceForFiles;
if (empty($payload)) {
throw new Exception('Missing payload');
@ -105,7 +123,7 @@ class Migrations extends Action
return;
}
$this->processMigration($migration, $queueForRealtime);
$this->processMigration($migration, $queueForRealtime, $queueForMails);
}
/**
@ -175,6 +193,7 @@ class Migrations extends Action
protected function processDestination(Document $migration, string $apiKey): Destination
{
$destination = $migration->getAttribute('destination');
$options = $migration->getAttribute('options', []);
return match ($destination) {
DestinationAppwrite::getName() => new DestinationAppwrite(
@ -185,14 +204,32 @@ class Migrations extends Action
Config::getParam('collections', [])['databases']['collections'],
),
DestinationCSV::getName() => new DestinationCSV(
$this->deviceForMigrations,
$this->deviceForFiles,
$migration->getAttribute('resourceId'),
$migration->getAttribute('options', [])['columns'] ?? [],
$options['bucketId'],
$options['filename'],
$options['columns'],
$options['delimiter'],
$options['enclosure'],
$options['escape'],
$options['header'],
),
default => throw new \Exception('Invalid destination type'),
};
}
/**
* Sanitize a filename to make it filesystem-safe
*/
protected function sanitizeFilename(string $filename): string
{
// Replace problematic characters with underscores
$sanitized = \preg_replace('/[:\/<>"|*?]/', '_', $filename);
$sanitized = \preg_replace('/[^\x20-\x7E]/', '_', $sanitized);
$sanitized = \trim($sanitized);
return empty($sanitized) ? 'export' : $sanitized;
}
/**
* @throws Authorization
* @throws Structure
@ -202,24 +239,18 @@ class Migrations extends Action
*/
protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document
{
$errorMessages = [];
$clonedMigrationDocument = clone $migration;
// we cannot use #sensitive because
// `errors` is nested which requires an override.
$errors = $clonedMigrationDocument->getAttribute('errors', []);
$messages = [];
$errors = $migration->getAttribute('errors', []);
foreach ($errors as $error) {
$decoded = json_decode($error, true);
if (is_array($decoded) && isset($decoded['trace'])) {
$decoded = \json_decode($error, true);
if (\is_array($decoded) && isset($decoded['trace'])) {
unset($decoded['trace']);
$errorMessages[] = json_encode($decoded);
$messages[] = json_encode($decoded);
}
}
// set the errors back without trace
$clonedMigrationDocument->setAttribute('errors', $errorMessages);
$migration->setAttribute('errors', $messages);
/** Trigger Realtime Events */
$queueForRealtime
@ -227,10 +258,14 @@ class Migrations extends Action
->setSubscribers(['console', $project->getId()])
->setEvent('migrations.[migrationId].update')
->setParam('migrationId', $migration->getId())
->setPayload($clonedMigrationDocument->getArrayCopy(), ['options', 'credentials'])
->setPayload($migration->getArrayCopy(), sensitive: ['options', 'credentials'])
->trigger();
return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration);
return $this->dbForProject->updateDocument(
'migrations',
$migration->getId(),
$migration
);
}
/**
@ -285,11 +320,13 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function processMigration(Document $migration, Realtime $queueForRealtime): void
{
$project = $this->project;
$projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId());
$tempAPIKey = $this->generateAPIKey($projectDocument);
protected function processMigration(
Document $migration,
Realtime $queueForRealtime,
Mail $queueForMails,
): void {
$project = $this->dbForPlatform->getDocument('projects', $this->project->getId());
$tempAPIKey = $this->generateAPIKey($project);
$transfer = $source = $destination = null;
@ -299,17 +336,15 @@ class Migrations extends Action
empty($migration->getAttribute('credentials', []))
) {
$credentials = $migration->getAttribute('credentials', []);
$credentials['projectId'] = $credentials['projectId'] ?? $projectDocument->getId();
$credentials['projectId'] = $credentials['projectId'] ?? $project->getId();
$credentials['endpoint'] = $credentials['endpoint'] ?? 'http://appwrite/v1';
$credentials['apiKey'] = $credentials['apiKey'] ?? $tempAPIKey;
$migration->setAttribute('credentials', $credentials);
}
$migration->setAttribute('stage', 'processing');
$migration->setAttribute('status', 'processing');
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$this->updateMigrationDocument($migration, $project, $queueForRealtime);
$source = $this->processSource($migration);
$destination = $this->processDestination($migration, $tempAPIKey);
@ -322,40 +357,44 @@ class Migrations extends Action
/** Start Transfer */
if (empty($source->getErrors())) {
$migration->setAttribute('stage', 'migrating');
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$this->updateMigrationDocument($migration, $project, $queueForRealtime);
$transfer->run(
$migration->getAttribute('resources'),
function () use ($migration, $transfer, $projectDocument, $queueForRealtime) {
function () use ($migration, $transfer, $project, $queueForRealtime) {
$migration->setAttribute('resourceData', json_encode($transfer->getCache()));
$migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$this->updateMigrationDocument($migration, $project, $queueForRealtime);
},
$migration->getAttribute('resourceId'),
$migration->getAttribute('resourceType')
);
}
$destination->shutDown();
$source->shutDown();
// Debug logging for CSV exports before shutdown
if ($migration->getAttribute('destination') === DestinationCSV::getName()) {
$statusCounters = $transfer->getStatusCounters();
Console::info('CSV export transfer completed. Status counters: ' . json_encode($statusCounters));
Console::info('CSV export options: ' . json_encode($migration->getAttribute('options')));
Console::info('CSV export errors: ' . json_encode($destination->getErrors()));
}
$destination->shutdown();
$source->shutdown();
$sourceErrors = $source->getErrors();
$destinationErrors = $destination->getErrors();
if (! empty($sourceErrors) || ! empty($destinationErrors)) {
if (!empty($sourceErrors) || ! empty($destinationErrors)) {
$migration->setAttribute('status', 'failed');
$migration->setAttribute('stage', 'finished');
$errorMessages = [];
foreach ($sourceErrors as $error) {
$errorMessages[] = json_encode($error);
}
foreach ($destinationErrors as $error) {
$errorMessages[] = json_encode($error);
$errors = [];
foreach ([...$sourceErrors, ...$destinationErrors] as $error) {
$errors[] = \json_encode($error);
}
$migration->setAttribute('errors', $errorMessages);
$migration->setAttribute('errors', $errors);
return;
}
@ -382,57 +421,100 @@ class Migrations extends Action
$sourceErrors = $source->getErrors();
$destinationErrors = $destination->getErrors();
$errorMessages = [];
foreach ($sourceErrors as $error) {
$errorMessages[] = json_encode($error);
}
foreach ($destinationErrors as $error) {
$errorMessages[] = json_encode($error);
$errors = [];
foreach ([...$sourceErrors, ...$destinationErrors] as $error) {
$errors[] = \json_encode($error);
}
$migration->setAttribute('errors', $errorMessages);
$migration->setAttribute('errors', $errors);
}
} finally {
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$this->updateMigrationDocument($migration, $project, $queueForRealtime);
if ($migration->getAttribute('status', '') === 'failed') {
Console::error('Migration('.$migration->getSequence().':'.$migration->getId().') failed, Project('.$this->project->getSequence().':'.$this->project->getId().')');
if ($destination) {
$destination->error();
$sourceErrors = $source?->getErrors() ?? [];
$destinationErrors = $destination?->getErrors() ?? [];
foreach ($destination->getErrors() as $error) {
/** @var MigrationException $error */
call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
foreach ([...$sourceErrors, ...$destinationErrors] as $error) {
/** @var MigrationException $error */
if ($error->getCode() === 0 || $error->getCode() >= 500) {
($this->logError)($error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
'migrationId' => $migration->getId(),
'source' => $migration->getAttribute('source') ?? '',
'destination' => $migration->getAttribute('destination') ?? '',
'resourceName' => $error->getResourceName(),
'resourceGroup' => $error->getResourceGroup()
'resourceGroup' => $error->getResourceGroup(),
]);
}
}
if ($source) {
$source->error();
foreach ($source->getErrors() as $error) {
/** @var MigrationException $error */
call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
'migrationId' => $migration->getId(),
'source' => $migration->getAttribute('source') ?? '',
'destination' => $migration->getAttribute('destination') ?? '',
'resourceName' => $error->getResourceName(),
'resourceGroup' => $error->getResourceGroup()
]);
}
}
$source?->error();
$destination?->error();
}
if ($migration->getAttribute('status', '') === 'completed') {
$destination?->success();
$source?->success();
if ($migration->getAttribute('destination') === DestinationCSV::getName()) {
$this->handleCSVExportComplete($project, $migration, $queueForMails);
}
}
}
}
protected function handleCSVExportComplete(Document $project, Document $migration, Mail $queueForMails): void
{
$options = $migration->getAttribute('options', []);
$bucketId = $options['bucketId'] ?? null;
$filename = $options['filename'] ?? 'export.csv';
$userInternalId = $options['userInternalId'] ?? '';
$resourceId = $migration->getAttribute('resourceId');
// Save file to bucket
$bucket = $this->dbForProject->getDocument('buckets', $bucketId);
if ($bucket->isEmpty()) {
throw new \Exception("Bucket not found: $bucketId");
}
$path = $this->deviceForFiles->getPath($bucketId . '/' . $this->sanitizeFilename($filename) . '.csv');
$size = $this->deviceForFiles->getFileSize($path);
$mime = $this->deviceForFiles->getFileMimeType($path);
$hash = $this->deviceForFiles->getFileHash($path);
$algorithm = Compression::NONE;
$fileId = \md5($resourceId);
$this->dbForProject->createDocument('bucket_' . $bucket->getSequence(), new Document([
'$id' => $fileId,
'$permissions' => [],
'bucketId' => $bucket->getId(),
'bucketInternalId' => $bucket->getSequence(),
'name' => $filename,
'path' => $path,
'signature' => $hash,
'mimeType' => $mime,
'sizeOriginal' => $size,
'sizeActual' => $size,
'algorithm' => $algorithm,
'comment' => '',
'chunksTotal' => 1,
'chunksUploaded' => 1,
'openSSLVersion' => null,
'openSSLCipher' => null,
'openSSLTag' => null,
'openSSLIV' => null,
'search' => \implode(' ', [$fileId, $filename]),
'metadata' => ['content_type' => $mime]
]));
Console::info("Created file document in bucket: $fileId");
// No notification required, skip email sending
if (!($options['notify'] ?? false)) {
return;
}
}
}