mirror of
https://github.com/freelabz/secator
synced 2026-05-24 10:08:23 +00:00
feat(tree): prune condition-failing nodes from scan/workflow tree display (#1077)
Some checks are pending
Tests / lint (3.11) (push) Waiting to run
release-please / release-please (push) Waiting to run
Tests / coverage (3.11) (push) Blocked by required conditions
Tests / unit (3.11) (push) Waiting to run
Tests / unit (3.8) (push) Waiting to run
Tests / integration (ubuntu-latest, 3.11) (push) Waiting to run
Tests / template (ubuntu-latest, 3.11) (push) Waiting to run
Some checks are pending
Tests / lint (3.11) (push) Waiting to run
release-please / release-please (push) Waiting to run
Tests / coverage (3.11) (push) Blocked by required conditions
Tests / unit (3.11) (push) Waiting to run
Tests / unit (3.8) (push) Waiting to run
Tests / integration (ubuntu-latest, 3.11) (push) Waiting to run
Tests / template (ubuntu-latest, 3.11) (push) Waiting to run
## Summary - Adds `prune_runner_tree(tree, opts, inputs)` to `secator/tree.py` that walks the runner tree bottom-up and removes nodes whose `if` conditions evaluate to `False` against the current opts/targets - Wires it into `log_start()` in `_base.py` so the \"Scan built:\" / \"Workflow built:\" message only shows tasks and workflows that will actually run - Execution paths in `workflow.py` and `scan.py` are untouched — this is display-only ## Test Plan - [ ] Run `secator test unit` — 302 tests pass, 0 failures - [ ] Run `secator test lint` — clean - [ ] Run a scan or workflow that has conditional tasks with options that exclude some tasks — verify the excluded tasks do not appear in the \"built:\" tree - [ ] Run the same scan/workflow without the excluding options — verify the full tree is shown <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Scan/workflow tree displays are pruned before rendering, hiding nodes whose conditions evaluate to false for the current options and inputs; CLI tree output reflects this behavior and preserves nodes on evaluation errors. * **Documentation** * Added a formal design and implementation plan for the pruned tree display and wiring steps. * **Tests** * Added unit tests covering pruning behavior, condition evaluation edge cases, group handling, and input-driven pruning. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
b30e21355a
commit
f76c343251
4 changed files with 248 additions and 66 deletions
|
|
@ -15,8 +15,8 @@ from secator.click import CLICK_LIST
|
|||
from secator.definitions import ADDONS_ENABLED, AVAILABLE_DRIVERS, AVAILABLE_EXPORTERS
|
||||
from secator.runners import Scan, Task, Workflow
|
||||
from secator.template import get_config_options
|
||||
from secator.tree import build_runner_tree
|
||||
from secator.utils import (deduplicate, expand_input, get_command_category)
|
||||
from secator.tree import build_runner_tree, prune_runner_tree
|
||||
from secator.utils import deduplicate, expand_input, get_command_category
|
||||
from secator.loader import get_configs_by_type
|
||||
from secator.completion import complete_profiles, complete_workspaces, complete_drivers, complete_exporters
|
||||
|
||||
|
|
@ -78,6 +78,7 @@ def decorate_command_options(opts):
|
|||
Returns:
|
||||
function: Decorator.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
reversed_opts = OrderedDict(list(opts.items())[::-1])
|
||||
# Pre-pass in original order to assign each short opt to its first claimant.
|
||||
|
|
@ -121,7 +122,7 @@ def decorate_command_options(opts):
|
|||
long += f'/--no-{opt_name}'
|
||||
short += f'/-n{short_opt}' if short_opt else f'/-n{opt_name}'
|
||||
if applies_to:
|
||||
applies_to_str = ", ".join(f'[bold yellow3]{_}[/]' for _ in applies_to)
|
||||
applies_to_str = ', '.join(f'[bold yellow3]{_}[/]' for _ in applies_to)
|
||||
conf['help'] += rf' \[[dim]{applies_to_str}[/]]'
|
||||
if default_from:
|
||||
conf['help'] += rf' \[[dim]default from: [dim yellow3]{default_from}[/][/]]'
|
||||
|
|
@ -139,6 +140,7 @@ def decorate_command_options(opts):
|
|||
args.append(internal_name)
|
||||
f = click.option(*args, **conf)(f)
|
||||
return f
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
|
|
@ -153,30 +155,22 @@ def register_runner(cli_endpoint, config):
|
|||
'no_args_is_help': True,
|
||||
'context_settings': {
|
||||
'ignore_unknown_options': False,
|
||||
'allow_extra_args': False
|
||||
}
|
||||
'allow_extra_args': False,
|
||||
},
|
||||
}
|
||||
|
||||
if cli_endpoint.name == 'scan':
|
||||
runner_cls = Scan
|
||||
short_help = config.description or ''
|
||||
short_help += f' [dim]alias: {config.alias}' if config.alias else ''
|
||||
command_opts.update({
|
||||
'name': name,
|
||||
'short_help': short_help,
|
||||
'no_args_is_help': False
|
||||
})
|
||||
command_opts.update({'name': name, 'short_help': short_help, 'no_args_is_help': False})
|
||||
input_types = config.input_types
|
||||
|
||||
elif cli_endpoint.name == 'workflow':
|
||||
runner_cls = Workflow
|
||||
short_help = config.description or ''
|
||||
short_help = f'{short_help:<55} [dim](alias)[/][bold cyan] {config.alias}' if config.alias else ''
|
||||
command_opts.update({
|
||||
'name': name,
|
||||
'short_help': short_help,
|
||||
'no_args_is_help': False
|
||||
})
|
||||
command_opts.update({'name': name, 'short_help': short_help, 'no_args_is_help': False})
|
||||
input_types = config.input_types
|
||||
|
||||
elif cli_endpoint.name == 'task':
|
||||
|
|
@ -184,14 +178,10 @@ def register_runner(cli_endpoint, config):
|
|||
task_cls = Task.get_task_class(config.name)
|
||||
task_category = get_command_category(task_cls)
|
||||
short_help = f'[magenta]{task_category:<25}[/] {task_cls.__doc__}'
|
||||
command_opts.update({
|
||||
'name': name,
|
||||
'short_help': short_help,
|
||||
'no_args_is_help': False
|
||||
})
|
||||
command_opts.update({'name': name, 'short_help': short_help, 'no_args_is_help': False})
|
||||
input_types = task_cls.input_types
|
||||
else:
|
||||
raise ValueError(f"Unrecognized runner endpoint name {cli_endpoint.name}")
|
||||
raise ValueError(f'Unrecognized runner endpoint name {cli_endpoint.name}')
|
||||
input_types_str = '|'.join(input_types) if input_types else 'targets'
|
||||
default_inputs = None if config.default_inputs == {} else config.default_inputs
|
||||
input_required = default_inputs is None
|
||||
|
|
@ -200,7 +190,7 @@ def register_runner(cli_endpoint, config):
|
|||
config,
|
||||
exec_opts=CLI_EXEC_OPTS,
|
||||
output_opts=CLI_OUTPUT_OPTS,
|
||||
type_mapping=CLI_TYPE_MAPPING
|
||||
type_mapping=CLI_TYPE_MAPPING,
|
||||
)
|
||||
|
||||
# TODO: maybe allow this in the future
|
||||
|
|
@ -259,7 +249,23 @@ def register_runner(cli_endpoint, config):
|
|||
|
||||
# Show runner tree
|
||||
if tree:
|
||||
tree_opts = dict(opts)
|
||||
profiles_str = tree_opts.get('profiles') or ''
|
||||
profile_names = [p.strip() for p in profiles_str.split(',') if p.strip()]
|
||||
for dp in CONFIG.profiles.defaults or []:
|
||||
if dp not in profile_names:
|
||||
profile_names.append(dp)
|
||||
if profile_names:
|
||||
for profile in get_configs_by_type('profile'):
|
||||
if profile.name not in profile_names:
|
||||
continue
|
||||
for k, v in profile.opts.items():
|
||||
if profile.enforce or not tree_opts.get(k):
|
||||
tree_opts[k] = v
|
||||
tree = build_runner_tree(config)
|
||||
raw_inputs = tree_opts.get('inputs')
|
||||
tree_inputs = [raw_inputs] if isinstance(raw_inputs, str) else (raw_inputs or [])
|
||||
prune_runner_tree(tree, tree_opts, tree_inputs)
|
||||
console.print(tree.render_tree())
|
||||
sys.exit(0)
|
||||
|
||||
|
|
@ -283,6 +289,7 @@ def register_runner(cli_endpoint, config):
|
|||
console.print(f'[bold red]Missing "{driver}" addon: please run `secator install addons {driver}`[/].')
|
||||
sys.exit(1)
|
||||
from secator.utils import import_dynamic
|
||||
|
||||
driver_hooks = import_dynamic(f'secator.hooks.{driver}', 'HOOKS')
|
||||
if driver_hooks is None:
|
||||
console.print(f'[bold red]Missing "secator.hooks.{driver}.HOOKS".[/]')
|
||||
|
|
@ -298,6 +305,7 @@ def register_runner(cli_endpoint, config):
|
|||
if 'api' in context['drivers']:
|
||||
try:
|
||||
from secator.hooks.api import get_workspace_name
|
||||
|
||||
workspace_name = get_workspace_name(context.get('workspace_id'))
|
||||
context['workspace_name'] = workspace_name
|
||||
except Exception as e:
|
||||
|
|
@ -305,16 +313,16 @@ def register_runner(cli_endpoint, config):
|
|||
sys.exit(1)
|
||||
|
||||
if enable_pyinstrument or enable_memray:
|
||||
if not ADDONS_ENABLED["trace"]:
|
||||
console.print(
|
||||
'[bold red]Missing "trace" addon: please run `secator install addons trace`[/].'
|
||||
)
|
||||
if not ADDONS_ENABLED['trace']:
|
||||
console.print('[bold red]Missing "trace" addon: please run `secator install addons trace`[/].')
|
||||
sys.exit(1)
|
||||
import memray
|
||||
|
||||
output_file = f'trace_memray_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.bin'
|
||||
contextmanager = memray.Tracker(output_file)
|
||||
|
||||
from secator.utils import deep_merge_dicts
|
||||
|
||||
hooks = deep_merge_dicts(*hooks)
|
||||
|
||||
# Enable sync or not
|
||||
|
|
@ -322,6 +330,7 @@ def register_runner(cli_endpoint, config):
|
|||
sync = True
|
||||
else:
|
||||
from secator.celery import is_celery_worker_alive
|
||||
|
||||
worker_alive = is_celery_worker_alive()
|
||||
if not worker_alive and not sync:
|
||||
sync = True
|
||||
|
|
@ -329,55 +338,51 @@ def register_runner(cli_endpoint, config):
|
|||
sync = False
|
||||
broker_protocol = CONFIG.celery.broker_url.split('://')[0]
|
||||
backend_protocol = CONFIG.celery.result_backend.split('://')[0]
|
||||
if CONFIG.celery.broker_url and \
|
||||
(broker_protocol == 'redis' or backend_protocol == 'redis') \
|
||||
and not ADDONS_ENABLED['redis']:
|
||||
redis_required = broker_protocol == 'redis' or backend_protocol == 'redis'
|
||||
if CONFIG.celery.broker_url and redis_required and not ADDONS_ENABLED['redis']:
|
||||
Console().print('[bold red]Missing `redis` addon: please run `secator install addons redis`[/].')
|
||||
sys.exit(1)
|
||||
|
||||
from secator.utils import debug
|
||||
|
||||
debug('Run options', obj=opts, sub='cli')
|
||||
|
||||
# Set run options
|
||||
opts.update({
|
||||
'print_cmd': True,
|
||||
'print_item': True,
|
||||
'print_line': True,
|
||||
'print_progress': True,
|
||||
'print_profiles': True,
|
||||
'print_start': True,
|
||||
'print_target': True,
|
||||
'print_end': True,
|
||||
'print_remote_info': not sync,
|
||||
'piped_input': ctx.obj['piped_input'],
|
||||
'piped_output': ctx.obj['piped_output'],
|
||||
'caller': 'cli',
|
||||
'sync': sync,
|
||||
'quiet': quiet
|
||||
})
|
||||
opts.update(
|
||||
{
|
||||
'print_cmd': True,
|
||||
'print_item': True,
|
||||
'print_line': True,
|
||||
'print_progress': True,
|
||||
'print_profiles': True,
|
||||
'print_start': True,
|
||||
'print_target': True,
|
||||
'print_end': True,
|
||||
'print_remote_info': not sync,
|
||||
'piped_input': ctx.obj['piped_input'],
|
||||
'piped_output': ctx.obj['piped_output'],
|
||||
'caller': 'cli',
|
||||
'sync': sync,
|
||||
'quiet': quiet,
|
||||
}
|
||||
)
|
||||
|
||||
# Start runner
|
||||
with contextmanager:
|
||||
if enable_memray:
|
||||
process = psutil.Process()
|
||||
console.print(
|
||||
f"[bold yellow3]Initial RAM Usage: {process.memory_info().rss / 1024 ** 2} MB[/]"
|
||||
)
|
||||
console.print(f'[bold yellow3]Initial RAM Usage: {process.memory_info().rss / 1024**2} MB[/]')
|
||||
item_count = 0
|
||||
runner = runner_cls(
|
||||
config, inputs, run_opts=opts, hooks=hooks, context=context
|
||||
)
|
||||
runner = runner_cls(config, inputs, run_opts=opts, hooks=hooks, context=context)
|
||||
for item in runner:
|
||||
del item
|
||||
item_count += 1
|
||||
if process and item_count % 100 == 0:
|
||||
console.print(
|
||||
f"[bold yellow3]RAM Usage: {process.memory_info().rss / 1024 ** 2} MB[/]"
|
||||
)
|
||||
console.print(f'[bold yellow3]RAM Usage: {process.memory_info().rss / 1024**2} MB[/]')
|
||||
|
||||
if enable_memray:
|
||||
console.print(f"[bold green]Memray output file: {output_file}[/]")
|
||||
os.system(f"memray flamegraph {output_file}")
|
||||
console.print(f'[bold green]Memray output file: {output_file}[/]')
|
||||
os.system(f'memray flamegraph {output_file}')
|
||||
|
||||
generate_cli_subcommand(cli_endpoint, func, **command_opts)
|
||||
generate_rich_click_opt_groups(cli_endpoint, name, input_types, options)
|
||||
|
|
@ -410,19 +415,13 @@ def generate_rich_click_opt_groups(cli_endpoint, name, input_types, options):
|
|||
},
|
||||
]
|
||||
for prefix in prefixes:
|
||||
prefix_opts = [
|
||||
opt for opt, conf in options.items()
|
||||
if conf['prefix'] == prefix
|
||||
]
|
||||
prefix_opts = [opt for opt, conf in options.items() if conf['prefix'] == prefix]
|
||||
if prefix not in ['Execution', 'Output']:
|
||||
prefix_opts = sorted(prefix_opts)
|
||||
opt_names = [f'--{opt_name}' for opt_name in prefix_opts]
|
||||
if prefix == 'Output':
|
||||
opt_names.append('--help')
|
||||
opt_group.append({
|
||||
'name': prefix + ' options',
|
||||
'options': opt_names
|
||||
})
|
||||
opt_group.append({'name': prefix + ' options', 'options': opt_names})
|
||||
aliases = [cli_endpoint.name, *cli_endpoint.aliases]
|
||||
for alias in aliases:
|
||||
endpoint_name = f'secator {alias} {name}'
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from secator.report import Report
|
|||
from secator.rich import console, console_stdout
|
||||
from secator.runners._helpers import get_task_folder_id, run_extractors
|
||||
from secator.utils import debug, import_dynamic, should_update, autodetect_type, sanitize_folder_name
|
||||
from secator.tree import build_runner_tree
|
||||
from secator.tree import build_runner_tree, prune_runner_tree
|
||||
from secator.loader import get_configs_by_type
|
||||
|
||||
|
||||
|
|
@ -1019,7 +1019,9 @@ class Runner:
|
|||
if self.has_parent:
|
||||
return
|
||||
if self.config.type != 'task':
|
||||
tree = textwrap.indent(build_runner_tree(self.config).render_tree(), ' ')
|
||||
tree = build_runner_tree(self.config)
|
||||
prune_runner_tree(tree, self.run_opts, self.inputs)
|
||||
tree = textwrap.indent(tree.render_tree(), ' ')
|
||||
info = Info(message=f'{self.config.type.capitalize()} built:\n{tree}', _source=self.unique_name)
|
||||
self._print(info, rich=True)
|
||||
remote_str = 'started' if self.sync else 'started in worker'
|
||||
|
|
|
|||
|
|
@ -152,6 +152,53 @@ def build_runner_tree(config: DotMap, condition: Optional[str] = None, parent: O
|
|||
return tree
|
||||
|
||||
|
||||
def prune_runner_tree(tree: RunnerTree, opts: dict, inputs: list = None) -> RunnerTree:
|
||||
"""Remove nodes whose conditions evaluate to False against opts/inputs.
|
||||
|
||||
Walks bottom-up so child removals don't corrupt parent iteration.
|
||||
On eval error the node is kept (err on the side of showing more).
|
||||
When entering a workflow node, strips its name prefix from opts so that
|
||||
scan-level options like domain_recon_passive are visible as opts.passive
|
||||
inside that workflow's tasks (mirroring build_celery_workflow behaviour).
|
||||
"""
|
||||
safe_globals = {'__builtins__': {'len': len}}
|
||||
|
||||
def strip_prefix(parent_opts, workflow_name):
|
||||
prefix = workflow_name.split('/')[0] + '_'
|
||||
result = dict(parent_opts)
|
||||
for k, v in parent_opts.items():
|
||||
if k.startswith(prefix):
|
||||
result[k[len(prefix):]] = v
|
||||
return result
|
||||
|
||||
def prune_node(node: TaskNode, current_opts: dict):
|
||||
child_opts = strip_prefix(current_opts, node.name) if node.type == 'workflow' else current_opts
|
||||
for child in list(node.children):
|
||||
prune_node(child, child_opts)
|
||||
if node.type == 'group' and not node.children:
|
||||
node.remove()
|
||||
return
|
||||
if node.condition:
|
||||
local_ns = {'opts': DotMap(current_opts), 'targets': inputs or []}
|
||||
try:
|
||||
if not eval(node.condition, safe_globals, local_ns):
|
||||
node.remove()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for root in list(tree.root_nodes):
|
||||
prune_node(root, opts)
|
||||
if root.condition:
|
||||
local_ns = {'opts': DotMap(opts), 'targets': inputs or []}
|
||||
try:
|
||||
if not eval(root.condition, safe_globals, local_ns):
|
||||
tree.root_nodes.remove(root)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return tree
|
||||
|
||||
|
||||
def walk_runner_tree(tree: RunnerTree, visit_func):
|
||||
"""
|
||||
Walk the RunnerTree and visit each node.
|
||||
|
|
|
|||
134
tests/unit/test_tree.py
Normal file
134
tests/unit/test_tree.py
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
import unittest
|
||||
from secator.tree import TaskNode, RunnerTree, prune_runner_tree
|
||||
|
||||
|
||||
def make_tree(*nodes):
|
||||
"""Helper: build a RunnerTree with given root TaskNodes."""
|
||||
tree = RunnerTree('test', 'workflow')
|
||||
for node in nodes:
|
||||
tree.add_root_node(node)
|
||||
return tree
|
||||
|
||||
|
||||
def make_node(name, condition=None, children=None):
|
||||
node = TaskNode(name, 'task', name, condition=condition)
|
||||
for child in (children or []):
|
||||
child.parent = node
|
||||
node.add_child(child)
|
||||
return node
|
||||
|
||||
|
||||
class TestPruneRunnerTree(unittest.TestCase):
|
||||
|
||||
def test_no_conditions_unchanged(self):
|
||||
"""Nodes without conditions are never removed."""
|
||||
tree = make_tree(make_node('nmap'), make_node('httpx'))
|
||||
prune_runner_tree(tree, {})
|
||||
names = [n.name for n in tree.root_nodes]
|
||||
self.assertEqual(names, ['nmap', 'httpx'])
|
||||
|
||||
def test_false_condition_removes_node(self):
|
||||
"""Node with a False condition is removed from the tree."""
|
||||
tree = make_tree(
|
||||
make_node('nmap'),
|
||||
make_node('httpx', condition='opts.run_httpx'),
|
||||
)
|
||||
prune_runner_tree(tree, {'run_httpx': False})
|
||||
names = [n.name for n in tree.root_nodes]
|
||||
self.assertEqual(names, ['nmap'])
|
||||
|
||||
def test_true_condition_keeps_node(self):
|
||||
"""Node with a True condition is kept."""
|
||||
tree = make_tree(
|
||||
make_node('nmap', condition='opts.run_nmap'),
|
||||
)
|
||||
prune_runner_tree(tree, {'run_nmap': True})
|
||||
self.assertEqual(len(tree.root_nodes), 1)
|
||||
|
||||
def test_bad_condition_keeps_node(self):
|
||||
"""On eval error the node is kept (err on the side of showing more)."""
|
||||
tree = make_tree(make_node('nmap', condition='this is not valid python!!!'))
|
||||
prune_runner_tree(tree, {})
|
||||
self.assertEqual(len(tree.root_nodes), 1)
|
||||
|
||||
def test_child_false_condition_removes_only_child(self):
|
||||
"""A child with a false condition is removed; parent stays."""
|
||||
child = make_node('httpx', condition='opts.run_httpx')
|
||||
parent = make_node('discovery')
|
||||
parent.add_child(child)
|
||||
child.parent = parent
|
||||
tree = make_tree(parent)
|
||||
prune_runner_tree(tree, {'run_httpx': False})
|
||||
self.assertEqual(len(tree.root_nodes), 1)
|
||||
self.assertEqual(tree.root_nodes[0].children, [])
|
||||
|
||||
def test_targets_available_in_condition(self):
|
||||
"""Conditions can reference `targets`."""
|
||||
tree = make_tree(make_node('nmap', condition='len(targets) > 0'))
|
||||
prune_runner_tree(tree, {}, inputs=['192.168.1.1'])
|
||||
self.assertEqual(len(tree.root_nodes), 1)
|
||||
|
||||
def test_targets_empty_removes_node(self):
|
||||
tree = make_tree(make_node('nmap', condition='len(targets) > 0'))
|
||||
prune_runner_tree(tree, {}, inputs=[])
|
||||
self.assertEqual(len(tree.root_nodes), 0)
|
||||
|
||||
def test_empty_group_is_removed(self):
|
||||
"""A group whose only child is pruned is itself removed."""
|
||||
child = TaskNode('httpx', 'task', 'httpx', condition='opts.run_httpx')
|
||||
group = TaskNode('_group1', 'group', '_group1')
|
||||
child.parent = group
|
||||
group.add_child(child)
|
||||
root = TaskNode('discovery', 'workflow', 'discovery')
|
||||
group.parent = root
|
||||
root.add_child(group)
|
||||
tree = make_tree(root)
|
||||
prune_runner_tree(tree, {'run_httpx': False})
|
||||
self.assertEqual(root.children, [])
|
||||
|
||||
def test_group_with_surviving_child_is_kept(self):
|
||||
"""A group that still has children after pruning is not removed."""
|
||||
child1 = TaskNode('httpx', 'task', 'httpx', condition='opts.run_httpx')
|
||||
child2 = TaskNode('nmap', 'task', 'nmap')
|
||||
group = TaskNode('_group1', 'group', '_group1')
|
||||
for c in [child1, child2]:
|
||||
c.parent = group
|
||||
group.add_child(c)
|
||||
root = TaskNode('discovery', 'workflow', 'discovery')
|
||||
group.parent = root
|
||||
root.add_child(group)
|
||||
tree = make_tree(root)
|
||||
prune_runner_tree(tree, {'run_httpx': False})
|
||||
self.assertEqual(len(root.children), 1)
|
||||
self.assertEqual(root.children[0].name, '_group1')
|
||||
self.assertEqual(len(root.children[0].children), 1)
|
||||
self.assertEqual(root.children[0].children[0].name, 'nmap')
|
||||
|
||||
def test_scan_workflow_prefix_stripping(self):
|
||||
"""Scan-level opts like domain_recon_passive are visible as opts.passive inside domain_recon tasks."""
|
||||
task = TaskNode('httpx', 'task', 'domain_recon.httpx', condition='not opts.passive')
|
||||
task.parent = None # will be set below
|
||||
wf = TaskNode('domain_recon', 'workflow', 'domain_recon')
|
||||
task.parent = wf
|
||||
wf.add_child(task)
|
||||
scan = TaskNode('domain', 'scan', 'domain')
|
||||
wf.parent = scan
|
||||
scan.add_child(wf)
|
||||
tree = make_tree(scan)
|
||||
# --domain-recon-passive sets domain_recon_passive=True in scan-level opts
|
||||
prune_runner_tree(tree, {'domain_recon_passive': True})
|
||||
# httpx should be pruned because not opts.passive == not True == False
|
||||
self.assertEqual(wf.children, [])
|
||||
|
||||
def test_scan_workflow_prefix_stripping_keeps_when_false(self):
|
||||
"""Task is kept when the prefixed opt is False (passive not set for that workflow)."""
|
||||
task = TaskNode('httpx', 'task', 'domain_recon.httpx', condition='not opts.passive')
|
||||
wf = TaskNode('domain_recon', 'workflow', 'domain_recon')
|
||||
task.parent = wf
|
||||
wf.add_child(task)
|
||||
scan = TaskNode('domain', 'scan', 'domain')
|
||||
wf.parent = scan
|
||||
scan.add_child(wf)
|
||||
tree = make_tree(scan)
|
||||
prune_runner_tree(tree, {'domain_recon_passive': False})
|
||||
self.assertEqual(len(wf.children), 1)
|
||||
Loading…
Reference in a new issue