1
+ from __future__ import annotations
2
+
1
3
import argparse
2
4
import enum
3
5
import functools
15
17
from collections import defaultdict , namedtuple
16
18
from itertools import product
17
19
from pathlib import Path
18
- from typing import Iterator , Optional , Union
20
+ from typing import BinaryIO , Callable , Iterator , Optional , Union
19
21
20
- from dissect .target import Target , exceptions
22
+ from dissect .target import Target
21
23
from dissect .target .filesystem import Filesystem
22
24
from dissect .target .filesystems import ntfs
23
25
from dissect .target .helpers import fsutil
@@ -146,46 +148,48 @@ def misc_osx_user_homes(target: Target) -> Iterator[fsutil.TargetPath]:
146
148
def from_user_home (target : Target , path : str ) -> Iterator [str ]:
147
149
try :
148
150
for user_details in target .user_details .all_with_home ():
149
- yield normalize_path ( target , user_details .home_path .joinpath (path ), lower_case = False )
151
+ yield user_details .home_path .joinpath (path ). as_posix ( )
150
152
except Exception as e :
151
153
log .warning ("Error occurred when requesting all user homes" )
152
154
log .debug ("" , exc_info = e )
153
155
154
156
misc_user_homes = MISC_MAPPING .get (target .os , misc_unix_user_homes )
155
157
for user_dir in misc_user_homes (target ):
156
- yield str ( user_dir .joinpath (path ))
158
+ yield user_dir .joinpath (path ). as_posix ( )
157
159
158
160
159
- def iter_ntfs_filesystems (target : Target ) -> Iterator [tuple [ntfs .NtfsFilesystem , str , str ]]:
161
+ def iter_ntfs_filesystems (target : Target ) -> Iterator [tuple [ntfs .NtfsFilesystem , Optional [ str ], str , str ]]:
160
162
mount_lookup = defaultdict (list )
161
163
for mount , fs in target .fs .mounts .items ():
162
164
mount_lookup [fs ].append (mount )
163
165
164
- sysvol = target .fs .mounts ["sysvol" ]
165
166
for fs in target .filesystems :
166
- if fs in mount_lookup :
167
- mountpoints = ", " .join (mount_lookup [fs ])
168
- else :
169
- mountpoints = "No mounts"
170
-
171
167
# The attr check is needed to correctly collect fake NTFS filesystems
172
168
# where the MFT etc. are added to a VirtualFilesystem. This happens for
173
169
# instance when the target is an acquired tar target.
174
170
if not isinstance (fs , ntfs .NtfsFilesystem ) and not hasattr (fs , "ntfs" ):
175
- log .warning ("Skipping %s (%s) - not an NTFS filesystem" , fs , mountpoints )
171
+ log .warning ("Skipping %s - not an NTFS filesystem" , fs )
176
172
continue
177
173
178
- if fs == sysvol :
179
- name = "sysvol"
180
- elif fs in mount_lookup :
181
- name = mount_lookup [fs ][0 ]
174
+ if fs in mount_lookup :
175
+ mountpoints = mount_lookup [fs ]
176
+
177
+ for main_mountpoint in mountpoints :
178
+ if main_mountpoint != "sysvol" :
179
+ break
180
+
181
+ name = main_mountpoint
182
+ mountpoints = ", " .join (mountpoints )
182
183
else :
184
+ main_mountpoint = None
183
185
name = f"vol-{ fs .ntfs .serial :x} "
186
+ mountpoints = "No mounts"
187
+ log .warning ("Unmounted NTFS filesystem found %s (%s)" , fs , name )
184
188
185
- yield fs , name , mountpoints
189
+ yield fs , main_mountpoint , name , mountpoints
186
190
187
191
188
- def iter_esxi_filesystems (target : Target ) -> Iterator [tuple [str , str , Filesystem ]]:
192
+ def iter_esxi_filesystems (target : Target ) -> Iterator [tuple [Filesystem , str , str , Optional [ str ] ]]:
189
193
for mount , fs in target .fs .mounts .items ():
190
194
if not mount .startswith ("/vmfs/volumes/" ):
191
195
continue
@@ -197,11 +201,11 @@ def iter_esxi_filesystems(target: Target) -> Iterator[tuple[str, str, Filesystem
197
201
elif fs .__type__ == "vmfs" :
198
202
name = fs .vmfs .label
199
203
200
- yield uuid , name , fs
204
+ yield fs , mount , uuid , name
201
205
202
206
203
- def register_module (* args , ** kwargs ):
204
- def wrapper (module_cls ) :
207
+ def register_module (* args , ** kwargs ) -> Callable [[ type [ Module ]], type [ Module ]] :
208
+ def wrapper (module_cls : type [ Module ]) -> type [ Module ] :
205
209
name = module_cls .__name__
206
210
207
211
if name in MODULES :
@@ -225,8 +229,8 @@ def wrapper(module_cls):
225
229
return wrapper
226
230
227
231
228
- def module_arg (* args , ** kwargs ):
229
- def wrapper (module_cls ) :
232
+ def module_arg (* args , ** kwargs ) -> Callable [[ type [ Module ]], type [ Module ]] :
233
+ def wrapper (module_cls : type [ Module ]) -> type [ Module ] :
230
234
if not hasattr (module_cls , "__cli_args__" ):
231
235
module_cls .__cli_args__ = []
232
236
module_cls .__cli_args__ .append ((args , kwargs ))
@@ -235,7 +239,7 @@ def wrapper(module_cls):
235
239
return wrapper
236
240
237
241
238
- def local_module (cls ) :
242
+ def local_module (cls : type [ object ]) -> object :
239
243
"""A decorator that sets property `__local__` on a module class to mark it for local target only"""
240
244
cls .__local__ = True
241
245
return cls
@@ -305,80 +309,44 @@ class NTFS(Module):
305
309
306
310
@classmethod
307
311
def _run (cls , target : Target , cli_args : argparse .Namespace , collector : Collector ) -> None :
308
- for fs , name , mountpoints in iter_ntfs_filesystems (target ):
309
- log .info ("Acquiring %s (%s)" , fs , mountpoints )
312
+ for fs , main_mountpoint , name , mountpoints in iter_ntfs_filesystems (target ):
313
+ log .info ("Acquiring from %s as %s (%s)" , fs , name , mountpoints )
314
+
315
+ for filename in ("$MFT" , "$Boot" , "$Secure:$SDS" ):
316
+ if main_mountpoint is not None :
317
+ path = fsutil .join (main_mountpoint , filename )
318
+ collector .collect_path (path )
310
319
311
- collector .collect_file (fs .path ("$MFT" ), outpath = name + "/$MFT" )
312
- collector .collect_file (fs .path ("$Boot" ), outpath = name + "/$Boot" )
320
+ else :
321
+ # In case the NTFS filesystem is not mounted, which should not occur but
322
+ # iter_ntfs_filesystems allows for the possibility, we fall back to raw file
323
+ # collection.
324
+ collector .collect_file_raw (filename , fs , name )
313
325
314
326
cls .collect_usnjrnl (collector , fs , name )
315
- cls .collect_ntfs_secure (collector , fs , name )
316
327
317
328
@classmethod
318
329
def collect_usnjrnl (cls , collector : Collector , fs : Filesystem , name : str ) -> None :
319
- try :
320
- usnjrnl_path = fs .path ("$Extend/$Usnjrnl:$J" )
321
- entry = usnjrnl_path .get ()
322
- journal = entry .open ()
323
-
330
+ def usnjrnl_accessor (journal : BinaryIO ) -> tuple [BinaryIO , int ]:
324
331
# If the filesystem is a virtual NTFS filesystem, journal will be
325
332
# plain BinaryIO, not a RunlistStream.
326
333
if isinstance (journal , RunlistStream ):
327
334
i = 0
328
335
while journal .runlist [i ][0 ] is None :
329
336
journal .seek (journal .runlist [i ][1 ] * journal .block_size , io .SEEK_CUR )
330
337
i += 1
338
+ size = journal .size - journal .tell ()
339
+ else :
340
+ size = journal .size
331
341
332
- # Use the same method to construct the output path as is used in
333
- # collector.collect_file()
334
- outpath = collector ._output_path (f"{ name } /$Extend/$Usnjrnl:$J" )
335
-
336
- collector .output .write (
337
- outpath ,
338
- journal ,
339
- size = journal .size - journal .tell (),
340
- entry = entry ,
341
- )
342
- collector .report .add_file_collected (cls .__name__ , usnjrnl_path )
343
- result = "OK"
344
- except exceptions .FileNotFoundError :
345
- collector .report .add_file_missing (cls .__name__ , usnjrnl_path )
346
- result = "File not found"
347
- except Exception as err :
348
- log .debug ("Failed to acquire UsnJrnl" , exc_info = True )
349
- collector .report .add_file_failed (cls .__name__ , usnjrnl_path )
350
- result = repr (err )
351
-
352
- log .info ("- Collecting file $Extend/$Usnjrnl:$J: %s" , result )
353
-
354
- @classmethod
355
- def collect_ntfs_secure (cls , collector : Collector , fs : Filesystem , name : str ) -> None :
356
- try :
357
- secure_path = fs .path ("$Secure:$SDS" )
358
- entry = secure_path .get ()
359
- sds = entry .open ()
360
-
361
- # Use the same method to construct the output path as is used in
362
- # collector.collect_file()
363
- outpath = collector ._output_path (f"{ name } /$Secure:$SDS" )
364
-
365
- collector .output .write (
366
- outpath ,
367
- sds ,
368
- size = sds .size ,
369
- entry = entry ,
370
- )
371
- collector .report .add_file_collected (cls .__name__ , secure_path )
372
- result = "OK"
373
- except FileNotFoundError :
374
- collector .report .add_file_missing (cls .__name__ , secure_path )
375
- result = "File not found"
376
- except Exception as err :
377
- log .debug ("Failed to acquire SDS" , exc_info = True )
378
- collector .report .add_file_failed (cls .__name__ , secure_path )
379
- result = repr (err )
342
+ return (journal , size )
380
343
381
- log .info ("- Collecting file $Secure:$SDS: %s" , result )
344
+ collector .collect_file_raw (
345
+ "$Extend/$Usnjrnl:$J" ,
346
+ fs ,
347
+ name ,
348
+ file_accessor = usnjrnl_accessor ,
349
+ )
382
350
383
351
384
352
@register_module ("-r" , "--registry" )
@@ -719,13 +687,20 @@ def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector
719
687
patterns .extend (["$Recycle.Bin/$R*" , "$Recycle.Bin/*/$R*" , "RECYCLE*/D*" ])
720
688
721
689
with collector .file_filter (large_files_filter ):
722
- for fs , name , mountpoints in iter_ntfs_filesystems (target ):
723
- log .info ("Acquiring recycle bin from %s (%s)" , fs , mountpoints )
690
+ for fs , main_mountpoint , name , mountpoints in iter_ntfs_filesystems (target ):
691
+ log .info ("Acquiring recycle bin from %s as %s (%s)" , fs , name , mountpoints )
724
692
725
693
for pattern in patterns :
726
- for entry in fs .path ().glob (pattern ):
727
- if entry .is_file ():
728
- collector .collect_file (entry , outpath = fsutil .join (name , str (entry )))
694
+ if main_mountpoint is not None :
695
+ pattern = fsutil .join (main_mountpoint , pattern )
696
+ collector .collect_glob (pattern )
697
+ else :
698
+ # In case the NTFS filesystem is not mounted, which should not occur but
699
+ # iter_ntfs_filesystems allows for the possibility, we fall back to raw file
700
+ # collection.
701
+ for entry in fs .path ().glob (pattern ):
702
+ if entry .is_file ():
703
+ collector .collect_file_raw (fs , entry , name )
729
704
730
705
731
706
@register_module ("--drivers" )
@@ -1291,8 +1266,9 @@ class Boot(Module):
1291
1266
1292
1267
1293
1268
def private_key_filter (path : fsutil .TargetPath ) -> bool :
1294
- with path .open ("rt" ) as file :
1295
- return "PRIVATE KEY" in file .readline ()
1269
+ if path .is_file () and not path .is_symlink ():
1270
+ with path .open ("rt" ) as file :
1271
+ return "PRIVATE KEY" in file .readline ()
1296
1272
1297
1273
1298
1274
@register_module ("--home" )
@@ -1438,21 +1414,24 @@ def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector
1438
1414
"bootbank" : "BOOTBANK1" ,
1439
1415
"altbootbank" : "BOOTBANK2" ,
1440
1416
}
1441
- boot_fs = []
1417
+ boot_fs = {}
1442
1418
1443
1419
for boot_dir , boot_vol in boot_dirs .items ():
1444
1420
dir_path = target .fs .path (boot_dir )
1445
1421
if dir_path .is_symlink () and dir_path .exists ():
1446
1422
dst = dir_path .readlink ()
1447
- boot_fs .append ((dst .name , boot_vol , dst .get ().top .fs ))
1423
+ fs = dst .get ().top .fs
1424
+ boot_fs [fs ] = boot_vol
1448
1425
1449
- for uuid , name , fs in boot_fs :
1450
- log .info ("Acquiring /vmfs/volumes/%s (%s)" , uuid , name )
1451
- base = f"fs/{ uuid } :{ name } "
1452
- for path in fs .path ("/" ).rglob ("*" ):
1453
- if not path .is_file ():
1454
- continue
1455
- collector .collect_file (path , outpath = path , base = base )
1426
+ for fs , mountpoint , uuid , _ in iter_esxi_filesystems (target ):
1427
+ if fs in boot_fs :
1428
+ name = boot_fs [fs ]
1429
+ log .info ("Acquiring %s (%s)" , mountpoint , name )
1430
+ mountpoint_len = len (mountpoint )
1431
+ base = f"fs/{ uuid } :{ name } "
1432
+ for path in target .fs .path (mountpoint ).rglob ("*" ):
1433
+ outpath = path .as_posix ()[mountpoint_len :]
1434
+ collector .collect_path (path , outpath = outpath , base = base )
1456
1435
1457
1436
1458
1437
@register_module ("--esxi" )
@@ -1475,16 +1454,16 @@ class VMFS(Module):
1475
1454
1476
1455
@classmethod
1477
1456
def _run (cls , target : Target , cli_args : argparse .Namespace , collector : Collector ) -> None :
1478
- for uuid , name , fs in iter_esxi_filesystems (target ):
1457
+ for fs , mountpoint , uuid , name in iter_esxi_filesystems (target ):
1479
1458
if not fs .__type__ == "vmfs" :
1480
1459
continue
1481
1460
1482
- log .info ("Acquiring /vmfs/volumes/%s (%s)" , uuid , name )
1461
+ log .info ("Acquiring %s (%s)" , mountpoint , name )
1462
+ mountpoint_len = len (mountpoint )
1483
1463
base = f"fs/{ uuid } :{ name } "
1484
- for path in fs .path ("/" ).glob ("*.sf" ):
1485
- if not path .is_file ():
1486
- continue
1487
- collector .collect_file (path , outpath = path , base = base )
1464
+ for path in target .fs .path (mountpoint ).glob ("*.sf" ):
1465
+ outpath = path .as_posix ()[mountpoint_len :]
1466
+ collector .collect_path (path , outpath = outpath , base = base )
1488
1467
1489
1468
1490
1469
@register_module ("--activities-cache" )
@@ -1685,7 +1664,7 @@ def acquire_target(target: Target, args: argparse.Namespace, output_ts: Optional
1685
1664
if log_file :
1686
1665
files .append (log_file )
1687
1666
if target .path .name == "local" :
1688
- skip_list .add (normalize_path (target , log_file , resolve = True ))
1667
+ skip_list .add (normalize_path (target , log_file , resolve_parents = True , preserve_case = False ))
1689
1668
1690
1669
print_disks_overview (target )
1691
1670
print_volumes_overview (target )
@@ -1775,7 +1754,7 @@ def acquire_target(target: Target, args: argparse.Namespace, output_ts: Optional
1775
1754
log .info ("Logging to file %s" , log_path )
1776
1755
files = [log_file_handler .baseFilename ]
1777
1756
if target .path .name == "local" :
1778
- skip_list = {normalize_path (target , log_path , resolve = True )}
1757
+ skip_list = {normalize_path (target , log_path , resolve_parents = True , preserve_case = False )}
1779
1758
1780
1759
output_path = args .output or args .output_file
1781
1760
if output_path .is_dir ():
@@ -1791,7 +1770,7 @@ def acquire_target(target: Target, args: argparse.Namespace, output_ts: Optional
1791
1770
)
1792
1771
files .append (output .path )
1793
1772
if target .path .name == "local" :
1794
- skip_list .add (normalize_path (target , output .path , resolve = True ))
1773
+ skip_list .add (normalize_path (target , output .path , resolve_parents = True , preserve_case = False ))
1795
1774
1796
1775
log .info ("Writing output to %s" , output .path )
1797
1776
if skip_list :
0 commit comments