diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..a8b2114 --- /dev/null +++ b/constants.py @@ -0,0 +1,55 @@ +# constants.py + +APP_NAME = "Skyscope macOS on PC USB Creator Tool" +DEVELOPER_NAME = "Miss Casey Jay Topojani" +BUSINESS_NAME = "Skyscope Sentinel Intelligence" + +MACOS_VERSIONS = { + "Sonoma": "sonoma", + "Ventura": "ventura", + "Monterey": "monterey", + "Big Sur": "big-sur", + "Catalina": "catalina" +} + +# Docker image base name +DOCKER_IMAGE_BASE = "sickcodes/docker-osx" + +# Default Docker command parameters (some will be overridden) +DEFAULT_DOCKER_PARAMS = { + "--device": "/dev/kvm", + "-p": "50922:10022", # For SSH access to the container + "-v": "/tmp/.X11-unix:/tmp/.X11-unix", # For GUI display + "-e": "DISPLAY=${DISPLAY:-:0.0}", + "-e GENERATE_UNIQUE": "true", # Crucial for unique OpenCore + # Sonoma-specific, will need to be conditional or use a base plist + # that works for all, or fetch the correct one per version. + # For now, let's use a generic one if possible, or the Sonoma one as a placeholder. + # The original issue used a Sonoma-specific one. + "-e CPU": "'Haswell-noTSX'", + "-e CPUID_FLAGS": "'kvm=on,vendor=GenuineIntel,+invtsc,vmware-cpuid-freq=on'", + "-e MASTER_PLIST_URL": "'https://raw.githubusercontent.com/sickcodes/osx-serial-generator/master/config-custom-sonoma.plist'" +} + +# Parameters that might change per macOS version or user setting +VERSION_SPECIFIC_PARAMS = { + "Sonoma": { + "-e SHORTNAME": "sonoma", + "-e MASTER_PLIST_URL": "'https://raw.githubusercontent.com/sickcodes/osx-serial-generator/master/config-custom-sonoma.plist'" + }, + "Ventura": { + "-e SHORTNAME": "ventura", + "-e MASTER_PLIST_URL": "'https://raw.githubusercontent.com/sickcodes/osx-serial-generator/master/config-custom.plist'" # Needs verification if different for Ventura + }, + "Monterey": { + "-e SHORTNAME": "monterey", + "-e MASTER_PLIST_URL": "'https://raw.githubusercontent.com/sickcodes/osx-serial-generator/master/config-custom.plist'" # Needs verification + }, + "Big Sur": { + "-e SHORTNAME": "big-sur", + # Big Sur might not use/need MASTER_PLIST_URL in the same way or has a different default + }, + "Catalina": { + # Catalina might not use/need MASTER_PLIST_URL + } +} diff --git a/main_app.py b/main_app.py new file mode 100644 index 0000000..53bdd01 --- /dev/null +++ b/main_app.py @@ -0,0 +1,580 @@ +# main_app.py + +import sys +import subprocess +import threading +import os +import psutil +import platform # For OS detection and USB writing logic + +from PyQt6.QtWidgets import ( + QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, + QLabel, QComboBox, QPushButton, QTextEdit, QMessageBox, QMenuBar, + QFileDialog, QGroupBox +) +from PyQt6.QtGui import QAction +from PyQt6.QtCore import pyqtSignal, pyqtSlot, QObject, QThread + +from constants import APP_NAME, DEVELOPER_NAME, BUSINESS_NAME, MACOS_VERSIONS +from utils import ( + build_docker_command, get_unique_container_name, + build_docker_cp_command, CONTAINER_MACOS_IMG_PATH, CONTAINER_OPENCORE_QCOW2_PATH, + build_docker_stop_command, build_docker_rm_command +) + +# Import the Linux USB writer (conditionally or handle import error) +if platform.system() == "Linux": + try: + from usb_writer_linux import USBWriterLinux + except ImportError: + USBWriterLinux = None # Flag that it's not available + print("Could not import USBWriterLinux. USB writing for Linux will be disabled.") +else: + USBWriterLinux = None + + +# --- Worker Signals --- +class WorkerSignals(QObject): + progress = pyqtSignal(str) + finished = pyqtSignal(str) + error = pyqtSignal(str) + +# --- Docker Process Worker --- +class DockerRunWorker(QObject): + def __init__(self, command_list): + super().__init__() + self.command_list = command_list + self.signals = WorkerSignals() + self.process = None + self._is_running = True + + @pyqtSlot() + def run(self): + try: + self.signals.progress.emit(f"Executing: {' '.join(self.command_list)}\n") + self.process = subprocess.Popen( + self.command_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + text=True, bufsize=1, universal_newlines=True, + creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 + ) + if self.process.stdout: + for line in iter(self.process.stdout.readline, ''): + if not self._is_running: + self.signals.progress.emit("Docker process stopping at user request.\n") + break + self.signals.progress.emit(line) + self.process.stdout.close() + return_code = self.process.wait() + if not self._is_running and return_code != 0: + self.signals.finished.emit("Docker process cancelled by user.") + return + if return_code == 0: + self.signals.finished.emit("Docker VM process (QEMU) closed by user or completed.") + else: + self.signals.error.emit(f"Docker VM process exited with code {return_code}. Assuming macOS setup was attempted.") + except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.") + except Exception as e: self.signals.error.emit(f"An error occurred during Docker run: {str(e)}") + finally: self._is_running = False + + def stop(self): + self._is_running = False + if self.process and self.process.poll() is None: + self.signals.progress.emit("Attempting to stop Docker process...\n") + try: + self.process.terminate() + try: self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.signals.progress.emit("Process did not terminate gracefully, killing.\n") + self.process.kill() + self.signals.progress.emit("Docker process stopped.\n") + except Exception as e: self.signals.error.emit(f"Error stopping process: {str(e)}\n") + +# --- Docker Command Execution Worker --- +class DockerCommandWorker(QObject): + def __init__(self, command_list, success_message="Command completed."): + super().__init__() + self.command_list = command_list + self.signals = WorkerSignals() + self.success_message = success_message + + @pyqtSlot() + def run(self): + try: + self.signals.progress.emit(f"Executing: {' '.join(self.command_list)}\n") + result = subprocess.run( + self.command_list, capture_output=True, text=True, check=False, + creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 + ) + if result.stdout: self.signals.progress.emit(result.stdout) + if result.stderr: self.signals.progress.emit(f"STDERR: {result.stderr}") + if result.returncode == 0: self.signals.finished.emit(self.success_message) + else: + err_msg = result.stderr or result.stdout or "Unknown error" + self.signals.error.emit(f"Command failed with code {result.returncode}: {err_msg.strip()}") + except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.") + except Exception as e: self.signals.error.emit(f"An error occurred: {str(e)}") + + +# --- USB Writing Worker --- +class USBWriterWorker(QObject): + signals = WorkerSignals() + + def __init__(self, device, opencore_path, macos_path): + super().__init__() + self.device = device + self.opencore_path = opencore_path + self.macos_path = macos_path + self.writer_instance = None + + @pyqtSlot() + def run(self): + try: + if platform.system() == "Linux": + if USBWriterLinux is None: + self.signals.error.emit("USBWriterLinux module not loaded. Cannot write to USB on this system.") + return + + self.writer_instance = USBWriterLinux( + self.device, self.opencore_path, self.macos_path, + progress_callback=lambda msg: self.signals.progress.emit(msg) + ) + # Dependency check is called within format_and_write + if self.writer_instance.format_and_write(): + self.signals.finished.emit("USB writing process completed successfully.") + else: + # Error message should have been emitted by the writer via progress_callback + self.signals.error.emit("USB writing process failed. Check output for details.") + else: + self.signals.error.emit(f"USB writing is not currently supported on {platform.system()}.") + except Exception as e: + self.signals.error.emit(f"An unexpected error occurred during USB writing preparation: {str(e)}") + + +class MainWindow(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle(APP_NAME) + self.setGeometry(100, 100, 800, 800) + self.current_container_name = None + self.extracted_main_image_path = None + self.extracted_opencore_image_path = None + self.extraction_status = {"main": False, "opencore": False} + self.active_worker_thread = None # To manage various worker threads one at a time + self._setup_ui() + self.refresh_usb_drives() + + def _setup_ui(self): + # ... (menu bar setup - same as before) ... + menubar = self.menuBar() + file_menu = menubar.addMenu("&File") + help_menu = menubar.addMenu("&Help") + exit_action = QAction("&Exit", self) + exit_action.triggered.connect(self.close) + file_menu.addAction(exit_action) + about_action = QAction("&About", self) + about_action.triggered.connect(self.show_about_dialog) + help_menu.addAction(about_action) + + central_widget = QWidget() + self.setCentralWidget(central_widget) + main_layout = QVBoxLayout(central_widget) + + # Step 1 + vm_creation_group = QGroupBox("Step 1: Create and Install macOS VM") + vm_layout = QVBoxLayout() + selection_layout = QHBoxLayout() + self.version_label = QLabel("Select macOS Version:") + self.version_combo = QComboBox() + self.version_combo.addItems(MACOS_VERSIONS.keys()) + selection_layout.addWidget(self.version_label) + selection_layout.addWidget(self.version_combo) + vm_layout.addLayout(selection_layout) + self.run_vm_button = QPushButton("Create VM and Start macOS Installation") + self.run_vm_button.clicked.connect(self.run_macos_vm) + vm_layout.addWidget(self.run_vm_button) + self.stop_vm_button = QPushButton("Stop/Cancel VM Creation") + self.stop_vm_button.clicked.connect(self.stop_docker_run_process) + self.stop_vm_button.setEnabled(False) + vm_layout.addWidget(self.stop_vm_button) + vm_creation_group.setLayout(vm_layout) + main_layout.addWidget(vm_creation_group) + + # Step 2 + extraction_group = QGroupBox("Step 2: Extract VM Images") + ext_layout = QVBoxLayout() + self.extract_images_button = QPushButton("Extract Images from Container") + self.extract_images_button.clicked.connect(self.extract_vm_images) + self.extract_images_button.setEnabled(False) + ext_layout.addWidget(self.extract_images_button) + extraction_group.setLayout(ext_layout) + main_layout.addWidget(extraction_group) + + # Step 3 + mgmt_group = QGroupBox("Step 3: Container Management (Optional)") + mgmt_layout = QHBoxLayout() + self.stop_container_button = QPushButton("Stop Container") + self.stop_container_button.clicked.connect(self.stop_persistent_container) + self.stop_container_button.setEnabled(False) + mgmt_layout.addWidget(self.stop_container_button) + self.remove_container_button = QPushButton("Remove Container") + self.remove_container_button.clicked.connect(self.remove_persistent_container) + self.remove_container_button.setEnabled(False) + mgmt_layout.addWidget(self.remove_container_button) + mgmt_group.setLayout(mgmt_layout) + main_layout.addWidget(mgmt_group) + + # Step 4: USB Drive Selection + usb_group = QGroupBox("Step 4: Select Target USB Drive and Write") # Title updated + usb_layout = QVBoxLayout() + usb_selection_layout = QHBoxLayout() + self.usb_drive_combo = QComboBox() + self.usb_drive_combo.currentIndexChanged.connect(self.update_write_to_usb_button_state) + usb_selection_layout.addWidget(QLabel("Available USB Drives:")) + usb_selection_layout.addWidget(self.usb_drive_combo) + self.refresh_usb_button = QPushButton("Refresh List") + self.refresh_usb_button.clicked.connect(self.refresh_usb_drives) + usb_selection_layout.addWidget(self.refresh_usb_button) + usb_layout.addLayout(usb_selection_layout) + warning_label = QLabel("WARNING: Selecting a drive and proceeding to write will ERASE ALL DATA on it!") + warning_label.setStyleSheet("color: red; font-weight: bold;") + usb_layout.addWidget(warning_label) + self.write_to_usb_button = QPushButton("Write Images to USB Drive") + self.write_to_usb_button.clicked.connect(self.handle_write_to_usb) + self.write_to_usb_button.setEnabled(False) + usb_layout.addWidget(self.write_to_usb_button) + usb_group.setLayout(usb_layout) + main_layout.addWidget(usb_group) + + self.output_area = QTextEdit() + self.output_area.setReadOnly(True) + main_layout.addWidget(self.output_area) + + def show_about_dialog(self): + QMessageBox.about(self, f"About {APP_NAME}", + f"Version: 0.4.0\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\n" + "This tool helps create bootable macOS USB drives using Docker-OSX.") + + def _start_worker(self, worker_instance, on_finished_slot, on_error_slot): + if self.active_worker_thread and self.active_worker_thread.isRunning(): + QMessageBox.warning(self, "Busy", "Another operation is already in progress. Please wait.") + return False + + self.active_worker_thread = QThread() + worker_instance.moveToThread(self.active_worker_thread) + + worker_instance.signals.progress.connect(self.update_output) + worker_instance.signals.finished.connect(on_finished_slot) + worker_instance.signals.error.connect(on_error_slot) + + # Cleanup thread when worker is done + worker_instance.signals.finished.connect(self.active_worker_thread.quit) + worker_instance.signals.error.connect(self.active_worker_thread.quit) + self.active_worker_thread.finished.connect(self.active_worker_thread.deleteLater) + + self.active_worker_thread.started.connect(worker_instance.run) + self.active_worker_thread.start() + return True + + def run_macos_vm(self): + selected_version_name = self.version_combo.currentText() + self.current_container_name = get_unique_container_name() + try: + command_list = build_docker_command(selected_version_name, self.current_container_name) + self.output_area.clear() + self.output_area.append(f"Starting macOS VM creation for {selected_version_name}...") + self.output_area.append(f"Container name: {self.current_container_name}") + self.output_area.append(f"Command: {' '.join(command_list)}\n") + self.output_area.append("The macOS installation will occur in a QEMU window...\n") + + self.docker_run_worker_instance = DockerRunWorker(command_list) # Store instance + if self._start_worker(self.docker_run_worker_instance, self.docker_run_finished, self.docker_run_error): + self.run_vm_button.setEnabled(False) + self.version_combo.setEnabled(False) + self.stop_vm_button.setEnabled(True) + self.extract_images_button.setEnabled(False) + self.write_to_usb_button.setEnabled(False) + except ValueError as e: self.handle_error(f"Failed to build command: {str(e)}") + except Exception as e: self.handle_error(f"An unexpected error: {str(e)}") + + @pyqtSlot(str) + def update_output(self, text): + self.output_area.append(text.strip()) # append automatically scrolls + QApplication.processEvents() # Keep UI responsive during rapid updates + + @pyqtSlot(str) + def docker_run_finished(self, message): + self.output_area.append(f"\n--- macOS VM Setup Process Finished ---\n{message}") + QMessageBox.information(self, "VM Setup Complete", f"{message}\nYou can now proceed to extract images.") + self.run_vm_button.setEnabled(True) + self.version_combo.setEnabled(True) + self.stop_vm_button.setEnabled(False) + self.extract_images_button.setEnabled(True) + self.stop_container_button.setEnabled(True) + self.active_worker_thread = None # Allow new worker + + @pyqtSlot(str) + def docker_run_error(self, error_message): + self.output_area.append(f"\n--- macOS VM Setup Process Error ---\n{error_message}") + if "exited with code" in error_message and self.current_container_name: + QMessageBox.warning(self, "VM Setup Ended", f"{error_message}\nAssuming macOS setup was attempted...") + self.extract_images_button.setEnabled(True) + self.stop_container_button.setEnabled(True) + else: QMessageBox.critical(self, "VM Setup Error", error_message) + self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True); self.stop_vm_button.setEnabled(False) + self.active_worker_thread = None + + def stop_docker_run_process(self): + if hasattr(self, 'docker_run_worker_instance') and self.docker_run_worker_instance: + self.output_area.append("\n--- Attempting to stop macOS VM creation ---") + self.docker_run_worker_instance.stop() # Worker should handle signal emission + self.stop_vm_button.setEnabled(False) # Disable to prevent multiple clicks + + def extract_vm_images(self): + if not self.current_container_name: + QMessageBox.warning(self, "Warning", "No active container specified for extraction."); return + save_dir = QFileDialog.getExistingDirectory(self, "Select Directory to Save VM Images") + if not save_dir: return + + self.output_area.append(f"\n--- Starting Image Extraction from {self.current_container_name} to {save_dir} ---") + self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False) + + self.extracted_main_image_path = os.path.join(save_dir, "mac_hdd_ng.img") + self.extracted_opencore_image_path = os.path.join(save_dir, "OpenCore.qcow2") + self.extraction_status = {"main": False, "opencore": False} + + cp_main_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_MACOS_IMG_PATH, self.extracted_main_image_path) + main_worker = DockerCommandWorker(cp_main_cmd, f"Main macOS image copied to {self.extracted_main_image_path}") + if not self._start_worker(main_worker, + lambda msg: self.docker_utility_finished(msg, "main_img_extract"), + lambda err: self.docker_utility_error(err, "main_img_extract_error")): + self.extract_images_button.setEnabled(True) # Re-enable if start failed + return # Don't proceed to second if first failed to start + + self.output_area.append(f"Extraction for main image started. OpenCore extraction will follow.") + + + def _start_opencore_extraction(self): # Called after main image extraction finishes + if not self.current_container_name or not self.extracted_opencore_image_path: return + + cp_oc_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_OPENCORE_QCOW2_PATH, self.extracted_opencore_image_path) + oc_worker = DockerCommandWorker(cp_oc_cmd, f"OpenCore image copied to {self.extracted_opencore_image_path}") + self._start_worker(oc_worker, + lambda msg: self.docker_utility_finished(msg, "oc_img_extract"), + lambda err: self.docker_utility_error(err, "oc_img_extract_error")) + + + def stop_persistent_container(self): + if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return + self.output_area.append(f"\n--- Stopping container {self.current_container_name} ---") + cmd = build_docker_stop_command(self.current_container_name) + worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} stopped.") + if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "stop_container"), + lambda err: self.docker_utility_error(err, "stop_container_error")): + self.stop_container_button.setEnabled(False) + + + def remove_persistent_container(self): + if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return + reply = QMessageBox.question(self, 'Confirm Remove', f"Remove container '{self.current_container_name}'?", + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) + if reply == QMessageBox.StandardButton.No: return + self.output_area.append(f"\n--- Removing container {self.current_container_name} ---") + cmd = build_docker_rm_command(self.current_container_name) + worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} removed.") + if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "rm_container"), + lambda err: self.docker_utility_error(err, "rm_container_error")): + self.remove_container_button.setEnabled(False) + + + def docker_utility_finished(self, message, task_id): + self.output_area.append(f"\n--- Task '{task_id}' Succeeded ---\n{message}") + QMessageBox.information(self, f"Task Complete", message) + self.active_worker_thread = None # Allow new worker + + if task_id == "main_img_extract": + self.extraction_status["main"] = True + self._start_opencore_extraction() # Start next part of extraction + elif task_id == "oc_img_extract": + self.extraction_status["opencore"] = True + + if self.extraction_status.get("main") and self.extraction_status.get("opencore"): + self.output_area.append("\nBoth VM images extracted successfully.") + self.update_write_to_usb_button_state() + self.extract_images_button.setEnabled(True) + elif task_id.startswith("extract"): # If one part finished but not both + self.extract_images_button.setEnabled(True) + + if task_id == "stop_container": + self.remove_container_button.setEnabled(True) + if task_id == "rm_container": + self.current_container_name = None + self.stop_container_button.setEnabled(False) + self.extract_images_button.setEnabled(False) + self.update_write_to_usb_button_state() # Should disable it + + + def docker_utility_error(self, error_message, task_id): + self.output_area.append(f"\n--- Task '{task_id}' Error ---\n{error_message}") + QMessageBox.critical(self, f"Task Error", error_message) + self.active_worker_thread = None + if task_id.startswith("extract"): self.extract_images_button.setEnabled(True) + if task_id == "stop_container": self.stop_container_button.setEnabled(True) # Allow retry + if task_id == "rm_container": self.remove_container_button.setEnabled(True) # Allow retry + + + def handle_error(self, message): + self.output_area.append(f"ERROR: {message}") + QMessageBox.critical(self, "Error", message) + self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True) + self.stop_vm_button.setEnabled(False); self.extract_images_button.setEnabled(False) + self.write_to_usb_button.setEnabled(False) + self.active_worker_thread = None + + def refresh_usb_drives(self): + self.usb_drive_combo.clear() + self._current_usb_selection_path = self.usb_drive_combo.currentData() # Save current selection + self.output_area.append("\nScanning for USB drives...") + try: + partitions = psutil.disk_partitions(all=False) + potential_usbs = [] + for p in partitions: + is_removable = 'removable' in p.opts + is_likely_usb = False + + if platform.system() == "Windows": + # A more reliable method for Windows would involve WMI or ctypes to query drive types. + # This is a basic filter. + if p.mountpoint and p.fstype and p.fstype.lower() not in ['ntfs', 'refs', 'cdfs'] and len(p.mountpoint) <= 3: # e.g. E:\ + is_likely_usb = True + elif platform.system() == "Darwin": + if p.device.startswith("/dev/disk") and (os.path.exists(f"/sys/block/{os.path.basename(p.device)}/removable") or "external" in p.opts.lower()): # Check 'external' from mount options + is_likely_usb = True + elif platform.system() == "Linux": + # Check if /sys/block/sdX/removable exists and is 1 + try: + with open(f"/sys/block/{os.path.basename(p.device)}/removable", "r") as f: + if f.read().strip() == "1": + is_likely_usb = True + except IOError: # If the removable file doesn't exist, it's likely not a USB mass storage + pass + if not is_likely_usb and (p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)): # Fallback to mountpoint + is_likely_usb = True + + if is_removable or is_likely_usb: + try: + # Attempt to get disk usage. If it fails, it might be an unformatted or problematic drive. + usage = psutil.disk_usage(p.mountpoint) + size_gb = usage.total / (1024**3) + if size_gb < 0.1 : continue + drive_text = f"{p.device} @ {p.mountpoint} ({p.fstype}, {size_gb:.2f} GB)" + potential_usbs.append((drive_text, p.device)) + except Exception: pass + + idx_to_select = -1 + if potential_usbs: + for i, (text, device_path) in enumerate(potential_usbs): + self.usb_drive_combo.addItem(text, userData=device_path) + if device_path == self._current_usb_selection_path: + idx_to_select = i + self.output_area.append(f"Found {len(potential_usbs)} potential USB drive(s). Please verify carefully.") + else: self.output_area.append("No suitable USB drives found. Ensure drive is connected, formatted, and mounted.") + + if idx_to_select != -1: self.usb_drive_combo.setCurrentIndex(idx_to_select) + + except ImportError: self.output_area.append("psutil library not found. USB detection disabled.") + except Exception as e: self.output_area.append(f"Error scanning for USB drives: {e}") + self.update_write_to_usb_button_state() + + + def handle_write_to_usb(self): + if platform.system() != "Linux": + QMessageBox.warning(self, "Unsupported Platform", f"USB writing is currently only implemented for Linux. Your system: {platform.system()}") + return + + if USBWriterLinux is None: + QMessageBox.critical(self, "Error", "USBWriterLinux module could not be loaded. Cannot write to USB.") + return + + selected_drive_device = self.usb_drive_combo.currentData() + if not self.extracted_main_image_path or not self.extracted_opencore_image_path or not self.extraction_status["main"] or not self.extraction_status["opencore"]: + QMessageBox.warning(self, "Missing Images", "Ensure both images are extracted."); return + if not selected_drive_device: + QMessageBox.warning(self, "No USB Selected", "Please select a target USB drive."); return + + confirm_msg = (f"WARNING: ALL DATA ON {selected_drive_device} WILL BE ERASED PERMANENTLY.\n" + "Are you absolutely sure you want to proceed?") + reply = QMessageBox.warning(self, "Confirm Write Operation", confirm_msg, + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel, + QMessageBox.StandardButton.Cancel) + if reply == QMessageBox.StandardButton.Cancel: + self.output_area.append("\nUSB write operation cancelled by user."); return + + self.output_area.append(f"\n--- Starting USB Write Process for {selected_drive_device} ---") + self.output_area.append("This will take a long time and requires sudo privileges for underlying commands.") + + usb_worker = USBWriterWorker(selected_drive_device, self.extracted_opencore_image_path, self.extracted_main_image_path) + if self._start_worker(usb_worker, self.usb_write_finished, self.usb_write_error): + self.write_to_usb_button.setEnabled(False) # Disable during write + self.refresh_usb_button.setEnabled(False) + else: # Failed to start worker (another is running) + pass # Message already shown by _start_worker + + + @pyqtSlot(str) + def usb_write_finished(self, message): + self.output_area.append(f"\n--- USB Write Process Finished ---\n{message}") + QMessageBox.information(self, "USB Write Complete", message) + self.write_to_usb_button.setEnabled(True) # Re-enable after completion + self.refresh_usb_button.setEnabled(True) + self.active_worker_thread = None + + @pyqtSlot(str) + def usb_write_error(self, error_message): + self.output_area.append(f"\n--- USB Write Process Error ---\n{error_message}") + QMessageBox.critical(self, "USB Write Error", error_message) + self.write_to_usb_button.setEnabled(True) # Re-enable after error + self.refresh_usb_button.setEnabled(True) + self.active_worker_thread = None + + def update_write_to_usb_button_state(self): + images_ready = self.extraction_status.get("main", False) and self.extraction_status.get("opencore", False) + usb_selected = bool(self.usb_drive_combo.currentData()) + can_write_on_platform = platform.system() == "Linux" and USBWriterLinux is not None + + self.write_to_usb_button.setEnabled(images_ready and usb_selected and can_write_on_platform) + if not can_write_on_platform and usb_selected and images_ready: + self.write_to_usb_button.setToolTip("USB writing currently only supported on Linux with all dependencies.") + else: + self.write_to_usb_button.setToolTip("") + + + def closeEvent(self, event): + if self.active_worker_thread and self.active_worker_thread.isRunning(): + reply = QMessageBox.question(self, 'Confirm Exit', "An operation is running. Exit anyway?", + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) + if reply == QMessageBox.StandardButton.Yes: + # Attempt to stop the specific worker if identifiable, or just quit thread + # For DockerRunWorker: + if hasattr(self, 'docker_run_worker_instance') and self.active_worker_thread.findChild(DockerRunWorker): + self.docker_run_worker_instance.stop() + # For USBWriterWorker, it doesn't have an explicit stop, rely on thread termination. + + self.active_worker_thread.quit() + if not self.active_worker_thread.wait(1000): # brief wait + self.output_area.append("Worker thread did not terminate gracefully. Forcing exit.") + event.accept() + else: event.ignore() + elif self.current_container_name and self.stop_container_button.isEnabled(): # Check only if stop button is enabled (meaning container might be running or exists) + reply = QMessageBox.question(self, 'Confirm Exit', f"Container '{self.current_container_name}' may still exist or be running. It's recommended to stop and remove it using the GUI buttons. Exit anyway?", + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) + if reply == QMessageBox.StandardButton.Yes: event.accept() + else: event.ignore() + else: + event.accept() + + +if __name__ == "__main__": + app = QApplication(sys.argv) + window = MainWindow() + window.show() + sys.exit(app.exec()) diff --git a/usb_writer_linux.py b/usb_writer_linux.py new file mode 100644 index 0000000..7442a0b --- /dev/null +++ b/usb_writer_linux.py @@ -0,0 +1,260 @@ +# usb_writer_linux.py +import subprocess +import os +import time + +# Placeholder for progress reporting signal if this were a QObject +# from PyQt6.QtCore import pyqtSignal + +class USBWriterLinux: + # progress_signal = pyqtSignal(str) # Example for QObject integration + + def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str, progress_callback=None): + """ + Args: + device: The path to the USB device (e.g., /dev/sdx). + opencore_qcow2_path: Path to the OpenCore.qcow2 image. + macos_qcow2_path: Path to the mac_hdd_ng.img (qcow2). + progress_callback: A function to call with progress strings. + """ + self.device = device + self.opencore_qcow2_path = opencore_qcow2_path + self.macos_qcow2_path = macos_qcow2_path + self.progress_callback = progress_callback + + self.opencore_raw_path = "opencore.raw" # Temporary raw image + self.macos_raw_path = "macos_main.raw" # Temporary raw image + self.mount_point_opencore_efi = "/mnt/opencore_efi_temp" + self.mount_point_usb_esp = "/mnt/usb_esp_temp" + + + def _report_progress(self, message: str): + print(message) # For standalone testing + if self.progress_callback: + self.progress_callback(message) + + def _run_command(self, command: list[str], check=True, capture_output=False, shell=False): + self._report_progress(f"Executing: {' '.join(command)}") + try: + process = subprocess.run( + command, + check=check, + capture_output=capture_output, + text=True, + shell=shell # Use shell=True with caution + ) + if capture_output: + if process.stdout: self._report_progress(f"STDOUT: {process.stdout.strip()}") + if process.stderr: self._report_progress(f"STDERR: {process.stderr.strip()}") + return process + except subprocess.CalledProcessError as e: + self._report_progress(f"Error executing {' '.join(command)}: {e}") + if e.stderr: self._report_progress(f"STDERR: {e.stderr.strip()}") + if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}") + raise + except FileNotFoundError: + self._report_progress(f"Error: Command {command[0]} not found. Is it installed and in PATH?") + raise + + def _cleanup_temp_files(self): + self._report_progress("Cleaning up temporary files...") + for f_path in [self.opencore_raw_path, self.macos_raw_path]: + if os.path.exists(f_path): + try: + os.remove(f_path) + self._report_progress(f"Removed {f_path}") + except OSError as e: + self._report_progress(f"Error removing {f_path}: {e}") + + def _unmount_and_remove_dir(self, mount_point): + if os.path.ismount(mount_point): + self._run_command(["sudo", "umount", mount_point], check=False) + if os.path.exists(mount_point): + try: + os.rmdir(mount_point) + except OSError as e: + self._report_progress(f"Could not rmdir {mount_point}: {e}. May need manual cleanup.") + + + def _cleanup_mappings_and_mounts(self): + self._report_progress("Cleaning up mappings and mounts...") + self._unmount_and_remove_dir(self.mount_point_opencore_efi) + self._unmount_and_remove_dir(self.mount_point_usb_esp) + + # Unmap kpartx devices - this is tricky as we don't know the loop device name easily without parsing + # For OpenCore raw image + if os.path.exists(self.opencore_raw_path): + self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path], check=False) + # For the USB device itself, if kpartx was used on it (it shouldn't be for this workflow) + # self._run_command(["sudo", "kpartx", "-d", self.device], check=False) + + + def check_dependencies(self): + self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat)...") + dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat"] + for dep in dependencies: + try: + self._run_command([dep, "--version" if dep != "kpartx" and dep != "mkfs.vfat" else "-V"], capture_output=True) # kpartx has no version, mkfs.vfat uses -V + except (FileNotFoundError, subprocess.CalledProcessError) as e: + self._report_progress(f"Dependency {dep} not found or not working: {e}") + raise RuntimeError(f"Dependency {dep} not found. Please install it.") + self._report_progress("All dependencies found.") + return True + + def format_and_write(self) -> bool: + try: + self.check_dependencies() + + self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!") + # Unmount any existing partitions on the target USB device + self._report_progress(f"Unmounting all partitions on {self.device}...") + for i in range(1, 5): # Try to unmount a few potential partitions + self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False) + self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False) # for nvme like + + # Create new GPT partition table + self._report_progress(f"Creating new GPT partition table on {self.device}...") + self._run_command(["sudo", "parted", "-s", self.device, "mklabel", "gpt"]) + + # Create EFI partition (e.g., 512MB) + self._report_progress("Creating EFI partition (ESP)...") + self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "EFI", "fat32", "1MiB", "513MiB"]) + self._run_command(["sudo", "parted", "-s", self.device, "set", "1", "esp", "on"]) + + # Create macOS partition (remaining space) + self._report_progress("Creating macOS partition...") + self._run_command(["sudo", "parted", "-s", self.device, "mkpart", "macOS", "hfs+", "513MiB", "100%"]) + + # Inform kernel of partition changes + self._run_command(["sudo", "partprobe", self.device]) + time.sleep(2) # Give kernel time to recognize new partitions + + # Determine partition names (e.g., /dev/sdx1, /dev/sdx2) + # This can be unreliable. A better way is `lsblk -jo NAME,PATH /dev/sdx` + # For simplicity, assuming /dev/sdx1 for ESP, /dev/sdx2 for macOS partition + esp_partition = f"{self.device}1" + if not os.path.exists(esp_partition): esp_partition = f"{self.device}p1" # for nvme like /dev/nvme0n1p1 + + macos_partition = f"{self.device}2" + if not os.path.exists(macos_partition): macos_partition = f"{self.device}p2" + + if not (os.path.exists(esp_partition) and os.path.exists(macos_partition)): + self._report_progress(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition} and {macos_partition}") + # Attempt to find them via lsblk if possible (more robust) + try: + lsblk_out = self._run_command(["lsblk", "-no", "NAME", "--paths", self.device], capture_output=True, check=True).stdout.strip().splitlines() + if len(lsblk_out) > 2 : # Device itself + at least 2 partitions + esp_partition = lsblk_out[1] + macos_partition = lsblk_out[2] + self._report_progress(f"Determined partitions using lsblk: ESP={esp_partition}, macOS={macos_partition}") + else: + raise RuntimeError("lsblk did not return enough partitions.") + except Exception as e_lsblk: + self._report_progress(f"Failed to determine partitions using lsblk: {e_lsblk}") + raise RuntimeError("Could not determine partition device names after partitioning.") + + + # Format ESP as FAT32 + self._report_progress(f"Formatting ESP ({esp_partition}) as FAT32...") + self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition]) + + # --- Write EFI content --- + self._report_progress(f"Converting OpenCore QCOW2 image ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...") + self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path]) + + self._report_progress(f"Mapping partitions from {self.opencore_raw_path}...") + map_output = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout + self._report_progress(f"kpartx output: {map_output}") + # Example output: add map loop0p1 (253:0): 0 1048576 linear /dev/loop0 2048 + # We need to parse "loop0p1" or similar from this. + mapped_efi_partition_name = None + for line in map_output.splitlines(): + if "loop" in line and "p1" in line: # Assuming first partition is EFI + parts = line.split() + if len(parts) > 2: + mapped_efi_partition_name = parts[2] # e.g., loop0p1 + break + + if not mapped_efi_partition_name: + raise RuntimeError(f"Could not determine mapped EFI partition name from kpartx output for {self.opencore_raw_path}.") + + mapped_efi_device = f"/dev/mapper/{mapped_efi_partition_name}" + self._report_progress(f"Mapped OpenCore EFI partition: {mapped_efi_device}") + + os.makedirs(self.mount_point_opencore_efi, exist_ok=True) + os.makedirs(self.mount_point_usb_esp, exist_ok=True) + + self._report_progress(f"Mounting {mapped_efi_device} to {self.mount_point_opencore_efi}...") + self._run_command(["sudo", "mount", "-o", "ro", mapped_efi_device, self.mount_point_opencore_efi]) + + self._report_progress(f"Mounting USB ESP ({esp_partition}) to {self.mount_point_usb_esp}...") + self._run_command(["sudo", "mount", esp_partition, self.mount_point_usb_esp]) + + self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi} to {self.mount_point_usb_esp}...") + # Copy contents of EFI folder + source_efi_dir = os.path.join(self.mount_point_opencore_efi, "EFI") + if not os.path.exists(source_efi_dir): # Sometimes it's directly in the root of the partition image + source_efi_dir = self.mount_point_opencore_efi + + self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_dir}/", f"{self.mount_point_usb_esp}/"]) + + + self._report_progress("Unmounting OpenCore EFI and USB ESP...") + self._run_command(["sudo", "umount", self.mount_point_opencore_efi]) + self._run_command(["sudo", "umount", self.mount_point_usb_esp]) + self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path]) # Unmap loop device + + # --- Write macOS main image --- + self._report_progress(f"Converting macOS QCOW2 image ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...") + self._report_progress("This may take a very long time and consume significant disk space temporarily.") + # Add dd progress status if possible, or estimate time based on size + # For qemu-img, there's no easy progress for convert. + self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path]) + + self._report_progress(f"Writing RAW macOS image ({self.macos_raw_path}) to {macos_partition}...") + self._report_progress("This will also take a very long time. Please be patient.") + # Using dd with progress status + dd_command = ["sudo", "dd", f"if={self.macos_raw_path}", f"of={macos_partition}", "bs=4M", "status=progress", "conv=fsync"] + self._run_command(dd_command) + + self._report_progress("USB writing process completed successfully.") + return True + + except Exception as e: + self._report_progress(f"An error occurred during USB writing: {e}") + return False + finally: + self._cleanup_mappings_and_mounts() + self._cleanup_temp_files() + +if __name__ == '__main__': + # This is for standalone testing of this script. + # YOU MUST RUN THIS SCRIPT WITH SUDO for it to work. + # BE EXTREMELY CAREFUL with the device path. + if os.geteuid() != 0: + print("Please run this script as root (sudo) for testing.") + exit(1) + + print("USB Writer Linux Standalone Test") + # Replace with actual paths to your QCOW2 files for testing + test_opencore_qcow2 = "path_to_your/OpenCore.qcow2" + test_macos_qcow2 = "path_to_your/mac_hdd_ng.img" + + # IMPORTANT: List available block devices to help user choose. + print("\nAvailable block devices (be careful!):") + subprocess.run(["lsblk", "-d", "-o", "NAME,SIZE,MODEL"], check=True) + + test_device = input("\nEnter target device (e.g., /dev/sdX). THIS DEVICE WILL BE WIPED: ") + if not test_device or not test_device.startswith("/dev/"): + print("Invalid device. Exiting.") + exit(1) + + if not (os.path.exists(test_opencore_qcow2) and os.path.exists(test_macos_qcow2)): + print(f"Test files {test_opencore_qcow2} or {test_macos_qcow2} not found. Skipping write test.") + else: + confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write images? (yes/NO): ") + if confirm.lower() == 'yes': + writer = USBWriterLinux(test_device, test_opencore_qcow2, test_macos_qcow2, print) + writer.format_and_write() + else: + print("Test cancelled by user.") diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..6395aab --- /dev/null +++ b/utils.py @@ -0,0 +1,126 @@ +# utils.py + +import time +import uuid +from constants import ( + DOCKER_IMAGE_BASE, + DEFAULT_DOCKER_PARAMS, + VERSION_SPECIFIC_PARAMS, + MACOS_VERSIONS +) + +# Path to the generated images inside the Docker container +CONTAINER_MACOS_IMG_PATH = "/home/arch/OSX-KVM/mac_hdd_ng.img" +# The OpenCore.qcow2 path can vary if BOOTDISK env var is used. +# The default generated one by the scripts (if not overridden by BOOTDISK) is: +CONTAINER_OPENCORE_QCOW2_PATH = "/home/arch/OSX-KVM/OpenCore/OpenCore.qcow2" + + +def get_unique_container_name() -> str: + """Generates a unique Docker container name.""" + return f"skyscope-osx-vm-{uuid.uuid4().hex[:8]}" + +def build_docker_command(macos_version_name: str, container_name: str) -> list[str]: + """ + Builds the docker run command arguments as a list. + + Args: + macos_version_name: The display name of the macOS version (e.g., "Sonoma"). + container_name: The unique name for the Docker container. + + Returns: + A list of strings representing the docker command and its arguments. + """ + if macos_version_name not in MACOS_VERSIONS: + raise ValueError(f"Unsupported macOS version: {macos_version_name}") + + image_tag = MACOS_VERSIONS[macos_version_name] + full_image_name = f"{DOCKER_IMAGE_BASE}:{image_tag}" + + # Removed --rm: we need the container to persist for file extraction + final_command_args = ["docker", "run", "-it", "--name", container_name] + + # Base parameters for the docker command + run_params = DEFAULT_DOCKER_PARAMS.copy() + + # Override/extend with version-specific parameters + if macos_version_name in VERSION_SPECIFIC_PARAMS: + version_specific = VERSION_SPECIFIC_PARAMS[macos_version_name] + + # More robustly handle environment variables (-e) + # Collect all -e keys from defaults and version-specific + default_env_vars = {k.split(" ", 1)[1].split("=")[0]: v for k, v in DEFAULT_DOCKER_PARAMS.items() if k.startswith("-e ")} + version_env_vars = {k.split(" ", 1)[1].split("=")[0]: v for k, v in version_specific.items() if k.startswith("-e ")} + + merged_env_vars = {**default_env_vars, **version_env_vars} + + # Remove all old -e params from run_params before adding merged ones + keys_to_remove_from_run_params = [k_param for k_param in run_params if k_param.startswith("-e ")] + for k_rem in keys_to_remove_from_run_params: + del run_params[k_rem] + + # Add merged env vars back with the "-e VAR_NAME" format for keys + for env_name, env_val_str in merged_env_vars.items(): + run_params[f"-e {env_name}"] = env_val_str + + # Add other non -e version-specific params + for k, v in version_specific.items(): + if not k.startswith("-e "): + run_params[k] = v + + # Construct the command list + for key, value in run_params.items(): + if key.startswith("-e "): + # Key is like "-e VARNAME", value is the actual value string like "'data'" or "GENERATE_UNIQUE='true'" + env_var_name_from_key = key.split(" ", 1)[1] # e.g. GENERATE_UNIQUE or CPU + + # If value string itself contains '=', it's likely the full 'VAR=val' form + if isinstance(value, str) and '=' in value and value.strip("'").upper().startswith(env_var_name_from_key.upper()): + # e.g. value is "GENERATE_UNIQUE='true'" + final_env_val = value.strip("'") + else: + # e.g. value is "'true'" for key "-e GENERATE_UNIQUE" + final_env_val = f"{env_var_name_from_key}={value.strip("'")}" + final_command_args.extend(["-e", final_env_val]) + else: # for --device, -p, -v + final_command_args.extend([key, value.strip("'")]) # Strip quotes for safety + + final_command_args.append(full_image_name) + + return final_command_args + +def build_docker_cp_command(container_name_or_id: str, container_path: str, host_path: str) -> list[str]: + """Builds the 'docker cp' command.""" + return ["docker", "cp", f"{container_name_or_id}:{container_path}", host_path] + +def build_docker_stop_command(container_name_or_id: str) -> list[str]: + """Builds the 'docker stop' command.""" + return ["docker", "stop", container_name_or_id] + +def build_docker_rm_command(container_name_or_id: str) -> list[str]: + """Builds the 'docker rm' command.""" + return ["docker", "rm", container_name_or_id] + + +if __name__ == '__main__': + # Test the functions + container_name = get_unique_container_name() + print(f"Generated container name: {container_name}") + + for version_name_key in MACOS_VERSIONS.keys(): + print(f"Command for {version_name_key}:") + cmd_list = build_docker_command(version_name_key, container_name) + print(" ".join(cmd_list)) + print("-" * 20) + + test_container_id = container_name # or an actual ID + print(f"CP Main Image: {' '.join(build_docker_cp_command(test_container_id, CONTAINER_MACOS_IMG_PATH, './mac_hdd_ng.img'))}") + print(f"CP OpenCore Image: {' '.join(build_docker_cp_command(test_container_id, CONTAINER_OPENCORE_QCOW2_PATH, './OpenCore.qcow2'))}") + print(f"Stop Command: {' '.join(build_docker_stop_command(test_container_id))}") + print(f"Remove Command: {' '.join(build_docker_rm_command(test_container_id))}") + + # Test with a non-existent version + try: + build_docker_command("NonExistentVersion", container_name) + except ValueError as e: + print(e)