-
Notifications
You must be signed in to change notification settings - Fork 1
/
reencrypt_backend.py
executable file
·656 lines (511 loc) · 21 KB
/
reencrypt_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
#!/usr/bin/env python
##
## PrivateOn-DeployReencrypt -- Because privacy matters
##
## Author: Mikko Rautiainen <info@tietosuojakone.fi>
##
## Copyright (C) 2016-2017 PrivateOn / Tietosuojakone Oy, Helsinki, Finland
## Released under the GNU Lesser General Public License
##
##
## This module does the heavy-lifting for the DeployReencrypt application
##
import decimal
import logging
import inspect
import random
import re
import string
import subprocess
import sys
import time
import os
## constants
DEBUG_LEVEL = logging.DEBUG
LOG_FILE = '/tmp/reencrypt.log'
OUT_FILE = '/tmp/reencrypt.out'
KEY_FILE = '/tmp/keyfile'
CRYPTSETUP = '/sbin/cryptsetup'
CRYPTSETUP_REENCRYPT = '/sbin/cryptsetup-reencrypt'
SUDO = '/usr/bin/sudo'
## functions
def run_command(command, description, target):
error_message = None
command = add_sudo(command)
try:
subprocess.check_call(command, shell=True)
except subprocess.CalledProcessError, err:
error_message = "Error: %s failed for %s with process error: returned non-zero exit status %s" % (description, target, str(err.returncode) )
return error_message
except OSError as err:
error_message = "Error: %s failed for %s with OS error: %s" % (description, target, err.strerror)
return error_message
except:
err = sys.exc_info()[1]
error_message = "Error: %s failed for %s with generic error: %s" % (description, target, err)
return error_message
return error_message
def run_command_with_output(command, description, target):
output = None
error_message = None
command = add_sudo(command)
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError, err:
error_message = "Error: %s failed for %s with process error: %s" % (description, target, err.output)
return [], error_message
except OSError as err:
error_message = "Error: %s failed for %s with OS error: %s" % (description, target, err.strerror)
return [], error_message
except:
err = sys.exc_info()[1]
error_message = "Error: %s failed for %s with generic error: %s" % (description, target, err)
return [], error_message
return output, error_message
def run_command_with_stderr(command, description, target):
error_message = None
command = add_sudo(command)
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError, err:
error_message = "Error: %s failed for %s with process error: %s" % (description, target, err.output)
return error_message
except OSError as err:
error_message = "Error: %s failed for %s with OS error: %s" % (description, target, err.strerror)
return error_message
except:
err = sys.exc_info()[1]
error_message = "Error: %s failed for %s with generic error: %s" % (description, target, err)
return error_message
return error_message
def popen_command(command, description, target):
error_message = None
command = add_sudo(command)
try:
FNULL = open(os.devnull, 'w')
subprocess.Popen(command, shell=True, stdout=FNULL)
FNULL.close()
except subprocess.CalledProcessError, err:
error_message = "Error: %s failed for %s with process error: returned non-zero exit status %s" % (description, target, str(err.returncode) )
return error_message
except OSError as err:
error_message = "Error: %s failed for %s with OS error: %s" % (description, target, err.strerror)
return error_message
except:
err = sys.exc_info()[1]
error_message = "Error: %s failed for %s with generic error: %s" % (description, target, err)
return error_message
return error_message
def add_sudo(command):
# if not root user
if os.geteuid() != 0:
# if command has pipe, add after pipe
if re.search('|', command):
result = command.split('|',1)
if len(result) >= 2:
command = result[0] + ' | ' + SUDO + ' ' + result[1]
else:
command = SUDO + ' ' + command
else:
command = SUDO + ' ' + command
return command
def check_out_file ():
error_message = None
# wait for process to start
time.sleep(1)
# if OUT_FILE is missing, give wait another 3 second
if not os.path.isfile(OUT_FILE):
time.sleep(3)
# if OUT_FILE still missing, give error message
if not os.path.isfile(OUT_FILE):
error_message = "Error: The reencrypt process failed to start: The cause of error is unknown."
return error_message
# read OUT_FILE
try:
with open(OUT_FILE, 'r') as out_file:
for line in out_file:
# skip warning text row
if re.search('this is experimental code', line):
continue
# everything OK if line has Progress or file is empty
if re.search('Progress', line):
break
# everything else is an error message
if not error_message:
error_message = "Error: Running reencrypt process: " + line.strip()
else:
error_message = ", " + line.strip()
except OSError as err:
error_message = "Error: Can't read file %s: %s" % (OUT_FILE, err.strerror)
return error_message
except:
error_message = "Error: Can't read file %s: Generic failure" % (OUT_FILE)
return error_message
return error_message
def test_password(part, password, slot=None):
error_message = None
# check that password is not empty
if not password:
error_message = "Error: The password can not be an empty string"
logging.error(error_message)
return error_message
# run cryptsetup luksOpen --test-passphrase
command = "printf '" + password + "'" + ' | ' + CRYPTSETUP + ' luksOpen --test-passphrase '
command = command + part
if slot is not None:
command = command + " -S " + str(slot)
description = 'LUKS volume pasword check'
error_message = run_command_with_stderr(command, description, part)
return error_message
def test_other_slots(part, password, key_slot):
if test_password(part, password, key_slot) == None:
for slot_num in range(8):
if slot_num == int(key_slot):
continue
if test_password(part, password, slot_num) == None:
return True
return False
def read_luks_header(part):
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# run luksDump
command = CRYPTSETUP + ' ' + 'luksDump' + ' ' + part
description = 'LUKS header dump'
output, error_message = run_command_with_output(command, description, part)
if not output:
logging.error(error_message)
return [], [], error_message
# initiate luks_details + luks_dict
luks_details = []
luks_dict = {}
luks_dict['cipher_name'] = 'UNKNOWN'
luks_dict['cipher_mode'] = 'UNKNOWN'
luks_dict['hash'] = 'UNKNOWN'
luks_dict['key_size'] = 'UNKNOWN'
luks_dict['uuid'] = 'UNKNOWN'
for slot_num in range(8):
luks_dict['key_slot_' + str(slot_num)] = 'UNKNOWN'
# parse output
for line in output.splitlines():
# write luks_details list
luks_details.append(line)
# parse Cipher name
if re.search('^Cipher name', line):
result = re.match(r"Cipher name:(.*)", line)
if result:
luks_dict['cipher_name'] = result.group(1).strip()
# parse Cipher mode
if re.search('^Cipher mode', line):
result = re.match(r"Cipher mode:(.*)", line)
if result:
luks_dict['cipher_mode'] = result.group(1).strip()
# parse Hash spec
if re.search('^Hash spec', line):
result = re.match(r"Hash spec:(.*)", line)
if result:
luks_dict['hash'] = result.group(1).strip()
# parse Key size
if re.search('^MK bits', line):
result = re.match(r"MK bits:(.*)", line)
if result:
luks_dict['key_size'] = result.group(1).strip()
# parse UUID
if re.search('^UUID', line):
result = re.match(r"UUID:(.*)", line)
if result:
luks_dict['uuid'] = result.group(1).strip()
# parse Key Slots
if re.search('^Key Slot', line):
result = re.match(r"Key Slot ([0-7]):", line)
if result:
slot_num = result.group(1)
if re.search('ENABLED', line):
luks_dict['key_slot_' + slot_num] = 'ENABLED'
elif re.search('DISABLED', line):
luks_dict['key_slot_' + slot_num] = 'DISABLED'
else:
luks_dict['key_slot_' + slot_num] = 'UNKNOWN'
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return luks_dict, luks_details, error_message
def read_master_key(part, password):
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# check that password is not empty
if not password:
error_message = "Error: The password can not be an empty string"
logging.error(error_message)
return error_message
# make random target name
map_name = 'target_' + ''.join( random.choice(string.lowercase) for i in range(5) )
# run cryptsetup luksOpen
command = "printf '" + password + "'" + ' | ' + CRYPTSETUP + ' luksOpen '
command = command + part + ' ' + map_name
description = 'open LUKS volume'
error_message = run_command_with_stderr(command, description, part)
if error_message:
logging.error(error_message)
return 'UNKNOWN', error_message
# turn off logging to prevent recoring the master key
logging.shutdown()
# run dmsetup table
command = 'dmsetup table --showkeys'
description = 'read volume table'
output, error_message1 = run_command_with_output(command, description, part)
# note: continue even if no output or error_message1 not empty
# turn logging back on
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
# parse output
master_key = 'UNKNOWN'
for line in output.splitlines():
# parse target line
if re.search( '^' + map_name, line ):
result = line.split(' ')
if len(result) >= 6:
master_key = result[5]
# run cryptsetup luksOpen
command = CRYPTSETUP + ' luksClose ' + map_name
description = 'close LUKS volume'
error_message2 = run_command_with_stderr(command, description, part)
if error_message1:
logging.error(error_message1)
return master_key, error_message1
if error_message2:
logging.error(error_message2)
return master_key, error_message2
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return master_key, error_message1
def add_key(part, password_current, password_new, key_slot):
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# check that password is not empty
if (not password_current) or (not password_new):
error_message = "Error: The password can not be an empty string"
logging.error(error_message)
return error_message
# delete tmp keyfile, just in case
if os.path.isfile(KEY_FILE):
os.remove(KEY_FILE)
# write password_new to KEY_FILE
key_file = open(KEY_FILE, 'w')
key_file.write(password_new)
key_file.close()
# run cryptsetup luksAddKey
command = "printf '" + password_current + "'" + ' | ' + CRYPTSETUP + ' luksAddKey '
command = command + part + ' --key-slot ' + str(key_slot) + ' ' + KEY_FILE
description = 'Adding LUKS key'
error_message = run_command_with_stderr(command, description, part)
# delete tmp keyfile
try:
os.remove(KEY_FILE)
except:
pass
if error_message:
logging.error(error_message)
return error_message
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return error_message
def delete_key(part, password_current, key_slot):
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# check that password is not empty
if not password_current:
error_message = "Error: The password can not be an empty string"
logging.error(error_message)
return error_message
# if password matches the slot and same password is not in used on other slot, give error
if test_password(part, password_current, key_slot) == None:
if test_other_slots(part, password_current, key_slot) == False:
error_message = "The password that you entered last matches this slot."
error_message = error_message + " \nPlease press \"Delete key\" again and enter a different password."
logging.error("Error: User tried to delete slot where this is only matching password")
return error_message
# run cryptsetup luksKillSlot
command = "printf '" + password_current + "'" + ' | ' + CRYPTSETUP + ' luksKillSlot '
command = command + part + ' ' + str(key_slot)
description = 'Deleting LUKS key'
error_message = run_command_with_stderr(command, description, part)
if error_message:
logging.error(error_message)
# check if error because of wrong password
if test_other_slots(part, password_current, key_slot) == False:
error_message = "The entered password doesn't match any of the slots."
error_message = error_message + " \nPlease press \"Delete key\" again and enter a different password."
logging.error(error_message)
return error_message
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return error_message
def preflight_check(part, password):
error_message = None
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# check that password is not empty and valid
error_message = test_password(part, password)
if error_message:
logging.error(error_message)
return error_message
# check that only one password slot is in use
luks_dict, luks_details, error_message = read_luks_header(part)
if error_message:
return error_message
slot_count = 0
for slot_num in range(8):
if luks_dict['key_slot_' + str(slot_num)] == 'ENABLED':
slot_count = slot_count + 1
if slot_count != 1:
if slot_count == 2:
error_message = "This application can only re-encrypt if one key slot is in use."
error_message = error_message +" Please delete %d key slot." % (slot_count-1)
elif slot_count > 2:
error_message = "This application can only re-encrypt if one key slot is in use."
error_message = error_message +" Please delete %d key slots." % (slot_count-1)
else:
error_message = "Error: No key slots detected"
logging.error(error_message)
return error_message
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return error_message
def do_reecnrypt(part, password):
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# run cryptsetup-reencrypt
command = "printf '" + password + "'" + ' | ' + CRYPTSETUP_REENCRYPT + ' ' + part + ' 2>' + OUT_FILE
if DEBUG_LEVEL == logging.DEBUG:
logging.debug("Debug: Re-encrypt command = %s", add_sudo(command) )
description = 'Re-encryption'
error_message = popen_command(command, description, part)
if error_message:
logging.error(error_message)
return error_message
# check if cryptsetup_reencrypt has writen an error message to out file
error_message = check_out_file()
if error_message:
logging.error(error_message)
return error_message
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return error_message
def poll_progress ():
error_message = None
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# initiate progress_dict
progress_dict = {}
progress_dict['status'] = 'UNKNOWN'
progress_dict['percent'] = 0
progress_dict['eta'] = None
progress_dict['written'] = 'UNKNOWN'
progress_dict['speed'] = None
# if OUT_FILE is missing, give error message
if not os.path.isfile(OUT_FILE):
error_message = "Error: The output file for the re-encrypt process is missing"
logging.error(error_message)
return progress_dict, error_message
# read OUT_FILE's last line
last_line = None
try:
with open(OUT_FILE, 'r') as out_file:
for line in out_file:
last_line = line
except OSError as err:
error_message = "Error: Can't read file %s: %s" % (OUT_FILE, err.strerror)
logging.error(error_message)
return progress_dict, error_message
except:
error_message = "Error: Can't read file %s: Generic failure" % (OUT_FILE)
logging.error(error_message)
return progress_dict, error_message
if not last_line:
error_message = "Warning: Failed to read output of re-encrypt process"
logging.warning(error_message)
return progress_dict, error_message
# get last entry after clearline EL2 character
last_progress = last_line.rsplit('\x1b[2K',1)
if len(last_progress) < 2:
last_progress = last_line
else:
last_progress = last_progress[1]
# parse last_progress
# example line = "Progress: 100.0%, ETA 00:00, 484 MiB written, speed 77.2 MiB/s"
components = last_progress.split(', ')
for component in components:
component = component.strip()
# parse Progress
if re.search('Progress', component):
result = re.match(r"Progress: (.*)\%", component)
if result:
progress_dict['percent'] = decimal.Decimal(result.group(1).strip())
# parse ETA
if re.search('^ETA', component):
result = re.match(r"ETA (.*)", component)
if result:
progress_dict['eta'] = result.group(1).strip()
progress_dict['status'] = 'running'
# parse written
if re.search('written', component):
result = re.match(r"(.*) written", component)
if result:
progress_dict['written'] = result.group(1).strip()
progress_dict['status'] = 'running'
# parse speed
if re.search('^speed', component):
result = re.match(r"speed (.*)", component)
if result:
progress_dict['speed'] = result.group(1).strip()
progress_dict['status'] = 'running'
# decide if the process is "running" or "completed"
if progress_dict['percent'] == 100:
progress_dict['status'] = 'completed'
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return progress_dict, error_message
def do_reboot ():
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# run reboot
command = '/sbin/reboot'
description = 'Reboot'
target = 'system'
error_message = run_command_with_stderr(command, description, target)
if error_message:
logging.error(error_message)
return error_message
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return error_message
def do_poweroff ():
# logging
function_name = inspect.stack()[0][3]
logging.basicConfig(filename=LOG_FILE, level=DEBUG_LEVEL)
logging.info("Starting reencrypt_backend.%s", function_name)
# run poweroff
command = '/sbin/poweroff'
description = 'Poweroff'
target = 'system'
error_message = run_command_with_stderr(command, description, target)
if error_message:
logging.error(error_message)
return error_message
logging.info("Completed reencrypt_backend.%s", function_name)
logging.shutdown()
return error_message