ToolJet/server/plugins/datasources/mongodb/index.ts
Midhun G S 21d63d2c99
Feature: Mongodb more operations support (#1717)
* feature-842 mongo support

* feature-842 fix

* json parse improvement

* Feature-842 date data type support

* feature-842 fix

* convert ejson to bson on mongo queries

* fix type on parse

* json parsing

* Typo error fix

* Insert Many Fix

Co-authored-by: Akshay Sasidharan <akshaysasidharan93@gmail.com>
2022-01-14 13:53:57 +05:30

198 lines
6.9 KiB
TypeScript

import { Injectable } from '@nestjs/common';
import { QueryResult } from 'src/modules/data_sources/query_result.type';
import { QueryError } from 'src/modules/data_sources/query.error';
import { QueryService } from 'src/modules/data_sources/query_service.interface';
import { ConnectionTestResult } from 'src/modules/data_sources/connection_test_result.type';
const { MongoClient } = require('mongodb');
const JSON5 = require('json5');
import { EJSON } from 'bson';
@Injectable()
export default class MongodbService implements QueryService {
async run(sourceOptions: any, queryOptions: any, dataSourceId: string): Promise<QueryResult> {
const { db, close } = await this.getConnection(sourceOptions);
let result = {};
const operation = queryOptions.operation;
try {
switch (operation) {
case 'list_collections':
result = await db.listCollections().toArray();
break;
case 'insert_one':
result = await db
.collection(queryOptions.collection)
.insertOne(this.parseEJSON(queryOptions.document), this.parseEJSON(queryOptions.options));
break;
case 'insert_many':
result = await db
.collection(queryOptions.collection)
.insertMany(this.parseEJSON(queryOptions.documents), this.parseEJSON(queryOptions.options));
break;
case 'find_one':
result = await db
.collection(queryOptions.collection)
.findOne(this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options));
break;
case 'find_many':
result = await db
.collection(queryOptions.collection)
.find(this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options))
.toArray();
break;
case 'count_total':
result = await db
.collection(queryOptions.collection)
.estimatedDocumentCount(this.parseEJSON(queryOptions.options));
result = { count: result };
break;
case 'count':
result = await db
.collection(queryOptions.collection)
.countDocuments(this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options));
result = { count: result };
break;
case 'distinct':
result = await db
.collection(queryOptions.collection)
.distinct(queryOptions.field, this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options));
break;
case 'update_one':
result = await db
.collection(queryOptions.collection)
.updateOne(
this.parseEJSON(queryOptions.filter),
this.parseEJSON(queryOptions.update),
this.parseEJSON(queryOptions.options)
);
break;
case 'update_many':
result = await db
.collection(queryOptions.collection)
.updateMany(
this.parseEJSON(queryOptions.filter),
this.parseEJSON(queryOptions.update),
this.parseEJSON(queryOptions.options)
);
break;
case 'replace_one':
result = await db
.collection(queryOptions.collection)
.replaceOne(
this.parseEJSON(queryOptions.filter),
this.parseEJSON(queryOptions.replacement),
this.parseEJSON(queryOptions.options)
);
break;
case 'find_one_replace':
result = await db
.collection(queryOptions.collection)
.findOneAndReplace(
this.parseEJSON(queryOptions.filter),
this.parseEJSON(queryOptions.replacement),
this.parseEJSON(queryOptions.options)
);
break;
case 'find_one_update':
result = await db
.collection(queryOptions.collection)
.findOneAndUpdate(
this.parseEJSON(queryOptions.filter),
this.parseEJSON(queryOptions.update),
this.parseEJSON(queryOptions.options)
);
break;
case 'find_one_delete':
result = await db
.collection(queryOptions.collection)
.findOneAndDelete(this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options));
break;
case 'delete_one':
result = await db
.collection(queryOptions.collection)
.deleteOne(this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options));
break;
case 'delete_many':
result = await db
.collection(queryOptions.collection)
.deleteMany(this.parseEJSON(queryOptions.filter), this.parseEJSON(queryOptions.options));
break;
case 'bulk_write':
result = await db
.collection(queryOptions.collection)
.bulkWrite(this.parseEJSON(queryOptions.operations), this.parseEJSON(queryOptions.options));
break;
case 'aggregate':
result = await db
.collection(queryOptions.collection)
.aggregate(this.parseEJSON(queryOptions.pipeline), this.parseEJSON(queryOptions.options))
.toArray();
break;
}
} catch (err) {
console.log(err);
throw new QueryError('Query could not be completed', err.message, {});
} finally {
await close();
}
return {
status: 'ok',
data: result,
};
}
parseEJSON(maybeEJSON?: string): any {
if (!maybeEJSON) return {};
return EJSON.parse(JSON.stringify(JSON5.parse(maybeEJSON)));
}
async testConnection(sourceOptions: object): Promise<ConnectionTestResult> {
const { db, close } = await this.getConnection(sourceOptions);
await db.listCollections().toArray();
await close();
return {
status: 'ok',
};
}
async getConnection(sourceOptions: any): Promise<any> {
let db = null,
client;
const connectionType = sourceOptions['connection_type'];
if (connectionType === 'manual') {
const database = sourceOptions.database;
const host = sourceOptions.host;
const port = sourceOptions.port;
const username = sourceOptions.username;
const password = sourceOptions.password;
const needsAuthentication = username !== '' && password !== '';
const uri = needsAuthentication
? `mongodb://${username}:${password}@${host}:${port}`
: `mongodb://${host}:${port}`;
client = new MongoClient(uri, {
directConnection: true,
});
await client.connect();
db = client.db(database);
} else {
const connectionString = sourceOptions['connection_string'];
client = new MongoClient(connectionString, { useNewUrlParser: true, useUnifiedTopology: true });
await client.connect();
db = client.db();
}
return {
db,
close: async () => {
await client?.close?.();
},
};
}
}