mirror of
https://github.com/ToolJet/ToolJet
synced 2026-05-21 16:08:35 +00:00
71 lines
2.6 KiB
Ruby
71 lines
2.6 KiB
Ruby
|
|
# frozen_string_literal: true
|
||
|
|
|
||
|
|
# API's to run queries with connection from the respective connection pool.
|
||
|
|
|
||
|
|
module DataSourceConnectionPool
|
||
|
|
include ::AvailableDataSource::ConnectionPooled
|
||
|
|
|
||
|
|
# The connection_closure should always return the properly initialized client/connection
|
||
|
|
# which can run queries on the respective datasource.
|
||
|
|
def get_connection_pool(data_source, connection_closure)
|
||
|
|
connection_pool_size = ENV.fetch("CONNECTION_POOL_SIZE", 5).to_i
|
||
|
|
connection_timeout = ENV.fetch("CONNECTION_TIMEOUT", 5).to_i
|
||
|
|
datasource_type = data_source.kind
|
||
|
|
if source_type_supported?(datasource_type)
|
||
|
|
res = $connection_pools.fetch_or_store(
|
||
|
|
data_source.id,
|
||
|
|
{ connection_pool: make_connection_pool(connection_pool_size, connection_timeout, connection_closure),
|
||
|
|
updated_at: data_source.updated_at }
|
||
|
|
)
|
||
|
|
verify_and_return_connection_pool(res, connection_closure, data_source)
|
||
|
|
else
|
||
|
|
raise AvailableDataSource::UnSupportedSource.new
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
# The connection_closure should always return the properly initialized client/connection
|
||
|
|
# which can run queries on the respective datasource.
|
||
|
|
def with_connection(data_source, connection_closure)
|
||
|
|
conn_pool = get_connection_pool(data_source, connection_closure)
|
||
|
|
# Checkout a connection from the connection pool and yield it to the calling function's block.
|
||
|
|
conn_pool.with do |conn|
|
||
|
|
if block_given?
|
||
|
|
yield(conn)
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
# Resets the connection pool of a particular source for the current process.
|
||
|
|
# If no datasource_type is passed, removes all the keys from the global concurrent_map.
|
||
|
|
def reset_connection_pool!(datasource_id = nil)
|
||
|
|
if datasource_id
|
||
|
|
$connection_pools.delete(datasource_id)
|
||
|
|
else
|
||
|
|
stored_data_source_keys = $connection_pools.keys
|
||
|
|
stored_data_source_keys.each do |key|
|
||
|
|
$connection_pools.delete(key)
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
private
|
||
|
|
|
||
|
|
def make_connection_pool(connection_pool_size, connection_timeout, connection_closure)
|
||
|
|
ConnectionPool.new(size: connection_pool_size,
|
||
|
|
timeout: connection_timeout) { connection_closure.call() }
|
||
|
|
end
|
||
|
|
|
||
|
|
def verify_and_return_connection_pool(cached_connections, connection_closure, data_source)
|
||
|
|
if connection_stale?(cached_connections, data_source)
|
||
|
|
reset_connection_pool!(data_source.id)
|
||
|
|
get_connection_pool(data_source, connection_closure)
|
||
|
|
else
|
||
|
|
cached_connections[:connection_pool]
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
def connection_stale?(cached_connections, data_source)
|
||
|
|
cached_connections[:updated_at] != data_source.reload.updated_at
|
||
|
|
end
|
||
|
|
end
|