From beae54e29b234494c5a1f16999fb173477fe6107 Mon Sep 17 00:00:00 2001 From: codefiles <11915375+codefiles@users.noreply.github.com> Date: Sat, 22 Mar 2025 15:02:14 -0400 Subject: [PATCH] Do not write passwords to /tmp --- archinstall/lib/general.py | 14 +++++++-- archinstall/lib/luks.py | 62 ++++++++++++++++++++------------------ 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index e9a6d6c13a..fee522eeeb 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -460,9 +460,19 @@ def _log_cmd(cmd: list[str]) -> None: pass -def run(cmd: list[str], input_data: bytes | None = None) -> None: +def run( + cmd: list[str], + input_data: bytes | None = None, +) -> subprocess.CompletedProcess[bytes]: _log_cmd(cmd) - subprocess.run(cmd, input=input_data, check=True) + + return subprocess.run( + cmd, + input=input_data, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True + ) def _pid_exists(pid: int) -> bool: diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 2cb407b4b6..a31f84ecc2 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -3,11 +3,12 @@ import shlex from dataclasses import dataclass from pathlib import Path +from subprocess import CalledProcessError from archinstall.lib.disk.utils import get_lsblk_info from .exceptions import DiskError, SysCallError -from .general import SysCommand, SysCommandWorker, generate_password +from .general import SysCommand, SysCommandWorker, generate_password, run from .output import debug, info @@ -58,16 +59,16 @@ def _password_bytes(self) -> bytes: else: return bytes(self.password, 'UTF-8') - def _get_key_file(self, key_file: Path | None = None) -> Path: - if key_file: - return key_file + def _get_passphrase_args( + self, + key_file: Path | None = None + ) -> tuple[list[str], bytes | None]: + key_file = key_file or self.key_file - if self.key_file: - return self.key_file + if key_file: + return ['--key-file', str(key_file)], None - default_key_file = Path(f'/tmp/{self.luks_dev_path.name}.disk_pw') - default_key_file.write_bytes(self._password_bytes()) - return default_key_file + return [], self._password_bytes() def encrypt( self, @@ -75,12 +76,12 @@ def encrypt( hash_type: str = 'sha512', iter_time: int = 10000, key_file: Path | None = None - ) -> Path: + ) -> Path | None: debug(f'Luks2 encrypting: {self.luks_dev_path}') - key_file = self._get_key_file(key_file) + key_file_arg, passphrase = self._get_passphrase_args(key_file) - cryptsetup_args = shlex.join([ + cmd = [ 'cryptsetup', '--batch-mode', '--verbose', @@ -89,19 +90,20 @@ def encrypt( '--hash', hash_type, '--key-size', str(key_size), '--iter-time', str(iter_time), - '--key-file', str(key_file), + *key_file_arg, '--use-urandom', - 'luksFormat', str(self.luks_dev_path), - ]) + 'luksFormat', str(self.luks_dev_path) + ] - debug(f'cryptsetup format: {cryptsetup_args}') + debug(f'cryptsetup format: {shlex.join(cmd)}') try: - result = SysCommand(cryptsetup_args).decode() - except SysCallError as err: - raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}') + result = run(cmd, input_data=passphrase) + except CalledProcessError as err: + output = err.stdout.decode().rstrip() + raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {output}') - debug(f'cryptsetup luksFormat output: {result}') + debug(f'cryptsetup luksFormat output: {result.stdout.decode().rstrip()}') self.key_file = key_file @@ -132,17 +134,19 @@ def unlock(self, key_file: Path | None = None) -> None: if not self.mapper_name: raise ValueError('mapper name missing') - key_file = self._get_key_file(key_file) + key_file_arg, passphrase = self._get_passphrase_args(key_file) + + cmd = [ + 'cryptsetup', 'open', + str(self.luks_dev_path), + str(self.mapper_name), + *key_file_arg, + '--type', 'luks2' + ] - result = SysCommand( - 'cryptsetup open ' - f'{self.luks_dev_path} ' - f'{self.mapper_name} ' - f'--key-file {key_file} ' - f'--type luks2' - ).decode() + result = run(cmd, input_data=passphrase) - debug(f'cryptsetup open output: {result}') + debug(f'cryptsetup open output: {result.stdout.decode().rstrip()}') if not self.mapper_dev or not self.mapper_dev.is_symlink(): raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')