From a80706a40a18482df0294e7742acdc7dc9f2ab59 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 1 Jun 2025 21:40:05 +0000 Subject: [PATCH] Jules was unable to complete the task in time. Please review the work done so far and provide feedback for Jules to continue. --- main_app.py | 602 ++++++++++++++++++++---------------------- usb_writer_linux.py | 343 +++++++++++++----------- usb_writer_macos.py | 313 ++++++++++++++++++++++ usb_writer_windows.py | 177 +++++++++++++ 4 files changed, 966 insertions(+), 469 deletions(-) create mode 100644 usb_writer_macos.py create mode 100644 usb_writer_windows.py diff --git a/main_app.py b/main_app.py index 53bdd01..ad442b6 100644 --- a/main_app.py +++ b/main_app.py @@ -1,16 +1,14 @@ # main_app.py - import sys import subprocess -import threading import os import psutil -import platform # For OS detection and USB writing logic +import platform from PyQt6.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QComboBox, QPushButton, QTextEdit, QMessageBox, QMenuBar, - QFileDialog, QGroupBox + QFileDialog, QGroupBox, QLineEdit # Added QLineEdit ) from PyQt6.QtGui import QAction from PyQt6.QtCore import pyqtSignal, pyqtSlot, QObject, QThread @@ -22,25 +20,26 @@ from utils import ( build_docker_stop_command, build_docker_rm_command ) -# Import the Linux USB writer (conditionally or handle import error) +USBWriterLinux = None +USBWriterMacOS = None +USBWriterWindows = None + if platform.system() == "Linux": - try: - from usb_writer_linux import USBWriterLinux - except ImportError: - USBWriterLinux = None # Flag that it's not available - print("Could not import USBWriterLinux. USB writing for Linux will be disabled.") -else: - USBWriterLinux = None + try: from usb_writer_linux import USBWriterLinux + except ImportError as e: print(f"Could not import USBWriterLinux: {e}") +elif platform.system() == "Darwin": + try: from usb_writer_macos import USBWriterMacOS + except ImportError as e: print(f"Could not import USBWriterMacOS: {e}") +elif platform.system() == "Windows": + try: from usb_writer_windows import USBWriterWindows + except ImportError as e: print(f"Could not import USBWriterWindows: {e}") - -# --- Worker Signals --- class WorkerSignals(QObject): progress = pyqtSignal(str) finished = pyqtSignal(str) error = pyqtSignal(str) -# --- Docker Process Worker --- -class DockerRunWorker(QObject): +class DockerRunWorker(QObject): # ... (same as before) def __init__(self, command_list): super().__init__() self.command_list = command_list @@ -65,13 +64,13 @@ class DockerRunWorker(QObject): self.signals.progress.emit(line) self.process.stdout.close() return_code = self.process.wait() - if not self._is_running and return_code != 0: - self.signals.finished.emit("Docker process cancelled by user.") - return + if not self._is_running and return_code != 0 : + self.signals.finished.emit(f"Docker process cancelled or stopped early (exit code {return_code}).") + return if return_code == 0: self.signals.finished.emit("Docker VM process (QEMU) closed by user or completed.") else: - self.signals.error.emit(f"Docker VM process exited with code {return_code}. Assuming macOS setup was attempted.") + self.signals.finished.emit(f"Docker VM process exited (code {return_code}). Assuming macOS setup was attempted or QEMU window closed.") except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.") except Exception as e: self.signals.error.emit(f"An error occurred during Docker run: {str(e)}") finally: self._is_running = False @@ -89,8 +88,7 @@ class DockerRunWorker(QObject): self.signals.progress.emit("Docker process stopped.\n") except Exception as e: self.signals.error.emit(f"Error stopping process: {str(e)}\n") -# --- Docker Command Execution Worker --- -class DockerCommandWorker(QObject): +class DockerCommandWorker(QObject): # ... (same as before) def __init__(self, command_list, success_message="Command completed."): super().__init__() self.command_list = command_list @@ -105,8 +103,8 @@ class DockerCommandWorker(QObject): self.command_list, capture_output=True, text=True, check=False, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) - if result.stdout: self.signals.progress.emit(result.stdout) - if result.stderr: self.signals.progress.emit(f"STDERR: {result.stderr}") + if result.stdout and result.stdout.strip(): self.signals.progress.emit(result.stdout) + if result.stderr and result.stderr.strip(): self.signals.progress.emit(f"STDERR: {result.stderr}") if result.returncode == 0: self.signals.finished.emit(self.success_message) else: err_msg = result.stderr or result.stdout or "Unknown error" @@ -114,11 +112,8 @@ class DockerCommandWorker(QObject): except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.") except Exception as e: self.signals.error.emit(f"An error occurred: {str(e)}") - -# --- USB Writing Worker --- -class USBWriterWorker(QObject): +class USBWriterWorker(QObject): # ... (same as before, uses platform check) signals = WorkerSignals() - def __init__(self, device, opencore_path, macos_path): super().__init__() self.device = device @@ -128,120 +123,107 @@ class USBWriterWorker(QObject): @pyqtSlot() def run(self): + current_os = platform.system() try: - if platform.system() == "Linux": - if USBWriterLinux is None: - self.signals.error.emit("USBWriterLinux module not loaded. Cannot write to USB on this system.") - return - - self.writer_instance = USBWriterLinux( - self.device, self.opencore_path, self.macos_path, - progress_callback=lambda msg: self.signals.progress.emit(msg) - ) - # Dependency check is called within format_and_write - if self.writer_instance.format_and_write(): - self.signals.finished.emit("USB writing process completed successfully.") - else: - # Error message should have been emitted by the writer via progress_callback - self.signals.error.emit("USB writing process failed. Check output for details.") + if current_os == "Linux": + if USBWriterLinux is None: self.signals.error.emit("USBWriterLinux module not available."); return + self.writer_instance = USBWriterLinux(self.device, self.opencore_path, self.macos_path, lambda msg: self.signals.progress.emit(msg)) + elif current_os == "Darwin": + if USBWriterMacOS is None: self.signals.error.emit("USBWriterMacOS module not available."); return + self.writer_instance = USBWriterMacOS(self.device, self.opencore_path, self.macos_path, lambda msg: self.signals.progress.emit(msg)) + elif current_os == "Windows": + if USBWriterWindows is None: self.signals.error.emit("USBWriterWindows module not available."); return + self.writer_instance = USBWriterWindows(self.device, self.opencore_path, self.macos_path, lambda msg: self.signals.progress.emit(msg)) else: - self.signals.error.emit(f"USB writing is not currently supported on {platform.system()}.") + self.signals.error.emit(f"USB writing not supported on {current_os}."); return + + if self.writer_instance.format_and_write(): + self.signals.finished.emit("USB writing process completed successfully.") + else: + self.signals.error.emit("USB writing process failed. Check output for details.") except Exception as e: - self.signals.error.emit(f"An unexpected error occurred during USB writing preparation: {str(e)}") + self.signals.error.emit(f"USB writing preparation error: {str(e)}") -class MainWindow(QMainWindow): +class MainWindow(QMainWindow): # ... (init and _setup_ui need changes for Windows USB input) def __init__(self): super().__init__() self.setWindowTitle(APP_NAME) - self.setGeometry(100, 100, 800, 800) + self.setGeometry(100, 100, 800, 850) # Adjusted height self.current_container_name = None self.extracted_main_image_path = None self.extracted_opencore_image_path = None self.extraction_status = {"main": False, "opencore": False} - self.active_worker_thread = None # To manage various worker threads one at a time + self.active_worker_thread = None + self.docker_run_worker_instance = None self._setup_ui() self.refresh_usb_drives() def _setup_ui(self): - # ... (menu bar setup - same as before) ... - menubar = self.menuBar() - file_menu = menubar.addMenu("&File") - help_menu = menubar.addMenu("&Help") - exit_action = QAction("&Exit", self) - exit_action.triggered.connect(self.close) - file_menu.addAction(exit_action) - about_action = QAction("&About", self) - about_action.triggered.connect(self.show_about_dialog) - help_menu.addAction(about_action) - - central_widget = QWidget() - self.setCentralWidget(central_widget) - main_layout = QVBoxLayout(central_widget) - - # Step 1 - vm_creation_group = QGroupBox("Step 1: Create and Install macOS VM") - vm_layout = QVBoxLayout() - selection_layout = QHBoxLayout() - self.version_label = QLabel("Select macOS Version:") - self.version_combo = QComboBox() - self.version_combo.addItems(MACOS_VERSIONS.keys()) - selection_layout.addWidget(self.version_label) - selection_layout.addWidget(self.version_combo) - vm_layout.addLayout(selection_layout) - self.run_vm_button = QPushButton("Create VM and Start macOS Installation") - self.run_vm_button.clicked.connect(self.run_macos_vm) - vm_layout.addWidget(self.run_vm_button) - self.stop_vm_button = QPushButton("Stop/Cancel VM Creation") - self.stop_vm_button.clicked.connect(self.stop_docker_run_process) - self.stop_vm_button.setEnabled(False) - vm_layout.addWidget(self.stop_vm_button) - vm_creation_group.setLayout(vm_layout) + # ... (Menu bar, Step 1, 2, 3 groups - same as before) ... + menubar = self.menuBar(); file_menu = menubar.addMenu("&File"); help_menu = menubar.addMenu("&Help") + exit_action = QAction("&Exit", self); exit_action.triggered.connect(self.close); file_menu.addAction(exit_action) + about_action = QAction("&About", self); about_action.triggered.connect(self.show_about_dialog); help_menu.addAction(about_action) + central_widget = QWidget(); self.setCentralWidget(central_widget); main_layout = QVBoxLayout(central_widget) + vm_creation_group = QGroupBox("Step 1: Create and Install macOS VM"); vm_layout = QVBoxLayout() + selection_layout = QHBoxLayout(); self.version_label = QLabel("Select macOS Version:"); self.version_combo = QComboBox() + self.version_combo.addItems(MACOS_VERSIONS.keys()); selection_layout.addWidget(self.version_label); selection_layout.addWidget(self.version_combo) + vm_layout.addLayout(selection_layout); self.run_vm_button = QPushButton("Create VM and Start macOS Installation") + self.run_vm_button.clicked.connect(self.run_macos_vm); vm_layout.addWidget(self.run_vm_button) + self.stop_vm_button = QPushButton("Stop/Cancel VM Creation"); self.stop_vm_button.clicked.connect(self.stop_docker_run_process) + self.stop_vm_button.setEnabled(False); vm_layout.addWidget(self.stop_vm_button); vm_creation_group.setLayout(vm_layout) main_layout.addWidget(vm_creation_group) - - # Step 2 - extraction_group = QGroupBox("Step 2: Extract VM Images") - ext_layout = QVBoxLayout() - self.extract_images_button = QPushButton("Extract Images from Container") - self.extract_images_button.clicked.connect(self.extract_vm_images) - self.extract_images_button.setEnabled(False) - ext_layout.addWidget(self.extract_images_button) - extraction_group.setLayout(ext_layout) + extraction_group = QGroupBox("Step 2: Extract VM Images"); ext_layout = QVBoxLayout() + self.extract_images_button = QPushButton("Extract Images from Container"); self.extract_images_button.clicked.connect(self.extract_vm_images) + self.extract_images_button.setEnabled(False); ext_layout.addWidget(self.extract_images_button); extraction_group.setLayout(ext_layout) main_layout.addWidget(extraction_group) - - # Step 3 - mgmt_group = QGroupBox("Step 3: Container Management (Optional)") - mgmt_layout = QHBoxLayout() - self.stop_container_button = QPushButton("Stop Container") - self.stop_container_button.clicked.connect(self.stop_persistent_container) - self.stop_container_button.setEnabled(False) - mgmt_layout.addWidget(self.stop_container_button) - self.remove_container_button = QPushButton("Remove Container") - self.remove_container_button.clicked.connect(self.remove_persistent_container) - self.remove_container_button.setEnabled(False) - mgmt_layout.addWidget(self.remove_container_button) - mgmt_group.setLayout(mgmt_layout) + mgmt_group = QGroupBox("Step 3: Container Management (Optional)"); mgmt_layout = QHBoxLayout() + self.stop_container_button = QPushButton("Stop Container"); self.stop_container_button.clicked.connect(self.stop_persistent_container) + self.stop_container_button.setEnabled(False); mgmt_layout.addWidget(self.stop_container_button) + self.remove_container_button = QPushButton("Remove Container"); self.remove_container_button.clicked.connect(self.remove_persistent_container) + self.remove_container_button.setEnabled(False); mgmt_layout.addWidget(self.remove_container_button); mgmt_group.setLayout(mgmt_layout) main_layout.addWidget(mgmt_group) - # Step 4: USB Drive Selection - usb_group = QGroupBox("Step 4: Select Target USB Drive and Write") # Title updated + # Step 4: USB Drive Selection - Modified for Windows + usb_group = QGroupBox("Step 4: Select Target USB Drive and Write") usb_layout = QVBoxLayout() + + self.usb_drive_label = QLabel("Available USB Drives (for Linux/macOS):") + usb_layout.addWidget(self.usb_drive_label) + usb_selection_layout = QHBoxLayout() self.usb_drive_combo = QComboBox() self.usb_drive_combo.currentIndexChanged.connect(self.update_write_to_usb_button_state) - usb_selection_layout.addWidget(QLabel("Available USB Drives:")) usb_selection_layout.addWidget(self.usb_drive_combo) + self.refresh_usb_button = QPushButton("Refresh List") self.refresh_usb_button.clicked.connect(self.refresh_usb_drives) usb_selection_layout.addWidget(self.refresh_usb_button) usb_layout.addLayout(usb_selection_layout) + + # Windows-specific input for disk ID + self.windows_usb_input_label = QLabel("For Windows: Enter USB Disk Number (e.g., 1, 2). Use 'diskpart' -> 'list disk' in an Admin CMD to find it.") + self.windows_disk_id_input = QLineEdit() + self.windows_disk_id_input.setPlaceholderText("Enter Disk Number (e.g., 1)") + self.windows_disk_id_input.textChanged.connect(self.update_write_to_usb_button_state) + + if platform.system() == "Windows": + self.usb_drive_label.setText("Detected Mountable Partitions (for reference only for writing):") + usb_layout.addWidget(self.windows_usb_input_label) + usb_layout.addWidget(self.windows_disk_id_input) + else: + self.windows_usb_input_label.setVisible(False) + self.windows_disk_id_input.setVisible(False) + warning_label = QLabel("WARNING: Selecting a drive and proceeding to write will ERASE ALL DATA on it!") warning_label.setStyleSheet("color: red; font-weight: bold;") usb_layout.addWidget(warning_label) + self.write_to_usb_button = QPushButton("Write Images to USB Drive") self.write_to_usb_button.clicked.connect(self.handle_write_to_usb) self.write_to_usb_button.setEnabled(False) usb_layout.addWidget(self.write_to_usb_button) + usb_group.setLayout(usb_layout) main_layout.addWidget(usb_group) @@ -249,329 +231,327 @@ class MainWindow(QMainWindow): self.output_area.setReadOnly(True) main_layout.addWidget(self.output_area) - def show_about_dialog(self): + def show_about_dialog(self): # ... (same as before, update version) QMessageBox.about(self, f"About {APP_NAME}", - f"Version: 0.4.0\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\n" + f"Version: 0.6.0\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\n" "This tool helps create bootable macOS USB drives using Docker-OSX.") - def _start_worker(self, worker_instance, on_finished_slot, on_error_slot): + def _start_worker(self, worker_instance, on_finished_slot, on_error_slot, worker_name="worker"): # ... (same as before) if self.active_worker_thread and self.active_worker_thread.isRunning(): QMessageBox.warning(self, "Busy", "Another operation is already in progress. Please wait.") return False - self.active_worker_thread = QThread() + self.active_worker_thread.setObjectName(worker_name + "_thread") + setattr(self, f"{worker_name}_instance", worker_instance) worker_instance.moveToThread(self.active_worker_thread) - worker_instance.signals.progress.connect(self.update_output) worker_instance.signals.finished.connect(on_finished_slot) worker_instance.signals.error.connect(on_error_slot) - - # Cleanup thread when worker is done worker_instance.signals.finished.connect(self.active_worker_thread.quit) worker_instance.signals.error.connect(self.active_worker_thread.quit) self.active_worker_thread.finished.connect(self.active_worker_thread.deleteLater) - + self.active_worker_thread.finished.connect(lambda: self._clear_worker_instance(worker_name)) # Use new clear method self.active_worker_thread.started.connect(worker_instance.run) self.active_worker_thread.start() return True - def run_macos_vm(self): + def _clear_worker_instance(self, worker_name): # New method to clean up worker instance from self + attr_name = f"{worker_name}_instance" + if hasattr(self, attr_name): + delattr(self, attr_name) + + def run_macos_vm(self): # ... (same as before, ensure worker_name matches for _clear_worker_instance) selected_version_name = self.version_combo.currentText() self.current_container_name = get_unique_container_name() try: command_list = build_docker_command(selected_version_name, self.current_container_name) self.output_area.clear() - self.output_area.append(f"Starting macOS VM creation for {selected_version_name}...") - self.output_area.append(f"Container name: {self.current_container_name}") - self.output_area.append(f"Command: {' '.join(command_list)}\n") - self.output_area.append("The macOS installation will occur in a QEMU window...\n") + self.output_area.append(f"Starting macOS VM creation for {selected_version_name}...") # ... rest of messages - self.docker_run_worker_instance = DockerRunWorker(command_list) # Store instance - if self._start_worker(self.docker_run_worker_instance, self.docker_run_finished, self.docker_run_error): - self.run_vm_button.setEnabled(False) - self.version_combo.setEnabled(False) - self.stop_vm_button.setEnabled(True) - self.extract_images_button.setEnabled(False) + docker_run_worker = DockerRunWorker(command_list) # Local var, instance stored by _start_worker + if self._start_worker(docker_run_worker, self.docker_run_finished, self.docker_run_error, "docker_run"): + self.run_vm_button.setEnabled(False); self.version_combo.setEnabled(False) + self.stop_vm_button.setEnabled(True); self.extract_images_button.setEnabled(False) self.write_to_usb_button.setEnabled(False) except ValueError as e: self.handle_error(f"Failed to build command: {str(e)}") except Exception as e: self.handle_error(f"An unexpected error: {str(e)}") @pyqtSlot(str) - def update_output(self, text): - self.output_area.append(text.strip()) # append automatically scrolls - QApplication.processEvents() # Keep UI responsive during rapid updates + def update_output(self, text): # ... (same as before) + self.output_area.append(text.strip()); QApplication.processEvents() @pyqtSlot(str) - def docker_run_finished(self, message): + def docker_run_finished(self, message): # ... (same as before) self.output_area.append(f"\n--- macOS VM Setup Process Finished ---\n{message}") QMessageBox.information(self, "VM Setup Complete", f"{message}\nYou can now proceed to extract images.") - self.run_vm_button.setEnabled(True) - self.version_combo.setEnabled(True) - self.stop_vm_button.setEnabled(False) - self.extract_images_button.setEnabled(True) + self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True) + self.stop_vm_button.setEnabled(False); self.extract_images_button.setEnabled(True) self.stop_container_button.setEnabled(True) - self.active_worker_thread = None # Allow new worker + self.active_worker_thread = None # Cleared by _start_worker's finished connection + @pyqtSlot(str) - def docker_run_error(self, error_message): + def docker_run_error(self, error_message): # ... (same as before) self.output_area.append(f"\n--- macOS VM Setup Process Error ---\n{error_message}") - if "exited with code" in error_message and self.current_container_name: + if "exited" in error_message.lower() and self.current_container_name: QMessageBox.warning(self, "VM Setup Ended", f"{error_message}\nAssuming macOS setup was attempted...") - self.extract_images_button.setEnabled(True) - self.stop_container_button.setEnabled(True) + self.extract_images_button.setEnabled(True); self.stop_container_button.setEnabled(True) else: QMessageBox.critical(self, "VM Setup Error", error_message) self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True); self.stop_vm_button.setEnabled(False) self.active_worker_thread = None - def stop_docker_run_process(self): - if hasattr(self, 'docker_run_worker_instance') and self.docker_run_worker_instance: - self.output_area.append("\n--- Attempting to stop macOS VM creation ---") - self.docker_run_worker_instance.stop() # Worker should handle signal emission - self.stop_vm_button.setEnabled(False) # Disable to prevent multiple clicks - def extract_vm_images(self): - if not self.current_container_name: - QMessageBox.warning(self, "Warning", "No active container specified for extraction."); return + def stop_docker_run_process(self): + docker_run_worker_inst = getattr(self, "docker_run_instance", None) # Use specific name + if docker_run_worker_inst: + self.output_area.append("\n--- Attempting to stop macOS VM creation ---") + docker_run_worker_inst.stop() + self.stop_vm_button.setEnabled(False) + + def extract_vm_images(self): # ... (same as before, ensure worker_names are unique) + if not self.current_container_name: QMessageBox.warning(self, "Warning", "No active container."); return save_dir = QFileDialog.getExistingDirectory(self, "Select Directory to Save VM Images") if not save_dir: return - self.output_area.append(f"\n--- Starting Image Extraction from {self.current_container_name} to {save_dir} ---") self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False) - self.extracted_main_image_path = os.path.join(save_dir, "mac_hdd_ng.img") self.extracted_opencore_image_path = os.path.join(save_dir, "OpenCore.qcow2") self.extraction_status = {"main": False, "opencore": False} - cp_main_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_MACOS_IMG_PATH, self.extracted_main_image_path) main_worker = DockerCommandWorker(cp_main_cmd, f"Main macOS image copied to {self.extracted_main_image_path}") - if not self._start_worker(main_worker, - lambda msg: self.docker_utility_finished(msg, "main_img_extract"), - lambda err: self.docker_utility_error(err, "main_img_extract_error")): - self.extract_images_button.setEnabled(True) # Re-enable if start failed - return # Don't proceed to second if first failed to start - + if not self._start_worker(main_worker, lambda msg: self.docker_utility_finished(msg, "main_img_extract"), + lambda err: self.docker_utility_error(err, "main_img_extract_error"), "cp_main"): # Unique name + self.extract_images_button.setEnabled(True); return self.output_area.append(f"Extraction for main image started. OpenCore extraction will follow.") - def _start_opencore_extraction(self): # Called after main image extraction finishes + def _start_opencore_extraction(self): # ... (same as before, ensure worker_name is unique) if not self.current_container_name or not self.extracted_opencore_image_path: return - cp_oc_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_OPENCORE_QCOW2_PATH, self.extracted_opencore_image_path) oc_worker = DockerCommandWorker(cp_oc_cmd, f"OpenCore image copied to {self.extracted_opencore_image_path}") - self._start_worker(oc_worker, - lambda msg: self.docker_utility_finished(msg, "oc_img_extract"), - lambda err: self.docker_utility_error(err, "oc_img_extract_error")) + self._start_worker(oc_worker, lambda msg: self.docker_utility_finished(msg, "oc_img_extract"), + lambda err: self.docker_utility_error(err, "oc_img_extract_error"), "cp_oc") # Unique name - - def stop_persistent_container(self): + def stop_persistent_container(self): # ... (same as before, ensure worker_name is unique) if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return - self.output_area.append(f"\n--- Stopping container {self.current_container_name} ---") cmd = build_docker_stop_command(self.current_container_name) worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} stopped.") if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "stop_container"), - lambda err: self.docker_utility_error(err, "stop_container_error")): + lambda err: self.docker_utility_error(err, "stop_container_error"), "stop_docker"): # Unique name self.stop_container_button.setEnabled(False) - def remove_persistent_container(self): + def remove_persistent_container(self): # ... (same as before, ensure worker_name is unique) if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return - reply = QMessageBox.question(self, 'Confirm Remove', f"Remove container '{self.current_container_name}'?", - QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) + reply = QMessageBox.question(self, 'Confirm Remove', f"Remove container '{self.current_container_name}'?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) if reply == QMessageBox.StandardButton.No: return - self.output_area.append(f"\n--- Removing container {self.current_container_name} ---") cmd = build_docker_rm_command(self.current_container_name) worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} removed.") if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "rm_container"), - lambda err: self.docker_utility_error(err, "rm_container_error")): + lambda err: self.docker_utility_error(err, "rm_container_error"), "rm_docker"): # Unique name self.remove_container_button.setEnabled(False) - - def docker_utility_finished(self, message, task_id): - self.output_area.append(f"\n--- Task '{task_id}' Succeeded ---\n{message}") - QMessageBox.information(self, f"Task Complete", message) - self.active_worker_thread = None # Allow new worker - - if task_id == "main_img_extract": - self.extraction_status["main"] = True - self._start_opencore_extraction() # Start next part of extraction - elif task_id == "oc_img_extract": - self.extraction_status["opencore"] = True - + def docker_utility_finished(self, message, task_id): # ... (same as before) + self.output_area.append(f"\n--- Task '{task_id}' Succeeded ---\n{message}"); QMessageBox.information(self, f"Task Complete", message) + if task_id == "main_img_extract": self.extraction_status["main"] = True; self._start_opencore_extraction(); return + elif task_id == "oc_img_extract": self.extraction_status["opencore"] = True + self.active_worker_thread = None # Cleared by _start_worker's finished connection if self.extraction_status.get("main") and self.extraction_status.get("opencore"): - self.output_area.append("\nBoth VM images extracted successfully.") - self.update_write_to_usb_button_state() - self.extract_images_button.setEnabled(True) - elif task_id.startswith("extract"): # If one part finished but not both - self.extract_images_button.setEnabled(True) - - if task_id == "stop_container": - self.remove_container_button.setEnabled(True) + self.output_area.append("\nBoth VM images extracted successfully."); self.update_write_to_usb_button_state(); self.extract_images_button.setEnabled(True) + elif task_id.startswith("extract"): self.extract_images_button.setEnabled(True) + if task_id == "stop_container": self.remove_container_button.setEnabled(True) if task_id == "rm_container": - self.current_container_name = None - self.stop_container_button.setEnabled(False) - self.extract_images_button.setEnabled(False) - self.update_write_to_usb_button_state() # Should disable it + self.current_container_name = None; self.stop_container_button.setEnabled(False) + self.extract_images_button.setEnabled(False); self.update_write_to_usb_button_state() - def docker_utility_error(self, error_message, task_id): - self.output_area.append(f"\n--- Task '{task_id}' Error ---\n{error_message}") - QMessageBox.critical(self, f"Task Error", error_message) + def docker_utility_error(self, error_message, task_id): # ... (same as before) + self.output_area.append(f"\n--- Task '{task_id}' Error ---\n{error_message}"); QMessageBox.critical(self, f"Task Error", error_message) self.active_worker_thread = None if task_id.startswith("extract"): self.extract_images_button.setEnabled(True) - if task_id == "stop_container": self.stop_container_button.setEnabled(True) # Allow retry - if task_id == "rm_container": self.remove_container_button.setEnabled(True) # Allow retry + if task_id == "stop_container": self.stop_container_button.setEnabled(True) + if task_id == "rm_container": self.remove_container_button.setEnabled(True) - def handle_error(self, message): - self.output_area.append(f"ERROR: {message}") - QMessageBox.critical(self, "Error", message) - self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True) - self.stop_vm_button.setEnabled(False); self.extract_images_button.setEnabled(False) - self.write_to_usb_button.setEnabled(False) - self.active_worker_thread = None + def handle_error(self, message): # ... (same as before) + self.output_area.append(f"ERROR: {message}"); QMessageBox.critical(self, "Error", message) + self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True); self.stop_vm_button.setEnabled(False) + self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False) + self.active_worker_thread = None; # Clear active thread + # Clear all potential worker instances + for attr_name in list(self.__dict__.keys()): + if attr_name.endswith("_instance") and isinstance(getattr(self,attr_name,None), QObject): + setattr(self,attr_name,None) - def refresh_usb_drives(self): + + def refresh_usb_drives(self): # Modified for Windows self.usb_drive_combo.clear() - self._current_usb_selection_path = self.usb_drive_combo.currentData() # Save current selection - self.output_area.append("\nScanning for USB drives...") - try: - partitions = psutil.disk_partitions(all=False) - potential_usbs = [] - for p in partitions: - is_removable = 'removable' in p.opts - is_likely_usb = False + current_selection_text = getattr(self, '_current_usb_selection_text', None) + self.output_area.append("\nScanning for disk devices...") - if platform.system() == "Windows": - # A more reliable method for Windows would involve WMI or ctypes to query drive types. - # This is a basic filter. - if p.mountpoint and p.fstype and p.fstype.lower() not in ['ntfs', 'refs', 'cdfs'] and len(p.mountpoint) <= 3: # e.g. E:\ - is_likely_usb = True - elif platform.system() == "Darwin": - if p.device.startswith("/dev/disk") and (os.path.exists(f"/sys/block/{os.path.basename(p.device)}/removable") or "external" in p.opts.lower()): # Check 'external' from mount options - is_likely_usb = True - elif platform.system() == "Linux": - # Check if /sys/block/sdX/removable exists and is 1 + current_os = platform.system() + if current_os == "Windows": + self.usb_drive_label.setText("For Windows, identify Physical Disk number (e.g., 1, 2) using Disk Management or 'diskpart > list disk'. Input below.") + self.windows_disk_id_input.setVisible(True) + self.windows_usb_input_label.setVisible(True) + self.usb_drive_combo.setVisible(False) # Hide combo for windows as input is manual + self.refresh_usb_button.setText("List Partitions (Ref.)") # Change button text + try: + partitions = psutil.disk_partitions(all=True) + ref_text = "Reference - Detected partitions/mounts:\n" + for p in partitions: try: - with open(f"/sys/block/{os.path.basename(p.device)}/removable", "r") as f: - if f.read().strip() == "1": - is_likely_usb = True - except IOError: # If the removable file doesn't exist, it's likely not a USB mass storage - pass - if not is_likely_usb and (p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)): # Fallback to mountpoint - is_likely_usb = True - - if is_removable or is_likely_usb: - try: - # Attempt to get disk usage. If it fails, it might be an unformatted or problematic drive. usage = psutil.disk_usage(p.mountpoint) size_gb = usage.total / (1024**3) - if size_gb < 0.1 : continue - drive_text = f"{p.device} @ {p.mountpoint} ({p.fstype}, {size_gb:.2f} GB)" - potential_usbs.append((drive_text, p.device)) - except Exception: pass + ref_text += f" {p.device} @ {p.mountpoint} ({p.fstype}, {size_gb:.2f} GB)\n" + except Exception: + ref_text += f" {p.device} ({p.fstype}) - could not get usage/mountpoint\n" + self.output_area.append(ref_text) + except Exception as e: + self.output_area.append(f"Error listing partitions for reference: {e}") + else: + self.usb_drive_label.setText("Available USB Drives (for Linux/macOS):") + self.windows_disk_id_input.setVisible(False) + self.windows_usb_input_label.setVisible(False) + self.usb_drive_combo.setVisible(True) + self.refresh_usb_button.setText("Refresh List") + try: # psutil logic for Linux/macOS + partitions = psutil.disk_partitions(all=False) + potential_usbs = [] + for p in partitions: + is_removable = 'removable' in p.opts + is_likely_usb = False + if current_os == "Darwin": + if p.device.startswith("/dev/disk") and 'external' in p.opts.lower() and 'physical' in p.opts.lower(): is_likely_usb = True + elif current_os == "Linux": + if (p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)) or (p.device.startswith("/dev/sd") and not p.device.endswith("da")): is_likely_usb = True + if is_removable or is_likely_usb: + try: + usage = psutil.disk_usage(p.mountpoint) + size_gb = usage.total / (1024**3); + if size_gb < 0.1 : continue + drive_text = f"{p.device} @ {p.mountpoint} ({p.fstype}, {size_gb:.2f} GB)" + potential_usbs.append((drive_text, p.device)) + except Exception: pass - idx_to_select = -1 - if potential_usbs: - for i, (text, device_path) in enumerate(potential_usbs): - self.usb_drive_combo.addItem(text, userData=device_path) - if device_path == self._current_usb_selection_path: - idx_to_select = i - self.output_area.append(f"Found {len(potential_usbs)} potential USB drive(s). Please verify carefully.") - else: self.output_area.append("No suitable USB drives found. Ensure drive is connected, formatted, and mounted.") + if potential_usbs: + idx_to_select = -1 + for i, (text, device_path) in enumerate(potential_usbs): + self.usb_drive_combo.addItem(text, userData=device_path) + if text == current_selection_text: idx_to_select = i + if idx_to_select != -1: self.usb_drive_combo.setCurrentIndex(idx_to_select) + self.output_area.append(f"Found {len(potential_usbs)} potential USB drive(s). Please verify carefully.") + else: self.output_area.append("No suitable USB drives found for Linux/macOS.") + except ImportError: self.output_area.append("psutil library not found.") + except Exception as e: self.output_area.append(f"Error scanning for USB drives: {e}") - if idx_to_select != -1: self.usb_drive_combo.setCurrentIndex(idx_to_select) - - except ImportError: self.output_area.append("psutil library not found. USB detection disabled.") - except Exception as e: self.output_area.append(f"Error scanning for USB drives: {e}") self.update_write_to_usb_button_state() - def handle_write_to_usb(self): - if platform.system() != "Linux": - QMessageBox.warning(self, "Unsupported Platform", f"USB writing is currently only implemented for Linux. Your system: {platform.system()}") - return + def handle_write_to_usb(self): # Modified for Windows + current_os = platform.system() + usb_writer_module = None + target_device_id_for_worker = None - if USBWriterLinux is None: - QMessageBox.critical(self, "Error", "USBWriterLinux module could not be loaded. Cannot write to USB.") - return + if current_os == "Linux": + usb_writer_module = USBWriterLinux + target_device_id_for_worker = self.usb_drive_combo.currentData() + elif current_os == "Darwin": + usb_writer_module = USBWriterMacOS + target_device_id_for_worker = self.usb_drive_combo.currentData() + elif current_os == "Windows": + usb_writer_module = USBWriterWindows + # For Windows, device_id for USBWriterWindows is the disk number string + target_device_id_for_worker = self.windows_disk_id_input.text().strip() + if not target_device_id_for_worker.isdigit(): # Basic validation + QMessageBox.warning(self, "Input Required", "Please enter a valid Windows Disk Number (e.g., 1, 2)."); return + # USBWriterWindows expects just the number, it constructs \\.\PhysicalDriveX itself. + + if not usb_writer_module: + QMessageBox.warning(self, "Unsupported Platform", f"USB writing not supported/enabled for {current_os}."); return - selected_drive_device = self.usb_drive_combo.currentData() if not self.extracted_main_image_path or not self.extracted_opencore_image_path or not self.extraction_status["main"] or not self.extraction_status["opencore"]: QMessageBox.warning(self, "Missing Images", "Ensure both images are extracted."); return - if not selected_drive_device: - QMessageBox.warning(self, "No USB Selected", "Please select a target USB drive."); return + if not target_device_id_for_worker: # Should catch empty input for Windows here too + QMessageBox.warning(self, "No USB Selected/Identified", f"Please select/identify the target USB drive for {current_os}."); return - confirm_msg = (f"WARNING: ALL DATA ON {selected_drive_device} WILL BE ERASED PERMANENTLY.\n" + confirm_msg = (f"WARNING: ALL DATA ON TARGET '{target_device_id_for_worker}' WILL BE ERASED PERMANENTLY. +" "Are you absolutely sure you want to proceed?") reply = QMessageBox.warning(self, "Confirm Write Operation", confirm_msg, QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel, QMessageBox.StandardButton.Cancel) if reply == QMessageBox.StandardButton.Cancel: - self.output_area.append("\nUSB write operation cancelled by user."); return + self.output_area.append(" +USB write operation cancelled by user."); return - self.output_area.append(f"\n--- Starting USB Write Process for {selected_drive_device} ---") - self.output_area.append("This will take a long time and requires sudo privileges for underlying commands.") + self.output_area.append(f" +--- Starting USB Write Process for {target_device_id_for_worker} on {current_os} ---") + self.write_to_usb_button.setEnabled(False); self.refresh_usb_button.setEnabled(False) - usb_worker = USBWriterWorker(selected_drive_device, self.extracted_opencore_image_path, self.extracted_main_image_path) - if self._start_worker(usb_worker, self.usb_write_finished, self.usb_write_error): - self.write_to_usb_button.setEnabled(False) # Disable during write - self.refresh_usb_button.setEnabled(False) - else: # Failed to start worker (another is running) - pass # Message already shown by _start_worker + usb_worker = USBWriterWorker(target_device_id_for_worker, self.extracted_opencore_image_path, self.extracted_main_image_path) + if not self._start_worker(usb_worker, self.usb_write_finished, self.usb_write_error, "usb_write"): # worker_name "usb_write" + self.write_to_usb_button.setEnabled(True); self.refresh_usb_button.setEnabled(True) + + @pyqtSlot(str) + def usb_write_finished(self, message): # ... (same as before) + self.output_area.append(f" +--- USB Write Process Finished --- +{message}"); QMessageBox.information(self, "USB Write Complete", message) + self.write_to_usb_button.setEnabled(True); self.refresh_usb_button.setEnabled(True) + self.active_worker_thread = None; setattr(self, "usb_write_instance", None) @pyqtSlot(str) - def usb_write_finished(self, message): - self.output_area.append(f"\n--- USB Write Process Finished ---\n{message}") - QMessageBox.information(self, "USB Write Complete", message) - self.write_to_usb_button.setEnabled(True) # Re-enable after completion - self.refresh_usb_button.setEnabled(True) - self.active_worker_thread = None + def usb_write_error(self, error_message): # ... (same as before) + self.output_area.append(f" +--- USB Write Process Error --- +{error_message}"); QMessageBox.critical(self, "USB Write Error", error_message) + self.write_to_usb_button.setEnabled(True); self.refresh_usb_button.setEnabled(True) + self.active_worker_thread = None; setattr(self, "usb_write_instance", None) - @pyqtSlot(str) - def usb_write_error(self, error_message): - self.output_area.append(f"\n--- USB Write Process Error ---\n{error_message}") - QMessageBox.critical(self, "USB Write Error", error_message) - self.write_to_usb_button.setEnabled(True) # Re-enable after error - self.refresh_usb_button.setEnabled(True) - self.active_worker_thread = None - - def update_write_to_usb_button_state(self): + def update_write_to_usb_button_state(self): # Modified for Windows images_ready = self.extraction_status.get("main", False) and self.extraction_status.get("opencore", False) - usb_selected = bool(self.usb_drive_combo.currentData()) - can_write_on_platform = platform.system() == "Linux" and USBWriterLinux is not None + usb_identified = False + current_os = platform.system() + writer_module = None - self.write_to_usb_button.setEnabled(images_ready and usb_selected and can_write_on_platform) - if not can_write_on_platform and usb_selected and images_ready: - self.write_to_usb_button.setToolTip("USB writing currently only supported on Linux with all dependencies.") + if current_os == "Linux": writer_module = USBWriterLinux + elif current_os == "Darwin": writer_module = USBWriterMacOS + elif current_os == "Windows": writer_module = USBWriterWindows + + if current_os == "Windows": + usb_identified = bool(self.windows_disk_id_input.text().strip().isdigit()) # Must be a digit for disk ID else: - self.write_to_usb_button.setToolTip("") + usb_identified = bool(self.usb_drive_combo.currentData()) + + self.write_to_usb_button.setEnabled(images_ready and usb_identified and writer_module is not None) + # ... (Tooltip logic same as before) ... + if writer_module is None: self.write_to_usb_button.setToolTip(f"USB Writing not supported on {current_os} or module missing.") + elif not images_ready: self.write_to_usb_button.setToolTip("Extract VM images first.") + elif not usb_identified: + if current_os == "Windows": self.write_to_usb_button.setToolTip("Enter a valid Windows Disk Number.") + else: self.write_to_usb_button.setToolTip("Select a target USB drive.") + else: self.write_to_usb_button.setToolTip("") - def closeEvent(self, event): + def closeEvent(self, event): # ... (same as before) + self._current_usb_selection_text = self.usb_drive_combo.currentText() if self.active_worker_thread and self.active_worker_thread.isRunning(): - reply = QMessageBox.question(self, 'Confirm Exit', "An operation is running. Exit anyway?", - QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) + reply = QMessageBox.question(self, 'Confirm Exit', "An operation is running. Exit anyway?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) if reply == QMessageBox.StandardButton.Yes: - # Attempt to stop the specific worker if identifiable, or just quit thread - # For DockerRunWorker: - if hasattr(self, 'docker_run_worker_instance') and self.active_worker_thread.findChild(DockerRunWorker): - self.docker_run_worker_instance.stop() - # For USBWriterWorker, it doesn't have an explicit stop, rely on thread termination. - - self.active_worker_thread.quit() - if not self.active_worker_thread.wait(1000): # brief wait - self.output_area.append("Worker thread did not terminate gracefully. Forcing exit.") + worker_instance_attr_name = self.active_worker_thread.objectName().replace("_thread", "_instance") + worker_to_stop = getattr(self, worker_instance_attr_name, None) + if worker_to_stop and hasattr(worker_to_stop, 'stop'): worker_to_stop.stop() + else: self.active_worker_thread.quit() + self.active_worker_thread.wait(1000) event.accept() - else: event.ignore() - elif self.current_container_name and self.stop_container_button.isEnabled(): # Check only if stop button is enabled (meaning container might be running or exists) - reply = QMessageBox.question(self, 'Confirm Exit', f"Container '{self.current_container_name}' may still exist or be running. It's recommended to stop and remove it using the GUI buttons. Exit anyway?", - QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) + else: event.ignore(); return + elif self.current_container_name and self.stop_container_button.isEnabled(): + reply = QMessageBox.question(self, 'Confirm Exit', f"Container '{self.current_container_name}' may still exist. Exit anyway?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) if reply == QMessageBox.StandardButton.Yes: event.accept() else: event.ignore() - else: - event.accept() - + else: event.accept() if __name__ == "__main__": app = QApplication(sys.argv) diff --git a/usb_writer_linux.py b/usb_writer_linux.py index 7442a0b..6e8a236 100644 --- a/usb_writer_linux.py +++ b/usb_writer_linux.py @@ -2,259 +2,286 @@ import subprocess import os import time - -# Placeholder for progress reporting signal if this were a QObject -# from PyQt6.QtCore import pyqtSignal +import shutil # For checking command existence class USBWriterLinux: - # progress_signal = pyqtSignal(str) # Example for QObject integration - def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None): - """ - Args: - device: The path to the USB device (e.g., /dev/sdx). - opencore_qcow2_path: Path to the OpenCore.qcow2 image. - macos_qcow2_path: Path to the mac_hdd_ng.img (qcow2). - progress_callback: A function to call with progress strings. - """ self.device = device self.opencore_qcow2_path = opencore_qcow2_path self.macos_qcow2_path = macos_qcow2_path self.progress_callback = progress_callback - self.opencore_raw_path = "opencore.raw" # Temporary raw image - self.macos_raw_path = "macos_main.raw" # Temporary raw image - self.mount_point_opencore_efi = "/mnt/opencore_efi_temp" - self.mount_point_usb_esp = "/mnt/usb_esp_temp" + # Define unique temporary file and mount point names + pid = os.getpid() # Make temp names more unique if multiple instances run (though unlikely for this app) + self.opencore_raw_path = f"opencore_temp_{pid}.raw" + self.macos_raw_path = f"macos_main_temp_{pid}.raw" + self.mount_point_opencore_efi = f"/mnt/opencore_efi_temp_skyscope_{pid}" + self.mount_point_usb_esp = f"/mnt/usb_esp_temp_skyscope_{pid}" + self.mount_point_macos_source = f"/mnt/macos_source_temp_skyscope_{pid}" + self.mount_point_usb_macos_target = f"/mnt/usb_macos_target_temp_skyscope_{pid}" + self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path] + self.temp_mount_points_to_clean = [ + self.mount_point_opencore_efi, self.mount_point_usb_esp, + self.mount_point_macos_source, self.mount_point_usb_macos_target + ] def _report_progress(self, message: str): print(message) # For standalone testing if self.progress_callback: self.progress_callback(message) - def _run_command(self, command: list[str], check=True, capture_output=False, shell=False): - self._report_progress(f"Executing: {' '.join(command)}") + def _run_command(self, command: list[str], check=True, capture_output=False, shell=False, timeout=None): + self.progress_callback(f"Executing: {' '.join(command)}") try: process = subprocess.run( command, check=check, capture_output=capture_output, text=True, - shell=shell # Use shell=True with caution + shell=shell, # Use shell=True with caution + timeout=timeout ) + # Log stdout/stderr only if capture_output is True and content exists if capture_output: - if process.stdout: self._report_progress(f"STDOUT: {process.stdout.strip()}") - if process.stderr: self._report_progress(f"STDERR: {process.stderr.strip()}") + if process.stdout and process.stdout.strip(): + self._report_progress(f"STDOUT: {process.stdout.strip()}") + if process.stderr and process.stderr.strip(): + self._report_progress(f"STDERR: {process.stderr.strip()}") return process + except subprocess.TimeoutExpired: + self._report_progress(f"Command {' '.join(command)} timed out after {timeout} seconds.") + raise except subprocess.CalledProcessError as e: - self._report_progress(f"Error executing {' '.join(command)}: {e}") + self._report_progress(f"Error executing {' '.join(command)} (return code {e.returncode}): {e}") if e.stderr: self._report_progress(f"STDERR: {e.stderr.strip()}") - if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}") + if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}") # Sometimes errors go to stdout raise except FileNotFoundError: - self._report_progress(f"Error: Command {command[0]} not found. Is it installed and in PATH?") + self._report_progress(f"Error: Command '{command[0]}' not found. Is it installed and in PATH?") raise def _cleanup_temp_files(self): - self._report_progress("Cleaning up temporary files...") - for f_path in [self.opencore_raw_path, self.macos_raw_path]: + self._report_progress("Cleaning up temporary image files...") + for f_path in self.temp_files_to_clean: if os.path.exists(f_path): try: - os.remove(f_path) + self._run_command(["sudo", "rm", "-f", f_path], check=False) # Use sudo rm for root-owned files self._report_progress(f"Removed {f_path}") - except OSError as e: - self._report_progress(f"Error removing {f_path}: {e}") + except Exception as e: # Catch broad exceptions from _run_command + self._report_progress(f"Error removing {f_path} via sudo rm: {e}") - def _unmount_and_remove_dir(self, mount_point): + def _unmount_path(self, mount_point): if os.path.ismount(mount_point): - self._run_command(["sudo", "umount", mount_point], check=False) - if os.path.exists(mount_point): + self._report_progress(f"Unmounting {mount_point}...") + self._run_command(["sudo", "umount", "-lf", mount_point], check=False, timeout=30) + + def _remove_dir_if_exists(self, dir_path): + if os.path.exists(dir_path): try: - os.rmdir(mount_point) - except OSError as e: - self._report_progress(f"Could not rmdir {mount_point}: {e}. May need manual cleanup.") + self._run_command(["sudo", "rmdir", dir_path], check=False) + except Exception as e: # Catch broad exceptions from _run_command + self._report_progress(f"Could not rmdir {dir_path}: {e}. May need manual cleanup.") - def _cleanup_mappings_and_mounts(self): - self._report_progress("Cleaning up mappings and mounts...") - self._unmount_and_remove_dir(self.mount_point_opencore_efi) - self._unmount_and_remove_dir(self.mount_point_usb_esp) + def _cleanup_all_mounts_and_mappings(self): + self._report_progress("Cleaning up all temporary mounts and kpartx mappings...") + for mp in self.temp_mount_points_to_clean: + self._unmount_path(mp) # Unmount first - # Unmap kpartx devices - this is tricky as we don't know the loop device name easily without parsing - # For OpenCore raw image - if os.path.exists(self.opencore_raw_path): + # Detach kpartx for raw images + if os.path.exists(self.opencore_raw_path): # Check if raw file was even created self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path], check=False) - # For the USB device itself, if kpartx was used on it (it shouldn't be for this workflow) - # self._run_command(["sudo", "kpartx", "-d", self.device], check=False) + if os.path.exists(self.macos_raw_path): + self._run_command(["sudo", "kpartx", "-d", self.macos_raw_path], check=False) + + # Remove mount point directories after unmounting and detaching + for mp in self.temp_mount_points_to_clean: + self._remove_dir_if_exists(mp) def check_dependencies(self): - self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat)...") - dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat"] + self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat, mkfs.hfsplus, apfs-fuse)...") + dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat", "mkfs.hfsplus", "apfs-fuse"] + missing_deps = [] for dep in dependencies: - try: - self._run_command([dep, "--version" if dep != "kpartx" and dep != "mkfs.vfat" else "-V"], capture_output=True) # kpartx has no version, mkfs.vfat uses -V - except (FileNotFoundError, subprocess.CalledProcessError) as e: - self._report_progress(f"Dependency {dep} not found or not working: {e}") - raise RuntimeError(f"Dependency {dep} not found. Please install it.") - self._report_progress("All dependencies found.") + if not shutil.which(dep): + missing_deps.append(dep) + + if missing_deps: + msg = f"Missing dependencies: {', '.join(missing_deps)}. Please install them. `apfs-fuse` may require manual installation from source or a user repository (e.g., AUR for Arch Linux)." + self._report_progress(msg) + raise RuntimeError(msg) + + self._report_progress("All critical dependencies found.") return True + def _get_mapped_partition_device(self, kpartx_output: str, partition_index_in_image: int = 1) -> str: + lines = kpartx_output.splitlines() + # Try to find loopXpY where Y is partition_index_in_image + for line in lines: + parts = line.split() + if len(parts) > 2 and parts[0] == "add" and parts[1] == "map" and f"p{partition_index_in_image}" in parts[2]: + return f"/dev/mapper/{parts[2]}" + # Fallback for images that might be a single partition mapped directly (e.g. loopX) + # This is less common for full disk images like OpenCore.qcow2 or mac_hdd_ng.img + if partition_index_in_image == 1 and len(lines) == 1: # Only one mapping line + parts = lines[0].split() + if len(parts) > 2 and parts[0] == "add" and parts[1] == "map": + # Check if it does NOT look like a partition (no 'p' number) + if 'p' not in parts[2]: + return f"/dev/mapper/{parts[2]}" # e.g. /dev/mapper/loop0 + self._report_progress(f"Could not find partition index {partition_index_in_image} in kpartx output:\n{kpartx_output}") + return None + def format_and_write(self) -> bool: + # Ensure cleanup runs even if errors occur early try: self.check_dependencies() + self._cleanup_all_mounts_and_mappings() # Clean before start, just in case + + for mp in self.temp_mount_points_to_clean: # Create mount point directories + self._run_command(["sudo", "mkdir", "-p", mp]) self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!") - # Unmount any existing partitions on the target USB device - self._report_progress(f"Unmounting all partitions on {self.device}...") - for i in range(1, 5): # Try to unmount a few potential partitions - self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False) - self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False) # for nvme like + self._report_progress(f"Unmounting all partitions on {self.device} (best effort)...") + for i in range(1, 10): + self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False, timeout=5) + self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False, timeout=5) - # Create new GPT partition table self._report_progress(f"Creating new GPT partition table on {self.device}...") - self._run_command(["sudo", "parted", "-s", self.device, "mklabel", "gpt"]) - - # Create EFI partition (e.g., 512MB) + self._run_command(["sudo", "parted", "--script", self.device, "mklabel", "gpt"]) self._report_progress("Creating EFI partition (ESP)...") - self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "EFI", "fat32", "1MiB", "513MiB"]) - self._run_command(["sudo", "parted", "-s", self.device, "set", "1", "esp", "on"]) - - # Create macOS partition (remaining space) + self._run_command(["sudo", "parted", "--script", self.device, "mkpart", "EFI", "fat32", "1MiB", "551MiB"]) + self._run_command(["sudo", "parted", "--script", self.device, "set", "1", "esp", "on"]) self._report_progress("Creating macOS partition...") - self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "macOS", "hfs+", "513MiB", "100%"]) + self._run_command(["sudo", "parted", "--script", self.device, "mkpart", "macOS", "hfs+", "551MiB", "100%"]) - # Inform kernel of partition changes - self._run_command(["sudo", "partprobe", self.device]) - time.sleep(2) # Give kernel time to recognize new partitions + self._run_command(["sudo", "partprobe", self.device], timeout=10) + time.sleep(3) - # Determine partition names (e.g., /dev/sdx1, /dev/sdx2) - # This can be unreliable. A better way is `lsblk -jo NAME,PATH /dev/sdx` - # For simplicity, assuming /dev/sdx1 for ESP, /dev/sdx2 for macOS partition - esp_partition = f"{self.device}1" - if not os.path.exists(esp_partition): esp_partition = f"{self.device}p1" # for nvme like /dev/nvme0n1p1 + esp_partition_dev = f"{self.device}1" if os.path.exists(f"{self.device}1") else f"{self.device}p1" + macos_partition_dev = f"{self.device}2" if os.path.exists(f"{self.device}2") else f"{self.device}p2" - macos_partition = f"{self.device}2" - if not os.path.exists(macos_partition): macos_partition = f"{self.device}p2" + if not (os.path.exists(esp_partition_dev) and os.path.exists(macos_partition_dev)): + raise RuntimeError(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition_dev} and {macos_partition_dev} to exist after partprobe.") - if not (os.path.exists(esp_partition) and os.path.exists(macos_partition)): - self._report_progress(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition} and {macos_partition}") - # Attempt to find them via lsblk if possible (more robust) - try: - lsblk_out = self._run_command(["lsblk", "-no", "NAME", "--paths", self.device], capture_output=True, check=True).stdout.strip().splitlines() - if len(lsblk_out) > 2 : # Device itself + at least 2 partitions - esp_partition = lsblk_out[1] - macos_partition = lsblk_out[2] - self._report_progress(f"Determined partitions using lsblk: ESP={esp_partition}, macOS={macos_partition}") - else: - raise RuntimeError("lsblk did not return enough partitions.") - except Exception as e_lsblk: - self._report_progress(f"Failed to determine partitions using lsblk: {e_lsblk}") - raise RuntimeError("Could not determine partition device names after partitioning.") - - - # Format ESP as FAT32 - self._report_progress(f"Formatting ESP ({esp_partition}) as FAT32...") - self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition]) + self._report_progress(f"Formatting ESP ({esp_partition_dev}) as FAT32...") + self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition_dev]) # --- Write EFI content --- - self._report_progress(f"Converting OpenCore QCOW2 image ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...") + self._report_progress(f"Converting OpenCore QCOW2 ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...") self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path]) - self._report_progress(f"Mapping partitions from {self.opencore_raw_path}...") - map_output = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout - self._report_progress(f"kpartx output: {map_output}") - # Example output: add map loop0p1 (253:0): 0 1048576 linear /dev/loop0 2048 - # We need to parse "loop0p1" or similar from this. - mapped_efi_partition_name = None - for line in map_output.splitlines(): - if "loop" in line and "p1" in line: # Assuming first partition is EFI - parts = line.split() - if len(parts) > 2: - mapped_efi_partition_name = parts[2] # e.g., loop0p1 - break - - if not mapped_efi_partition_name: - raise RuntimeError(f"Could not determine mapped EFI partition name from kpartx output for {self.opencore_raw_path}.") - - mapped_efi_device = f"/dev/mapper/{mapped_efi_partition_name}" - self._report_progress(f"Mapped OpenCore EFI partition: {mapped_efi_device}") - - os.makedirs(self.mount_point_opencore_efi, exist_ok=True) - os.makedirs(self.mount_point_usb_esp, exist_ok=True) + map_output_efi = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout + mapped_efi_device = self._get_mapped_partition_device(map_output_efi, 1) # EFI is partition 1 in OpenCore.qcow2 + if not mapped_efi_device: raise RuntimeError(f"Could not map EFI partition from {self.opencore_raw_path}.") + self._report_progress(f"Mapped OpenCore EFI partition device: {mapped_efi_device}") self._report_progress(f"Mounting {mapped_efi_device} to {self.mount_point_opencore_efi}...") self._run_command(["sudo", "mount", "-o", "ro", mapped_efi_device, self.mount_point_opencore_efi]) + self._report_progress(f"Mounting USB ESP ({esp_partition_dev}) to {self.mount_point_usb_esp}...") + self._run_command(["sudo", "mount", esp_partition_dev, self.mount_point_usb_esp]) - self._report_progress(f"Mounting USB ESP ({esp_partition}) to {self.mount_point_usb_esp}...") - self._run_command(["sudo", "mount", esp_partition, self.mount_point_usb_esp]) + self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi}/EFI to {self.mount_point_usb_esp}/EFI...") + source_efi_content_path = os.path.join(self.mount_point_opencore_efi, "EFI") + if not os.path.isdir(source_efi_content_path): # Check if EFI folder is in root of partition + source_efi_content_path = self.mount_point_opencore_efi # Assume content is in root - self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi} to {self.mount_point_usb_esp}...") - # Copy contents of EFI folder - source_efi_dir = os.path.join(self.mount_point_opencore_efi, "EFI") - if not os.path.exists(source_efi_dir): # Sometimes it's directly in the root of the partition image - source_efi_dir = self.mount_point_opencore_efi + target_efi_dir_on_usb = os.path.join(self.mount_point_usb_esp, "EFI") + self._run_command(["sudo", "mkdir", "-p", target_efi_dir_on_usb]) + self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_content_path}/", f"{target_efi_dir_on_usb}/"]) # Copy content of EFI - self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_dir}/", f"{self.mount_point_usb_esp}/"]) + self._unmount_path(self.mount_point_opencore_efi) + self._unmount_path(self.mount_point_usb_esp) + self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path]) + # --- Write macOS main image (File-level copy) --- + self._report_progress(f"Formatting macOS partition ({macos_partition_dev}) on USB as HFS+...") + self._run_command(["sudo", "mkfs.hfsplus", "-v", "macOS_USB", macos_partition_dev]) - self._report_progress("Unmounting OpenCore EFI and USB ESP...") - self._run_command(["sudo", "umount", self.mount_point_opencore_efi]) - self._run_command(["sudo", "umount", self.mount_point_usb_esp]) - self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path]) # Unmap loop device - - # --- Write macOS main image --- - self._report_progress(f"Converting macOS QCOW2 image ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...") + self._report_progress(f"Converting macOS QCOW2 ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...") self._report_progress("This may take a very long time and consume significant disk space temporarily.") - # Add dd progress status if possible, or estimate time based on size - # For qemu-img, there's no easy progress for convert. self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path]) - self._report_progress(f"Writing RAW macOS image ({self.macos_raw_path}) to {macos_partition}...") - self._report_progress("This will also take a very long time. Please be patient.") - # Using dd with progress status - dd_command = ["sudo", "dd", f"if={self.macos_raw_path}", f"of={macos_partition}", "bs=4M", "status=progress", "conv=fsync"] - self._run_command(dd_command) + self._report_progress(f"Mapping partitions from macOS RAW image ({self.macos_raw_path})...") + map_output_macos = self._run_command(["sudo", "kpartx", "-av", self.macos_raw_path], capture_output=True).stdout + # The mac_hdd_ng.img usually contains an APFS container. + # kpartx might show multiple APFS volumes within the container, or the container partition itself. + # We need to mount the APFS Data or System volume. + # Typically, the main usable partition is the largest one, or the second one (after a small EFI if present in this image). + mapped_macos_device = self._get_mapped_partition_device(map_output_macos, 2) # Try p2 (common for APFS container) + if not mapped_macos_device: + mapped_macos_device = self._get_mapped_partition_device(map_output_macos, 1) # Fallback to p1 + if not mapped_macos_device: + raise RuntimeError(f"Could not identify and map main macOS data partition from {self.macos_raw_path}.") + self._report_progress(f"Mapped macOS source partition device: {mapped_macos_device}") + + self._report_progress(f"Mounting source macOS partition ({mapped_macos_device}) to {self.mount_point_macos_source} using apfs-fuse...") + self._run_command(["sudo", "apfs-fuse", "-o", "ro,allow_other", mapped_macos_device, self.mount_point_macos_source]) + + self._report_progress(f"Mounting target USB macOS partition ({macos_partition_dev}) to {self.mount_point_usb_macos_target}...") + self._run_command(["sudo", "mount", macos_partition_dev, self.mount_point_usb_macos_target]) + + self._report_progress(f"Copying macOS system files from {self.mount_point_macos_source} to {self.mount_point_usb_macos_target} using rsync...") + self._report_progress("This will take a very long time. Please be patient.") + self._run_command(["sudo", "rsync", "-avh", "--delete", f"{self.mount_point_macos_source}/", f"{self.mount_point_usb_macos_target}/"]) # Note trailing slashes self._report_progress("USB writing process completed successfully.") return True except Exception as e: self._report_progress(f"An error occurred during USB writing: {e}") + import traceback + self._report_progress(traceback.format_exc()) # Log full traceback for debugging return False finally: - self._cleanup_mappings_and_mounts() + self._cleanup_all_mounts_and_mappings() self._cleanup_temp_files() if __name__ == '__main__': - # This is for standalone testing of this script. - # YOU MUST RUN THIS SCRIPT WITH SUDO for it to work. - # BE EXTREMELY CAREFUL with the device path. if os.geteuid() != 0: print("Please run this script as root (sudo) for testing.") exit(1) - print("USB Writer Linux Standalone Test") - # Replace with actual paths to your QCOW2 files for testing - test_opencore_qcow2 = "path_to_your/OpenCore.qcow2" - test_macos_qcow2 = "path_to_your/mac_hdd_ng.img" + print("USB Writer Linux Standalone Test - REFACTORED for File Copy") + + # Create dummy qcow2 files for testing script structure + # These won't result in a bootable USB but allow testing the commands. + mock_opencore_path = "mock_opencore_usb_writer.qcow2" + mock_macos_path = "mock_macos_usb_writer.qcow2" + + print(f"Creating mock image: {mock_opencore_path}") + subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_opencore_path, "384M"], check=True) + # TODO: A more complex mock would involve creating a partition table and filesystem inside this qcow2. + # For now, this is just to ensure the file exists for qemu-img convert. + # Actual EFI content would be needed for kpartx to map something meaningful. + + print(f"Creating mock image: {mock_macos_path}") + subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_macos_path, "1G"], check=True) # Small for quick test + # TODO: Similar to above, a real test needs a qcow2 with a mountable filesystem. - # IMPORTANT: List available block devices to help user choose. print("\nAvailable block devices (be careful!):") subprocess.run(["lsblk", "-d", "-o", "NAME,SIZE,MODEL"], check=True) - test_device = input("\nEnter target device (e.g., /dev/sdX). THIS DEVICE WILL BE WIPED: ") - if not test_device or not test_device.startswith("/dev/"): + + if not test_device or not (test_device.startswith("/dev/") or test_device.startswith("/dev/mapper/")): # Allow /dev/mapper for testing with loop devices print("Invalid device. Exiting.") + # Clean up mock files + if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path) + if os.path.exists(mock_macos_path): os.remove(mock_macos_path) exit(1) - if not (os.path.exists(test_opencore_qcow2) and os.path.exists(test_macos_qcow2)): - print(f"Test files {test_opencore_qcow2} or {test_macos_qcow2} not found. Skipping write test.") + confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write mock images? (yes/NO): ") + success = False + if confirm.lower() == 'yes': + writer = USBWriterLinux(test_device, mock_opencore_path, mock_macos_path, print) + success = writer.format_and_write() else: - confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write images? (yes/NO): ") - if confirm.lower() == 'yes': - writer = USBWriterLinux(test_device, test_opencore_qcow2, test_macos_qcow2, print) - writer.format_and_write() - else: - print("Test cancelled by user.") + print("Test cancelled by user.") + + print(f"Test finished. Success: {success}") + # Clean up mock files + if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path) + if os.path.exists(mock_macos_path): os.remove(mock_macos_path) + print("Mock files cleaned up.") diff --git a/usb_writer_macos.py b/usb_writer_macos.py new file mode 100644 index 0000000..46aa992 --- /dev/null +++ b/usb_writer_macos.py @@ -0,0 +1,313 @@ +# usb_writer_macos.py +import subprocess +import os +import time +import shutil # For checking command existence +import plistlib # For parsing diskutil list -plist output + +class USBWriterMacOS: + def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None): + self.device = device # Should be like /dev/diskX + self.opencore_qcow2_path = opencore_qcow2_path + self.macos_qcow2_path = macos_qcow2_path + self.progress_callback = progress_callback + + pid = os.getpid() + self.opencore_raw_path = f"opencore_temp_{pid}.raw" + self.macos_raw_path = f"macos_main_temp_{pid}.raw" + self.temp_opencore_mount = f"/tmp/opencore_efi_temp_skyscope_{pid}" + self.temp_usb_esp_mount = f"/tmp/usb_esp_temp_skyscope_{pid}" + self.temp_macos_source_mount = f"/tmp/macos_source_temp_skyscope_{pid}" + self.temp_usb_macos_target_mount = f"/tmp/usb_macos_target_temp_skyscope_{pid}" + + self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path] + self.temp_mount_points_to_clean = [ + self.temp_opencore_mount, self.temp_usb_esp_mount, + self.temp_macos_source_mount, self.temp_usb_macos_target_mount + ] + self.attached_raw_images_devices = [] # Store devices from hdiutil attach + + def _report_progress(self, message: str): + print(message) # For standalone testing + if self.progress_callback: + self.progress_callback(message) + + def _run_command(self, command: list[str], check=True, capture_output=False, timeout=None): + self._report_progress(f"Executing: {' '.join(command)}") + try: + process = subprocess.run( + command, check=check, capture_output=capture_output, text=True, timeout=timeout + ) + if capture_output: + if process.stdout and process.stdout.strip(): + self._report_progress(f"STDOUT: {process.stdout.strip()}") + if process.stderr and process.stderr.strip(): + self._report_progress(f"STDERR: {process.stderr.strip()}") + return process + except subprocess.TimeoutExpired: + self._report_progress(f"Command {' '.join(command)} timed out after {timeout} seconds.") + raise + except subprocess.CalledProcessError as e: + self._report_progress(f"Error executing {' '.join(command)} (code {e.returncode}): {e.stderr or e.stdout or str(e)}") + raise + except FileNotFoundError: + self._report_progress(f"Error: Command '{command[0]}' not found. Is it installed and in PATH?") + raise + + def _cleanup_temp_files(self): + self._report_progress("Cleaning up temporary image files...") + for f_path in self.temp_files_to_clean: + if os.path.exists(f_path): + try: + os.remove(f_path) + self._report_progress(f"Removed {f_path}") + except OSError as e: + self._report_progress(f"Error removing {f_path}: {e}") + + def _unmount_path(self, mount_path_or_device, is_device=False, force=False): + target = mount_path_or_device + cmd_base = ["diskutil"] + action = "unmountDisk" if is_device else "unmount" + + if force: + cmd = cmd_base + [action, "force", target] + else: + cmd = cmd_base + [action, target] + + is_target_valid_for_unmount = (os.path.ismount(mount_path_or_device) and not is_device) or \ + (is_device and os.path.exists(target)) + + if is_target_valid_for_unmount: + self._report_progress(f"Attempting to unmount {target} (Action: {action}, Force: {force})...") + self._run_command(cmd, check=False, timeout=30) + + def _detach_raw_image_device(self, device_path): + if device_path and os.path.exists(device_path): + self._report_progress(f"Detaching raw image device {device_path}...") + try: + info_check = subprocess.run(["diskutil", "info", device_path], capture_output=True, text=True, check=False) + if info_check.returncode == 0: + self._run_command(["hdiutil", "detach", device_path, "-force"], check=False, timeout=30) + else: + self._report_progress(f"Device {device_path} appears invalid or already detached.") + except Exception as e: + self._report_progress(f"Exception while checking/detaching {device_path}: {e}") + + def _cleanup_all_mounts_and_mappings(self): + self._report_progress("Cleaning up all temporary mounts and attached raw images...") + for mp in reversed(self.temp_mount_points_to_clean): + self._unmount_path(mp, force=True) + if os.path.exists(mp): + try: os.rmdir(mp) + except OSError as e: self._report_progress(f"Could not rmdir {mp}: {e}") + + devices_to_detach = list(self.attached_raw_images_devices) + for dev_path in devices_to_detach: + self._detach_raw_image_device(dev_path) + self.attached_raw_images_devices = [] + + + def check_dependencies(self): + self._report_progress("Checking dependencies (qemu-img, diskutil, hdiutil, rsync)...") + dependencies = ["qemu-img", "diskutil", "hdiutil", "rsync"] + missing_deps = [] + for dep in dependencies: + if not shutil.which(dep): + missing_deps.append(dep) + + if missing_deps: + msg = f"Missing dependencies: {', '.join(missing_deps)}. `qemu-img` might need to be installed (e.g., via Homebrew: `brew install qemu`). `diskutil`, `hdiutil`, `rsync` are usually standard on macOS." + self._report_progress(msg) + raise RuntimeError(msg) + + self._report_progress("All critical dependencies found.") + return True + + def _get_partition_device_id(self, parent_disk_id_str: str, partition_label_or_type: str) -> str | None: + """Finds partition device ID by Volume Name or Content Hint.""" + target_disk_id = parent_disk_id_str.replace("/dev/", "") + self._report_progress(f"Searching for partition '{partition_label_or_type}' on disk '{target_disk_id}'") + try: + result = self._run_command(["diskutil", "list", "-plist", target_disk_id], capture_output=True) + if not result.stdout: + self._report_progress(f"No stdout from diskutil list for {target_disk_id}") + return None + + plist_data = plistlib.loads(result.stdout.encode('utf-8')) + + all_disks_and_partitions = plist_data.get("AllDisksAndPartitions", []) + if not isinstance(all_disks_and_partitions, list): + if plist_data.get("DeviceIdentifier") == target_disk_id: + all_disks_and_partitions = [plist_data] + else: + all_disks_and_partitions = [] + + for disk_info_entry in all_disks_and_partitions: + current_disk_id_in_plist = disk_info_entry.get("DeviceIdentifier") + if current_disk_id_in_plist == target_disk_id: + for part_info in disk_info_entry.get("Partitions", []): + vol_name = part_info.get("VolumeName") + content_hint = part_info.get("Content") + device_id = part_info.get("DeviceIdentifier") + + if device_id: + if vol_name and vol_name.strip().lower() == partition_label_or_type.strip().lower(): + self._report_progress(f"Found partition by VolumeName: {vol_name} -> /dev/{device_id}") + return f"/dev/{device_id}" + if content_hint and content_hint.strip().lower() == partition_label_or_type.strip().lower(): + self._report_progress(f"Found partition by Content type: {content_hint} -> /dev/{device_id}") + return f"/dev/{device_id}" + + self._report_progress(f"Partition '{partition_label_or_type}' not found on disk '{target_disk_id}'.") + return None + except Exception as e: + self._report_progress(f"Error parsing 'diskutil list -plist {target_disk_id}': {e}") + return None + + def format_and_write(self) -> bool: + try: + self.check_dependencies() + self._cleanup_all_mounts_and_mappings() + + for mp in self.temp_mount_points_to_clean: + os.makedirs(mp, exist_ok=True) + + self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!") + self._report_progress(f"Unmounting disk {self.device} (force)...") + self._unmount_path(self.device, is_device=True, force=True) + time.sleep(2) + + self._report_progress(f"Partitioning {self.device} with GPT scheme...") + self._run_command([ + "diskutil", "partitionDisk", self.device, "GPT", + "MS-DOS FAT32", "EFI", "551MiB", + "JHFS+", "macOS_USB", "0b" + ], timeout=180) + time.sleep(3) + + esp_partition_dev = self._get_partition_device_id(self.device, "EFI") + macos_partition_dev = self._get_partition_device_id(self.device, "macOS_USB") + + if not (esp_partition_dev and os.path.exists(esp_partition_dev)): + esp_partition_dev = f"{self.device}s1" + if not (macos_partition_dev and os.path.exists(macos_partition_dev)): + macos_partition_dev = f"{self.device}s2" + + if not (os.path.exists(esp_partition_dev) and os.path.exists(macos_partition_dev)): + raise RuntimeError(f"Could not identify partitions on {self.device}. ESP: {esp_partition_dev}, macOS: {macos_partition_dev}") + + self._report_progress(f"Identified ESP: {esp_partition_dev}, macOS Partition: {macos_partition_dev}") + + # --- Write EFI content --- + self._report_progress(f"Converting OpenCore QCOW2 ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...") + self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path]) + + self._report_progress(f"Attaching RAW OpenCore image ({self.opencore_raw_path})...") + attach_cmd_efi = ["hdiutil", "attach", "-nomount", "-imagekey", "diskimage-class=CRawDiskImage", self.opencore_raw_path] + efi_attach_output = self._run_command(attach_cmd_efi, capture_output=True).stdout.strip() + raw_efi_disk_id = efi_attach_output.splitlines()[-1].strip().split()[0] + if not raw_efi_disk_id.startswith("/dev/disk"): + raise RuntimeError(f"Failed to attach raw EFI image: {efi_attach_output}") + self.attached_raw_images_devices.append(raw_efi_disk_id) + self._report_progress(f"Attached raw OpenCore image as {raw_efi_disk_id}") + time.sleep(2) + + source_efi_partition_dev = self._get_partition_device_id(raw_efi_disk_id, "EFI") or f"{raw_efi_disk_id}s1" + + self._report_progress(f"Mounting source EFI partition ({source_efi_partition_dev}) to {self.temp_opencore_mount}...") + self._run_command(["diskutil", "mount", "readOnly", "-mountPoint", self.temp_opencore_mount, source_efi_partition_dev], timeout=30) + + self._report_progress(f"Mounting target USB ESP ({esp_partition_dev}) to {self.temp_usb_esp_mount}...") + self._run_command(["diskutil", "mount", "-mountPoint", self.temp_usb_esp_mount, esp_partition_dev], timeout=30) + + source_efi_content_path = os.path.join(self.temp_opencore_mount, "EFI") + if not os.path.isdir(source_efi_content_path): source_efi_content_path = self.temp_opencore_mount + + target_efi_dir_on_usb = os.path.join(self.temp_usb_esp_mount, "EFI") + self._report_progress(f"Copying EFI files from {source_efi_content_path} to {target_efi_dir_on_usb}...") + self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_content_path}/", f"{target_efi_dir_on_usb}/"]) + + self._unmount_path(self.temp_opencore_mount, force=True) + self._unmount_path(self.temp_usb_esp_mount, force=True) + self._detach_raw_image_device(raw_efi_disk_id); raw_efi_disk_id = None + + # --- Write macOS main image (File-level copy) --- + self._report_progress(f"Converting macOS QCOW2 ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...") + self._report_progress("This may take a very long time...") + self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path]) + + self._report_progress(f"Attaching RAW macOS image ({self.macos_raw_path})...") + attach_cmd_macos = ["hdiutil", "attach", "-nomount", "-imagekey", "diskimage-class=CRawDiskImage", self.macos_raw_path] + macos_attach_output = self._run_command(attach_cmd_macos, capture_output=True).stdout.strip() + raw_macos_disk_id = macos_attach_output.splitlines()[-1].strip().split()[0] + if not raw_macos_disk_id.startswith("/dev/disk"): + raise RuntimeError(f"Failed to attach raw macOS image: {macos_attach_output}") + self.attached_raw_images_devices.append(raw_macos_disk_id) + self._report_progress(f"Attached raw macOS image as {raw_macos_disk_id}") + time.sleep(2) + + source_macos_part_dev = self._get_partition_device_id(raw_macos_disk_id, "Apple_APFS_Container") or \ + self._get_partition_device_id(raw_macos_disk_id, "Apple_APFS") or \ + self._get_partition_device_id(raw_macos_disk_id, "Apple_HFS") or \ + f"{raw_macos_disk_id}s2" + if not (source_macos_part_dev and os.path.exists(source_macos_part_dev)): + raise RuntimeError(f"Could not find source macOS partition on {raw_macos_disk_id}") + + self._report_progress(f"Mounting source macOS partition ({source_macos_part_dev}) to {self.temp_macos_source_mount}...") + self._run_command(["diskutil", "mount", "readOnly", "-mountPoint", self.temp_macos_source_mount, source_macos_part_dev], timeout=60) + + self._report_progress(f"Mounting target USB macOS partition ({macos_partition_dev}) to {self.temp_usb_macos_target_mount}...") + self._run_command(["diskutil", "mount", "-mountPoint", self.temp_usb_macos_target_mount, macos_partition_dev], timeout=30) + + self._report_progress(f"Copying macOS system files from {self.temp_macos_source_mount} to {self.temp_usb_macos_target_mount} (sudo rsync)...") + self._report_progress("This will also take a very long time.") + self._run_command([ + "sudo", "rsync", "-avh", "--delete", + "--exclude=.Spotlight-V100", "--exclude=.fseventsd", "--exclude=/.Trashes", "--exclude=/System/Volumes/VM", "--exclude=/private/var/vm", + f"{self.temp_macos_source_mount}/", f"{self.temp_usb_macos_target_mount}/" + ]) + + self._report_progress("USB writing process completed successfully.") + return True + + except Exception as e: + self._report_progress(f"An error occurred during USB writing on macOS: {e}") + import traceback + self._report_progress(traceback.format_exc()) + return False + finally: + self._cleanup_all_mounts_and_mappings() + self._cleanup_temp_files() + +if __name__ == '__main__': + if platform.system() != "Darwin": print("This script is intended for macOS."); exit(1) + print("USB Writer macOS Standalone Test - File Copy Method") + + mock_opencore_path = "mock_opencore_macos.qcow2" + mock_macos_path = "mock_macos_macos.qcow2" + if not os.path.exists(mock_opencore_path): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_opencore_path, "384M"]) + if not os.path.exists(mock_macos_path): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_macos_path, "1G"]) + + print("\nAvailable disks (use 'diskutil list external physical' in Terminal to identify your USB):") + subprocess.run(["diskutil", "list", "external", "physical"], check=False) + test_device = input("\nEnter target disk identifier (e.g., /dev/diskX - NOT /dev/diskXsY). THIS DISK WILL BE WIPED: ") + + if not test_device or not test_device.startswith("/dev/disk"): + print("Invalid disk identifier. Exiting.") + if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path) + if os.path.exists(mock_macos_path): os.remove(mock_macos_path) + exit(1) + + confirm = input(f"Are you sure you want to wipe {test_device} and write mock images? (yes/NO): ") + success = False + if confirm.lower() == 'yes': + print("Ensure you have sudo privileges for rsync if needed, or app is run as root.") + writer = USBWriterMacOS(test_device, mock_opencore_path, mock_macos_path, print) + success = writer.format_and_write() + else: + print("Test cancelled.") + + print(f"Test finished. Success: {success}") + if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path) + if os.path.exists(mock_macos_path): os.remove(mock_macos_path) + print("Mock files cleaned up.") diff --git a/usb_writer_windows.py b/usb_writer_windows.py new file mode 100644 index 0000000..2864c98 --- /dev/null +++ b/usb_writer_windows.py @@ -0,0 +1,177 @@ +# usb_writer_windows.py +import subprocess +import os +import time +import shutil + +class USBWriterWindows: + def __init__(self, device_id: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None): + self.device_id = device_id + # Construct PhysicalDrive path carefully + disk_number_str = "".join(filter(str.isdigit, device_id)) + self.physical_drive_path = f"\\\\.\\PhysicalDrive{disk_number_str}" + self.opencore_qcow2_path = opencore_qcow2_path + self.macos_qcow2_path = macos_qcow2_path + self.progress_callback = progress_callback + + pid = os.getpid() + self.opencore_raw_path = f"opencore_temp_{pid}.raw" + self.macos_raw_path = f"macos_main_temp_{pid}.raw" + self.temp_efi_extract_dir = f"temp_efi_files_{pid}" + + self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path] + self.temp_dirs_to_clean = [self.temp_efi_extract_dir] + self.assigned_efi_letter = None + + def _report_progress(self, message: str): + if self.progress_callback: + self.progress_callback(message) + else: + print(message) + + def _run_command(self, command: list[str] | str, check=True, capture_output=False, timeout=None, shell=False, working_dir=None): + self._report_progress(f"Executing: {command if isinstance(command, str) else ' '.join(command)}") + try: + process = subprocess.run( + command, check=check, capture_output=capture_output, text=True, timeout=timeout, shell=shell, cwd=working_dir, + creationflags=subprocess.CREATE_NO_WINDOW + ) + if capture_output: + if process.stdout and process.stdout.strip(): self._report_progress(f"STDOUT: {process.stdout.strip()}") + if process.stderr and process.stderr.strip(): self._report_progress(f"STDERR: {process.stderr.strip()}") + return process + except subprocess.TimeoutExpired: + self._report_progress(f"Command timed out after {timeout} seconds.") + raise + except subprocess.CalledProcessError as e: + self._report_progress(f"Error executing (code {e.returncode}): {e.stderr or e.stdout or str(e)}") + raise + except FileNotFoundError: + self._report_progress(f"Error: Command '{command[0] if isinstance(command, list) else command.split()[0]}' not found.") + raise + + def _run_diskpart_script(self, script_content: str): + script_file_path = f"diskpart_script_{os.getpid()}.txt" + with open(script_file_path, "w") as f: + f.write(script_content) + try: + self._report_progress(f"Running diskpart script...\n{script_content}") + self._run_command(["diskpart", "/s", script_file_path], capture_output=True, check=False) + finally: + if os.path.exists(script_file_path): os.remove(script_file_path) + + def _cleanup_temp_files_and_dirs(self): + self._report_progress("Cleaning up...") + for f_path in self.temp_files_to_clean: + if os.path.exists(f_path): os.remove(f_path) + for d_path in self.temp_dirs_to_clean: + if os.path.exists(d_path): shutil.rmtree(d_path, ignore_errors=True) + + def _find_available_drive_letter(self) -> str | None: + import string + # This is a placeholder. Actual psutil or ctypes calls would be more robust. + # For now, assume 'S' is available if not 'E' through 'Z'. + return 'S' + + def check_dependencies(self): + self._report_progress("Checking dependencies (qemu-img, diskpart, robocopy)... DD for Win & 7z are manual checks.") + dependencies = ["qemu-img", "diskpart", "robocopy"] + missing = [dep for dep in dependencies if not shutil.which(dep)] + if missing: + raise RuntimeError(f"Missing dependencies: {', '.join(missing)}. qemu-img needs install & PATH.") + self._report_progress("Base dependencies found. Ensure 'dd for Windows' and '7z.exe' are in PATH if needed.") + return True + + def format_and_write(self) -> bool: + try: + self.check_dependencies() + self._cleanup_temp_files_and_dirs() + os.makedirs(self.temp_efi_extract_dir, exist_ok=True) + + disk_number = "".join(filter(str.isdigit, self.device_id)) + self._report_progress(f"WARNING: ALL DATA ON DISK {disk_number} ({self.physical_drive_path}) WILL BE ERASED!") + + self.assigned_efi_letter = self._find_available_drive_letter() + if not self.assigned_efi_letter: + raise RuntimeError("Could not find an available drive letter for EFI.") + self._report_progress(f"Attempting to use letter {self.assigned_efi_letter}: for EFI.") + + script = f"select disk {disk_number}\nclean\nconvert gpt\n" + script += f"create partition efi size=550\nformat fs=fat32 quick label=EFI\nassign letter={self.assigned_efi_letter}\n" + script += "create partition primary label=macOS_USB\nexit\n" + self._run_diskpart_script(script) + time.sleep(5) + + self._report_progress(f"Converting OpenCore QCOW2 to RAW: {self.opencore_raw_path}") + self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path]) + + self._report_progress("Extracting EFI files (using 7z if available)...") + if shutil.which("7z"): + # Simplified 7z call, assumes EFI folder is at root of first partition image by 7z + self._run_command([ + "7z", "x", self.opencore_raw_path, + f"-o{self.temp_efi_extract_dir}", "EFI", "-r", "-y" + ], check=False) + source_efi_folder = os.path.join(self.temp_efi_extract_dir, "EFI") + if not os.path.isdir(source_efi_folder): + # Fallback: check if files were extracted to temp_efi_extract_dir directly + if os.path.exists(os.path.join(self.temp_efi_extract_dir, "BOOTX64.EFI")): + source_efi_folder = self.temp_efi_extract_dir + else: + raise RuntimeError("Could not extract EFI folder using 7-Zip.") + + target_efi_on_usb = f"{self.assigned_efi_letter}:\\EFI" + if not os.path.exists(f"{self.assigned_efi_letter}:\\"): + raise RuntimeError(f"EFI partition {self.assigned_efi_letter}: not accessible after assign.") + if not os.path.exists(target_efi_on_usb): os.makedirs(target_efi_on_usb, exist_ok=True) + self._report_progress(f"Copying EFI files to {target_efi_on_usb}") + self._run_command(["robocopy", source_efi_folder, target_efi_on_usb, "/E", "/S", "/NFL", "/NDL", "/NJH", "/NJS", "/NC", "/NS", "/NP"], check=True) + else: + raise RuntimeError("7-Zip CLI (7z.exe) not found in PATH for EFI extraction.") + + self._report_progress(f"Converting macOS QCOW2 to RAW: {self.macos_raw_path}") + self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path]) + + self._report_progress("Windows RAW macOS image writing is a placeholder.") + self._report_progress(f"RAW image at: {self.macos_raw_path}") + self._report_progress(f"Target physical drive: {self.physical_drive_path}") + self._report_progress("User needs to use 'dd for Windows' to write the above raw image to the second partition of the USB drive.") + # Placeholder for actual dd command, as it's complex and risky to automate fully without specific dd tool knowledge + # E.g. dd if=self.macos_raw_path of=\\\\.\\PhysicalDriveX --partition 2 bs=4M status=progress (syntax depends on dd variant) + + self._report_progress("Windows USB writing process (EFI part done, macOS part placeholder) completed.") + return True + + except Exception as e: + self._report_progress(f"Error during Windows USB writing: {e}") + import traceback + self._report_progress(traceback.format_exc()) + return False + finally: + if self.assigned_efi_letter: + self._run_diskpart_script(f"select volume {self.assigned_efi_letter}\nremove letter={self.assigned_efi_letter}\nexit") + self._cleanup_temp_files_and_dirs() + +if __name__ == '__main__': + if platform.system() != "Windows": + print("This script is for Windows standalone testing."); exit(1) + print("USB Writer Windows Standalone Test - Partial Implementation") + # Requires Admin privileges + mock_oc = "mock_oc_win.qcow2" + mock_mac = "mock_mac_win.qcow2" + if not os.path.exists(mock_oc): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_oc, "384M"]) + if not os.path.exists(mock_mac): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_mac, "1G"]) + + disk_id = input("Enter target disk ID (e.g., '1' for 'disk 1'). WIPES DISK: ") + if not disk_id.isdigit(): print("Invalid disk ID."); exit(1) + actual_disk_id = f"disk {disk_id}" # This is how it's used in the class, but the input is just the number. + + if input(f"Sure to wipe disk {disk_id}? (yes/NO): ").lower() == 'yes': + # Pass the disk number string to the constructor, it will form \\.\PhysicalDriveX + writer = USBWriterWindows(disk_id, mock_oc, mock_mac, print) + writer.format_and_write() + else: print("Cancelled.") + + if os.path.exists(mock_oc): os.remove(mock_oc) + if os.path.exists(mock_mac): os.remove(mock_mac) + print("Mocks cleaned.")