Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 30 additions & 91 deletions acquire/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import functools
import io
import itertools
import json
import logging
import os
import platform
Expand All @@ -22,10 +21,9 @@
from dissect.target.filesystem import Filesystem
from dissect.target.filesystems import ntfs
from dissect.target.helpers import fsutil
from dissect.target.loaders.remote import RemoteStreamConnection
from dissect.target.loaders.targetd import TargetdLoader
from dissect.target.plugins.apps.webserver import iis
from dissect.target.plugins.os.windows.log import evt, evtx
from dissect.target.tools.utils import args_to_uri
from dissect.util.stream import RunlistStream

from acquire.collector import Collector, get_full_formatted_report, get_report_summary
Expand Down Expand Up @@ -92,7 +90,6 @@

CLI_ARGS_MODULE = "cli-args"


log = logging.getLogger("acquire")
log.propagate = 0
log_file_handler = None
Expand Down Expand Up @@ -1658,45 +1655,6 @@ def print_acquire_warning(target: Target) -> None:
log.warning("========================================== WARNING ==========================================")


def modargs2json(args: argparse.Namespace) -> dict:
json_opts = {}
for module in MODULES.values():
cli_arg = module.__cli_args__[-1:][0][1]
if opt := cli_arg.get("dest"):
json_opts[opt] = getattr(args, opt)
return json_opts


def acquire_target(target: Target, *args, **kwargs) -> list[str]:
if isinstance(target._loader, TargetdLoader):
files = acquire_target_targetd(target, *args, **kwargs)
else:
files = acquire_target_regular(target, *args, **kwargs)
return files


def acquire_target_targetd(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None) -> list[str]:
files = []
# debug logs contain references to flow objects and will give errors
logging.getLogger().setLevel(logging.CRITICAL)
if not len(target.hostname()):
log.error("Unable to initialize targetd.")
return files
json_opts = modargs2json(args)
json_opts["profile"] = args.profile
json_opts["file"] = args.file
json_opts["directory"] = args.directory
json_opts["glob"] = args.glob
m = {"targetd-meta": "acquire", "args": json_opts}
json_str = json.dumps(m)
targetd = target._loader.instance.client
targetd.send_message(json_str.encode("utf-8"))
targetd.sync()
for stream in targetd.streams:
files.append(stream.out_file)
return files


def _add_modules_for_profile(choice: str, operating_system: str, profile: dict, msg: str) -> Optional[dict]:
modules_selected = dict()

Expand All @@ -1712,7 +1670,7 @@ def _add_modules_for_profile(choice: str, operating_system: str, profile: dict,
return modules_selected


def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None) -> list[str]:
def acquire_target(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None) -> list[str]:
acquire_gui = GUI()
files = []
output_ts = output_ts or get_utc_now_str()
Expand Down Expand Up @@ -2092,7 +2050,7 @@ class VolatileProfile:

def main() -> None:
parser = create_argument_parser(PROFILES, VOLATILE, MODULES)
args = parse_acquire_args(parser, config=CONFIG)
args, rest = parse_acquire_args(parser, config=CONFIG)

# start GUI if requested through CLI / config
flavour = None
Expand Down Expand Up @@ -2144,70 +2102,51 @@ def main() -> None:
log.exception(err)
parser.exit(1)

if args.targetd:
from targetd.tools.targetd import start_client

# set @auto hostname to real hostname
if args.targetd_hostname == "@auto":
args.targetd_hostname = f"/host/{Target.open('local').hostname}"

config = {
"function": args.targetd_func,
"topics": [args.targetd_hostname, args.targetd_groupname, args.targetd_globalname],
"link": args.targetd_link,
"address": args.targetd_ip,
"port": args.targetd_port,
"cacert_str": args.targetd_cacert,
"service": args.targetd_func == "agent",
"cacert": None,
}
start_client(args, presets=config)
return

if args.upload:
try:
upload_files(args.upload, args.upload_plugin, args.no_proxy)
except Exception:
log.exception("Failed to upload files")
return

RemoteStreamConnection.configure(args.cagent_key, args.cagent_certificate)

target_path = args.target

if target_path == "local":
target_query = {}
if args.force_fallback:
target_query.update({"force-directory-fs": 1})
target_paths = []
for target_path in args.targets:
target_path = args_to_uri([target_path], args.loader, rest)[0] if args.loader else target_path
if target_path == "local":
target_query = {}
if args.force_fallback:
target_query.update({"force-directory-fs": 1})

if args.fallback:
target_query.update({"fallback-to-directory-fs": 1})
if args.fallback:
target_query.update({"fallback-to-directory-fs": 1})

target_query = urllib.parse.urlencode(target_query)
target_path = f"{target_path}?{target_query}"

log.info("Loading target %s", target_path)
target_query = urllib.parse.urlencode(target_query)
target_path = f"{target_path}?{target_query}"
target_paths.append(target_path)

try:
target = Target.open(target_path)
log.info(target)
target_name = "Unknown" # just in case open_all already fails
for target in Target.open_all(target_paths):
target_name = "Unknown" # overwrite previous target name
target_name = target.name
log.info("Loading target %s", target_name)
log.info(target)
if target.os == "esxi" and target.name == "local":
# Loader found that we are running on an esxi host
# Perform operations to "enhance" memory
with esxi_memory_context_manager():
acquire_children_and_targets(target, args)
else:
acquire_children_and_targets(target, args)
except Exception:
if not is_user_admin():
log.error("Failed to load target, try re-running as administrator/root.")
log.error("Failed to load target: %s, try re-running as administrator/root", target_name)
acquire_gui.message("This application must be run as administrator.")
acquire_gui.wait_for_quit()
parser.exit(1)
log.exception("Failed to load target")
log.exception("Failed to load target: %s", target_name)
raise

if target.os == "esxi" and target.name == "local":
# Loader found that we are running on an esxi host
# Perform operations to "enhance" memory
with esxi_memory_context_manager():
acquire_children_and_targets(target, args)
else:
acquire_children_and_targets(target, args)


def load_child(target: Target, child_path: Path) -> None:
log.info("")
Expand Down
52 changes: 15 additions & 37 deletions acquire/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,6 @@
from acquire.outputs import OUTPUTS
from acquire.uploaders.plugin_registry import UploaderRegistry

# Acquire Configuration for CAgent and TargetD
CAGENT_TARGETD_ATTRS = {
"cagent_key",
"cagent_certificate",
"targetd_func",
"targetd_cacert",
"targetd_ip",
"targetd_port",
"targetd_hostname",
"targetd_groupname",
"targetd_globalname",
"targetd_link",
}


class StrEnum(str, Enum):
"""Sortable and serializible string-based enum"""
Expand Down Expand Up @@ -78,13 +64,7 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg
fromfile_prefix_chars="@",
)

parser.add_argument(
"target",
metavar="TARGET",
default="local",
nargs="?",
help="target to load (default: local)",
)
parser.add_argument("targets", metavar="TARGETS", default=["local"], nargs="*", help="Targets to load")
# Create a mutually exclusive group, such that only one of the output options can be used
output_group = parser.add_mutually_exclusive_group()
output_group.add_argument("-o", "--output", default=Path("."), type=Path, help="output directory")
Expand All @@ -102,11 +82,6 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg
action="store_true",
help="compress output (if supported by the output type)",
)
parser.add_argument(
"--targetd",
action="store_true",
help="setup and install targetd agent",
)
parser.add_argument(
"--encrypt",
action="store_true",
Expand All @@ -123,6 +98,13 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg
parser.add_argument("--public-key", type=Path, help=argparse.SUPPRESS)
parser.add_argument("-l", "--log", type=Path, help="log directory location")
parser.add_argument("--no-log", action="store_true", help=argparse.SUPPRESS)
parser.add_argument(
"-L",
"--loader",
action="store",
default=None,
help="select a specific loader (i.e. vmx, raw)",
)
parser.add_argument("-p", "--profile", choices=profiles.keys(), help="collection profile")
parser.add_argument("--volatile-profile", choices=volatile.keys(), help="volatile profile")

Expand Down Expand Up @@ -178,7 +160,7 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg
def parse_acquire_args(
parser: argparse.ArgumentParser,
config: dict[str, Any],
) -> argparse.Namespace:
) -> tuple[argparse.Namespace, list[str]]:
"""Parse and set the acquire command line arguments.

The arguments are set to values supplied in ``config[arguments]``, when not
Expand All @@ -194,10 +176,10 @@ def parse_acquire_args(
Returns:
A command line arguments namespace
"""
command_line_args = parser.parse_args()
command_line_args, rest = parser.parse_known_args()
_merge_args_and_config(parser, command_line_args, config)

return command_line_args
return command_line_args, rest


def _merge_args_and_config(
Expand Down Expand Up @@ -264,10 +246,10 @@ def check_and_set_log_args(args: argparse.Namespace):
# Logging to a single file is allowed, even if the file does not yet
# exist, as it will be automatically created. However then the parent
# directory must exist.
if args.children:
# If children are acquired, logging can only happen to separate
if args.children or len(args.targets) > 1:
# If children or multiple targets are acquired, logging can only happen to separate
# files, so log_path needs to be a directory.
raise ValueError("Log path must be a directory when using --children")
raise ValueError("Log path must be a directory when using multiple targets or --children")
else:
raise ValueError(f"Log path doesn't exist: {log_path}")

Expand Down Expand Up @@ -312,7 +294,7 @@ def check_and_set_acquire_args(

if not args.upload:
# check output related configuration
if args.children and args.output_file:
if (args.children or len(args.targets) > 1) and args.output_file:
raise ValueError("--children can not be used with --output_file. Use --output instead")
elif args.output_file and (not args.output_file.parent.is_dir() or args.output_file.is_dir()):
raise ValueError("--output_file must be a path to a file in an existing directory")
Expand All @@ -328,10 +310,6 @@ def check_and_set_acquire_args(
raise ValueError("No public key available (embedded or argument)")
setattr(args, "public_key", public_key)

# set cagent/targetd related configuration
for attr in CAGENT_TARGETD_ATTRS:
setattr(args, attr, args.config.get(attr))

if not args.children and args.skip_parent:
raise ValueError("--skip-parent can only be set with --children")

Expand Down
7 changes: 6 additions & 1 deletion tests/test_acquire_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
def acquire_parser_args(config: List, argument_list: List) -> Namespace:
CONFIG["arguments"] = config
with patch("argparse._sys.argv", [""] + argument_list):
return parse_acquire_args(create_argument_parser(PROFILES, VOLATILE, MODULES), config=CONFIG)
return parse_acquire_args(create_argument_parser(PROFILES, VOLATILE, MODULES), config=CONFIG)[0]


@pytest.mark.parametrize("config, argument_list", [([], [])])
Expand All @@ -34,3 +34,8 @@ def test_one_config_default_argument(acquire_parser_args):
@pytest.mark.parametrize("config, argument_list", [(["-f", "test"], ["-f", "best"])])
def test_config_default_argument_override(acquire_parser_args):
assert acquire_parser_args.file == ["best"]


@pytest.mark.parametrize("config, argument_list", [([], ["target1", "target2"])])
def test_local_target_fallbactargets(acquire_parser_args):
assert acquire_parser_args.targets == ["target1", "target2"]
21 changes: 1 addition & 20 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test_check_and_set_log_args(
def test_check_and_set_log_args_fail_log_to_file_with_children() -> None:
mock_path = get_mock_path(is_dir=False)
args = get_args(log=mock_path, children=True)
with pytest.raises(ValueError, match="Log path must be a directory when using --children"):
with pytest.raises(ValueError, match="Log path must be a directory when using multiple targets or --children"):
check_and_set_log_args(args)


Expand Down Expand Up @@ -141,11 +141,6 @@ def test_check_and_set_acquire_args_upload_auto_upload(arg_name: str) -> None:
args = get_args(**{arg_name: True, "config": config})
check_and_set_acquire_args(args, upload_plugins)

if arg_name == "upload":
assert "cagent_key" not in args
else:
assert args.cagent_key == cagent_key


@pytest.mark.parametrize(
"arg_name",
Expand Down Expand Up @@ -307,20 +302,6 @@ def test_check_and_set_acquire_args_encrypt_without_public_key_fail(public_key:
check_and_set_acquire_args(args, MagicMock())


def test_check_and_set_acquire_args_cagent() -> None:
cagent_key = "KEY"
cagent_certificate = "CERT"
config = {
"cagent_key": cagent_key,
"cagent_certificate": cagent_certificate,
}
args = get_args(config=config)
check_and_set_acquire_args(args, MagicMock())

assert args.cagent_key == cagent_key
assert args.cagent_certificate == cagent_certificate


@pytest.mark.parametrize(
"path, sysvol, resolve, lower_case, case_sensitive, os, result",
[
Expand Down