# usb_writer_linux.py import subprocess import os import time # Placeholder for progress reporting signal if this were a QObject # from PyQt6.QtCore import pyqtSignal class USBWriterLinux: # progress_signal = pyqtSignal(str) # Example for QObject integration def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None): """ Args: device: The path to the USB device (e.g., /dev/sdx). opencore_qcow2_path: Path to the OpenCore.qcow2 image. macos_qcow2_path: Path to the mac_hdd_ng.img (qcow2). progress_callback: A function to call with progress strings. """ self.device = device self.opencore_qcow2_path = opencore_qcow2_path self.macos_qcow2_path = macos_qcow2_path self.progress_callback = progress_callback self.opencore_raw_path = "opencore.raw" # Temporary raw image self.macos_raw_path = "macos_main.raw" # Temporary raw image self.mount_point_opencore_efi = "/mnt/opencore_efi_temp" self.mount_point_usb_esp = "/mnt/usb_esp_temp" def _report_progress(self, message: str): print(message) # For standalone testing if self.progress_callback: self.progress_callback(message) def _run_command(self, command: list[str], check=True, capture_output=False, shell=False): self._report_progress(f"Executing: {' '.join(command)}") try: process = subprocess.run( command, check=check, capture_output=capture_output, text=True, shell=shell # Use shell=True with caution ) if capture_output: if process.stdout: self._report_progress(f"STDOUT: {process.stdout.strip()}") if process.stderr: self._report_progress(f"STDERR: {process.stderr.strip()}") return process except subprocess.CalledProcessError as e: self._report_progress(f"Error executing {' '.join(command)}: {e}") if e.stderr: self._report_progress(f"STDERR: {e.stderr.strip()}") if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}") raise except FileNotFoundError: self._report_progress(f"Error: Command {command[0]} not found. Is it installed and in PATH?") raise def _cleanup_temp_files(self): self._report_progress("Cleaning up temporary files...") for f_path in [self.opencore_raw_path, self.macos_raw_path]: if os.path.exists(f_path): try: os.remove(f_path) self._report_progress(f"Removed {f_path}") except OSError as e: self._report_progress(f"Error removing {f_path}: {e}") def _unmount_and_remove_dir(self, mount_point): if os.path.ismount(mount_point): self._run_command(["sudo", "umount", mount_point], check=False) if os.path.exists(mount_point): try: os.rmdir(mount_point) except OSError as e: self._report_progress(f"Could not rmdir {mount_point}: {e}. May need manual cleanup.") def _cleanup_mappings_and_mounts(self): self._report_progress("Cleaning up mappings and mounts...") self._unmount_and_remove_dir(self.mount_point_opencore_efi) self._unmount_and_remove_dir(self.mount_point_usb_esp) # Unmap kpartx devices - this is tricky as we don't know the loop device name easily without parsing # For OpenCore raw image if os.path.exists(self.opencore_raw_path): self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path], check=False) # For the USB device itself, if kpartx was used on it (it shouldn't be for this workflow) # self._run_command(["sudo", "kpartx", "-d", self.device], check=False) def check_dependencies(self): self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat)...") dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat"] for dep in dependencies: try: self._run_command([dep, "--version" if dep != "kpartx" and dep != "mkfs.vfat" else "-V"], capture_output=True) # kpartx has no version, mkfs.vfat uses -V except (FileNotFoundError, subprocess.CalledProcessError) as e: self._report_progress(f"Dependency {dep} not found or not working: {e}") raise RuntimeError(f"Dependency {dep} not found. Please install it.") self._report_progress("All dependencies found.") return True def format_and_write(self) -> bool: try: self.check_dependencies() self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!") # Unmount any existing partitions on the target USB device self._report_progress(f"Unmounting all partitions on {self.device}...") for i in range(1, 5): # Try to unmount a few potential partitions self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False) self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False) # for nvme like # Create new GPT partition table self._report_progress(f"Creating new GPT partition table on {self.device}...") self._run_command(["sudo", "parted", "-s", self.device, "mklabel", "gpt"]) # Create EFI partition (e.g., 512MB) self._report_progress("Creating EFI partition (ESP)...") self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "EFI", "fat32", "1MiB", "513MiB"]) self._run_command(["sudo", "parted", "-s", self.device, "set", "1", "esp", "on"]) # Create macOS partition (remaining space) self._report_progress("Creating macOS partition...") self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "macOS", "hfs+", "513MiB", "100%"]) # Inform kernel of partition changes self._run_command(["sudo", "partprobe", self.device]) time.sleep(2) # Give kernel time to recognize new partitions # Determine partition names (e.g., /dev/sdx1, /dev/sdx2) # This can be unreliable. A better way is `lsblk -jo NAME,PATH /dev/sdx` # For simplicity, assuming /dev/sdx1 for ESP, /dev/sdx2 for macOS partition esp_partition = f"{self.device}1" if not os.path.exists(esp_partition): esp_partition = f"{self.device}p1" # for nvme like /dev/nvme0n1p1 macos_partition = f"{self.device}2" if not os.path.exists(macos_partition): macos_partition = f"{self.device}p2" if not (os.path.exists(esp_partition) and os.path.exists(macos_partition)): self._report_progress(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition} and {macos_partition}") # Attempt to find them via lsblk if possible (more robust) try: lsblk_out = self._run_command(["lsblk", "-no", "NAME", "--paths", self.device], capture_output=True, check=True).stdout.strip().splitlines() if len(lsblk_out) > 2 : # Device itself + at least 2 partitions esp_partition = lsblk_out[1] macos_partition = lsblk_out[2] self._report_progress(f"Determined partitions using lsblk: ESP={esp_partition}, macOS={macos_partition}") else: raise RuntimeError("lsblk did not return enough partitions.") except Exception as e_lsblk: self._report_progress(f"Failed to determine partitions using lsblk: {e_lsblk}") raise RuntimeError("Could not determine partition device names after partitioning.") # Format ESP as FAT32 self._report_progress(f"Formatting ESP ({esp_partition}) as FAT32...") self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition]) # --- Write EFI content --- self._report_progress(f"Converting OpenCore QCOW2 image ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...") self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path]) self._report_progress(f"Mapping partitions from {self.opencore_raw_path}...") map_output = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout self._report_progress(f"kpartx output: {map_output}") # Example output: add map loop0p1 (253:0): 0 1048576 linear /dev/loop0 2048 # We need to parse "loop0p1" or similar from this. mapped_efi_partition_name = None for line in map_output.splitlines(): if "loop" in line and "p1" in line: # Assuming first partition is EFI parts = line.split() if len(parts) > 2: mapped_efi_partition_name = parts[2] # e.g., loop0p1 break if not mapped_efi_partition_name: raise RuntimeError(f"Could not determine mapped EFI partition name from kpartx output for {self.opencore_raw_path}.") mapped_efi_device = f"/dev/mapper/{mapped_efi_partition_name}" self._report_progress(f"Mapped OpenCore EFI partition: {mapped_efi_device}") os.makedirs(self.mount_point_opencore_efi, exist_ok=True) os.makedirs(self.mount_point_usb_esp, exist_ok=True) self._report_progress(f"Mounting {mapped_efi_device} to {self.mount_point_opencore_efi}...") self._run_command(["sudo", "mount", "-o", "ro", mapped_efi_device, self.mount_point_opencore_efi]) self._report_progress(f"Mounting USB ESP ({esp_partition}) to {self.mount_point_usb_esp}...") self._run_command(["sudo", "mount", esp_partition, self.mount_point_usb_esp]) self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi} to {self.mount_point_usb_esp}...") # Copy contents of EFI folder source_efi_dir = os.path.join(self.mount_point_opencore_efi, "EFI") if not os.path.exists(source_efi_dir): # Sometimes it's directly in the root of the partition image source_efi_dir = self.mount_point_opencore_efi self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_dir}/", f"{self.mount_point_usb_esp}/"]) self._report_progress("Unmounting OpenCore EFI and USB ESP...") self._run_command(["sudo", "umount", self.mount_point_opencore_efi]) self._run_command(["sudo", "umount", self.mount_point_usb_esp]) self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path]) # Unmap loop device # --- Write macOS main image --- self._report_progress(f"Converting macOS QCOW2 image ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...") self._report_progress("This may take a very long time and consume significant disk space temporarily.") # Add dd progress status if possible, or estimate time based on size # For qemu-img, there's no easy progress for convert. self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path]) self._report_progress(f"Writing RAW macOS image ({self.macos_raw_path}) to {macos_partition}...") self._report_progress("This will also take a very long time. Please be patient.") # Using dd with progress status dd_command = ["sudo", "dd", f"if={self.macos_raw_path}", f"of={macos_partition}", "bs=4M", "status=progress", "conv=fsync"] self._run_command(dd_command) self._report_progress("USB writing process completed successfully.") return True except Exception as e: self._report_progress(f"An error occurred during USB writing: {e}") return False finally: self._cleanup_mappings_and_mounts() self._cleanup_temp_files() if __name__ == '__main__': # This is for standalone testing of this script. # YOU MUST RUN THIS SCRIPT WITH SUDO for it to work. # BE EXTREMELY CAREFUL with the device path. if os.geteuid() != 0: print("Please run this script as root (sudo) for testing.") exit(1) print("USB Writer Linux Standalone Test") # Replace with actual paths to your QCOW2 files for testing test_opencore_qcow2 = "path_to_your/OpenCore.qcow2" test_macos_qcow2 = "path_to_your/mac_hdd_ng.img" # IMPORTANT: List available block devices to help user choose. print("\nAvailable block devices (be careful!):") subprocess.run(["lsblk", "-d", "-o", "NAME,SIZE,MODEL"], check=True) test_device = input("\nEnter target device (e.g., /dev/sdX). THIS DEVICE WILL BE WIPED: ") if not test_device or not test_device.startswith("/dev/"): print("Invalid device. Exiting.") exit(1) if not (os.path.exists(test_opencore_qcow2) and os.path.exists(test_macos_qcow2)): print(f"Test files {test_opencore_qcow2} or {test_macos_qcow2} not found. Skipping write test.") else: confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write images? (yes/NO): ") if confirm.lower() == 'yes': writer = USBWriterLinux(test_device, test_opencore_qcow2, test_macos_qcow2, print) writer.format_and_write() else: print("Test cancelled by user.")