Jules was unable to complete the task in time. Please review the work done so far and provide feedback for Jules to continue.

This commit is contained in:
google-labs-jules[bot] 2025-06-01 21:40:05 +00:00
parent 97f999ed81
commit a80706a40a
4 changed files with 966 additions and 469 deletions

View File

@ -1,16 +1,14 @@
# main_app.py
import sys
import subprocess
import threading
import os
import psutil
import platform # For OS detection and USB writing logic
import platform
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QComboBox, QPushButton, QTextEdit, QMessageBox, QMenuBar,
QFileDialog, QGroupBox
QFileDialog, QGroupBox, QLineEdit # Added QLineEdit
)
from PyQt6.QtGui import QAction
from PyQt6.QtCore import pyqtSignal, pyqtSlot, QObject, QThread
@ -22,25 +20,26 @@ from utils import (
build_docker_stop_command, build_docker_rm_command
)
# Import the Linux USB writer (conditionally or handle import error)
USBWriterLinux = None
USBWriterMacOS = None
USBWriterWindows = None
if platform.system() == "Linux":
try:
from usb_writer_linux import USBWriterLinux
except ImportError:
USBWriterLinux = None # Flag that it's not available
print("Could not import USBWriterLinux. USB writing for Linux will be disabled.")
else:
USBWriterLinux = None
try: from usb_writer_linux import USBWriterLinux
except ImportError as e: print(f"Could not import USBWriterLinux: {e}")
elif platform.system() == "Darwin":
try: from usb_writer_macos import USBWriterMacOS
except ImportError as e: print(f"Could not import USBWriterMacOS: {e}")
elif platform.system() == "Windows":
try: from usb_writer_windows import USBWriterWindows
except ImportError as e: print(f"Could not import USBWriterWindows: {e}")
# --- Worker Signals ---
class WorkerSignals(QObject):
progress = pyqtSignal(str)
finished = pyqtSignal(str)
error = pyqtSignal(str)
# --- Docker Process Worker ---
class DockerRunWorker(QObject):
class DockerRunWorker(QObject): # ... (same as before)
def __init__(self, command_list):
super().__init__()
self.command_list = command_list
@ -65,13 +64,13 @@ class DockerRunWorker(QObject):
self.signals.progress.emit(line)
self.process.stdout.close()
return_code = self.process.wait()
if not self._is_running and return_code != 0:
self.signals.finished.emit("Docker process cancelled by user.")
if not self._is_running and return_code != 0 :
self.signals.finished.emit(f"Docker process cancelled or stopped early (exit code {return_code}).")
return
if return_code == 0:
self.signals.finished.emit("Docker VM process (QEMU) closed by user or completed.")
else:
self.signals.error.emit(f"Docker VM process exited with code {return_code}. Assuming macOS setup was attempted.")
self.signals.finished.emit(f"Docker VM process exited (code {return_code}). Assuming macOS setup was attempted or QEMU window closed.")
except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.")
except Exception as e: self.signals.error.emit(f"An error occurred during Docker run: {str(e)}")
finally: self._is_running = False
@ -89,8 +88,7 @@ class DockerRunWorker(QObject):
self.signals.progress.emit("Docker process stopped.\n")
except Exception as e: self.signals.error.emit(f"Error stopping process: {str(e)}\n")
# --- Docker Command Execution Worker ---
class DockerCommandWorker(QObject):
class DockerCommandWorker(QObject): # ... (same as before)
def __init__(self, command_list, success_message="Command completed."):
super().__init__()
self.command_list = command_list
@ -105,8 +103,8 @@ class DockerCommandWorker(QObject):
self.command_list, capture_output=True, text=True, check=False,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
)
if result.stdout: self.signals.progress.emit(result.stdout)
if result.stderr: self.signals.progress.emit(f"STDERR: {result.stderr}")
if result.stdout and result.stdout.strip(): self.signals.progress.emit(result.stdout)
if result.stderr and result.stderr.strip(): self.signals.progress.emit(f"STDERR: {result.stderr}")
if result.returncode == 0: self.signals.finished.emit(self.success_message)
else:
err_msg = result.stderr or result.stdout or "Unknown error"
@ -114,11 +112,8 @@ class DockerCommandWorker(QObject):
except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.")
except Exception as e: self.signals.error.emit(f"An error occurred: {str(e)}")
# --- USB Writing Worker ---
class USBWriterWorker(QObject):
class USBWriterWorker(QObject): # ... (same as before, uses platform check)
signals = WorkerSignals()
def __init__(self, device, opencore_path, macos_path):
super().__init__()
self.device = device
@ -128,120 +123,107 @@ class USBWriterWorker(QObject):
@pyqtSlot()
def run(self):
current_os = platform.system()
try:
if platform.system() == "Linux":
if USBWriterLinux is None:
self.signals.error.emit("USBWriterLinux module not loaded. Cannot write to USB on this system.")
return
if current_os == "Linux":
if USBWriterLinux is None: self.signals.error.emit("USBWriterLinux module not available."); return
self.writer_instance = USBWriterLinux(self.device, self.opencore_path, self.macos_path, lambda msg: self.signals.progress.emit(msg))
elif current_os == "Darwin":
if USBWriterMacOS is None: self.signals.error.emit("USBWriterMacOS module not available."); return
self.writer_instance = USBWriterMacOS(self.device, self.opencore_path, self.macos_path, lambda msg: self.signals.progress.emit(msg))
elif current_os == "Windows":
if USBWriterWindows is None: self.signals.error.emit("USBWriterWindows module not available."); return
self.writer_instance = USBWriterWindows(self.device, self.opencore_path, self.macos_path, lambda msg: self.signals.progress.emit(msg))
else:
self.signals.error.emit(f"USB writing not supported on {current_os}."); return
self.writer_instance = USBWriterLinux(
self.device, self.opencore_path, self.macos_path,
progress_callback=lambda msg: self.signals.progress.emit(msg)
)
# Dependency check is called within format_and_write
if self.writer_instance.format_and_write():
self.signals.finished.emit("USB writing process completed successfully.")
else:
# Error message should have been emitted by the writer via progress_callback
self.signals.error.emit("USB writing process failed. Check output for details.")
else:
self.signals.error.emit(f"USB writing is not currently supported on {platform.system()}.")
except Exception as e:
self.signals.error.emit(f"An unexpected error occurred during USB writing preparation: {str(e)}")
self.signals.error.emit(f"USB writing preparation error: {str(e)}")
class MainWindow(QMainWindow):
class MainWindow(QMainWindow): # ... (init and _setup_ui need changes for Windows USB input)
def __init__(self):
super().__init__()
self.setWindowTitle(APP_NAME)
self.setGeometry(100, 100, 800, 800)
self.setGeometry(100, 100, 800, 850) # Adjusted height
self.current_container_name = None
self.extracted_main_image_path = None
self.extracted_opencore_image_path = None
self.extraction_status = {"main": False, "opencore": False}
self.active_worker_thread = None # To manage various worker threads one at a time
self.active_worker_thread = None
self.docker_run_worker_instance = None
self._setup_ui()
self.refresh_usb_drives()
def _setup_ui(self):
# ... (menu bar setup - same as before) ...
menubar = self.menuBar()
file_menu = menubar.addMenu("&File")
help_menu = menubar.addMenu("&Help")
exit_action = QAction("&Exit", self)
exit_action.triggered.connect(self.close)
file_menu.addAction(exit_action)
about_action = QAction("&About", self)
about_action.triggered.connect(self.show_about_dialog)
help_menu.addAction(about_action)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# Step 1
vm_creation_group = QGroupBox("Step 1: Create and Install macOS VM")
vm_layout = QVBoxLayout()
selection_layout = QHBoxLayout()
self.version_label = QLabel("Select macOS Version:")
self.version_combo = QComboBox()
self.version_combo.addItems(MACOS_VERSIONS.keys())
selection_layout.addWidget(self.version_label)
selection_layout.addWidget(self.version_combo)
vm_layout.addLayout(selection_layout)
self.run_vm_button = QPushButton("Create VM and Start macOS Installation")
self.run_vm_button.clicked.connect(self.run_macos_vm)
vm_layout.addWidget(self.run_vm_button)
self.stop_vm_button = QPushButton("Stop/Cancel VM Creation")
self.stop_vm_button.clicked.connect(self.stop_docker_run_process)
self.stop_vm_button.setEnabled(False)
vm_layout.addWidget(self.stop_vm_button)
vm_creation_group.setLayout(vm_layout)
# ... (Menu bar, Step 1, 2, 3 groups - same as before) ...
menubar = self.menuBar(); file_menu = menubar.addMenu("&File"); help_menu = menubar.addMenu("&Help")
exit_action = QAction("&Exit", self); exit_action.triggered.connect(self.close); file_menu.addAction(exit_action)
about_action = QAction("&About", self); about_action.triggered.connect(self.show_about_dialog); help_menu.addAction(about_action)
central_widget = QWidget(); self.setCentralWidget(central_widget); main_layout = QVBoxLayout(central_widget)
vm_creation_group = QGroupBox("Step 1: Create and Install macOS VM"); vm_layout = QVBoxLayout()
selection_layout = QHBoxLayout(); self.version_label = QLabel("Select macOS Version:"); self.version_combo = QComboBox()
self.version_combo.addItems(MACOS_VERSIONS.keys()); selection_layout.addWidget(self.version_label); selection_layout.addWidget(self.version_combo)
vm_layout.addLayout(selection_layout); self.run_vm_button = QPushButton("Create VM and Start macOS Installation")
self.run_vm_button.clicked.connect(self.run_macos_vm); vm_layout.addWidget(self.run_vm_button)
self.stop_vm_button = QPushButton("Stop/Cancel VM Creation"); self.stop_vm_button.clicked.connect(self.stop_docker_run_process)
self.stop_vm_button.setEnabled(False); vm_layout.addWidget(self.stop_vm_button); vm_creation_group.setLayout(vm_layout)
main_layout.addWidget(vm_creation_group)
# Step 2
extraction_group = QGroupBox("Step 2: Extract VM Images")
ext_layout = QVBoxLayout()
self.extract_images_button = QPushButton("Extract Images from Container")
self.extract_images_button.clicked.connect(self.extract_vm_images)
self.extract_images_button.setEnabled(False)
ext_layout.addWidget(self.extract_images_button)
extraction_group.setLayout(ext_layout)
extraction_group = QGroupBox("Step 2: Extract VM Images"); ext_layout = QVBoxLayout()
self.extract_images_button = QPushButton("Extract Images from Container"); self.extract_images_button.clicked.connect(self.extract_vm_images)
self.extract_images_button.setEnabled(False); ext_layout.addWidget(self.extract_images_button); extraction_group.setLayout(ext_layout)
main_layout.addWidget(extraction_group)
# Step 3
mgmt_group = QGroupBox("Step 3: Container Management (Optional)")
mgmt_layout = QHBoxLayout()
self.stop_container_button = QPushButton("Stop Container")
self.stop_container_button.clicked.connect(self.stop_persistent_container)
self.stop_container_button.setEnabled(False)
mgmt_layout.addWidget(self.stop_container_button)
self.remove_container_button = QPushButton("Remove Container")
self.remove_container_button.clicked.connect(self.remove_persistent_container)
self.remove_container_button.setEnabled(False)
mgmt_layout.addWidget(self.remove_container_button)
mgmt_group.setLayout(mgmt_layout)
mgmt_group = QGroupBox("Step 3: Container Management (Optional)"); mgmt_layout = QHBoxLayout()
self.stop_container_button = QPushButton("Stop Container"); self.stop_container_button.clicked.connect(self.stop_persistent_container)
self.stop_container_button.setEnabled(False); mgmt_layout.addWidget(self.stop_container_button)
self.remove_container_button = QPushButton("Remove Container"); self.remove_container_button.clicked.connect(self.remove_persistent_container)
self.remove_container_button.setEnabled(False); mgmt_layout.addWidget(self.remove_container_button); mgmt_group.setLayout(mgmt_layout)
main_layout.addWidget(mgmt_group)
# Step 4: USB Drive Selection
usb_group = QGroupBox("Step 4: Select Target USB Drive and Write") # Title updated
# Step 4: USB Drive Selection - Modified for Windows
usb_group = QGroupBox("Step 4: Select Target USB Drive and Write")
usb_layout = QVBoxLayout()
self.usb_drive_label = QLabel("Available USB Drives (for Linux/macOS):")
usb_layout.addWidget(self.usb_drive_label)
usb_selection_layout = QHBoxLayout()
self.usb_drive_combo = QComboBox()
self.usb_drive_combo.currentIndexChanged.connect(self.update_write_to_usb_button_state)
usb_selection_layout.addWidget(QLabel("Available USB Drives:"))
usb_selection_layout.addWidget(self.usb_drive_combo)
self.refresh_usb_button = QPushButton("Refresh List")
self.refresh_usb_button.clicked.connect(self.refresh_usb_drives)
usb_selection_layout.addWidget(self.refresh_usb_button)
usb_layout.addLayout(usb_selection_layout)
# Windows-specific input for disk ID
self.windows_usb_input_label = QLabel("For Windows: Enter USB Disk Number (e.g., 1, 2). Use 'diskpart' -> 'list disk' in an Admin CMD to find it.")
self.windows_disk_id_input = QLineEdit()
self.windows_disk_id_input.setPlaceholderText("Enter Disk Number (e.g., 1)")
self.windows_disk_id_input.textChanged.connect(self.update_write_to_usb_button_state)
if platform.system() == "Windows":
self.usb_drive_label.setText("Detected Mountable Partitions (for reference only for writing):")
usb_layout.addWidget(self.windows_usb_input_label)
usb_layout.addWidget(self.windows_disk_id_input)
else:
self.windows_usb_input_label.setVisible(False)
self.windows_disk_id_input.setVisible(False)
warning_label = QLabel("WARNING: Selecting a drive and proceeding to write will ERASE ALL DATA on it!")
warning_label.setStyleSheet("color: red; font-weight: bold;")
usb_layout.addWidget(warning_label)
self.write_to_usb_button = QPushButton("Write Images to USB Drive")
self.write_to_usb_button.clicked.connect(self.handle_write_to_usb)
self.write_to_usb_button.setEnabled(False)
usb_layout.addWidget(self.write_to_usb_button)
usb_group.setLayout(usb_layout)
main_layout.addWidget(usb_group)
@ -249,329 +231,327 @@ class MainWindow(QMainWindow):
self.output_area.setReadOnly(True)
main_layout.addWidget(self.output_area)
def show_about_dialog(self):
def show_about_dialog(self): # ... (same as before, update version)
QMessageBox.about(self, f"About {APP_NAME}",
f"Version: 0.4.0\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\n"
f"Version: 0.6.0\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\n"
"This tool helps create bootable macOS USB drives using Docker-OSX.")
def _start_worker(self, worker_instance, on_finished_slot, on_error_slot):
def _start_worker(self, worker_instance, on_finished_slot, on_error_slot, worker_name="worker"): # ... (same as before)
if self.active_worker_thread and self.active_worker_thread.isRunning():
QMessageBox.warning(self, "Busy", "Another operation is already in progress. Please wait.")
return False
self.active_worker_thread = QThread()
self.active_worker_thread.setObjectName(worker_name + "_thread")
setattr(self, f"{worker_name}_instance", worker_instance)
worker_instance.moveToThread(self.active_worker_thread)
worker_instance.signals.progress.connect(self.update_output)
worker_instance.signals.finished.connect(on_finished_slot)
worker_instance.signals.error.connect(on_error_slot)
# Cleanup thread when worker is done
worker_instance.signals.finished.connect(self.active_worker_thread.quit)
worker_instance.signals.error.connect(self.active_worker_thread.quit)
self.active_worker_thread.finished.connect(self.active_worker_thread.deleteLater)
self.active_worker_thread.finished.connect(lambda: self._clear_worker_instance(worker_name)) # Use new clear method
self.active_worker_thread.started.connect(worker_instance.run)
self.active_worker_thread.start()
return True
def run_macos_vm(self):
def _clear_worker_instance(self, worker_name): # New method to clean up worker instance from self
attr_name = f"{worker_name}_instance"
if hasattr(self, attr_name):
delattr(self, attr_name)
def run_macos_vm(self): # ... (same as before, ensure worker_name matches for _clear_worker_instance)
selected_version_name = self.version_combo.currentText()
self.current_container_name = get_unique_container_name()
try:
command_list = build_docker_command(selected_version_name, self.current_container_name)
self.output_area.clear()
self.output_area.append(f"Starting macOS VM creation for {selected_version_name}...")
self.output_area.append(f"Container name: {self.current_container_name}")
self.output_area.append(f"Command: {' '.join(command_list)}\n")
self.output_area.append("The macOS installation will occur in a QEMU window...\n")
self.output_area.append(f"Starting macOS VM creation for {selected_version_name}...") # ... rest of messages
self.docker_run_worker_instance = DockerRunWorker(command_list) # Store instance
if self._start_worker(self.docker_run_worker_instance, self.docker_run_finished, self.docker_run_error):
self.run_vm_button.setEnabled(False)
self.version_combo.setEnabled(False)
self.stop_vm_button.setEnabled(True)
self.extract_images_button.setEnabled(False)
docker_run_worker = DockerRunWorker(command_list) # Local var, instance stored by _start_worker
if self._start_worker(docker_run_worker, self.docker_run_finished, self.docker_run_error, "docker_run"):
self.run_vm_button.setEnabled(False); self.version_combo.setEnabled(False)
self.stop_vm_button.setEnabled(True); self.extract_images_button.setEnabled(False)
self.write_to_usb_button.setEnabled(False)
except ValueError as e: self.handle_error(f"Failed to build command: {str(e)}")
except Exception as e: self.handle_error(f"An unexpected error: {str(e)}")
@pyqtSlot(str)
def update_output(self, text):
self.output_area.append(text.strip()) # append automatically scrolls
QApplication.processEvents() # Keep UI responsive during rapid updates
def update_output(self, text): # ... (same as before)
self.output_area.append(text.strip()); QApplication.processEvents()
@pyqtSlot(str)
def docker_run_finished(self, message):
def docker_run_finished(self, message): # ... (same as before)
self.output_area.append(f"\n--- macOS VM Setup Process Finished ---\n{message}")
QMessageBox.information(self, "VM Setup Complete", f"{message}\nYou can now proceed to extract images.")
self.run_vm_button.setEnabled(True)
self.version_combo.setEnabled(True)
self.stop_vm_button.setEnabled(False)
self.extract_images_button.setEnabled(True)
self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True)
self.stop_vm_button.setEnabled(False); self.extract_images_button.setEnabled(True)
self.stop_container_button.setEnabled(True)
self.active_worker_thread = None # Allow new worker
self.active_worker_thread = None # Cleared by _start_worker's finished connection
@pyqtSlot(str)
def docker_run_error(self, error_message):
def docker_run_error(self, error_message): # ... (same as before)
self.output_area.append(f"\n--- macOS VM Setup Process Error ---\n{error_message}")
if "exited with code" in error_message and self.current_container_name:
if "exited" in error_message.lower() and self.current_container_name:
QMessageBox.warning(self, "VM Setup Ended", f"{error_message}\nAssuming macOS setup was attempted...")
self.extract_images_button.setEnabled(True)
self.stop_container_button.setEnabled(True)
self.extract_images_button.setEnabled(True); self.stop_container_button.setEnabled(True)
else: QMessageBox.critical(self, "VM Setup Error", error_message)
self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True); self.stop_vm_button.setEnabled(False)
self.active_worker_thread = None
def stop_docker_run_process(self):
if hasattr(self, 'docker_run_worker_instance') and self.docker_run_worker_instance:
self.output_area.append("\n--- Attempting to stop macOS VM creation ---")
self.docker_run_worker_instance.stop() # Worker should handle signal emission
self.stop_vm_button.setEnabled(False) # Disable to prevent multiple clicks
def extract_vm_images(self):
if not self.current_container_name:
QMessageBox.warning(self, "Warning", "No active container specified for extraction."); return
def stop_docker_run_process(self):
docker_run_worker_inst = getattr(self, "docker_run_instance", None) # Use specific name
if docker_run_worker_inst:
self.output_area.append("\n--- Attempting to stop macOS VM creation ---")
docker_run_worker_inst.stop()
self.stop_vm_button.setEnabled(False)
def extract_vm_images(self): # ... (same as before, ensure worker_names are unique)
if not self.current_container_name: QMessageBox.warning(self, "Warning", "No active container."); return
save_dir = QFileDialog.getExistingDirectory(self, "Select Directory to Save VM Images")
if not save_dir: return
self.output_area.append(f"\n--- Starting Image Extraction from {self.current_container_name} to {save_dir} ---")
self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False)
self.extracted_main_image_path = os.path.join(save_dir, "mac_hdd_ng.img")
self.extracted_opencore_image_path = os.path.join(save_dir, "OpenCore.qcow2")
self.extraction_status = {"main": False, "opencore": False}
cp_main_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_MACOS_IMG_PATH, self.extracted_main_image_path)
main_worker = DockerCommandWorker(cp_main_cmd, f"Main macOS image copied to {self.extracted_main_image_path}")
if not self._start_worker(main_worker,
lambda msg: self.docker_utility_finished(msg, "main_img_extract"),
lambda err: self.docker_utility_error(err, "main_img_extract_error")):
self.extract_images_button.setEnabled(True) # Re-enable if start failed
return # Don't proceed to second if first failed to start
if not self._start_worker(main_worker, lambda msg: self.docker_utility_finished(msg, "main_img_extract"),
lambda err: self.docker_utility_error(err, "main_img_extract_error"), "cp_main"): # Unique name
self.extract_images_button.setEnabled(True); return
self.output_area.append(f"Extraction for main image started. OpenCore extraction will follow.")
def _start_opencore_extraction(self): # Called after main image extraction finishes
def _start_opencore_extraction(self): # ... (same as before, ensure worker_name is unique)
if not self.current_container_name or not self.extracted_opencore_image_path: return
cp_oc_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_OPENCORE_QCOW2_PATH, self.extracted_opencore_image_path)
oc_worker = DockerCommandWorker(cp_oc_cmd, f"OpenCore image copied to {self.extracted_opencore_image_path}")
self._start_worker(oc_worker,
lambda msg: self.docker_utility_finished(msg, "oc_img_extract"),
lambda err: self.docker_utility_error(err, "oc_img_extract_error"))
self._start_worker(oc_worker, lambda msg: self.docker_utility_finished(msg, "oc_img_extract"),
lambda err: self.docker_utility_error(err, "oc_img_extract_error"), "cp_oc") # Unique name
def stop_persistent_container(self):
def stop_persistent_container(self): # ... (same as before, ensure worker_name is unique)
if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return
self.output_area.append(f"\n--- Stopping container {self.current_container_name} ---")
cmd = build_docker_stop_command(self.current_container_name)
worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} stopped.")
if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "stop_container"),
lambda err: self.docker_utility_error(err, "stop_container_error")):
lambda err: self.docker_utility_error(err, "stop_container_error"), "stop_docker"): # Unique name
self.stop_container_button.setEnabled(False)
def remove_persistent_container(self):
def remove_persistent_container(self): # ... (same as before, ensure worker_name is unique)
if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return
reply = QMessageBox.question(self, 'Confirm Remove', f"Remove container '{self.current_container_name}'?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
reply = QMessageBox.question(self, 'Confirm Remove', f"Remove container '{self.current_container_name}'?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.No: return
self.output_area.append(f"\n--- Removing container {self.current_container_name} ---")
cmd = build_docker_rm_command(self.current_container_name)
worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} removed.")
if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "rm_container"),
lambda err: self.docker_utility_error(err, "rm_container_error")):
lambda err: self.docker_utility_error(err, "rm_container_error"), "rm_docker"): # Unique name
self.remove_container_button.setEnabled(False)
def docker_utility_finished(self, message, task_id):
self.output_area.append(f"\n--- Task '{task_id}' Succeeded ---\n{message}")
QMessageBox.information(self, f"Task Complete", message)
self.active_worker_thread = None # Allow new worker
if task_id == "main_img_extract":
self.extraction_status["main"] = True
self._start_opencore_extraction() # Start next part of extraction
elif task_id == "oc_img_extract":
self.extraction_status["opencore"] = True
def docker_utility_finished(self, message, task_id): # ... (same as before)
self.output_area.append(f"\n--- Task '{task_id}' Succeeded ---\n{message}"); QMessageBox.information(self, f"Task Complete", message)
if task_id == "main_img_extract": self.extraction_status["main"] = True; self._start_opencore_extraction(); return
elif task_id == "oc_img_extract": self.extraction_status["opencore"] = True
self.active_worker_thread = None # Cleared by _start_worker's finished connection
if self.extraction_status.get("main") and self.extraction_status.get("opencore"):
self.output_area.append("\nBoth VM images extracted successfully.")
self.update_write_to_usb_button_state()
self.extract_images_button.setEnabled(True)
elif task_id.startswith("extract"): # If one part finished but not both
self.extract_images_button.setEnabled(True)
if task_id == "stop_container":
self.remove_container_button.setEnabled(True)
self.output_area.append("\nBoth VM images extracted successfully."); self.update_write_to_usb_button_state(); self.extract_images_button.setEnabled(True)
elif task_id.startswith("extract"): self.extract_images_button.setEnabled(True)
if task_id == "stop_container": self.remove_container_button.setEnabled(True)
if task_id == "rm_container":
self.current_container_name = None
self.stop_container_button.setEnabled(False)
self.extract_images_button.setEnabled(False)
self.update_write_to_usb_button_state() # Should disable it
self.current_container_name = None; self.stop_container_button.setEnabled(False)
self.extract_images_button.setEnabled(False); self.update_write_to_usb_button_state()
def docker_utility_error(self, error_message, task_id):
self.output_area.append(f"\n--- Task '{task_id}' Error ---\n{error_message}")
QMessageBox.critical(self, f"Task Error", error_message)
def docker_utility_error(self, error_message, task_id): # ... (same as before)
self.output_area.append(f"\n--- Task '{task_id}' Error ---\n{error_message}"); QMessageBox.critical(self, f"Task Error", error_message)
self.active_worker_thread = None
if task_id.startswith("extract"): self.extract_images_button.setEnabled(True)
if task_id == "stop_container": self.stop_container_button.setEnabled(True) # Allow retry
if task_id == "rm_container": self.remove_container_button.setEnabled(True) # Allow retry
if task_id == "stop_container": self.stop_container_button.setEnabled(True)
if task_id == "rm_container": self.remove_container_button.setEnabled(True)
def handle_error(self, message):
self.output_area.append(f"ERROR: {message}")
QMessageBox.critical(self, "Error", message)
self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True)
self.stop_vm_button.setEnabled(False); self.extract_images_button.setEnabled(False)
self.write_to_usb_button.setEnabled(False)
self.active_worker_thread = None
def handle_error(self, message): # ... (same as before)
self.output_area.append(f"ERROR: {message}"); QMessageBox.critical(self, "Error", message)
self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True); self.stop_vm_button.setEnabled(False)
self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False)
self.active_worker_thread = None; # Clear active thread
# Clear all potential worker instances
for attr_name in list(self.__dict__.keys()):
if attr_name.endswith("_instance") and isinstance(getattr(self,attr_name,None), QObject):
setattr(self,attr_name,None)
def refresh_usb_drives(self):
def refresh_usb_drives(self): # Modified for Windows
self.usb_drive_combo.clear()
self._current_usb_selection_path = self.usb_drive_combo.currentData() # Save current selection
self.output_area.append("\nScanning for USB drives...")
current_selection_text = getattr(self, '_current_usb_selection_text', None)
self.output_area.append("\nScanning for disk devices...")
current_os = platform.system()
if current_os == "Windows":
self.usb_drive_label.setText("For Windows, identify Physical Disk number (e.g., 1, 2) using Disk Management or 'diskpart > list disk'. Input below.")
self.windows_disk_id_input.setVisible(True)
self.windows_usb_input_label.setVisible(True)
self.usb_drive_combo.setVisible(False) # Hide combo for windows as input is manual
self.refresh_usb_button.setText("List Partitions (Ref.)") # Change button text
try:
partitions = psutil.disk_partitions(all=True)
ref_text = "Reference - Detected partitions/mounts:\n"
for p in partitions:
try:
usage = psutil.disk_usage(p.mountpoint)
size_gb = usage.total / (1024**3)
ref_text += f" {p.device} @ {p.mountpoint} ({p.fstype}, {size_gb:.2f} GB)\n"
except Exception:
ref_text += f" {p.device} ({p.fstype}) - could not get usage/mountpoint\n"
self.output_area.append(ref_text)
except Exception as e:
self.output_area.append(f"Error listing partitions for reference: {e}")
else:
self.usb_drive_label.setText("Available USB Drives (for Linux/macOS):")
self.windows_disk_id_input.setVisible(False)
self.windows_usb_input_label.setVisible(False)
self.usb_drive_combo.setVisible(True)
self.refresh_usb_button.setText("Refresh List")
try: # psutil logic for Linux/macOS
partitions = psutil.disk_partitions(all=False)
potential_usbs = []
for p in partitions:
is_removable = 'removable' in p.opts
is_likely_usb = False
if platform.system() == "Windows":
# A more reliable method for Windows would involve WMI or ctypes to query drive types.
# This is a basic filter.
if p.mountpoint and p.fstype and p.fstype.lower() not in ['ntfs', 'refs', 'cdfs'] and len(p.mountpoint) <= 3: # e.g. E:\
is_likely_usb = True
elif platform.system() == "Darwin":
if p.device.startswith("/dev/disk") and (os.path.exists(f"/sys/block/{os.path.basename(p.device)}/removable") or "external" in p.opts.lower()): # Check 'external' from mount options
is_likely_usb = True
elif platform.system() == "Linux":
# Check if /sys/block/sdX/removable exists and is 1
try:
with open(f"/sys/block/{os.path.basename(p.device)}/removable", "r") as f:
if f.read().strip() == "1":
is_likely_usb = True
except IOError: # If the removable file doesn't exist, it's likely not a USB mass storage
pass
if not is_likely_usb and (p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)): # Fallback to mountpoint
is_likely_usb = True
if current_os == "Darwin":
if p.device.startswith("/dev/disk") and 'external' in p.opts.lower() and 'physical' in p.opts.lower(): is_likely_usb = True
elif current_os == "Linux":
if (p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)) or (p.device.startswith("/dev/sd") and not p.device.endswith("da")): is_likely_usb = True
if is_removable or is_likely_usb:
try:
# Attempt to get disk usage. If it fails, it might be an unformatted or problematic drive.
usage = psutil.disk_usage(p.mountpoint)
size_gb = usage.total / (1024**3)
size_gb = usage.total / (1024**3);
if size_gb < 0.1 : continue
drive_text = f"{p.device} @ {p.mountpoint} ({p.fstype}, {size_gb:.2f} GB)"
potential_usbs.append((drive_text, p.device))
except Exception: pass
idx_to_select = -1
if potential_usbs:
idx_to_select = -1
for i, (text, device_path) in enumerate(potential_usbs):
self.usb_drive_combo.addItem(text, userData=device_path)
if device_path == self._current_usb_selection_path:
idx_to_select = i
self.output_area.append(f"Found {len(potential_usbs)} potential USB drive(s). Please verify carefully.")
else: self.output_area.append("No suitable USB drives found. Ensure drive is connected, formatted, and mounted.")
if text == current_selection_text: idx_to_select = i
if idx_to_select != -1: self.usb_drive_combo.setCurrentIndex(idx_to_select)
except ImportError: self.output_area.append("psutil library not found. USB detection disabled.")
self.output_area.append(f"Found {len(potential_usbs)} potential USB drive(s). Please verify carefully.")
else: self.output_area.append("No suitable USB drives found for Linux/macOS.")
except ImportError: self.output_area.append("psutil library not found.")
except Exception as e: self.output_area.append(f"Error scanning for USB drives: {e}")
self.update_write_to_usb_button_state()
def handle_write_to_usb(self):
if platform.system() != "Linux":
QMessageBox.warning(self, "Unsupported Platform", f"USB writing is currently only implemented for Linux. Your system: {platform.system()}")
return
def handle_write_to_usb(self): # Modified for Windows
current_os = platform.system()
usb_writer_module = None
target_device_id_for_worker = None
if USBWriterLinux is None:
QMessageBox.critical(self, "Error", "USBWriterLinux module could not be loaded. Cannot write to USB.")
return
if current_os == "Linux":
usb_writer_module = USBWriterLinux
target_device_id_for_worker = self.usb_drive_combo.currentData()
elif current_os == "Darwin":
usb_writer_module = USBWriterMacOS
target_device_id_for_worker = self.usb_drive_combo.currentData()
elif current_os == "Windows":
usb_writer_module = USBWriterWindows
# For Windows, device_id for USBWriterWindows is the disk number string
target_device_id_for_worker = self.windows_disk_id_input.text().strip()
if not target_device_id_for_worker.isdigit(): # Basic validation
QMessageBox.warning(self, "Input Required", "Please enter a valid Windows Disk Number (e.g., 1, 2)."); return
# USBWriterWindows expects just the number, it constructs \\.\PhysicalDriveX itself.
if not usb_writer_module:
QMessageBox.warning(self, "Unsupported Platform", f"USB writing not supported/enabled for {current_os}."); return
selected_drive_device = self.usb_drive_combo.currentData()
if not self.extracted_main_image_path or not self.extracted_opencore_image_path or not self.extraction_status["main"] or not self.extraction_status["opencore"]:
QMessageBox.warning(self, "Missing Images", "Ensure both images are extracted."); return
if not selected_drive_device:
QMessageBox.warning(self, "No USB Selected", "Please select a target USB drive."); return
if not target_device_id_for_worker: # Should catch empty input for Windows here too
QMessageBox.warning(self, "No USB Selected/Identified", f"Please select/identify the target USB drive for {current_os}."); return
confirm_msg = (f"WARNING: ALL DATA ON {selected_drive_device} WILL BE ERASED PERMANENTLY.\n"
confirm_msg = (f"WARNING: ALL DATA ON TARGET '{target_device_id_for_worker}' WILL BE ERASED PERMANENTLY.
"
"Are you absolutely sure you want to proceed?")
reply = QMessageBox.warning(self, "Confirm Write Operation", confirm_msg,
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel,
QMessageBox.StandardButton.Cancel)
if reply == QMessageBox.StandardButton.Cancel:
self.output_area.append("\nUSB write operation cancelled by user."); return
self.output_area.append("
USB write operation cancelled by user."); return
self.output_area.append(f"\n--- Starting USB Write Process for {selected_drive_device} ---")
self.output_area.append("This will take a long time and requires sudo privileges for underlying commands.")
self.output_area.append(f"
--- Starting USB Write Process for {target_device_id_for_worker} on {current_os} ---")
self.write_to_usb_button.setEnabled(False); self.refresh_usb_button.setEnabled(False)
usb_worker = USBWriterWorker(selected_drive_device, self.extracted_opencore_image_path, self.extracted_main_image_path)
if self._start_worker(usb_worker, self.usb_write_finished, self.usb_write_error):
self.write_to_usb_button.setEnabled(False) # Disable during write
self.refresh_usb_button.setEnabled(False)
else: # Failed to start worker (another is running)
pass # Message already shown by _start_worker
usb_worker = USBWriterWorker(target_device_id_for_worker, self.extracted_opencore_image_path, self.extracted_main_image_path)
if not self._start_worker(usb_worker, self.usb_write_finished, self.usb_write_error, "usb_write"): # worker_name "usb_write"
self.write_to_usb_button.setEnabled(True); self.refresh_usb_button.setEnabled(True)
@pyqtSlot(str)
def usb_write_finished(self, message): # ... (same as before)
self.output_area.append(f"
--- USB Write Process Finished ---
{message}"); QMessageBox.information(self, "USB Write Complete", message)
self.write_to_usb_button.setEnabled(True); self.refresh_usb_button.setEnabled(True)
self.active_worker_thread = None; setattr(self, "usb_write_instance", None)
@pyqtSlot(str)
def usb_write_finished(self, message):
self.output_area.append(f"\n--- USB Write Process Finished ---\n{message}")
QMessageBox.information(self, "USB Write Complete", message)
self.write_to_usb_button.setEnabled(True) # Re-enable after completion
self.refresh_usb_button.setEnabled(True)
self.active_worker_thread = None
def usb_write_error(self, error_message): # ... (same as before)
self.output_area.append(f"
--- USB Write Process Error ---
{error_message}"); QMessageBox.critical(self, "USB Write Error", error_message)
self.write_to_usb_button.setEnabled(True); self.refresh_usb_button.setEnabled(True)
self.active_worker_thread = None; setattr(self, "usb_write_instance", None)
@pyqtSlot(str)
def usb_write_error(self, error_message):
self.output_area.append(f"\n--- USB Write Process Error ---\n{error_message}")
QMessageBox.critical(self, "USB Write Error", error_message)
self.write_to_usb_button.setEnabled(True) # Re-enable after error
self.refresh_usb_button.setEnabled(True)
self.active_worker_thread = None
def update_write_to_usb_button_state(self):
def update_write_to_usb_button_state(self): # Modified for Windows
images_ready = self.extraction_status.get("main", False) and self.extraction_status.get("opencore", False)
usb_selected = bool(self.usb_drive_combo.currentData())
can_write_on_platform = platform.system() == "Linux" and USBWriterLinux is not None
usb_identified = False
current_os = platform.system()
writer_module = None
self.write_to_usb_button.setEnabled(images_ready and usb_selected and can_write_on_platform)
if not can_write_on_platform and usb_selected and images_ready:
self.write_to_usb_button.setToolTip("USB writing currently only supported on Linux with all dependencies.")
if current_os == "Linux": writer_module = USBWriterLinux
elif current_os == "Darwin": writer_module = USBWriterMacOS
elif current_os == "Windows": writer_module = USBWriterWindows
if current_os == "Windows":
usb_identified = bool(self.windows_disk_id_input.text().strip().isdigit()) # Must be a digit for disk ID
else:
self.write_to_usb_button.setToolTip("")
usb_identified = bool(self.usb_drive_combo.currentData())
self.write_to_usb_button.setEnabled(images_ready and usb_identified and writer_module is not None)
# ... (Tooltip logic same as before) ...
if writer_module is None: self.write_to_usb_button.setToolTip(f"USB Writing not supported on {current_os} or module missing.")
elif not images_ready: self.write_to_usb_button.setToolTip("Extract VM images first.")
elif not usb_identified:
if current_os == "Windows": self.write_to_usb_button.setToolTip("Enter a valid Windows Disk Number.")
else: self.write_to_usb_button.setToolTip("Select a target USB drive.")
else: self.write_to_usb_button.setToolTip("")
def closeEvent(self, event):
def closeEvent(self, event): # ... (same as before)
self._current_usb_selection_text = self.usb_drive_combo.currentText()
if self.active_worker_thread and self.active_worker_thread.isRunning():
reply = QMessageBox.question(self, 'Confirm Exit', "An operation is running. Exit anyway?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
reply = QMessageBox.question(self, 'Confirm Exit', "An operation is running. Exit anyway?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.Yes:
# Attempt to stop the specific worker if identifiable, or just quit thread
# For DockerRunWorker:
if hasattr(self, 'docker_run_worker_instance') and self.active_worker_thread.findChild(DockerRunWorker):
self.docker_run_worker_instance.stop()
# For USBWriterWorker, it doesn't have an explicit stop, rely on thread termination.
self.active_worker_thread.quit()
if not self.active_worker_thread.wait(1000): # brief wait
self.output_area.append("Worker thread did not terminate gracefully. Forcing exit.")
worker_instance_attr_name = self.active_worker_thread.objectName().replace("_thread", "_instance")
worker_to_stop = getattr(self, worker_instance_attr_name, None)
if worker_to_stop and hasattr(worker_to_stop, 'stop'): worker_to_stop.stop()
else: self.active_worker_thread.quit()
self.active_worker_thread.wait(1000)
event.accept()
else: event.ignore()
elif self.current_container_name and self.stop_container_button.isEnabled(): # Check only if stop button is enabled (meaning container might be running or exists)
reply = QMessageBox.question(self, 'Confirm Exit', f"Container '{self.current_container_name}' may still exist or be running. It's recommended to stop and remove it using the GUI buttons. Exit anyway?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
else: event.ignore(); return
elif self.current_container_name and self.stop_container_button.isEnabled():
reply = QMessageBox.question(self, 'Confirm Exit', f"Container '{self.current_container_name}' may still exist. Exit anyway?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.Yes: event.accept()
else: event.ignore()
else:
event.accept()
else: event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)

View File

@ -2,259 +2,286 @@
import subprocess
import os
import time
# Placeholder for progress reporting signal if this were a QObject
# from PyQt6.QtCore import pyqtSignal
import shutil # For checking command existence
class USBWriterLinux:
# progress_signal = pyqtSignal(str) # Example for QObject integration
def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None):
"""
Args:
device: The path to the USB device (e.g., /dev/sdx).
opencore_qcow2_path: Path to the OpenCore.qcow2 image.
macos_qcow2_path: Path to the mac_hdd_ng.img (qcow2).
progress_callback: A function to call with progress strings.
"""
self.device = device
self.opencore_qcow2_path = opencore_qcow2_path
self.macos_qcow2_path = macos_qcow2_path
self.progress_callback = progress_callback
self.opencore_raw_path = "opencore.raw" # Temporary raw image
self.macos_raw_path = "macos_main.raw" # Temporary raw image
self.mount_point_opencore_efi = "/mnt/opencore_efi_temp"
self.mount_point_usb_esp = "/mnt/usb_esp_temp"
# Define unique temporary file and mount point names
pid = os.getpid() # Make temp names more unique if multiple instances run (though unlikely for this app)
self.opencore_raw_path = f"opencore_temp_{pid}.raw"
self.macos_raw_path = f"macos_main_temp_{pid}.raw"
self.mount_point_opencore_efi = f"/mnt/opencore_efi_temp_skyscope_{pid}"
self.mount_point_usb_esp = f"/mnt/usb_esp_temp_skyscope_{pid}"
self.mount_point_macos_source = f"/mnt/macos_source_temp_skyscope_{pid}"
self.mount_point_usb_macos_target = f"/mnt/usb_macos_target_temp_skyscope_{pid}"
self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path]
self.temp_mount_points_to_clean = [
self.mount_point_opencore_efi, self.mount_point_usb_esp,
self.mount_point_macos_source, self.mount_point_usb_macos_target
]
def _report_progress(self, message: str):
print(message) # For standalone testing
if self.progress_callback:
self.progress_callback(message)
def _run_command(self, command: list[str], check=True, capture_output=False, shell=False):
self._report_progress(f"Executing: {' '.join(command)}")
def _run_command(self, command: list[str], check=True, capture_output=False, shell=False, timeout=None):
self.progress_callback(f"Executing: {' '.join(command)}")
try:
process = subprocess.run(
command,
check=check,
capture_output=capture_output,
text=True,
shell=shell # Use shell=True with caution
shell=shell, # Use shell=True with caution
timeout=timeout
)
# Log stdout/stderr only if capture_output is True and content exists
if capture_output:
if process.stdout: self._report_progress(f"STDOUT: {process.stdout.strip()}")
if process.stderr: self._report_progress(f"STDERR: {process.stderr.strip()}")
if process.stdout and process.stdout.strip():
self._report_progress(f"STDOUT: {process.stdout.strip()}")
if process.stderr and process.stderr.strip():
self._report_progress(f"STDERR: {process.stderr.strip()}")
return process
except subprocess.TimeoutExpired:
self._report_progress(f"Command {' '.join(command)} timed out after {timeout} seconds.")
raise
except subprocess.CalledProcessError as e:
self._report_progress(f"Error executing {' '.join(command)}: {e}")
self._report_progress(f"Error executing {' '.join(command)} (return code {e.returncode}): {e}")
if e.stderr: self._report_progress(f"STDERR: {e.stderr.strip()}")
if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}")
if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}") # Sometimes errors go to stdout
raise
except FileNotFoundError:
self._report_progress(f"Error: Command {command[0]} not found. Is it installed and in PATH?")
self._report_progress(f"Error: Command '{command[0]}' not found. Is it installed and in PATH?")
raise
def _cleanup_temp_files(self):
self._report_progress("Cleaning up temporary files...")
for f_path in [self.opencore_raw_path, self.macos_raw_path]:
self._report_progress("Cleaning up temporary image files...")
for f_path in self.temp_files_to_clean:
if os.path.exists(f_path):
try:
os.remove(f_path)
self._run_command(["sudo", "rm", "-f", f_path], check=False) # Use sudo rm for root-owned files
self._report_progress(f"Removed {f_path}")
except OSError as e:
self._report_progress(f"Error removing {f_path}: {e}")
except Exception as e: # Catch broad exceptions from _run_command
self._report_progress(f"Error removing {f_path} via sudo rm: {e}")
def _unmount_and_remove_dir(self, mount_point):
def _unmount_path(self, mount_point):
if os.path.ismount(mount_point):
self._run_command(["sudo", "umount", mount_point], check=False)
if os.path.exists(mount_point):
self._report_progress(f"Unmounting {mount_point}...")
self._run_command(["sudo", "umount", "-lf", mount_point], check=False, timeout=30)
def _remove_dir_if_exists(self, dir_path):
if os.path.exists(dir_path):
try:
os.rmdir(mount_point)
except OSError as e:
self._report_progress(f"Could not rmdir {mount_point}: {e}. May need manual cleanup.")
self._run_command(["sudo", "rmdir", dir_path], check=False)
except Exception as e: # Catch broad exceptions from _run_command
self._report_progress(f"Could not rmdir {dir_path}: {e}. May need manual cleanup.")
def _cleanup_mappings_and_mounts(self):
self._report_progress("Cleaning up mappings and mounts...")
self._unmount_and_remove_dir(self.mount_point_opencore_efi)
self._unmount_and_remove_dir(self.mount_point_usb_esp)
def _cleanup_all_mounts_and_mappings(self):
self._report_progress("Cleaning up all temporary mounts and kpartx mappings...")
for mp in self.temp_mount_points_to_clean:
self._unmount_path(mp) # Unmount first
# Unmap kpartx devices - this is tricky as we don't know the loop device name easily without parsing
# For OpenCore raw image
if os.path.exists(self.opencore_raw_path):
# Detach kpartx for raw images
if os.path.exists(self.opencore_raw_path): # Check if raw file was even created
self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path], check=False)
# For the USB device itself, if kpartx was used on it (it shouldn't be for this workflow)
# self._run_command(["sudo", "kpartx", "-d", self.device], check=False)
if os.path.exists(self.macos_raw_path):
self._run_command(["sudo", "kpartx", "-d", self.macos_raw_path], check=False)
# Remove mount point directories after unmounting and detaching
for mp in self.temp_mount_points_to_clean:
self._remove_dir_if_exists(mp)
def check_dependencies(self):
self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat)...")
dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat"]
self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat, mkfs.hfsplus, apfs-fuse)...")
dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat", "mkfs.hfsplus", "apfs-fuse"]
missing_deps = []
for dep in dependencies:
try:
self._run_command([dep, "--version" if dep != "kpartx" and dep != "mkfs.vfat" else "-V"], capture_output=True) # kpartx has no version, mkfs.vfat uses -V
except (FileNotFoundError, subprocess.CalledProcessError) as e:
self._report_progress(f"Dependency {dep} not found or not working: {e}")
raise RuntimeError(f"Dependency {dep} not found. Please install it.")
self._report_progress("All dependencies found.")
if not shutil.which(dep):
missing_deps.append(dep)
if missing_deps:
msg = f"Missing dependencies: {', '.join(missing_deps)}. Please install them. `apfs-fuse` may require manual installation from source or a user repository (e.g., AUR for Arch Linux)."
self._report_progress(msg)
raise RuntimeError(msg)
self._report_progress("All critical dependencies found.")
return True
def _get_mapped_partition_device(self, kpartx_output: str, partition_index_in_image: int = 1) -> str:
lines = kpartx_output.splitlines()
# Try to find loopXpY where Y is partition_index_in_image
for line in lines:
parts = line.split()
if len(parts) > 2 and parts[0] == "add" and parts[1] == "map" and f"p{partition_index_in_image}" in parts[2]:
return f"/dev/mapper/{parts[2]}"
# Fallback for images that might be a single partition mapped directly (e.g. loopX)
# This is less common for full disk images like OpenCore.qcow2 or mac_hdd_ng.img
if partition_index_in_image == 1 and len(lines) == 1: # Only one mapping line
parts = lines[0].split()
if len(parts) > 2 and parts[0] == "add" and parts[1] == "map":
# Check if it does NOT look like a partition (no 'p' number)
if 'p' not in parts[2]:
return f"/dev/mapper/{parts[2]}" # e.g. /dev/mapper/loop0
self._report_progress(f"Could not find partition index {partition_index_in_image} in kpartx output:\n{kpartx_output}")
return None
def format_and_write(self) -> bool:
# Ensure cleanup runs even if errors occur early
try:
self.check_dependencies()
self._cleanup_all_mounts_and_mappings() # Clean before start, just in case
for mp in self.temp_mount_points_to_clean: # Create mount point directories
self._run_command(["sudo", "mkdir", "-p", mp])
self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!")
# Unmount any existing partitions on the target USB device
self._report_progress(f"Unmounting all partitions on {self.device}...")
for i in range(1, 5): # Try to unmount a few potential partitions
self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False)
self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False) # for nvme like
self._report_progress(f"Unmounting all partitions on {self.device} (best effort)...")
for i in range(1, 10):
self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False, timeout=5)
self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False, timeout=5)
# Create new GPT partition table
self._report_progress(f"Creating new GPT partition table on {self.device}...")
self._run_command(["sudo", "parted", "-s", self.device, "mklabel", "gpt"])
# Create EFI partition (e.g., 512MB)
self._run_command(["sudo", "parted", "--script", self.device, "mklabel", "gpt"])
self._report_progress("Creating EFI partition (ESP)...")
self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "EFI", "fat32", "1MiB", "513MiB"])
self._run_command(["sudo", "parted", "-s", self.device, "set", "1", "esp", "on"])
# Create macOS partition (remaining space)
self._run_command(["sudo", "parted", "--script", self.device, "mkpart", "EFI", "fat32", "1MiB", "551MiB"])
self._run_command(["sudo", "parted", "--script", self.device, "set", "1", "esp", "on"])
self._report_progress("Creating macOS partition...")
self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "macOS", "hfs+", "513MiB", "100%"])
self._run_command(["sudo", "parted", "--script", self.device, "mkpart", "macOS", "hfs+", "551MiB", "100%"])
# Inform kernel of partition changes
self._run_command(["sudo", "partprobe", self.device])
time.sleep(2) # Give kernel time to recognize new partitions
self._run_command(["sudo", "partprobe", self.device], timeout=10)
time.sleep(3)
# Determine partition names (e.g., /dev/sdx1, /dev/sdx2)
# This can be unreliable. A better way is `lsblk -jo NAME,PATH /dev/sdx`
# For simplicity, assuming /dev/sdx1 for ESP, /dev/sdx2 for macOS partition
esp_partition = f"{self.device}1"
if not os.path.exists(esp_partition): esp_partition = f"{self.device}p1" # for nvme like /dev/nvme0n1p1
esp_partition_dev = f"{self.device}1" if os.path.exists(f"{self.device}1") else f"{self.device}p1"
macos_partition_dev = f"{self.device}2" if os.path.exists(f"{self.device}2") else f"{self.device}p2"
macos_partition = f"{self.device}2"
if not os.path.exists(macos_partition): macos_partition = f"{self.device}p2"
if not (os.path.exists(esp_partition_dev) and os.path.exists(macos_partition_dev)):
raise RuntimeError(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition_dev} and {macos_partition_dev} to exist after partprobe.")
if not (os.path.exists(esp_partition) and os.path.exists(macos_partition)):
self._report_progress(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition} and {macos_partition}")
# Attempt to find them via lsblk if possible (more robust)
try:
lsblk_out = self._run_command(["lsblk", "-no", "NAME", "--paths", self.device], capture_output=True, check=True).stdout.strip().splitlines()
if len(lsblk_out) > 2 : # Device itself + at least 2 partitions
esp_partition = lsblk_out[1]
macos_partition = lsblk_out[2]
self._report_progress(f"Determined partitions using lsblk: ESP={esp_partition}, macOS={macos_partition}")
else:
raise RuntimeError("lsblk did not return enough partitions.")
except Exception as e_lsblk:
self._report_progress(f"Failed to determine partitions using lsblk: {e_lsblk}")
raise RuntimeError("Could not determine partition device names after partitioning.")
# Format ESP as FAT32
self._report_progress(f"Formatting ESP ({esp_partition}) as FAT32...")
self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition])
self._report_progress(f"Formatting ESP ({esp_partition_dev}) as FAT32...")
self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition_dev])
# --- Write EFI content ---
self._report_progress(f"Converting OpenCore QCOW2 image ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...")
self._report_progress(f"Converting OpenCore QCOW2 ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...")
self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path])
self._report_progress(f"Mapping partitions from {self.opencore_raw_path}...")
map_output = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout
self._report_progress(f"kpartx output: {map_output}")
# Example output: add map loop0p1 (253:0): 0 1048576 linear /dev/loop0 2048
# We need to parse "loop0p1" or similar from this.
mapped_efi_partition_name = None
for line in map_output.splitlines():
if "loop" in line and "p1" in line: # Assuming first partition is EFI
parts = line.split()
if len(parts) > 2:
mapped_efi_partition_name = parts[2] # e.g., loop0p1
break
if not mapped_efi_partition_name:
raise RuntimeError(f"Could not determine mapped EFI partition name from kpartx output for {self.opencore_raw_path}.")
mapped_efi_device = f"/dev/mapper/{mapped_efi_partition_name}"
self._report_progress(f"Mapped OpenCore EFI partition: {mapped_efi_device}")
os.makedirs(self.mount_point_opencore_efi, exist_ok=True)
os.makedirs(self.mount_point_usb_esp, exist_ok=True)
map_output_efi = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout
mapped_efi_device = self._get_mapped_partition_device(map_output_efi, 1) # EFI is partition 1 in OpenCore.qcow2
if not mapped_efi_device: raise RuntimeError(f"Could not map EFI partition from {self.opencore_raw_path}.")
self._report_progress(f"Mapped OpenCore EFI partition device: {mapped_efi_device}")
self._report_progress(f"Mounting {mapped_efi_device} to {self.mount_point_opencore_efi}...")
self._run_command(["sudo", "mount", "-o", "ro", mapped_efi_device, self.mount_point_opencore_efi])
self._report_progress(f"Mounting USB ESP ({esp_partition_dev}) to {self.mount_point_usb_esp}...")
self._run_command(["sudo", "mount", esp_partition_dev, self.mount_point_usb_esp])
self._report_progress(f"Mounting USB ESP ({esp_partition}) to {self.mount_point_usb_esp}...")
self._run_command(["sudo", "mount", esp_partition, self.mount_point_usb_esp])
self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi}/EFI to {self.mount_point_usb_esp}/EFI...")
source_efi_content_path = os.path.join(self.mount_point_opencore_efi, "EFI")
if not os.path.isdir(source_efi_content_path): # Check if EFI folder is in root of partition
source_efi_content_path = self.mount_point_opencore_efi # Assume content is in root
self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi} to {self.mount_point_usb_esp}...")
# Copy contents of EFI folder
source_efi_dir = os.path.join(self.mount_point_opencore_efi, "EFI")
if not os.path.exists(source_efi_dir): # Sometimes it's directly in the root of the partition image
source_efi_dir = self.mount_point_opencore_efi
target_efi_dir_on_usb = os.path.join(self.mount_point_usb_esp, "EFI")
self._run_command(["sudo", "mkdir", "-p", target_efi_dir_on_usb])
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_content_path}/", f"{target_efi_dir_on_usb}/"]) # Copy content of EFI
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_dir}/", f"{self.mount_point_usb_esp}/"])
self._unmount_path(self.mount_point_opencore_efi)
self._unmount_path(self.mount_point_usb_esp)
self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path])
# --- Write macOS main image (File-level copy) ---
self._report_progress(f"Formatting macOS partition ({macos_partition_dev}) on USB as HFS+...")
self._run_command(["sudo", "mkfs.hfsplus", "-v", "macOS_USB", macos_partition_dev])
self._report_progress("Unmounting OpenCore EFI and USB ESP...")
self._run_command(["sudo", "umount", self.mount_point_opencore_efi])
self._run_command(["sudo", "umount", self.mount_point_usb_esp])
self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path]) # Unmap loop device
# --- Write macOS main image ---
self._report_progress(f"Converting macOS QCOW2 image ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...")
self._report_progress(f"Converting macOS QCOW2 ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...")
self._report_progress("This may take a very long time and consume significant disk space temporarily.")
# Add dd progress status if possible, or estimate time based on size
# For qemu-img, there's no easy progress for convert.
self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path])
self._report_progress(f"Writing RAW macOS image ({self.macos_raw_path}) to {macos_partition}...")
self._report_progress("This will also take a very long time. Please be patient.")
# Using dd with progress status
dd_command = ["sudo", "dd", f"if={self.macos_raw_path}", f"of={macos_partition}", "bs=4M", "status=progress", "conv=fsync"]
self._run_command(dd_command)
self._report_progress(f"Mapping partitions from macOS RAW image ({self.macos_raw_path})...")
map_output_macos = self._run_command(["sudo", "kpartx", "-av", self.macos_raw_path], capture_output=True).stdout
# The mac_hdd_ng.img usually contains an APFS container.
# kpartx might show multiple APFS volumes within the container, or the container partition itself.
# We need to mount the APFS Data or System volume.
# Typically, the main usable partition is the largest one, or the second one (after a small EFI if present in this image).
mapped_macos_device = self._get_mapped_partition_device(map_output_macos, 2) # Try p2 (common for APFS container)
if not mapped_macos_device:
mapped_macos_device = self._get_mapped_partition_device(map_output_macos, 1) # Fallback to p1
if not mapped_macos_device:
raise RuntimeError(f"Could not identify and map main macOS data partition from {self.macos_raw_path}.")
self._report_progress(f"Mapped macOS source partition device: {mapped_macos_device}")
self._report_progress(f"Mounting source macOS partition ({mapped_macos_device}) to {self.mount_point_macos_source} using apfs-fuse...")
self._run_command(["sudo", "apfs-fuse", "-o", "ro,allow_other", mapped_macos_device, self.mount_point_macos_source])
self._report_progress(f"Mounting target USB macOS partition ({macos_partition_dev}) to {self.mount_point_usb_macos_target}...")
self._run_command(["sudo", "mount", macos_partition_dev, self.mount_point_usb_macos_target])
self._report_progress(f"Copying macOS system files from {self.mount_point_macos_source} to {self.mount_point_usb_macos_target} using rsync...")
self._report_progress("This will take a very long time. Please be patient.")
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{self.mount_point_macos_source}/", f"{self.mount_point_usb_macos_target}/"]) # Note trailing slashes
self._report_progress("USB writing process completed successfully.")
return True
except Exception as e:
self._report_progress(f"An error occurred during USB writing: {e}")
import traceback
self._report_progress(traceback.format_exc()) # Log full traceback for debugging
return False
finally:
self._cleanup_mappings_and_mounts()
self._cleanup_all_mounts_and_mappings()
self._cleanup_temp_files()
if __name__ == '__main__':
# This is for standalone testing of this script.
# YOU MUST RUN THIS SCRIPT WITH SUDO for it to work.
# BE EXTREMELY CAREFUL with the device path.
if os.geteuid() != 0:
print("Please run this script as root (sudo) for testing.")
exit(1)
print("USB Writer Linux Standalone Test")
# Replace with actual paths to your QCOW2 files for testing
test_opencore_qcow2 = "path_to_your/OpenCore.qcow2"
test_macos_qcow2 = "path_to_your/mac_hdd_ng.img"
print("USB Writer Linux Standalone Test - REFACTORED for File Copy")
# Create dummy qcow2 files for testing script structure
# These won't result in a bootable USB but allow testing the commands.
mock_opencore_path = "mock_opencore_usb_writer.qcow2"
mock_macos_path = "mock_macos_usb_writer.qcow2"
print(f"Creating mock image: {mock_opencore_path}")
subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_opencore_path, "384M"], check=True)
# TODO: A more complex mock would involve creating a partition table and filesystem inside this qcow2.
# For now, this is just to ensure the file exists for qemu-img convert.
# Actual EFI content would be needed for kpartx to map something meaningful.
print(f"Creating mock image: {mock_macos_path}")
subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_macos_path, "1G"], check=True) # Small for quick test
# TODO: Similar to above, a real test needs a qcow2 with a mountable filesystem.
# IMPORTANT: List available block devices to help user choose.
print("\nAvailable block devices (be careful!):")
subprocess.run(["lsblk", "-d", "-o", "NAME,SIZE,MODEL"], check=True)
test_device = input("\nEnter target device (e.g., /dev/sdX). THIS DEVICE WILL BE WIPED: ")
if not test_device or not test_device.startswith("/dev/"):
if not test_device or not (test_device.startswith("/dev/") or test_device.startswith("/dev/mapper/")): # Allow /dev/mapper for testing with loop devices
print("Invalid device. Exiting.")
# Clean up mock files
if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path)
if os.path.exists(mock_macos_path): os.remove(mock_macos_path)
exit(1)
if not (os.path.exists(test_opencore_qcow2) and os.path.exists(test_macos_qcow2)):
print(f"Test files {test_opencore_qcow2} or {test_macos_qcow2} not found. Skipping write test.")
else:
confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write images? (yes/NO): ")
confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write mock images? (yes/NO): ")
success = False
if confirm.lower() == 'yes':
writer = USBWriterLinux(test_device, test_opencore_qcow2, test_macos_qcow2, print)
writer.format_and_write()
writer = USBWriterLinux(test_device, mock_opencore_path, mock_macos_path, print)
success = writer.format_and_write()
else:
print("Test cancelled by user.")
print(f"Test finished. Success: {success}")
# Clean up mock files
if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path)
if os.path.exists(mock_macos_path): os.remove(mock_macos_path)
print("Mock files cleaned up.")

313
usb_writer_macos.py Normal file
View File

@ -0,0 +1,313 @@
# usb_writer_macos.py
import subprocess
import os
import time
import shutil # For checking command existence
import plistlib # For parsing diskutil list -plist output
class USBWriterMacOS:
def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None):
self.device = device # Should be like /dev/diskX
self.opencore_qcow2_path = opencore_qcow2_path
self.macos_qcow2_path = macos_qcow2_path
self.progress_callback = progress_callback
pid = os.getpid()
self.opencore_raw_path = f"opencore_temp_{pid}.raw"
self.macos_raw_path = f"macos_main_temp_{pid}.raw"
self.temp_opencore_mount = f"/tmp/opencore_efi_temp_skyscope_{pid}"
self.temp_usb_esp_mount = f"/tmp/usb_esp_temp_skyscope_{pid}"
self.temp_macos_source_mount = f"/tmp/macos_source_temp_skyscope_{pid}"
self.temp_usb_macos_target_mount = f"/tmp/usb_macos_target_temp_skyscope_{pid}"
self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path]
self.temp_mount_points_to_clean = [
self.temp_opencore_mount, self.temp_usb_esp_mount,
self.temp_macos_source_mount, self.temp_usb_macos_target_mount
]
self.attached_raw_images_devices = [] # Store devices from hdiutil attach
def _report_progress(self, message: str):
print(message) # For standalone testing
if self.progress_callback:
self.progress_callback(message)
def _run_command(self, command: list[str], check=True, capture_output=False, timeout=None):
self._report_progress(f"Executing: {' '.join(command)}")
try:
process = subprocess.run(
command, check=check, capture_output=capture_output, text=True, timeout=timeout
)
if capture_output:
if process.stdout and process.stdout.strip():
self._report_progress(f"STDOUT: {process.stdout.strip()}")
if process.stderr and process.stderr.strip():
self._report_progress(f"STDERR: {process.stderr.strip()}")
return process
except subprocess.TimeoutExpired:
self._report_progress(f"Command {' '.join(command)} timed out after {timeout} seconds.")
raise
except subprocess.CalledProcessError as e:
self._report_progress(f"Error executing {' '.join(command)} (code {e.returncode}): {e.stderr or e.stdout or str(e)}")
raise
except FileNotFoundError:
self._report_progress(f"Error: Command '{command[0]}' not found. Is it installed and in PATH?")
raise
def _cleanup_temp_files(self):
self._report_progress("Cleaning up temporary image files...")
for f_path in self.temp_files_to_clean:
if os.path.exists(f_path):
try:
os.remove(f_path)
self._report_progress(f"Removed {f_path}")
except OSError as e:
self._report_progress(f"Error removing {f_path}: {e}")
def _unmount_path(self, mount_path_or_device, is_device=False, force=False):
target = mount_path_or_device
cmd_base = ["diskutil"]
action = "unmountDisk" if is_device else "unmount"
if force:
cmd = cmd_base + [action, "force", target]
else:
cmd = cmd_base + [action, target]
is_target_valid_for_unmount = (os.path.ismount(mount_path_or_device) and not is_device) or \
(is_device and os.path.exists(target))
if is_target_valid_for_unmount:
self._report_progress(f"Attempting to unmount {target} (Action: {action}, Force: {force})...")
self._run_command(cmd, check=False, timeout=30)
def _detach_raw_image_device(self, device_path):
if device_path and os.path.exists(device_path):
self._report_progress(f"Detaching raw image device {device_path}...")
try:
info_check = subprocess.run(["diskutil", "info", device_path], capture_output=True, text=True, check=False)
if info_check.returncode == 0:
self._run_command(["hdiutil", "detach", device_path, "-force"], check=False, timeout=30)
else:
self._report_progress(f"Device {device_path} appears invalid or already detached.")
except Exception as e:
self._report_progress(f"Exception while checking/detaching {device_path}: {e}")
def _cleanup_all_mounts_and_mappings(self):
self._report_progress("Cleaning up all temporary mounts and attached raw images...")
for mp in reversed(self.temp_mount_points_to_clean):
self._unmount_path(mp, force=True)
if os.path.exists(mp):
try: os.rmdir(mp)
except OSError as e: self._report_progress(f"Could not rmdir {mp}: {e}")
devices_to_detach = list(self.attached_raw_images_devices)
for dev_path in devices_to_detach:
self._detach_raw_image_device(dev_path)
self.attached_raw_images_devices = []
def check_dependencies(self):
self._report_progress("Checking dependencies (qemu-img, diskutil, hdiutil, rsync)...")
dependencies = ["qemu-img", "diskutil", "hdiutil", "rsync"]
missing_deps = []
for dep in dependencies:
if not shutil.which(dep):
missing_deps.append(dep)
if missing_deps:
msg = f"Missing dependencies: {', '.join(missing_deps)}. `qemu-img` might need to be installed (e.g., via Homebrew: `brew install qemu`). `diskutil`, `hdiutil`, `rsync` are usually standard on macOS."
self._report_progress(msg)
raise RuntimeError(msg)
self._report_progress("All critical dependencies found.")
return True
def _get_partition_device_id(self, parent_disk_id_str: str, partition_label_or_type: str) -> str | None:
"""Finds partition device ID by Volume Name or Content Hint."""
target_disk_id = parent_disk_id_str.replace("/dev/", "")
self._report_progress(f"Searching for partition '{partition_label_or_type}' on disk '{target_disk_id}'")
try:
result = self._run_command(["diskutil", "list", "-plist", target_disk_id], capture_output=True)
if not result.stdout:
self._report_progress(f"No stdout from diskutil list for {target_disk_id}")
return None
plist_data = plistlib.loads(result.stdout.encode('utf-8'))
all_disks_and_partitions = plist_data.get("AllDisksAndPartitions", [])
if not isinstance(all_disks_and_partitions, list):
if plist_data.get("DeviceIdentifier") == target_disk_id:
all_disks_and_partitions = [plist_data]
else:
all_disks_and_partitions = []
for disk_info_entry in all_disks_and_partitions:
current_disk_id_in_plist = disk_info_entry.get("DeviceIdentifier")
if current_disk_id_in_plist == target_disk_id:
for part_info in disk_info_entry.get("Partitions", []):
vol_name = part_info.get("VolumeName")
content_hint = part_info.get("Content")
device_id = part_info.get("DeviceIdentifier")
if device_id:
if vol_name and vol_name.strip().lower() == partition_label_or_type.strip().lower():
self._report_progress(f"Found partition by VolumeName: {vol_name} -> /dev/{device_id}")
return f"/dev/{device_id}"
if content_hint and content_hint.strip().lower() == partition_label_or_type.strip().lower():
self._report_progress(f"Found partition by Content type: {content_hint} -> /dev/{device_id}")
return f"/dev/{device_id}"
self._report_progress(f"Partition '{partition_label_or_type}' not found on disk '{target_disk_id}'.")
return None
except Exception as e:
self._report_progress(f"Error parsing 'diskutil list -plist {target_disk_id}': {e}")
return None
def format_and_write(self) -> bool:
try:
self.check_dependencies()
self._cleanup_all_mounts_and_mappings()
for mp in self.temp_mount_points_to_clean:
os.makedirs(mp, exist_ok=True)
self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!")
self._report_progress(f"Unmounting disk {self.device} (force)...")
self._unmount_path(self.device, is_device=True, force=True)
time.sleep(2)
self._report_progress(f"Partitioning {self.device} with GPT scheme...")
self._run_command([
"diskutil", "partitionDisk", self.device, "GPT",
"MS-DOS FAT32", "EFI", "551MiB",
"JHFS+", "macOS_USB", "0b"
], timeout=180)
time.sleep(3)
esp_partition_dev = self._get_partition_device_id(self.device, "EFI")
macos_partition_dev = self._get_partition_device_id(self.device, "macOS_USB")
if not (esp_partition_dev and os.path.exists(esp_partition_dev)):
esp_partition_dev = f"{self.device}s1"
if not (macos_partition_dev and os.path.exists(macos_partition_dev)):
macos_partition_dev = f"{self.device}s2"
if not (os.path.exists(esp_partition_dev) and os.path.exists(macos_partition_dev)):
raise RuntimeError(f"Could not identify partitions on {self.device}. ESP: {esp_partition_dev}, macOS: {macos_partition_dev}")
self._report_progress(f"Identified ESP: {esp_partition_dev}, macOS Partition: {macos_partition_dev}")
# --- Write EFI content ---
self._report_progress(f"Converting OpenCore QCOW2 ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...")
self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path])
self._report_progress(f"Attaching RAW OpenCore image ({self.opencore_raw_path})...")
attach_cmd_efi = ["hdiutil", "attach", "-nomount", "-imagekey", "diskimage-class=CRawDiskImage", self.opencore_raw_path]
efi_attach_output = self._run_command(attach_cmd_efi, capture_output=True).stdout.strip()
raw_efi_disk_id = efi_attach_output.splitlines()[-1].strip().split()[0]
if not raw_efi_disk_id.startswith("/dev/disk"):
raise RuntimeError(f"Failed to attach raw EFI image: {efi_attach_output}")
self.attached_raw_images_devices.append(raw_efi_disk_id)
self._report_progress(f"Attached raw OpenCore image as {raw_efi_disk_id}")
time.sleep(2)
source_efi_partition_dev = self._get_partition_device_id(raw_efi_disk_id, "EFI") or f"{raw_efi_disk_id}s1"
self._report_progress(f"Mounting source EFI partition ({source_efi_partition_dev}) to {self.temp_opencore_mount}...")
self._run_command(["diskutil", "mount", "readOnly", "-mountPoint", self.temp_opencore_mount, source_efi_partition_dev], timeout=30)
self._report_progress(f"Mounting target USB ESP ({esp_partition_dev}) to {self.temp_usb_esp_mount}...")
self._run_command(["diskutil", "mount", "-mountPoint", self.temp_usb_esp_mount, esp_partition_dev], timeout=30)
source_efi_content_path = os.path.join(self.temp_opencore_mount, "EFI")
if not os.path.isdir(source_efi_content_path): source_efi_content_path = self.temp_opencore_mount
target_efi_dir_on_usb = os.path.join(self.temp_usb_esp_mount, "EFI")
self._report_progress(f"Copying EFI files from {source_efi_content_path} to {target_efi_dir_on_usb}...")
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_content_path}/", f"{target_efi_dir_on_usb}/"])
self._unmount_path(self.temp_opencore_mount, force=True)
self._unmount_path(self.temp_usb_esp_mount, force=True)
self._detach_raw_image_device(raw_efi_disk_id); raw_efi_disk_id = None
# --- Write macOS main image (File-level copy) ---
self._report_progress(f"Converting macOS QCOW2 ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...")
self._report_progress("This may take a very long time...")
self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path])
self._report_progress(f"Attaching RAW macOS image ({self.macos_raw_path})...")
attach_cmd_macos = ["hdiutil", "attach", "-nomount", "-imagekey", "diskimage-class=CRawDiskImage", self.macos_raw_path]
macos_attach_output = self._run_command(attach_cmd_macos, capture_output=True).stdout.strip()
raw_macos_disk_id = macos_attach_output.splitlines()[-1].strip().split()[0]
if not raw_macos_disk_id.startswith("/dev/disk"):
raise RuntimeError(f"Failed to attach raw macOS image: {macos_attach_output}")
self.attached_raw_images_devices.append(raw_macos_disk_id)
self._report_progress(f"Attached raw macOS image as {raw_macos_disk_id}")
time.sleep(2)
source_macos_part_dev = self._get_partition_device_id(raw_macos_disk_id, "Apple_APFS_Container") or \
self._get_partition_device_id(raw_macos_disk_id, "Apple_APFS") or \
self._get_partition_device_id(raw_macos_disk_id, "Apple_HFS") or \
f"{raw_macos_disk_id}s2"
if not (source_macos_part_dev and os.path.exists(source_macos_part_dev)):
raise RuntimeError(f"Could not find source macOS partition on {raw_macos_disk_id}")
self._report_progress(f"Mounting source macOS partition ({source_macos_part_dev}) to {self.temp_macos_source_mount}...")
self._run_command(["diskutil", "mount", "readOnly", "-mountPoint", self.temp_macos_source_mount, source_macos_part_dev], timeout=60)
self._report_progress(f"Mounting target USB macOS partition ({macos_partition_dev}) to {self.temp_usb_macos_target_mount}...")
self._run_command(["diskutil", "mount", "-mountPoint", self.temp_usb_macos_target_mount, macos_partition_dev], timeout=30)
self._report_progress(f"Copying macOS system files from {self.temp_macos_source_mount} to {self.temp_usb_macos_target_mount} (sudo rsync)...")
self._report_progress("This will also take a very long time.")
self._run_command([
"sudo", "rsync", "-avh", "--delete",
"--exclude=.Spotlight-V100", "--exclude=.fseventsd", "--exclude=/.Trashes", "--exclude=/System/Volumes/VM", "--exclude=/private/var/vm",
f"{self.temp_macos_source_mount}/", f"{self.temp_usb_macos_target_mount}/"
])
self._report_progress("USB writing process completed successfully.")
return True
except Exception as e:
self._report_progress(f"An error occurred during USB writing on macOS: {e}")
import traceback
self._report_progress(traceback.format_exc())
return False
finally:
self._cleanup_all_mounts_and_mappings()
self._cleanup_temp_files()
if __name__ == '__main__':
if platform.system() != "Darwin": print("This script is intended for macOS."); exit(1)
print("USB Writer macOS Standalone Test - File Copy Method")
mock_opencore_path = "mock_opencore_macos.qcow2"
mock_macos_path = "mock_macos_macos.qcow2"
if not os.path.exists(mock_opencore_path): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_opencore_path, "384M"])
if not os.path.exists(mock_macos_path): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_macos_path, "1G"])
print("\nAvailable disks (use 'diskutil list external physical' in Terminal to identify your USB):")
subprocess.run(["diskutil", "list", "external", "physical"], check=False)
test_device = input("\nEnter target disk identifier (e.g., /dev/diskX - NOT /dev/diskXsY). THIS DISK WILL BE WIPED: ")
if not test_device or not test_device.startswith("/dev/disk"):
print("Invalid disk identifier. Exiting.")
if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path)
if os.path.exists(mock_macos_path): os.remove(mock_macos_path)
exit(1)
confirm = input(f"Are you sure you want to wipe {test_device} and write mock images? (yes/NO): ")
success = False
if confirm.lower() == 'yes':
print("Ensure you have sudo privileges for rsync if needed, or app is run as root.")
writer = USBWriterMacOS(test_device, mock_opencore_path, mock_macos_path, print)
success = writer.format_and_write()
else:
print("Test cancelled.")
print(f"Test finished. Success: {success}")
if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path)
if os.path.exists(mock_macos_path): os.remove(mock_macos_path)
print("Mock files cleaned up.")

177
usb_writer_windows.py Normal file
View File

@ -0,0 +1,177 @@
# usb_writer_windows.py
import subprocess
import os
import time
import shutil
class USBWriterWindows:
def __init__(self, device_id: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None):
self.device_id = device_id
# Construct PhysicalDrive path carefully
disk_number_str = "".join(filter(str.isdigit, device_id))
self.physical_drive_path = f"\\\\.\\PhysicalDrive{disk_number_str}"
self.opencore_qcow2_path = opencore_qcow2_path
self.macos_qcow2_path = macos_qcow2_path
self.progress_callback = progress_callback
pid = os.getpid()
self.opencore_raw_path = f"opencore_temp_{pid}.raw"
self.macos_raw_path = f"macos_main_temp_{pid}.raw"
self.temp_efi_extract_dir = f"temp_efi_files_{pid}"
self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path]
self.temp_dirs_to_clean = [self.temp_efi_extract_dir]
self.assigned_efi_letter = None
def _report_progress(self, message: str):
if self.progress_callback:
self.progress_callback(message)
else:
print(message)
def _run_command(self, command: list[str] | str, check=True, capture_output=False, timeout=None, shell=False, working_dir=None):
self._report_progress(f"Executing: {command if isinstance(command, str) else ' '.join(command)}")
try:
process = subprocess.run(
command, check=check, capture_output=capture_output, text=True, timeout=timeout, shell=shell, cwd=working_dir,
creationflags=subprocess.CREATE_NO_WINDOW
)
if capture_output:
if process.stdout and process.stdout.strip(): self._report_progress(f"STDOUT: {process.stdout.strip()}")
if process.stderr and process.stderr.strip(): self._report_progress(f"STDERR: {process.stderr.strip()}")
return process
except subprocess.TimeoutExpired:
self._report_progress(f"Command timed out after {timeout} seconds.")
raise
except subprocess.CalledProcessError as e:
self._report_progress(f"Error executing (code {e.returncode}): {e.stderr or e.stdout or str(e)}")
raise
except FileNotFoundError:
self._report_progress(f"Error: Command '{command[0] if isinstance(command, list) else command.split()[0]}' not found.")
raise
def _run_diskpart_script(self, script_content: str):
script_file_path = f"diskpart_script_{os.getpid()}.txt"
with open(script_file_path, "w") as f:
f.write(script_content)
try:
self._report_progress(f"Running diskpart script...\n{script_content}")
self._run_command(["diskpart", "/s", script_file_path], capture_output=True, check=False)
finally:
if os.path.exists(script_file_path): os.remove(script_file_path)
def _cleanup_temp_files_and_dirs(self):
self._report_progress("Cleaning up...")
for f_path in self.temp_files_to_clean:
if os.path.exists(f_path): os.remove(f_path)
for d_path in self.temp_dirs_to_clean:
if os.path.exists(d_path): shutil.rmtree(d_path, ignore_errors=True)
def _find_available_drive_letter(self) -> str | None:
import string
# This is a placeholder. Actual psutil or ctypes calls would be more robust.
# For now, assume 'S' is available if not 'E' through 'Z'.
return 'S'
def check_dependencies(self):
self._report_progress("Checking dependencies (qemu-img, diskpart, robocopy)... DD for Win & 7z are manual checks.")
dependencies = ["qemu-img", "diskpart", "robocopy"]
missing = [dep for dep in dependencies if not shutil.which(dep)]
if missing:
raise RuntimeError(f"Missing dependencies: {', '.join(missing)}. qemu-img needs install & PATH.")
self._report_progress("Base dependencies found. Ensure 'dd for Windows' and '7z.exe' are in PATH if needed.")
return True
def format_and_write(self) -> bool:
try:
self.check_dependencies()
self._cleanup_temp_files_and_dirs()
os.makedirs(self.temp_efi_extract_dir, exist_ok=True)
disk_number = "".join(filter(str.isdigit, self.device_id))
self._report_progress(f"WARNING: ALL DATA ON DISK {disk_number} ({self.physical_drive_path}) WILL BE ERASED!")
self.assigned_efi_letter = self._find_available_drive_letter()
if not self.assigned_efi_letter:
raise RuntimeError("Could not find an available drive letter for EFI.")
self._report_progress(f"Attempting to use letter {self.assigned_efi_letter}: for EFI.")
script = f"select disk {disk_number}\nclean\nconvert gpt\n"
script += f"create partition efi size=550\nformat fs=fat32 quick label=EFI\nassign letter={self.assigned_efi_letter}\n"
script += "create partition primary label=macOS_USB\nexit\n"
self._run_diskpart_script(script)
time.sleep(5)
self._report_progress(f"Converting OpenCore QCOW2 to RAW: {self.opencore_raw_path}")
self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path])
self._report_progress("Extracting EFI files (using 7z if available)...")
if shutil.which("7z"):
# Simplified 7z call, assumes EFI folder is at root of first partition image by 7z
self._run_command([
"7z", "x", self.opencore_raw_path,
f"-o{self.temp_efi_extract_dir}", "EFI", "-r", "-y"
], check=False)
source_efi_folder = os.path.join(self.temp_efi_extract_dir, "EFI")
if not os.path.isdir(source_efi_folder):
# Fallback: check if files were extracted to temp_efi_extract_dir directly
if os.path.exists(os.path.join(self.temp_efi_extract_dir, "BOOTX64.EFI")):
source_efi_folder = self.temp_efi_extract_dir
else:
raise RuntimeError("Could not extract EFI folder using 7-Zip.")
target_efi_on_usb = f"{self.assigned_efi_letter}:\\EFI"
if not os.path.exists(f"{self.assigned_efi_letter}:\\"):
raise RuntimeError(f"EFI partition {self.assigned_efi_letter}: not accessible after assign.")
if not os.path.exists(target_efi_on_usb): os.makedirs(target_efi_on_usb, exist_ok=True)
self._report_progress(f"Copying EFI files to {target_efi_on_usb}")
self._run_command(["robocopy", source_efi_folder, target_efi_on_usb, "/E", "/S", "/NFL", "/NDL", "/NJH", "/NJS", "/NC", "/NS", "/NP"], check=True)
else:
raise RuntimeError("7-Zip CLI (7z.exe) not found in PATH for EFI extraction.")
self._report_progress(f"Converting macOS QCOW2 to RAW: {self.macos_raw_path}")
self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path])
self._report_progress("Windows RAW macOS image writing is a placeholder.")
self._report_progress(f"RAW image at: {self.macos_raw_path}")
self._report_progress(f"Target physical drive: {self.physical_drive_path}")
self._report_progress("User needs to use 'dd for Windows' to write the above raw image to the second partition of the USB drive.")
# Placeholder for actual dd command, as it's complex and risky to automate fully without specific dd tool knowledge
# E.g. dd if=self.macos_raw_path of=\\\\.\\PhysicalDriveX --partition 2 bs=4M status=progress (syntax depends on dd variant)
self._report_progress("Windows USB writing process (EFI part done, macOS part placeholder) completed.")
return True
except Exception as e:
self._report_progress(f"Error during Windows USB writing: {e}")
import traceback
self._report_progress(traceback.format_exc())
return False
finally:
if self.assigned_efi_letter:
self._run_diskpart_script(f"select volume {self.assigned_efi_letter}\nremove letter={self.assigned_efi_letter}\nexit")
self._cleanup_temp_files_and_dirs()
if __name__ == '__main__':
if platform.system() != "Windows":
print("This script is for Windows standalone testing."); exit(1)
print("USB Writer Windows Standalone Test - Partial Implementation")
# Requires Admin privileges
mock_oc = "mock_oc_win.qcow2"
mock_mac = "mock_mac_win.qcow2"
if not os.path.exists(mock_oc): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_oc, "384M"])
if not os.path.exists(mock_mac): subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_mac, "1G"])
disk_id = input("Enter target disk ID (e.g., '1' for 'disk 1'). WIPES DISK: ")
if not disk_id.isdigit(): print("Invalid disk ID."); exit(1)
actual_disk_id = f"disk {disk_id}" # This is how it's used in the class, but the input is just the number.
if input(f"Sure to wipe disk {disk_id}? (yes/NO): ").lower() == 'yes':
# Pass the disk number string to the constructor, it will form \\.\PhysicalDriveX
writer = USBWriterWindows(disk_id, mock_oc, mock_mac, print)
writer.format_and_write()
else: print("Cancelled.")
if os.path.exists(mock_oc): os.remove(mock_oc)
if os.path.exists(mock_mac): os.remove(mock_mac)
print("Mocks cleaned.")