mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
feat: Support ARRAY transform.method
This commit is contained in:
parent
812f9a2c64
commit
f1f6b0ce2d
1 changed files with 140 additions and 54 deletions
|
|
@ -358,9 +358,9 @@ const AggregatorFunctions = {
|
|||
|
||||
var AggregatorFunctionDiv = {
|
||||
sum: false,
|
||||
count: false,
|
||||
min: false,
|
||||
max: false,
|
||||
count: false,
|
||||
avg: true
|
||||
};
|
||||
|
||||
|
|
@ -378,6 +378,8 @@ export function getCube(rows, keyColumns, groupColumns, aggrColumns) {
|
|||
const keyColumnName = keyColumns.map(c => c.name).join('.')
|
||||
const groupNameSet = new Set()
|
||||
const keyNameSet = new Set()
|
||||
const selectorNameWithIndex = {} /** { selectorName: index } */
|
||||
let indexCounter = 0
|
||||
|
||||
for (let i = 0; i < rows.length; i++) {
|
||||
const row = rows[i];
|
||||
|
|
@ -414,11 +416,16 @@ export function getCube(rows, keyColumns, groupColumns, aggrColumns) {
|
|||
const aggrColumn = aggrColumns[a]
|
||||
const aggrName = aggrColumn.name
|
||||
|
||||
// update selectors
|
||||
let selector = undefined
|
||||
// update groupNameSet
|
||||
if (!mergedGroupName) {
|
||||
selector = aggrName
|
||||
groupNameSet.add(selector)
|
||||
groupNameSet.add(aggrName) /** aggr column name will be used as group name if group is empty */
|
||||
}
|
||||
|
||||
// update selectorNameWithIndex
|
||||
const selector = getSelectorName(mergedGroupName, aggrColumns.length, aggrName)
|
||||
if (typeof selectorNameWithIndex[selector] === 'undefined' /** value might be 0 */) {
|
||||
selectorNameWithIndex[selector] = indexCounter
|
||||
indexCounter = indexCounter + 1
|
||||
}
|
||||
|
||||
// add aggregator to entry
|
||||
|
|
@ -450,31 +457,125 @@ export function getCube(rows, keyColumns, groupColumns, aggrColumns) {
|
|||
cube: cube,
|
||||
schema: schema,
|
||||
keyColumnName: keyColumnName,
|
||||
keyNames: Object.keys(cube), /** keys should be sorted, so we use Object.keys here */
|
||||
keyNames: Object.keys(cube), /** keys should be sorted */
|
||||
groupNameSet: groupNameSet,
|
||||
selectorNameWithIndex: selectorNameWithIndex,
|
||||
}
|
||||
}
|
||||
|
||||
export function getNameWidIndex(arr) {
|
||||
return arr.reduce((acc, name, index) => {
|
||||
acc[name] = index
|
||||
export function getSelectorName(mergedGroupName, aggrColumnLength, aggrColumnName) {
|
||||
if (!mergedGroupName) {
|
||||
return aggrColumnName
|
||||
} else {
|
||||
return (aggrColumnLength > 1) ?
|
||||
`${mergedGroupName} / ${aggrColumnName}` : mergedGroupName
|
||||
}
|
||||
}
|
||||
|
||||
export function getCubeValue(obj, aggregator, aggrColumnName) {
|
||||
let value = null /** default is null */
|
||||
try {
|
||||
/** if AVG or COUNT, calculate it now, previously we can't because we were doing accumulation */
|
||||
if (aggregator === Aggregator.AVG) {
|
||||
value = obj[aggrColumnName].value / obj[aggrColumnName].count
|
||||
} else if (aggregator === Aggregator.COUNT) {
|
||||
value = obj[aggrColumnName].value
|
||||
} else {
|
||||
value = obj[aggrColumnName].value
|
||||
}
|
||||
|
||||
if (typeof value === 'undefined') { value = null }
|
||||
} catch (error) { /** iognore */ }
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
export function sortSelectorNameWithIndex(sortedSelectors) {
|
||||
const selectorNameWithIndex = {}
|
||||
|
||||
for (let i = 0; i < sortedSelectors.length; i++) {
|
||||
const selector = sortedSelectors[i]
|
||||
selectorNameWithIndex[selector] = i
|
||||
}
|
||||
|
||||
return selectorNameWithIndex
|
||||
}
|
||||
|
||||
export function getArrayRowsFromCube(cube, schema, aggregatorColumns,
|
||||
keyColumnName, keyNames, groupNameSet,
|
||||
selectorNameWithIndex) {
|
||||
|
||||
const emptyKey = 'root'
|
||||
|
||||
if (!schema.key) {
|
||||
keyNames = [ emptyKey, ]
|
||||
cube = { root: cube, }
|
||||
}
|
||||
|
||||
const sortedSelectors = Object.keys(selectorNameWithIndex).sort()
|
||||
const sortedSelectorNameWithIndex = sortSelectorNameWithIndex(sortedSelectors)
|
||||
|
||||
const rows = keyNames.reduce((acc, key) => {
|
||||
const obj = cube[key]
|
||||
const { rowValue, } = getArrayRow(schema, aggregatorColumns, obj,
|
||||
groupNameSet, sortedSelectorNameWithIndex, sortedSelectors.length)
|
||||
|
||||
const row = { key: key, value: rowValue, }
|
||||
acc.push(row)
|
||||
|
||||
return acc
|
||||
}, {})
|
||||
}, [])
|
||||
|
||||
|
||||
return { transformed: rows, sortedSelectors: sortedSelectors, sortedSelectorNameWithIndex: sortedSelectorNameWithIndex, }
|
||||
}
|
||||
|
||||
export function getArrayRow(schema, aggrColumns, obj, groupNameSet, selectorNameWithIndex, selectorLength) {
|
||||
let row = new Array(selectorLength)
|
||||
|
||||
/** when group is empty */
|
||||
if (!schema.group) {
|
||||
for(let i = 0; i < aggrColumns.length; i++) {
|
||||
const aggrColumn = aggrColumns[i]
|
||||
const aggrName = aggrColumn.name
|
||||
|
||||
const value = getCubeValue(obj, aggrColumn.aggr, aggrName)
|
||||
const selector = getSelectorName(undefined, aggrColumns.length, aggrName)
|
||||
const index = selectorNameWithIndex[selector]
|
||||
row[index] = value
|
||||
}
|
||||
|
||||
return { rowValue: row, }
|
||||
}
|
||||
|
||||
for(let i = 0; i < aggrColumns.length; i++) {
|
||||
const aggrColumn = aggrColumns[i]
|
||||
const aggrName = aggrColumn.name
|
||||
|
||||
for (let groupName of groupNameSet) {
|
||||
const grouped = obj[groupName]
|
||||
const value = getCubeValue(grouped, aggrColumn.aggr, aggrName)
|
||||
const selector = getSelectorName(groupName, aggrColumns.length, aggrName)
|
||||
const index = selectorNameWithIndex[selector]
|
||||
row[index] = value
|
||||
}
|
||||
}
|
||||
|
||||
return { rowValue: row, }
|
||||
}
|
||||
|
||||
export function getObjectRowsFromCube(cube, schema, aggregatorColumns,
|
||||
keyColumnName, keyNames, groupNameSet) {
|
||||
keyColumnName, keyNames, groupNameSet,
|
||||
selectorNameWithIndex) {
|
||||
|
||||
if (!schema.key) {
|
||||
keyNames = [ 'root', ]
|
||||
cube = { root: cube, }
|
||||
}
|
||||
|
||||
const selectorSet = new Set()
|
||||
|
||||
const rows = keyNames.reduce((acc, key) => {
|
||||
const obj = cube[key]
|
||||
const row = getObjectRow(schema, aggregatorColumns, obj, groupNameSet, selectorSet)
|
||||
const row = getObjectRow(schema, aggregatorColumns, obj, groupNameSet, selectorNameWithIndex)
|
||||
|
||||
if (schema.key) { row[keyColumnName] = key }
|
||||
acc.push(row)
|
||||
|
|
@ -482,11 +583,10 @@ export function getObjectRowsFromCube(cube, schema, aggregatorColumns,
|
|||
return acc
|
||||
}, [])
|
||||
|
||||
return { transformed: rows, selectorSet: selectorSet, }
|
||||
return { transformed: rows, }
|
||||
}
|
||||
|
||||
|
||||
export function getObjectRow(schema, aggrColumns, obj, groupNameSet, selectorSet) {
|
||||
export function getObjectRow(schema, aggrColumns, obj, groupNameSet, selectorNameWithIndex) {
|
||||
const row = {}
|
||||
|
||||
/** when group is empty */
|
||||
|
|
@ -495,21 +595,8 @@ export function getObjectRow(schema, aggrColumns, obj, groupNameSet, selectorSet
|
|||
const aggrColumn = aggrColumns[i]
|
||||
const aggrName = aggrColumn.name
|
||||
|
||||
const selector = aggrColumn.name
|
||||
selectorSet.add(selector)
|
||||
|
||||
let value = null
|
||||
try {
|
||||
if (aggrColumn.aggr === Aggregator.AVG) {
|
||||
value = aggrColumn[aggrName].value / aggrColumn[aggrName].count
|
||||
} else if (aggrColumn.aggr === Aggregator.COUNT) {
|
||||
value = aggrColumn[aggrName].count
|
||||
} else {
|
||||
value = aggrColumn[aggrName].value
|
||||
}
|
||||
if (typeof value === 'undefined') { value = null }
|
||||
} catch (error) { /** iognore */ }
|
||||
|
||||
const value = getCubeValue(obj, aggrColumn.aggr, aggrName)
|
||||
const selector = getSelectorName(undefined, aggrColumns.length, aggrName)
|
||||
row[selector] = value
|
||||
}
|
||||
|
||||
|
|
@ -523,26 +610,12 @@ export function getObjectRow(schema, aggrColumns, obj, groupNameSet, selectorSet
|
|||
|
||||
for (let groupName of groupNameSet) {
|
||||
const grouped = obj[groupName]
|
||||
/** do not add aggrColumn name if group is specified && aggrColumns.length === 1 */
|
||||
const selector = (aggrColumns.length > 1) ?
|
||||
`${groupName} / ${aggrName}` : groupName
|
||||
selectorSet.add(selector)
|
||||
|
||||
if (grouped) {
|
||||
let value = null
|
||||
|
||||
/** if AVG or COUNT, calculate it now, previously we can't because we are accumulating */
|
||||
if (grouped.aggr === Aggregator.AVG) {
|
||||
value = grouped[aggrName].value / grouped[aggrName].count
|
||||
} else if (grouped.aggr === Aggregator.COUNT) {
|
||||
value = grouped[aggrName].count
|
||||
} else {
|
||||
value = grouped[aggrName].value
|
||||
}
|
||||
|
||||
const value = getCubeValue(grouped, aggrColumn.aggr, aggrName)
|
||||
const selector = getSelectorName(groupName, aggrColumns.length, aggrName)
|
||||
row[selector] = value
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -569,20 +642,33 @@ export function getTransformer(conf, rows, keyColumns, groupColumns, aggregatorC
|
|||
transformer = () => { return rows; }
|
||||
} else if (method === TransformMethod.CUBE) {
|
||||
transformer = () => {
|
||||
const { cube, schema, keyColumnName, keyNames, groupNameSet, } =
|
||||
const { cube, schema, keyColumnName, keyNames, groupNameSet, selectorNameWithIndex, } =
|
||||
getCube(rows, keyColumns, groupColumns, aggregatorColumns)
|
||||
|
||||
return { rows: [], cube, keyColumnName, keyNames, groupNameSet, }
|
||||
return { rows: [], cube, keyColumnName, keyNames, groupNameSet, selectorNameWithIndex, }
|
||||
}
|
||||
} else if (method === TransformMethod.OBJECT) {
|
||||
transformer = () => {
|
||||
const { cube, schema, keyColumnName, keyNames, groupNameSet, } =
|
||||
const { cube, schema, keyColumnName, keyNames, groupNameSet, selectorNameWithIndex, } =
|
||||
getCube(rows, keyColumns, groupColumns, aggregatorColumns)
|
||||
|
||||
const { transformed, selectorSet, } =
|
||||
getObjectRowsFromCube(cube, schema, aggregatorColumns, keyColumnName, keyNames, groupNameSet)
|
||||
const { transformed, } = getObjectRowsFromCube(cube, schema, aggregatorColumns,
|
||||
keyColumnName, keyNames, groupNameSet, selectorNameWithIndex)
|
||||
|
||||
return { rows: transformed, cube, keyColumnName, keyNames, selectorSet, }
|
||||
return {
|
||||
rows: transformed, cube, keyColumnName, keyNames,
|
||||
selectors: Object.keys(selectorNameWithIndex).sort(), /** to sort selectors */ }
|
||||
}
|
||||
} else if (method === TransformMethod.ARRAY) {
|
||||
transformer = () => {
|
||||
const { cube, schema, keyColumnName, keyNames, groupNameSet, selectorNameWithIndex, } =
|
||||
getCube(rows, keyColumns, groupColumns, aggregatorColumns)
|
||||
|
||||
const { transformed, sortedSelectors, sortedSelectorNameWithIndex } = getArrayRowsFromCube(
|
||||
cube, schema, aggregatorColumns, keyColumnName, keyNames, groupNameSet, selectorNameWithIndex)
|
||||
|
||||
return { rows: transformed, cube, keyColumnName, keyNames,
|
||||
selectors: sortedSelectors, selectorNameWithIndex: sortedSelectorNameWithIndex, }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue