feat(tree): prune condition-failing nodes from scan/workflow tree display (#1077)
Some checks are pending
Tests / lint (3.11) (push) Waiting to run
release-please / release-please (push) Waiting to run
Tests / coverage (3.11) (push) Blocked by required conditions
Tests / unit (3.11) (push) Waiting to run
Tests / unit (3.8) (push) Waiting to run
Tests / integration (ubuntu-latest, 3.11) (push) Waiting to run
Tests / template (ubuntu-latest, 3.11) (push) Waiting to run

## Summary

- Adds `prune_runner_tree(tree, opts, inputs)` to `secator/tree.py` that
walks the runner tree bottom-up and removes nodes whose `if` conditions
evaluate to `False` against the current opts/targets
- Wires it into `log_start()` in `_base.py` so the \"Scan built:\" /
\"Workflow built:\" message only shows tasks and workflows that will
actually run
- Execution paths in `workflow.py` and `scan.py` are untouched — this is
display-only

## Test Plan

- [ ] Run `secator test unit` — 302 tests pass, 0 failures
- [ ] Run `secator test lint` — clean
- [ ] Run a scan or workflow that has conditional tasks with options
that exclude some tasks — verify the excluded tasks do not appear in the
\"built:\" tree
- [ ] Run the same scan/workflow without the excluding options — verify
the full tree is shown

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Scan/workflow tree displays are pruned before rendering, hiding nodes
whose conditions evaluate to false for the current options and inputs;
CLI tree output reflects this behavior and preserves nodes on evaluation
errors.

* **Documentation**
* Added a formal design and implementation plan for the pruned tree
display and wiring steps.

* **Tests**
* Added unit tests covering pruning behavior, condition evaluation edge
cases, group handling, and input-driven pruning.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Olivier Cervello 2026-05-12 23:19:41 +02:00 committed by GitHub
parent b30e21355a
commit f76c343251
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 248 additions and 66 deletions

View file

@ -15,8 +15,8 @@ from secator.click import CLICK_LIST
from secator.definitions import ADDONS_ENABLED, AVAILABLE_DRIVERS, AVAILABLE_EXPORTERS
from secator.runners import Scan, Task, Workflow
from secator.template import get_config_options
from secator.tree import build_runner_tree
from secator.utils import (deduplicate, expand_input, get_command_category)
from secator.tree import build_runner_tree, prune_runner_tree
from secator.utils import deduplicate, expand_input, get_command_category
from secator.loader import get_configs_by_type
from secator.completion import complete_profiles, complete_workspaces, complete_drivers, complete_exporters
@ -78,6 +78,7 @@ def decorate_command_options(opts):
Returns:
function: Decorator.
"""
def decorator(f):
reversed_opts = OrderedDict(list(opts.items())[::-1])
# Pre-pass in original order to assign each short opt to its first claimant.
@ -121,7 +122,7 @@ def decorate_command_options(opts):
long += f'/--no-{opt_name}'
short += f'/-n{short_opt}' if short_opt else f'/-n{opt_name}'
if applies_to:
applies_to_str = ", ".join(f'[bold yellow3]{_}[/]' for _ in applies_to)
applies_to_str = ', '.join(f'[bold yellow3]{_}[/]' for _ in applies_to)
conf['help'] += rf' \[[dim]{applies_to_str}[/]]'
if default_from:
conf['help'] += rf' \[[dim]default from: [dim yellow3]{default_from}[/][/]]'
@ -139,6 +140,7 @@ def decorate_command_options(opts):
args.append(internal_name)
f = click.option(*args, **conf)(f)
return f
return decorator
@ -153,30 +155,22 @@ def register_runner(cli_endpoint, config):
'no_args_is_help': True,
'context_settings': {
'ignore_unknown_options': False,
'allow_extra_args': False
}
'allow_extra_args': False,
},
}
if cli_endpoint.name == 'scan':
runner_cls = Scan
short_help = config.description or ''
short_help += f' [dim]alias: {config.alias}' if config.alias else ''
command_opts.update({
'name': name,
'short_help': short_help,
'no_args_is_help': False
})
command_opts.update({'name': name, 'short_help': short_help, 'no_args_is_help': False})
input_types = config.input_types
elif cli_endpoint.name == 'workflow':
runner_cls = Workflow
short_help = config.description or ''
short_help = f'{short_help:<55} [dim](alias)[/][bold cyan] {config.alias}' if config.alias else ''
command_opts.update({
'name': name,
'short_help': short_help,
'no_args_is_help': False
})
command_opts.update({'name': name, 'short_help': short_help, 'no_args_is_help': False})
input_types = config.input_types
elif cli_endpoint.name == 'task':
@ -184,14 +178,10 @@ def register_runner(cli_endpoint, config):
task_cls = Task.get_task_class(config.name)
task_category = get_command_category(task_cls)
short_help = f'[magenta]{task_category:<25}[/] {task_cls.__doc__}'
command_opts.update({
'name': name,
'short_help': short_help,
'no_args_is_help': False
})
command_opts.update({'name': name, 'short_help': short_help, 'no_args_is_help': False})
input_types = task_cls.input_types
else:
raise ValueError(f"Unrecognized runner endpoint name {cli_endpoint.name}")
raise ValueError(f'Unrecognized runner endpoint name {cli_endpoint.name}')
input_types_str = '|'.join(input_types) if input_types else 'targets'
default_inputs = None if config.default_inputs == {} else config.default_inputs
input_required = default_inputs is None
@ -200,7 +190,7 @@ def register_runner(cli_endpoint, config):
config,
exec_opts=CLI_EXEC_OPTS,
output_opts=CLI_OUTPUT_OPTS,
type_mapping=CLI_TYPE_MAPPING
type_mapping=CLI_TYPE_MAPPING,
)
# TODO: maybe allow this in the future
@ -259,7 +249,23 @@ def register_runner(cli_endpoint, config):
# Show runner tree
if tree:
tree_opts = dict(opts)
profiles_str = tree_opts.get('profiles') or ''
profile_names = [p.strip() for p in profiles_str.split(',') if p.strip()]
for dp in CONFIG.profiles.defaults or []:
if dp not in profile_names:
profile_names.append(dp)
if profile_names:
for profile in get_configs_by_type('profile'):
if profile.name not in profile_names:
continue
for k, v in profile.opts.items():
if profile.enforce or not tree_opts.get(k):
tree_opts[k] = v
tree = build_runner_tree(config)
raw_inputs = tree_opts.get('inputs')
tree_inputs = [raw_inputs] if isinstance(raw_inputs, str) else (raw_inputs or [])
prune_runner_tree(tree, tree_opts, tree_inputs)
console.print(tree.render_tree())
sys.exit(0)
@ -283,6 +289,7 @@ def register_runner(cli_endpoint, config):
console.print(f'[bold red]Missing "{driver}" addon: please run `secator install addons {driver}`[/].')
sys.exit(1)
from secator.utils import import_dynamic
driver_hooks = import_dynamic(f'secator.hooks.{driver}', 'HOOKS')
if driver_hooks is None:
console.print(f'[bold red]Missing "secator.hooks.{driver}.HOOKS".[/]')
@ -298,6 +305,7 @@ def register_runner(cli_endpoint, config):
if 'api' in context['drivers']:
try:
from secator.hooks.api import get_workspace_name
workspace_name = get_workspace_name(context.get('workspace_id'))
context['workspace_name'] = workspace_name
except Exception as e:
@ -305,16 +313,16 @@ def register_runner(cli_endpoint, config):
sys.exit(1)
if enable_pyinstrument or enable_memray:
if not ADDONS_ENABLED["trace"]:
console.print(
'[bold red]Missing "trace" addon: please run `secator install addons trace`[/].'
)
if not ADDONS_ENABLED['trace']:
console.print('[bold red]Missing "trace" addon: please run `secator install addons trace`[/].')
sys.exit(1)
import memray
output_file = f'trace_memray_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.bin'
contextmanager = memray.Tracker(output_file)
from secator.utils import deep_merge_dicts
hooks = deep_merge_dicts(*hooks)
# Enable sync or not
@ -322,6 +330,7 @@ def register_runner(cli_endpoint, config):
sync = True
else:
from secator.celery import is_celery_worker_alive
worker_alive = is_celery_worker_alive()
if not worker_alive and not sync:
sync = True
@ -329,55 +338,51 @@ def register_runner(cli_endpoint, config):
sync = False
broker_protocol = CONFIG.celery.broker_url.split('://')[0]
backend_protocol = CONFIG.celery.result_backend.split('://')[0]
if CONFIG.celery.broker_url and \
(broker_protocol == 'redis' or backend_protocol == 'redis') \
and not ADDONS_ENABLED['redis']:
redis_required = broker_protocol == 'redis' or backend_protocol == 'redis'
if CONFIG.celery.broker_url and redis_required and not ADDONS_ENABLED['redis']:
Console().print('[bold red]Missing `redis` addon: please run `secator install addons redis`[/].')
sys.exit(1)
from secator.utils import debug
debug('Run options', obj=opts, sub='cli')
# Set run options
opts.update({
'print_cmd': True,
'print_item': True,
'print_line': True,
'print_progress': True,
'print_profiles': True,
'print_start': True,
'print_target': True,
'print_end': True,
'print_remote_info': not sync,
'piped_input': ctx.obj['piped_input'],
'piped_output': ctx.obj['piped_output'],
'caller': 'cli',
'sync': sync,
'quiet': quiet
})
opts.update(
{
'print_cmd': True,
'print_item': True,
'print_line': True,
'print_progress': True,
'print_profiles': True,
'print_start': True,
'print_target': True,
'print_end': True,
'print_remote_info': not sync,
'piped_input': ctx.obj['piped_input'],
'piped_output': ctx.obj['piped_output'],
'caller': 'cli',
'sync': sync,
'quiet': quiet,
}
)
# Start runner
with contextmanager:
if enable_memray:
process = psutil.Process()
console.print(
f"[bold yellow3]Initial RAM Usage: {process.memory_info().rss / 1024 ** 2} MB[/]"
)
console.print(f'[bold yellow3]Initial RAM Usage: {process.memory_info().rss / 1024**2} MB[/]')
item_count = 0
runner = runner_cls(
config, inputs, run_opts=opts, hooks=hooks, context=context
)
runner = runner_cls(config, inputs, run_opts=opts, hooks=hooks, context=context)
for item in runner:
del item
item_count += 1
if process and item_count % 100 == 0:
console.print(
f"[bold yellow3]RAM Usage: {process.memory_info().rss / 1024 ** 2} MB[/]"
)
console.print(f'[bold yellow3]RAM Usage: {process.memory_info().rss / 1024**2} MB[/]')
if enable_memray:
console.print(f"[bold green]Memray output file: {output_file}[/]")
os.system(f"memray flamegraph {output_file}")
console.print(f'[bold green]Memray output file: {output_file}[/]')
os.system(f'memray flamegraph {output_file}')
generate_cli_subcommand(cli_endpoint, func, **command_opts)
generate_rich_click_opt_groups(cli_endpoint, name, input_types, options)
@ -410,19 +415,13 @@ def generate_rich_click_opt_groups(cli_endpoint, name, input_types, options):
},
]
for prefix in prefixes:
prefix_opts = [
opt for opt, conf in options.items()
if conf['prefix'] == prefix
]
prefix_opts = [opt for opt, conf in options.items() if conf['prefix'] == prefix]
if prefix not in ['Execution', 'Output']:
prefix_opts = sorted(prefix_opts)
opt_names = [f'--{opt_name}' for opt_name in prefix_opts]
if prefix == 'Output':
opt_names.append('--help')
opt_group.append({
'name': prefix + ' options',
'options': opt_names
})
opt_group.append({'name': prefix + ' options', 'options': opt_names})
aliases = [cli_endpoint.name, *cli_endpoint.aliases]
for alias in aliases:
endpoint_name = f'secator {alias} {name}'

View file

@ -20,7 +20,7 @@ from secator.report import Report
from secator.rich import console, console_stdout
from secator.runners._helpers import get_task_folder_id, run_extractors
from secator.utils import debug, import_dynamic, should_update, autodetect_type, sanitize_folder_name
from secator.tree import build_runner_tree
from secator.tree import build_runner_tree, prune_runner_tree
from secator.loader import get_configs_by_type
@ -1019,7 +1019,9 @@ class Runner:
if self.has_parent:
return
if self.config.type != 'task':
tree = textwrap.indent(build_runner_tree(self.config).render_tree(), ' ')
tree = build_runner_tree(self.config)
prune_runner_tree(tree, self.run_opts, self.inputs)
tree = textwrap.indent(tree.render_tree(), ' ')
info = Info(message=f'{self.config.type.capitalize()} built:\n{tree}', _source=self.unique_name)
self._print(info, rich=True)
remote_str = 'started' if self.sync else 'started in worker'

View file

@ -152,6 +152,53 @@ def build_runner_tree(config: DotMap, condition: Optional[str] = None, parent: O
return tree
def prune_runner_tree(tree: RunnerTree, opts: dict, inputs: list = None) -> RunnerTree:
"""Remove nodes whose conditions evaluate to False against opts/inputs.
Walks bottom-up so child removals don't corrupt parent iteration.
On eval error the node is kept (err on the side of showing more).
When entering a workflow node, strips its name prefix from opts so that
scan-level options like domain_recon_passive are visible as opts.passive
inside that workflow's tasks (mirroring build_celery_workflow behaviour).
"""
safe_globals = {'__builtins__': {'len': len}}
def strip_prefix(parent_opts, workflow_name):
prefix = workflow_name.split('/')[0] + '_'
result = dict(parent_opts)
for k, v in parent_opts.items():
if k.startswith(prefix):
result[k[len(prefix):]] = v
return result
def prune_node(node: TaskNode, current_opts: dict):
child_opts = strip_prefix(current_opts, node.name) if node.type == 'workflow' else current_opts
for child in list(node.children):
prune_node(child, child_opts)
if node.type == 'group' and not node.children:
node.remove()
return
if node.condition:
local_ns = {'opts': DotMap(current_opts), 'targets': inputs or []}
try:
if not eval(node.condition, safe_globals, local_ns):
node.remove()
except Exception:
pass
for root in list(tree.root_nodes):
prune_node(root, opts)
if root.condition:
local_ns = {'opts': DotMap(opts), 'targets': inputs or []}
try:
if not eval(root.condition, safe_globals, local_ns):
tree.root_nodes.remove(root)
except Exception:
pass
return tree
def walk_runner_tree(tree: RunnerTree, visit_func):
"""
Walk the RunnerTree and visit each node.

134
tests/unit/test_tree.py Normal file
View file

@ -0,0 +1,134 @@
import unittest
from secator.tree import TaskNode, RunnerTree, prune_runner_tree
def make_tree(*nodes):
"""Helper: build a RunnerTree with given root TaskNodes."""
tree = RunnerTree('test', 'workflow')
for node in nodes:
tree.add_root_node(node)
return tree
def make_node(name, condition=None, children=None):
node = TaskNode(name, 'task', name, condition=condition)
for child in (children or []):
child.parent = node
node.add_child(child)
return node
class TestPruneRunnerTree(unittest.TestCase):
def test_no_conditions_unchanged(self):
"""Nodes without conditions are never removed."""
tree = make_tree(make_node('nmap'), make_node('httpx'))
prune_runner_tree(tree, {})
names = [n.name for n in tree.root_nodes]
self.assertEqual(names, ['nmap', 'httpx'])
def test_false_condition_removes_node(self):
"""Node with a False condition is removed from the tree."""
tree = make_tree(
make_node('nmap'),
make_node('httpx', condition='opts.run_httpx'),
)
prune_runner_tree(tree, {'run_httpx': False})
names = [n.name for n in tree.root_nodes]
self.assertEqual(names, ['nmap'])
def test_true_condition_keeps_node(self):
"""Node with a True condition is kept."""
tree = make_tree(
make_node('nmap', condition='opts.run_nmap'),
)
prune_runner_tree(tree, {'run_nmap': True})
self.assertEqual(len(tree.root_nodes), 1)
def test_bad_condition_keeps_node(self):
"""On eval error the node is kept (err on the side of showing more)."""
tree = make_tree(make_node('nmap', condition='this is not valid python!!!'))
prune_runner_tree(tree, {})
self.assertEqual(len(tree.root_nodes), 1)
def test_child_false_condition_removes_only_child(self):
"""A child with a false condition is removed; parent stays."""
child = make_node('httpx', condition='opts.run_httpx')
parent = make_node('discovery')
parent.add_child(child)
child.parent = parent
tree = make_tree(parent)
prune_runner_tree(tree, {'run_httpx': False})
self.assertEqual(len(tree.root_nodes), 1)
self.assertEqual(tree.root_nodes[0].children, [])
def test_targets_available_in_condition(self):
"""Conditions can reference `targets`."""
tree = make_tree(make_node('nmap', condition='len(targets) > 0'))
prune_runner_tree(tree, {}, inputs=['192.168.1.1'])
self.assertEqual(len(tree.root_nodes), 1)
def test_targets_empty_removes_node(self):
tree = make_tree(make_node('nmap', condition='len(targets) > 0'))
prune_runner_tree(tree, {}, inputs=[])
self.assertEqual(len(tree.root_nodes), 0)
def test_empty_group_is_removed(self):
"""A group whose only child is pruned is itself removed."""
child = TaskNode('httpx', 'task', 'httpx', condition='opts.run_httpx')
group = TaskNode('_group1', 'group', '_group1')
child.parent = group
group.add_child(child)
root = TaskNode('discovery', 'workflow', 'discovery')
group.parent = root
root.add_child(group)
tree = make_tree(root)
prune_runner_tree(tree, {'run_httpx': False})
self.assertEqual(root.children, [])
def test_group_with_surviving_child_is_kept(self):
"""A group that still has children after pruning is not removed."""
child1 = TaskNode('httpx', 'task', 'httpx', condition='opts.run_httpx')
child2 = TaskNode('nmap', 'task', 'nmap')
group = TaskNode('_group1', 'group', '_group1')
for c in [child1, child2]:
c.parent = group
group.add_child(c)
root = TaskNode('discovery', 'workflow', 'discovery')
group.parent = root
root.add_child(group)
tree = make_tree(root)
prune_runner_tree(tree, {'run_httpx': False})
self.assertEqual(len(root.children), 1)
self.assertEqual(root.children[0].name, '_group1')
self.assertEqual(len(root.children[0].children), 1)
self.assertEqual(root.children[0].children[0].name, 'nmap')
def test_scan_workflow_prefix_stripping(self):
"""Scan-level opts like domain_recon_passive are visible as opts.passive inside domain_recon tasks."""
task = TaskNode('httpx', 'task', 'domain_recon.httpx', condition='not opts.passive')
task.parent = None # will be set below
wf = TaskNode('domain_recon', 'workflow', 'domain_recon')
task.parent = wf
wf.add_child(task)
scan = TaskNode('domain', 'scan', 'domain')
wf.parent = scan
scan.add_child(wf)
tree = make_tree(scan)
# --domain-recon-passive sets domain_recon_passive=True in scan-level opts
prune_runner_tree(tree, {'domain_recon_passive': True})
# httpx should be pruned because not opts.passive == not True == False
self.assertEqual(wf.children, [])
def test_scan_workflow_prefix_stripping_keeps_when_false(self):
"""Task is kept when the prefixed opt is False (passive not set for that workflow)."""
task = TaskNode('httpx', 'task', 'domain_recon.httpx', condition='not opts.passive')
wf = TaskNode('domain_recon', 'workflow', 'domain_recon')
task.parent = wf
wf.add_child(task)
scan = TaskNode('domain', 'scan', 'domain')
wf.parent = scan
scan.add_child(wf)
tree = make_tree(scan)
prune_runner_tree(tree, {'domain_recon_passive': False})
self.assertEqual(len(wf.children), 1)