Jules was unable to complete the task in time. Please review the work done so far and provide feedback for Jules to continue.

This commit is contained in:
google-labs-jules[bot] 2025-06-06 01:38:32 +00:00
parent e81120e8e9
commit e9ffd575ca
19 changed files with 617 additions and 712 deletions

View File

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>ACPI</key>
<dict>
<key>Add</key> <array/>
<key>Delete</key> <array/>
<key>Patch</key> <array/>
<key>Quirks</key>
<dict>
<key>FadtEnableReset</key> <false/>
<key>NormalizeHeaders</key> <false/>
<key>RebaseRegions</key> <false/>
<key>ResetHwSig</key> <false/>
<key>ResetLogoStatus</key> <true/>
<key>SyncTableIds</key> <false/>
</dict>
</dict>
<key>Booter</key>
<dict>
<key>MmioWhitelist</key> <array/>
<key>Patch</key> <array/>
<key>Quirks</key>
<dict>
<key>AllowRelocationBlock</key> <false/>
<key>AvoidRuntimeDefrag</key> <true/>
<key>DevirtualiseMmio</key> <false/> <!-- Change to true for Alder Lake B660/Z690 if needed -->
<key>DisableSingleUser</key> <false/>
<key>DisableVariableWrite</key> <false/>
<key>DiscardHibernateMap</key> <false/>
<key>EnableSafeModeSlide</key> <true/>
<key>EnableWriteUnprotector</key> <false/> <!-- Keep false, OpenRuntime handles this -->
<key>ForceBooterSignature</key> <false/>
<key>ForceExitBootServices</key> <false/>
<key>ProtectMemoryRegions</key> <false/>
<key>ProtectSecureBoot</key> <false/>
<key>ProtectUefiServices</key> <false/>
<key>ProvideCustomSlide</key> <true/>
<key>ProvideMaxSlide</key> <integer>0</integer>
<key>RebuildAppleMemoryMap</key> <false/> <!-- Change to true for Alder Lake if needed -->
<key>ResizeAppleGpuBars</key> <integer>-1</integer>
<key>SetupVirtualMap</key> <true/>
<key>SignalAppleOS</key> <false/>
<key>SyncRuntimePermissions</key> <false/> <!-- Change to true for Alder Lake if needed -->
</dict>
</dict>
<key>DeviceProperties</key> <dict><key>Add</key><dict/><key>Delete</key><dict/></dict>
<key>Kernel</key>
<dict>
<key>Add</key> <array>
<!-- Lilu -->
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>Lilu.kext</string><key>Comment</key><string>Patch engine</string><key>Enabled</key><true/><key>ExecutablePath</key><string>Contents/MacOS/Lilu</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
<!-- VirtualSMC -->
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>VirtualSMC.kext</string><key>Comment</key><string>SMC emulator</string><key>Enabled</key><true/><key>ExecutablePath</key><string>Contents/MacOS/VirtualSMC</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
<!-- WhateverGreen -->
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>WhateverGreen.kext</string><key>Comment</key><string>Video patches</string><key>Enabled</key><true/><key>ExecutablePath</key><string>Contents/MacOS/WhateverGreen</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
<!-- AppleALC -->
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>AppleALC.kext</string><key>Comment</key><string>Audio patches</string><key>Enabled</key><false/><key>ExecutablePath</key><string>Contents/MacOS/AppleALC</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
<!-- Ethernet Kexts (disabled by default, enabled by plist_modifier) -->
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>IntelMausi.kext</string><key>Comment</key><string>Intel Ethernet</string><key>Enabled</key><false/><key>ExecutablePath</key><string>Contents/MacOS/IntelMausi</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>RealtekRTL8111.kext</string><key>Comment</key><string>Realtek RTL8111</string><key>Enabled</key><false/><key>ExecutablePath</key><string>Contents/MacOS/RealtekRTL8111</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
<dict><key>Arch</key><string>Any</string><key>BundlePath</key><string>LucyRTL8125Ethernet.kext</string><key>Comment</key><string>Realtek RTL8125</string><key>Enabled</key><false/><key>ExecutablePath</key><string>Contents/MacOS/LucyRTL8125Ethernet</string><key>MaxKernel</key><string></string><key>MinKernel</key><string></string><key>PlistPath</key><string>Contents/Info.plist</string></dict>
</array>
<key>Block</key> <array/> <key>Emulate</key> <dict/> <key>Force</key> <array/> <key>Patch</key> <array/>
<key>Quirks</key>
<dict>
<key>AppleCpuPmCfgLock</key> <false/> <key>AppleXcpmCfgLock</key> <true/> <key>AppleXcpmExtraMsrs</key> <false/>
<key>AppleXcpmForceBoost</key> <false/> <key>CustomPciSerialDevice</key> <false/> <key>CustomSMBIOSGuid</key> <false/>
<key>DisableIoMapper</key> <true/> <key>DisableLinkeditJettison</key> <true/> <key>DisableRtcChecksum</key> <false/>
<key>ExtendBTFeatureFlags</key> <false/> <key>ExternalDiskIcons</key> <false/> <key>ForceAquantiaEthernet</key> <false/>
<key>ForceSecureBootScheme</key> <false/> <key>IncreasePciBarSize</key> <false/> <key>LapicKernelPanic</key> <false/>
<key>LegacyCommpage</key> <false/> <key>PanicNoKextDump</key> <true/> <key>PowerTimeoutKernelPanic</key> <true/>
<key>ProvideCurrentCpuInfo</key> <false/> <key>SetApfsTrimTimeout</key> <integer>-1</integer>
<key>ThirdPartyDrives</key> <false/> <key>XhciPortLimit</key> <false/>
</dict>
<key>Scheme</key> <dict><key>CustomKernel</key><false/><key>FuzzyMatch</key><true/><key>KernelArch</key><string>Auto</string><key>KernelCache</key><string>Auto</string></dict>
</dict>
<key>Misc</key> <dict> <key>BlessOverride</key><array/><key>Boot</key><dict><key>ConsoleAttributes</key><integer>0</integer><key>HibernateMode</key><string>None</string><key>HibernateSkipsPicker</key><false/><key>HideAuxiliary</key><false/><key>LauncherOption</key><string>Disabled</string><key>LauncherPath</key><string>Default</string><key>PickerAttributes</key><integer>17</integer><key>PickerAudioAssist</key><false/><key>PickerMode</key><string>External</string><key>PickerVariant</key><string>Auto</string><key>PollAppleHotKeys</key><true/><key>ShowPicker</key><true/><key>TakeoffDelay</key><integer>0</integer><key>Timeout</key><integer>5</integer></dict><key>Debug</key><dict><key>AppleDebug</key><false/><key>ApplePanic</key><false/><key>DisableWatchDog</key><true/><key>DisplayDelay</key><integer>0</integer><key>DisplayLevel</key><integer>2147483650</integer><key>LogModules</key><string>*</string><key>SysReport</key><false/><key>Target</key><integer>3</integer></dict><key>Entries</key><array/><key>Security</key><dict><key>AllowSetDefault</key><true/><key>ApECID</key><integer>0</integer><key>AuthRestart</key><false/><key>BlacklistAppleUpdate</key><true/><key>DmgLoading</key><string>Signed</string><key>EnablePassword</key><false/><key>ExposeSensitiveData</key><integer>6</integer><key>HaltLevel</key><integer>2147483648</integer><key>PasswordHash</key><data></data><key>PasswordSalt</key><data></data><key>ScanPolicy</key><integer>0</integer><key>SecureBootModel</key><string>Disabled</string><key>Vault</key><string>Optional</string></dict><key>Serial</key><dict><key>Init</key><false/><key>Override</key><false/></dict><key>Tools</key><array/></dict>
<key>NVRAM</key> <dict><key>Add</key><dict><key>4D1EDE05-38C7-4A6A-9CC6-4BCCA8B38C14</key><dict><key>DefaultBackgroundColor</key><data>AAAAAA==</data><key>UIScale</key><data>AQ==</data></dict><key>7C436110-AB2A-4BBB-A880-FE41995C9F82</key><dict><key>SystemAudioVolume</key><data>Rg==</data><key>boot-args</key><string>-v keepsyms=1 debug=0x100</string><key>csr-active-config</key><data>AAAAAA==</data><key>prev-lang:kbd</key><data>ZW4tVVM6MA==</data><key>run-efi-updater</key><string>No</string></dict></dict><key>Delete</key><dict><key>4D1EDE05-38C7-4A6A-9CC6-4BCCA8B38C14</key><array><string>UIScale</string><string>DefaultBackgroundColor</string></array><key>7C436110-AB2A-4BBB-A880-FE41995C9F82</key><array><string>boot-args</string></dict></dict><key>LegacySchema</key><dict/><key>WriteFlash</key><true/></dict>
<key>PlatformInfo</key> <dict><key>Automatic</key><true/><key>CustomMemory</key><false/><key>Generic</key><dict><key>AdviseFeatures</key><false/><key>MLB</key><string>PLEASE_REPLACE_MLB</string><key>MaxBIOSVersion</key><false/><key>ProcessorType</key><integer>0</integer><key>ROM</key><data>AAAAAA==</data><key>SpoofVendor</key><true/><key>SystemMemoryStatus</key><string>Auto</string><key>SystemProductName</key><string>iMacPro1,1</string><key>SystemSerialNumber</key><string>PLEASE_REPLACE_SERIAL</string><key>SystemUUID</key><string>PLEASE_REPLACE_UUID</string></dict><key>UpdateDataHub</key><true/><key>UpdateNVRAM</key><true/><key>UpdateSMBIOS</key><true/><key>UpdateSMBIOSMode</key><string>Create</string><key>UseRawUuidEncoding</key><false/></dict>
<key>UEFI</key> <dict><key>APFS</key><dict><key>EnableJumpstart</key><true/><key>GlobalConnect</key><false/><key>HideVerbose</key><true/><key>JumpstartHotPlug</key><false/><key>MinDate</key><integer>0</integer><key>MinVersion</key><integer>0</integer></dict><key>AppleInput</key><dict><key>AppleEvent</key><string>Builtin</string><key>CustomDelays</key><false/><key>GraphicsInputMirroring</key><true/><key>KeyInitialDelay</key><integer>50</integer><key>KeySubsequentDelay</key><integer>5</integer><key>PointerSpeedDiv</key><integer>1</integer><key>PointerSpeedMul</key><integer>1</integer></dict><key>Audio</key><dict><key>AudioCodec</key><integer>0</integer><key>AudioDevice</key><string></string><key>AudioOutMask</key><integer>-1</integer><key>AudioSupport</key><false/><key>DisconnectHda</key><false/><key>MaximumGain</key><integer>-15</integer><key>MinimumAssistGain</key><integer>-30</integer><key>MinimumAudibleGain</key><integer>-55</integer><key>PlayChime</key><string>Auto</string><key>ResetTrafficClass</key><false/><key>SetupDelay</key><integer>0</integer></dict><key>ConnectDrivers</key><true/><key>Drivers</key><array><string>HfsPlus.efi</string><string>OpenRuntime.efi</string><string>OpenCanopy.efi</string></array><key>Input</key><dict><key>KeyFiltering</key><false/><key>KeyForgetThreshold</key><integer>5</integer><key>KeySupport</key><true/><key>KeySupportMode</key><string>Auto</string><key>KeySwap</key><false/><key>PointerSupport</key><false/><key>PointerSupportMode</key><string>ASUS</string><key>TimerResolution</key><integer>50000</integer></dict><key>Output</key><dict><key>ClearScreenOnModeSwitch</key><false/><key>ConsoleMode</key><string></string><key>DirectGopRendering</key><false/><key>ForceResolution</key><false/><key>GopPassThrough</key><string>Disabled</string><key>IgnoreTextInGraphics</key><false/><key>ProvideConsoleGop</key><true/><key>ReconnectGraphicsOnConnect</key><false/><key>ReconnectOnResChange</key><false/><key>ReplaceTabWithSpace</key><false/><key>Resolution</key><string>Max</string><key>SanitiseClearScreen</key><false/><key>TextRenderer</key><string>BuiltinGraphics</string><key>UIScale</key><integer>-1</integer><key>UgaPassThrough</key><false/></dict><key>ProtocolOverrides</key><dict/><key>Quirks</key><dict><key>ActivateHpetSupport</key><false/><key>DisableSecurityPolicy</key><false/><key>EnableVectorAcceleration</key><true/><key>EnableVmx</key><false/><key>ExitBootServicesDelay</key><integer>0</integer><key>ForceOcWriteFlash</key><false/><key>ForgeUefiSupport</key><false/><key>IgnoreInvalidFlexRatio</key><false/><key>ReleaseUsbOwnership</key><false/><key>ReloadOptionRoms</key><false/><key>RequestBootVarRouting</key><true/><key>ResizeGpuBars</key><integer>-1</integer><key>TscSyncTimeout</key><integer>0</integer><key>UnblockFsConnect</key><false/></dict><key>ReservedMemory</key><array/></dict>
</dict>
</plist>

View File

@ -5,25 +5,28 @@ import os
import psutil
import platform
import ctypes
import json # For parsing PowerShell JSON output
import json
import re
import traceback # For better error logging
import shutil # For shutil.which
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QComboBox, QPushButton, QTextEdit, QMessageBox, QMenuBar,
QFileDialog, QGroupBox, QLineEdit, QProgressBar
QFileDialog, QGroupBox, QLineEdit, QProgressBar, QCheckBox
)
from PyQt6.QtGui import QAction
from PyQt6.QtCore import pyqtSignal, pyqtSlot, QObject, QThread, QTimer, Qt # Added QTimer
from PyQt6.QtGui import QAction, QIcon
from PyQt6.QtCore import pyqtSignal, pyqtSlot, QObject, QThread, QTimer, Qt
# ... (Worker classes and other imports remain the same) ...
from constants import APP_NAME, DEVELOPER_NAME, BUSINESS_NAME, MACOS_VERSIONS, DOCKER_IMAGE_BASE
from utils import (
build_docker_command, get_unique_container_name,
build_docker_cp_command, CONTAINER_MACOS_IMG_PATH, CONTAINER_OPENCORE_QCOW2_PATH,
build_docker_stop_command, build_docker_rm_command
)
from constants import APP_NAME, DEVELOPER_NAME, BUSINESS_NAME, MACOS_VERSIONS
# DOCKER_IMAGE_BASE and Docker-related utils are no longer primary for this flow.
# utils.py might be refactored or parts removed later.
# Platform specific USB writers
USBWriterLinux = None
USBWriterMacOS = None
USBWriterWindows = None
USBWriterLinux = None; USBWriterMacOS = None; USBWriterWindows = None
if platform.system() == "Linux":
try: from usb_writer_linux import USBWriterLinux
except ImportError as e: print(f"Could not import USBWriterLinux: {e}")
@ -34,77 +37,102 @@ elif platform.system() == "Windows":
try: from usb_writer_windows import USBWriterWindows
except ImportError as e: print(f"Could not import USBWriterWindows: {e}")
class WorkerSignals(QObject): progress = pyqtSignal(str); finished = pyqtSignal(str); error = pyqtSignal(str)
GIBMACOS_SCRIPT_PATH = os.path.join(os.path.dirname(__file__), "scripts", "gibMacOS", "gibMacOS.py")
if not os.path.exists(GIBMACOS_SCRIPT_PATH):
GIBMACOS_SCRIPT_PATH = os.path.join(os.path.dirname(__file__), "gibMacOS.py")
class DockerPullWorker(QObject): # ... ( 그대로 )
class WorkerSignals(QObject):
progress = pyqtSignal(str)
finished = pyqtSignal(str)
error = pyqtSignal(str)
progress_value = pyqtSignal(int)
class GibMacOSWorker(QObject):
signals = WorkerSignals()
def __init__(self, image_name: str): super().__init__(); self.image_name = image_name
def __init__(self, version_key: str, download_path: str, catalog_key: str = "publicrelease"):
super().__init__()
self.version_key = version_key
self.download_path = download_path
self.catalog_key = catalog_key
self.process = None
self._is_running = True
@pyqtSlot()
def run(self):
try:
command = ["docker", "pull", self.image_name]; self.signals.progress.emit(f"Pulling Docker image: {self.image_name}...\n")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0)
if process.stdout:
for line in iter(process.stdout.readline, ''): self.signals.progress.emit(line)
process.stdout.close()
return_code = process.wait()
if return_code == 0: self.signals.finished.emit(f"Image '{self.image_name}' pulled successfully or already exists.")
else: self.signals.error.emit(f"Failed to pull image '{self.image_name}' (exit code {return_code}).")
except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.")
except Exception as e: self.signals.error.emit(f"An error occurred during docker pull: {str(e)}")
script_to_run = ""
if os.path.exists(GIBMACOS_SCRIPT_PATH):
script_to_run = GIBMACOS_SCRIPT_PATH
elif shutil.which("gibMacOS.py"): # Check if it's in PATH
script_to_run = "gibMacOS.py"
elif os.path.exists(os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "gibMacOS.py")): # Check alongside main_app.py
script_to_run = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "gibMacOS.py")
else:
self.signals.error.emit(f"gibMacOS.py not found at expected locations or in PATH.")
return
version_for_gib = MACOS_VERSIONS.get(self.version_key, self.version_key)
os.makedirs(self.download_path, exist_ok=True)
command = [sys.executable, script_to_run, "-n", "-c", self.catalog_key, "-v", version_for_gib, "-d", self.download_path]
self.signals.progress.emit(f"Downloading macOS '{self.version_key}' (as '{version_for_gib}') installer assets...\nCommand: {' '.join(command)}\nOutput will be in: {self.download_path}\n")
self.process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, bufsize=1, universal_newlines=True,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
)
class DockerRunWorker(QObject): # ... ( 그대로 )
signals = WorkerSignals()
def __init__(self, command_list): super().__init__(); self.command_list = command_list; self.process = None; self._is_running = True
@pyqtSlot()
def run(self):
try:
self.signals.progress.emit(f"Executing: {' '.join(self.command_list)}\n")
self.process = subprocess.Popen(self.command_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0)
if self.process.stdout:
for line in iter(self.process.stdout.readline, ''):
if not self._is_running: self.signals.progress.emit("Docker process stopping at user request.\n"); break
self.signals.progress.emit(line)
if not self._is_running:
self.signals.progress.emit("macOS download process stopping at user request.\n")
break
line_strip = line.strip()
self.signals.progress.emit(line_strip)
progress_match = re.search(r"(\d+)%", line_strip)
if progress_match:
try: self.signals.progress_value.emit(int(progress_match.group(1)))
except ValueError: pass
self.process.stdout.close()
return_code = self.process.wait()
if not self._is_running and return_code != 0 : self.signals.finished.emit(f"Docker process cancelled or stopped early (exit code {return_code})."); return
if return_code == 0: self.signals.finished.emit("Docker VM process (QEMU) closed by user or completed.")
else: self.signals.finished.emit(f"Docker VM process exited (code {return_code}). Assuming macOS setup was attempted or QEMU window closed.")
except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.")
except Exception as e: self.signals.error.emit(f"An error occurred during Docker run: {str(e)}")
finally: self._is_running = False
if not self._is_running and return_code != 0:
self.signals.finished.emit(f"macOS download cancelled or stopped early (exit code {return_code}).")
return
if return_code == 0:
self.signals.finished.emit(f"macOS '{self.version_key}' installer assets downloaded to '{self.download_path}'.")
else:
self.signals.error.emit(f"Failed to download macOS '{self.version_key}' (gibMacOS exit code {return_code}). Check logs.")
except FileNotFoundError:
self.signals.error.emit(f"Error: Python or gibMacOS.py script not found. Ensure Python is in PATH and gibMacOS script is correctly located.")
except Exception as e:
self.signals.error.emit(f"An error occurred during macOS download: {str(e)}\n{traceback.format_exc()}")
finally:
self._is_running = False
def stop(self):
self._is_running = False
if self.process and self.process.poll() is None:
self.signals.progress.emit("Attempting to stop Docker process...\n")
try: self.process.terminate(); self.process.wait(timeout=5)
except subprocess.TimeoutExpired: self.signals.progress.emit("Process did not terminate gracefully, killing.\n"); self.process.kill()
self.signals.progress.emit("Docker process stopped.\n")
elif self.process and self.process.poll() is not None: self.signals.progress.emit("Docker process already stopped.\n")
self.signals.progress.emit("Attempting to stop macOS download (may not be effective for active downloads)...\n")
try:
self.process.terminate(); self.process.wait(timeout=2)
except subprocess.TimeoutExpired: self.process.kill()
self.signals.progress.emit("macOS download process termination requested.\n")
class DockerCommandWorker(QObject): # ... ( 그대로 )
signals = WorkerSignals()
def __init__(self, command_list, success_message="Command completed."): super().__init__(); self.command_list = command_list; self.signals = WorkerSignals(); self.success_message = success_message
@pyqtSlot()
def run(self):
try:
self.signals.progress.emit(f"Executing: {' '.join(self.command_list)}\n"); result = subprocess.run(self.command_list, capture_output=True, text=True, check=False, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0)
if result.stdout and result.stdout.strip(): self.signals.progress.emit(result.stdout)
if result.stderr and result.stderr.strip(): self.signals.progress.emit(f"STDERR: {result.stderr}")
if result.returncode == 0: self.signals.finished.emit(self.success_message)
else: self.signals.error.emit(f"Command failed (code {result.returncode}): {result.stderr or result.stdout or 'Unknown error'}".strip())
except FileNotFoundError: self.signals.error.emit("Error: Docker command not found.")
except Exception as e: self.signals.error.emit(f"An error occurred: {str(e)}")
class USBWriterWorker(QObject):
signals = WorkerSignals()
def __init__(self, device, opencore_path, macos_path, enhance_plist: bool, target_macos_version: str): # Added new args
def __init__(self, device: str, macos_download_path: str,
enhance_plist: bool, target_macos_version: str):
super().__init__()
self.device = device
self.opencore_path = opencore_path
self.macos_path = macos_path
self.enhance_plist = enhance_plist # Store
self.target_macos_version = target_macos_version # Store
self.macos_download_path = macos_download_path
self.enhance_plist = enhance_plist
self.target_macos_version = target_macos_version
self.writer_instance = None
@pyqtSlot()
@ -119,10 +147,13 @@ class USBWriterWorker(QObject):
if writer_cls is None:
self.signals.error.emit(f"{current_os} USB writer module not available or OS not supported."); return
# Pass new args to platform writer constructor
# Platform writers' __init__ will need to be updated for macos_download_path
# This assumes usb_writer_*.py __init__ signatures are now:
# __init__(self, device, macos_download_path, progress_callback, enhance_plist_enabled, target_macos_version)
self.writer_instance = writer_cls(
self.device, self.opencore_path, self.macos_path,
progress_callback=lambda msg: self.signals.progress.emit(msg), # Ensure progress_callback is named if it's a kwarg in writers
device=self.device,
macos_download_path=self.macos_download_path,
progress_callback=lambda msg: self.signals.progress.emit(msg),
enhance_plist_enabled=self.enhance_plist,
target_macos_version=self.target_macos_version
)
@ -132,31 +163,27 @@ class USBWriterWorker(QObject):
else:
self.signals.error.emit("USB writing process failed. Check output for details.")
except Exception as e:
self.signals.error.emit(f"USB writing preparation error: {str(e)}")
self.signals.error.emit(f"USB writing preparation error: {str(e)}\n{traceback.format_exc()}")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle(APP_NAME)
self.setGeometry(100, 100, 800, 900) # Adjusted height for progress bar in status bar
self.setGeometry(100, 100, 800, 700) # Adjusted height
self.current_container_name = None; self.extracted_main_image_path = None; self.extracted_opencore_image_path = None
self.extraction_status = {"main": False, "opencore": False}; self.active_worker_thread = None
self.docker_run_worker_instance = None; self.docker_pull_worker_instance = None # Specific worker instances
self._current_usb_selection_text = None
self.active_worker_thread = None
self.macos_download_path = None
self.current_worker_instance = None
self.spinner_chars = ["|", "/", "-", "\\"]
self.spinner_index = 0
self.spinner_timer = QTimer(self)
self.spinner_timer.timeout.connect(self._update_spinner_status)
self.base_status_message = "Ready." # Default status message
self._setup_ui() # Call before using self.statusBar
self.status_bar = self.statusBar() # Initialize status bar early
self.status_bar.addPermanentWidget(self.progressBar) # Add progress bar to status bar
self.status_bar.showMessage(self.base_status_message, 5000) # Initial ready message
self.spinner_chars = ["|", "/", "-", "\\"]; self.spinner_index = 0
self.spinner_timer = QTimer(self); self.spinner_timer.timeout.connect(self._update_spinner_status)
self.base_status_message = "Ready."
self._setup_ui()
self.status_bar = self.statusBar()
# self.status_bar.addPermanentWidget(self.progress_bar) # Progress bar now in main layout
self.status_bar.showMessage(self.base_status_message, 5000)
self.refresh_usb_drives()
def _setup_ui(self):
@ -165,385 +192,229 @@ class MainWindow(QMainWindow):
about_action = QAction("&About", self); about_action.triggered.connect(self.show_about_dialog); help_menu.addAction(about_action)
central_widget = QWidget(); self.setCentralWidget(central_widget); main_layout = QVBoxLayout(central_widget)
# Steps 1, 2, 3 remain the same UI structure
vm_creation_group = QGroupBox("Step 1: Create and Install macOS VM"); vm_layout = QVBoxLayout()
# Step 1: Download macOS
download_group = QGroupBox("Step 1: Download macOS Installer Assets")
download_layout = QVBoxLayout()
selection_layout = QHBoxLayout(); self.version_label = QLabel("Select macOS Version:"); self.version_combo = QComboBox()
self.version_combo.addItems(MACOS_VERSIONS.keys()); selection_layout.addWidget(self.version_label); selection_layout.addWidget(self.version_combo)
vm_layout.addLayout(selection_layout); self.run_vm_button = QPushButton("Create VM and Start macOS Installation")
self.run_vm_button.clicked.connect(self.initiate_vm_creation_flow); vm_layout.addWidget(self.run_vm_button)
self.stop_vm_button = QPushButton("Stop/Cancel Current Docker Operation"); self.stop_vm_button.clicked.connect(self.stop_current_docker_operation)
self.stop_vm_button.setEnabled(False); vm_layout.addWidget(self.stop_vm_button); vm_creation_group.setLayout(vm_layout)
main_layout.addWidget(vm_creation_group)
extraction_group = QGroupBox("Step 2: Extract VM Images"); ext_layout = QVBoxLayout()
self.extract_images_button = QPushButton("Extract Images from Container"); self.extract_images_button.clicked.connect(self.extract_vm_images)
self.extract_images_button.setEnabled(False); ext_layout.addWidget(self.extract_images_button); extraction_group.setLayout(ext_layout)
main_layout.addWidget(extraction_group)
mgmt_group = QGroupBox("Step 3: Container Management (Optional)"); mgmt_layout = QHBoxLayout()
self.stop_container_button = QPushButton("Stop Container"); self.stop_container_button.clicked.connect(self.stop_persistent_container)
self.stop_container_button.setEnabled(False); mgmt_layout.addWidget(self.stop_container_button)
self.remove_container_button = QPushButton("Remove Container"); self.remove_container_button.clicked.connect(self.remove_persistent_container)
self.remove_container_button.setEnabled(False); mgmt_layout.addWidget(self.remove_container_button); mgmt_group.setLayout(mgmt_layout)
main_layout.addWidget(mgmt_group)
download_layout.addLayout(selection_layout)
# Step 4: USB Drive Selection - UI now adapts to Windows
usb_group = QGroupBox("Step 4: Select Target USB Drive and Write")
self.download_macos_button = QPushButton("Download macOS Installer Assets")
self.download_macos_button.clicked.connect(self.start_macos_download_flow)
download_layout.addWidget(self.download_macos_button)
self.cancel_operation_button = QPushButton("Cancel Current Operation")
self.cancel_operation_button.clicked.connect(self.stop_current_operation)
self.cancel_operation_button.setEnabled(False)
download_layout.addWidget(self.cancel_operation_button)
download_group.setLayout(download_layout)
main_layout.addWidget(download_group)
# Step 2: USB Drive Selection & Writing
usb_group = QGroupBox("Step 2: Create Bootable USB Installer")
self.usb_layout = QVBoxLayout()
self.usb_drive_label = QLabel("Available USB Drives:")
self.usb_layout.addWidget(self.usb_drive_label)
usb_selection_layout = QHBoxLayout()
self.usb_drive_combo = QComboBox()
self.usb_drive_combo.currentIndexChanged.connect(self.update_write_to_usb_button_state)
usb_selection_layout.addWidget(self.usb_drive_combo)
self.refresh_usb_button = QPushButton("Refresh List")
self.refresh_usb_button.clicked.connect(self.refresh_usb_drives)
usb_selection_layout.addWidget(self.refresh_usb_button)
self.usb_layout.addLayout(usb_selection_layout)
# Windows-specific input for disk ID - initially hidden and managed by refresh_usb_drives
self.windows_usb_guidance_label = QLabel("For Windows: Detected USB Disks (select from dropdown).")
self.windows_usb_input_label = QLabel("Manual Fallback: Enter USB Disk Number (e.g., 1, 2):")
self.windows_disk_id_input = QLineEdit()
self.windows_disk_id_input.setPlaceholderText("Enter Disk Number if dropdown empty")
self.windows_disk_id_input.textChanged.connect(self.update_write_to_usb_button_state)
self.usb_layout.addWidget(self.windows_usb_guidance_label)
self.usb_layout.addWidget(self.windows_usb_input_label)
self.usb_layout.addWidget(self.windows_disk_id_input)
# Visibility will be toggled in refresh_usb_drives based on OS
self.usb_drive_label = QLabel("Available USB Drives:"); self.usb_layout.addWidget(self.usb_drive_label)
usb_selection_layout = QHBoxLayout(); self.usb_drive_combo = QComboBox(); self.usb_drive_combo.currentIndexChanged.connect(self.update_all_button_states)
usb_selection_layout.addWidget(self.usb_drive_combo); self.refresh_usb_button = QPushButton("Refresh List"); self.refresh_usb_button.clicked.connect(self.refresh_usb_drives)
usb_selection_layout.addWidget(self.refresh_usb_button); self.usb_layout.addLayout(usb_selection_layout)
self.windows_usb_guidance_label = QLabel("For Windows: Select USB disk from dropdown (WMI). Manual input below if empty/unreliable.")
self.windows_disk_id_input = QLineEdit(); self.windows_disk_id_input.setPlaceholderText("Disk No. (e.g., 1)"); self.windows_disk_id_input.textChanged.connect(self.update_all_button_states)
if platform.system() == "Windows": self.usb_layout.addWidget(self.windows_usb_guidance_label); self.usb_layout.addWidget(self.windows_disk_id_input); self.windows_usb_guidance_label.setVisible(True); self.windows_disk_id_input.setVisible(True)
else: self.windows_usb_guidance_label.setVisible(False); self.windows_disk_id_input.setVisible(False)
self.enhance_plist_checkbox = QCheckBox("Try to auto-enhance config.plist for this system's hardware (Experimental, Linux Host Only for detection)")
self.enhance_plist_checkbox.setChecked(False) # Off by default
self.enhance_plist_checkbox.setToolTip(
"If checked, attempts to modify the OpenCore config.plist based on detected host hardware (Linux only for detection part).\n"
"This might improve compatibility for iGPU, audio, Ethernet. Use with caution."
)
self.usb_layout.addWidget(self.enhance_plist_checkbox)
warning_label = QLabel("WARNING: Selecting a drive and proceeding to write will ERASE ALL DATA on it!")
warning_label.setStyleSheet("color: red; font-weight: bold;")
self.usb_layout.addWidget(warning_label)
self.write_to_usb_button = QPushButton("Write Images to USB Drive")
self.write_to_usb_button.clicked.connect(self.handle_write_to_usb)
self.write_to_usb_button.setEnabled(False)
self.usb_layout.addWidget(self.write_to_usb_button)
usb_group.setLayout(self.usb_layout)
main_layout.addWidget(usb_group)
self.enhance_plist_checkbox.setChecked(False); self.usb_layout.addWidget(self.enhance_plist_checkbox)
warning_label = QLabel("WARNING: USB drive will be ERASED!"); warning_label.setStyleSheet("color: red; font-weight: bold;"); self.usb_layout.addWidget(warning_label)
self.write_to_usb_button = QPushButton("Create macOS Installer USB"); self.write_to_usb_button.clicked.connect(self.handle_write_to_usb)
self.write_to_usb_button.setEnabled(False); self.usb_layout.addWidget(self.write_to_usb_button); usb_group.setLayout(self.usb_layout); main_layout.addWidget(usb_group)
self.progress_bar = QProgressBar(self); self.progress_bar.setRange(0, 0); self.progress_bar.setVisible(False); main_layout.addWidget(self.progress_bar)
self.output_area = QTextEdit(); self.output_area.setReadOnly(True); main_layout.addWidget(self.output_area)
self.update_all_button_states()
# Status Bar and Progress Bar
self.statusBar = self.statusBar()
self.progressBar = QProgressBar(self)
self.progressBar.setRange(0, 0) # Indeterminate
self.progressBar.setVisible(False)
self.statusBar.addPermanentWidget(self.progressBar) # Corrected addPermanentWidget call
def show_about_dialog(self): QMessageBox.about(self, f"About {APP_NAME}", f"Version: 1.0.0 (Installer Flow)\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\nThis tool helps create bootable macOS USB drives using gibMacOS and OpenCore.")
def _set_ui_busy(self, is_busy: bool, status_message: str = "Processing..."): # Default busy message
"""Manages UI element states and progress indicators, including spinner."""
self.general_interactive_widgets = [
self.run_vm_button, self.version_combo, self.extract_images_button,
self.stop_container_button, self.remove_container_button,
self.usb_drive_combo, self.refresh_usb_button, self.write_to_usb_button,
self.windows_disk_id_input, self.enhance_plist_checkbox
]
if is_busy:
self.base_status_message = status_message # Store the core message for spinner
for widget in self.general_interactive_widgets:
widget.setEnabled(False)
# self.stop_vm_button is handled by _start_worker
self.progressBar.setVisible(True)
if not self.spinner_timer.isActive(): # Start spinner if not already active
self.spinner_index = 0
self.spinner_timer.start(150)
self._update_spinner_status() # Show initial spinner message
def _set_ui_busy(self, busy_status: bool, message: str = "Processing..."):
self.progress_bar.setVisible(busy_status)
if busy_status:
self.base_status_message = message
if not self.spinner_timer.isActive(): self.spinner_timer.start(150)
self._update_spinner_status()
self.progress_bar.setRange(0,0)
else:
self.spinner_timer.stop()
self.progressBar.setVisible(False)
self.statusBar.showMessage(status_message or "Ready.", 7000) # Show final message longer
self.update_all_button_states() # Centralized button state update
self.status_bar.showMessage(message or "Ready.", 7000)
self.update_all_button_states()
def _update_spinner_status(self):
"""Updates the status bar message with a spinner."""
if self.spinner_timer.isActive() and self.active_worker_thread and self.active_worker_thread.isRunning():
if self.spinner_timer.isActive():
char = self.spinner_chars[self.spinner_index % len(self.spinner_chars)]
# Check if current worker is providing determinate progress
worker_name = self.active_worker_thread.objectName().replace("_thread", "")
worker_provides_progress = getattr(self, f"{worker_name}_provides_progress", False)
if worker_provides_progress and self.progressBar.maximum() == 100 and self.progressBar.value() > 0 : # Determinate
# For determinate, status bar shows base message, progress bar shows percentage
self.statusBar.showMessage(f"{char} {self.base_status_message} ({self.progressBar.value()}%)")
else: # Indeterminate
if self.progressBar.maximum() != 0: self.progressBar.setRange(0,0) # Ensure indeterminate
self.statusBar.showMessage(f"{char} {self.base_status_message}")
active_worker_provides_progress = False
if self.active_worker_thread and self.active_worker_thread.isRunning():
active_worker_provides_progress = getattr(self.active_worker_thread, "provides_progress", False)
if active_worker_provides_progress and self.progress_bar.maximum() == 100: # Determinate
self.status_bar.showMessage(f"{char} {self.base_status_message} ({self.progress_bar.value()}%)")
else:
if self.progress_bar.maximum() != 0: self.progress_bar.setRange(0,0)
self.status_bar.showMessage(f"{char} {self.base_status_message}")
self.spinner_index = (self.spinner_index + 1) % len(self.spinner_chars)
elif not (self.active_worker_thread and self.active_worker_thread.isRunning()): # If timer is somehow active but no worker
self.spinner_timer.stop()
# self.statusBar.showMessage(self.base_status_message or "Ready.", 5000) # Show last base message or ready
elif not (self.active_worker_thread and self.active_worker_thread.isRunning()):
self.spinner_timer.stop()
def update_all_button_states(self): # Renamed from update_button_states_after_operation
"""Centralized method to update button states based on app's current state."""
is_worker_running = self.active_worker_thread and self.active_worker_thread.isRunning()
def update_all_button_states(self):
is_worker_active = self.active_worker_thread is not None and self.active_worker_thread.isRunning()
self.run_vm_button.setEnabled(not is_worker_running)
self.version_combo.setEnabled(not is_worker_running)
self.download_macos_button.setEnabled(not is_worker_active)
self.version_combo.setEnabled(not is_worker_active)
self.cancel_operation_button.setEnabled(is_worker_active and self.current_worker_instance is not None)
pull_worker_active = getattr(self, "docker_pull_instance", None) is not None
run_worker_active = getattr(self, "docker_run_instance", None) is not None
self.stop_vm_button.setEnabled(is_worker_running and (pull_worker_active or run_worker_active))
self.refresh_usb_button.setEnabled(not is_worker_active)
self.usb_drive_combo.setEnabled(not is_worker_active)
if platform.system() == "Windows": self.windows_disk_id_input.setEnabled(not is_worker_active)
self.enhance_plist_checkbox.setEnabled(not is_worker_active)
can_extract = self.current_container_name is not None and not is_worker_running
self.extract_images_button.setEnabled(can_extract)
# Write to USB button logic
macos_assets_ready = bool(self.macos_download_path and os.path.isdir(self.macos_download_path))
usb_identified = False
current_os = platform.system(); writer_module = None
if current_os == "Linux": writer_module = USBWriterLinux; usb_identified = bool(self.usb_drive_combo.currentData())
elif current_os == "Darwin": writer_module = USBWriterMacOS; usb_identified = bool(self.usb_drive_combo.currentData())
elif current_os == "Windows":
writer_module = USBWriterWindows
usb_identified = bool(self.usb_drive_combo.currentData()) or bool(self.windows_disk_id_input.text().strip())
can_manage_container = self.current_container_name is not None and not is_worker_running
self.stop_container_button.setEnabled(can_manage_container)
# Remove button is enabled if container exists and no worker is running (simplification)
# A more accurate state for remove_container_button would be if the container is actually stopped.
# This is typically handled by the finished slot of the stop_container worker.
# For now, this is a general enablement if not busy.
self.remove_container_button.setEnabled(can_manage_container)
self.write_to_usb_button.setEnabled(not is_worker_active and macos_assets_ready and usb_identified and writer_module is not None)
tooltip = ""
if writer_module is None: tooltip = f"USB Writing not supported on {current_os} or module missing."
elif not macos_assets_ready: tooltip = "Download macOS installer assets first (Step 1)."
elif not usb_identified: tooltip = "Select or identify a target USB drive."
else: tooltip = ""
self.write_to_usb_button.setToolTip(tooltip)
self.refresh_usb_button.setEnabled(not is_worker_running)
self.update_write_to_usb_button_state() # This handles its own complex logic
def show_about_dialog(self):
QMessageBox.about(self, f"About {APP_NAME}", f"Version: 0.8.2\nDeveloper: {DEVELOPER_NAME}\nBusiness: {BUSINESS_NAME}\n\nThis tool helps create bootable macOS USB drives using Docker-OSX.")
def _start_worker(self, worker_instance, on_finished_slot, on_error_slot, worker_name="worker", busy_message="Processing...", provides_progress=False): # Added provides_progress
def _start_worker(self, worker_instance, on_finished_slot, on_error_slot, worker_name="worker", provides_progress=False):
if self.active_worker_thread and self.active_worker_thread.isRunning():
QMessageBox.warning(self, "Busy", "Another operation is in progress."); return False
self._set_ui_busy(True, busy_message) # This now also starts the spinner
self._set_ui_busy(True, f"Starting {worker_name.replace('_', ' ')}...")
self.current_worker_instance = worker_instance
# Set progress bar type based on worker capability
if provides_progress:
self.progress_bar.setRange(0, 100) # Determinate
self.progress_bar.setValue(0)
else:
self.progress_bar.setRange(0, 0) # Indeterminate
# Store if this worker provides progress for spinner logic
setattr(self, f"{worker_name}_provides_progress", provides_progress)
if worker_name in ["docker_pull", "docker_run"]:
self.stop_vm_button.setEnabled(True)
else:
self.stop_vm_button.setEnabled(False)
self.active_worker_thread = QThread(); self.active_worker_thread.setObjectName(worker_name + "_thread"); setattr(self, f"{worker_name}_instance", worker_instance)
worker_instance.moveToThread(self.active_worker_thread)
worker_instance.signals.progress.connect(self.update_output)
if provides_progress: # Connect progress_value only if worker provides it
self.progress_bar.setRange(0,100)
worker_instance.signals.progress_value.connect(self.update_progress_bar_value)
worker_instance.signals.finished.connect(lambda message, wn=worker_name, slot=on_finished_slot: self._handle_worker_finished(message, wn, slot))
worker_instance.signals.error.connect(lambda error_message, wn=worker_name, slot=on_error_slot: self._handle_worker_error(error_message, wn, slot))
else:
self.progress_bar.setRange(0,0)
self.active_worker_thread = QThread(); self.active_worker_thread.setObjectName(worker_name + "_thread")
setattr(self.active_worker_thread, "provides_progress", provides_progress)
worker_instance.moveToThread(self.active_worker_thread)
worker_instance.signals.progress.connect(self.update_output)
worker_instance.signals.finished.connect(lambda msg, wn=worker_name, slot=on_finished_slot: self._handle_worker_finished(msg, wn, slot))
worker_instance.signals.error.connect(lambda err, wn=worker_name, slot=on_error_slot: self._handle_worker_error(err, wn, slot))
self.active_worker_thread.finished.connect(self.active_worker_thread.deleteLater)
self.active_worker_thread.started.connect(worker_instance.run); self.active_worker_thread.start(); return True
self.active_worker_thread.started.connect(worker_instance.run)
self.active_worker_thread.start()
return True
@pyqtSlot(int)
def update_progress_bar_value(self, value):
if self.progress_bar.minimum() == 0 and self.progress_bar.maximum() == 0: # If it was indeterminate
self.progress_bar.setRange(0,100) # Switch to determinate
if self.progress_bar.maximum() == 0: self.progress_bar.setRange(0,100)
self.progress_bar.setValue(value)
# Spinner will update with percentage from progress_bar.value()
# Spinner update will happen on its timer, it can check progress_bar.value()
def _handle_worker_finished(self, message, worker_name, specific_finished_slot):
final_status_message = f"{worker_name.replace('_', ' ').capitalize()} completed."
self._clear_worker_instance(worker_name)
final_msg = f"{worker_name.replace('_', ' ').capitalize()} completed."
self.current_worker_instance = None # Clear current worker
self.active_worker_thread = None
if specific_finished_slot: specific_finished_slot(message)
self._set_ui_busy(False, final_status_message)
self._set_ui_busy(False, final_msg)
def _handle_worker_error(self, error_message, worker_name, specific_error_slot):
final_status_message = f"{worker_name.replace('_', ' ').capitalize()} failed."
self._clear_worker_instance(worker_name)
final_msg = f"{worker_name.replace('_', ' ').capitalize()} failed."
self.current_worker_instance = None # Clear current worker
self.active_worker_thread = None
if specific_error_slot: specific_error_slot(error_message)
self._set_ui_busy(False, final_status_message)
self._set_ui_busy(False, final_msg)
def _clear_worker_instance(self, worker_name):
attr_name = f"{worker_name}_instance"
if hasattr(self, attr_name): delattr(self, attr_name)
def start_macos_download_flow(self):
self.output_area.clear(); selected_version_name = self.version_combo.currentText()
gibmacos_version_arg = MACOS_VERSIONS.get(selected_version_name, selected_version_name)
chosen_path = QFileDialog.getExistingDirectory(self, "Select Directory to Download macOS Installer Assets")
if not chosen_path: self.output_area.append("Download directory selection cancelled."); return
self.macos_download_path = chosen_path
worker = GibMacOSWorker(gibmacos_version_arg, self.macos_download_path)
if not self._start_worker(worker, self.macos_download_finished, self.macos_download_error,
"macos_download",
f"Downloading macOS {selected_version_name} assets...",
provides_progress=True): # Assuming GibMacOSWorker will emit progress_value
self._set_ui_busy(False, "Failed to start macOS download operation.")
def initiate_vm_creation_flow(self):
self.output_area.clear(); selected_version_name = self.version_combo.currentText(); image_tag = MACOS_VERSIONS.get(selected_version_name)
if not image_tag: self.handle_error(f"Invalid macOS version: {selected_version_name}"); return
full_image_name = f"{DOCKER_IMAGE_BASE}:{image_tag}"
pull_worker = DockerPullWorker(full_image_name)
self._start_worker(pull_worker,
self.docker_pull_finished,
self.docker_pull_error,
"docker_pull", # worker_name
f"Pulling image {full_image_name}...", # busy_message
provides_progress=False) # Docker pull progress is complex to parse reliably for a percentage
@pyqtSlot(str)
def docker_pull_finished(self, message): # Specific handler
self.output_area.append(f"Step 1.2: Proceeding to run Docker container for macOS installation...")
self.run_macos_vm()
def macos_download_finished(self, message):
QMessageBox.information(self, "Download Complete", message)
# self.macos_download_path is set. UI update handled by generic handler.
@pyqtSlot(str)
def docker_pull_error(self, error_message): # Specific handler
QMessageBox.critical(self, "Docker Pull Error", error_message)
def macos_download_error(self, error_message):
QMessageBox.critical(self, "Download Error", error_message)
self.macos_download_path = None
# UI reset by generic handler.
def run_macos_vm(self):
selected_version_name = self.version_combo.currentText(); self.current_container_name = get_unique_container_name()
try:
command_list = build_docker_command(selected_version_name, self.current_container_name)
run_worker = DockerRunWorker(command_list)
self._start_worker(run_worker,
self.docker_run_finished,
self.docker_run_error,
"docker_run",
f"Starting container {self.current_container_name}...",
provides_progress=False) # Docker run output is also streamed, not easily percentage
except ValueError as e: self.handle_error(f"Failed to build command: {str(e)}")
except Exception as e: self.handle_error(f"An unexpected error: {str(e)}")
@pyqtSlot(str)
def update_output(self, text): self.output_area.append(text.strip()); QApplication.processEvents()
@pyqtSlot(str)
def docker_run_finished(self, message): # Specific handler
QMessageBox.information(self, "VM Setup Complete", f"{message}\nYou can now proceed to extract images.")
@pyqtSlot(str)
def docker_run_error(self, error_message): # Specific handler
if "exited" in error_message.lower() and self.current_container_name:
QMessageBox.warning(self, "VM Setup Ended", f"{error_message}\nAssuming macOS setup was attempted...")
def stop_current_operation(self):
if self.current_worker_instance and hasattr(self.current_worker_instance, 'stop'):
self.output_area.append(f"
--- Attempting to stop {self.active_worker_thread.objectName().replace('_thread','')} ---")
self.current_worker_instance.stop()
else:
QMessageBox.critical(self, "VM Setup Error", error_message)
self.output_area.append("
--- No active stoppable operation or stop method not implemented for current worker. ---")
def stop_current_docker_operation(self):
pull_worker = getattr(self, "docker_pull_instance", None); run_worker = getattr(self, "docker_run_instance", None)
if pull_worker: self.output_area.append("\n--- Docker pull cannot be directly stopped by this button. Close app to abort. ---")
elif run_worker: self.output_area.append("\n--- Attempting to stop macOS VM creation (docker run) ---"); run_worker.stop()
else: self.output_area.append("\n--- No stoppable Docker operation active. ---")
def extract_vm_images(self):
if not self.current_container_name: QMessageBox.warning(self, "Warning", "No active container."); return
save_dir = QFileDialog.getExistingDirectory(self, "Select Directory to Save VM Images");
if not save_dir: return
self.output_area.append(f"\n--- Starting Image Extraction from {self.current_container_name} to {save_dir} ---"); self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False)
self.extracted_main_image_path = os.path.join(save_dir, "mac_hdd_ng.img"); self.extracted_opencore_image_path = os.path.join(save_dir, "OpenCore.qcow2"); self.extraction_status = {"main": False, "opencore": False}
cp_main_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_MACOS_IMG_PATH, self.extracted_main_image_path); main_worker = DockerCommandWorker(cp_main_cmd, f"Main macOS image copied to {self.extracted_main_image_path}")
if not self._start_worker(main_worker, lambda msg: self.docker_utility_finished(msg, "main_img_extract"), lambda err: self.docker_utility_error(err, "main_img_extract_error"), "cp_main_worker"): self.extract_images_button.setEnabled(True); return
self.output_area.append(f"Extraction for main image started. OpenCore extraction will follow.")
def _start_opencore_extraction(self):
if not self.current_container_name or not self.extracted_opencore_image_path: return
cp_oc_cmd = build_docker_cp_command(self.current_container_name, CONTAINER_OPENCORE_QCOW2_PATH, self.extracted_opencore_image_path); oc_worker = DockerCommandWorker(cp_oc_cmd, f"OpenCore image copied to {self.extracted_opencore_image_path}")
self._start_worker(oc_worker, lambda msg: self.docker_utility_finished(msg, "oc_img_extract"), lambda err: self.docker_utility_error(err, "oc_img_extract_error"), "cp_oc_worker")
def stop_persistent_container(self):
if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return
cmd = build_docker_stop_command(self.current_container_name); worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} stopped.")
if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "stop_container"), lambda err: self.docker_utility_error(err, "stop_container_error"), "stop_worker"): self.stop_container_button.setEnabled(False)
def remove_persistent_container(self):
if not self.current_container_name: QMessageBox.warning(self, "Warning", "No container name."); return
reply = QMessageBox.question(self, 'Confirm Remove', f"Remove container '{self.current_container_name}'?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.No: return
cmd = build_docker_rm_command(self.current_container_name); worker = DockerCommandWorker(cmd, f"Container {self.current_container_name} removed.")
if self._start_worker(worker, lambda msg: self.docker_utility_finished(msg, "rm_container"), lambda err: self.docker_utility_error(err, "rm_container_error"), "rm_worker"): self.remove_container_button.setEnabled(False)
def docker_utility_finished(self, message, task_id): # Specific handler
QMessageBox.information(self, f"Task Complete", message) # Show specific popup
# Core logic based on task_id
if task_id == "main_img_extract":
self.extraction_status["main"] = True
# _handle_worker_finished (generic) has already reset active_worker_thread.
self._start_opencore_extraction() # Start the next part of the sequence
return # Return here as active_worker_thread will be managed by _start_opencore_extraction
elif task_id == "oc_img_extract":
self.extraction_status["opencore"] = True
elif task_id == "rm_container": # Specific logic for after rm
self.current_container_name = None
# For other utility tasks (like stop_container), or after oc_img_extract,
# or after rm_container specific logic, the generic handler _handle_worker_finished
# (which called this) will then call _set_ui_busy(False) -> update_button_states_after_operation.
# So, no explicit call to self.update_button_states_after_operation() is needed here
# unless a state relevant to it changed *within this specific handler*.
# In case of rm_container, current_container_name changes, so a UI update is good.
if task_id == "rm_container" or (task_id == "oc_img_extract" and self.extraction_status.get("main")):
self.update_button_states_after_operation()
def docker_utility_error(self, error_message, task_id): # Specific handler
QMessageBox.critical(self, f"Task Error: {task_id}", error_message)
# UI state reset by generic _handle_worker_error -> _set_ui_busy(False) -> update_button_states_after_operation
# Task-specific error UI updates if needed can be added here, but usually generic reset is enough.
def handle_error(self, message): # General error handler for non-worker related setup issues
def handle_error(self, message):
self.output_area.append(f"ERROR: {message}"); QMessageBox.critical(self, "Error", message)
self.run_vm_button.setEnabled(True); self.version_combo.setEnabled(True); self.stop_vm_button.setEnabled(False); self.extract_images_button.setEnabled(False); self.write_to_usb_button.setEnabled(False)
self.active_worker_thread = None;
for worker_name_suffix in ["pull", "run", "cp_main_worker", "cp_oc_worker", "stop_worker", "rm_worker", "usb_write_worker"]: self._clear_worker_instance(worker_name_suffix)
self._set_ui_busy(False, "Error occurred.")
def check_admin_privileges(self) -> bool:
def check_admin_privileges(self) -> bool: # ... (same)
try:
if platform.system() == "Windows": return ctypes.windll.shell32.IsUserAnAdmin() != 0
else: return os.geteuid() == 0
except Exception as e: self.output_area.append(f"Could not check admin privileges: {e}"); return False
def refresh_usb_drives(self): # Modified for Windows WMI
self.usb_drive_combo.clear()
self._current_usb_selection_text = self.usb_drive_combo.currentText() # Store to reselect if possible
self.output_area.append("\nScanning for disk devices...")
current_os = platform.system()
self.windows_usb_guidance_label.setVisible(current_os == "Windows")
self.windows_usb_input_label.setVisible(False) # Hide manual input by default
self.windows_disk_id_input.setVisible(False) # Hide manual input by default
self.usb_drive_combo.setVisible(True) # Always visible, populated differently
if current_os == "Windows":
self.usb_drive_label.setText("Available USB Disks (Windows - WMI):")
self.refresh_usb_button.setText("Refresh USB List")
def refresh_usb_drives(self): # ... (same logic as before)
self.usb_drive_combo.clear(); current_selection_text = getattr(self, '_current_usb_selection_text', None)
self.output_area.append("
Scanning for disk devices...")
if platform.system() == "Windows":
self.usb_drive_label.setText("Available USB Disks (Windows - via WMI/PowerShell):")
self.windows_usb_guidance_label.setVisible(True); self.windows_disk_id_input.setVisible(False);
powershell_command = "Get-WmiObject Win32_DiskDrive | Where-Object {$_.InterfaceType -eq 'USB'} | Select-Object DeviceID, Index, Model, @{Name='SizeGB';Expression={[math]::Round($_.Size / 1GB, 2)}} | ConvertTo-Json"
try:
process = subprocess.run(["powershell", "-Command", powershell_command], capture_output=True, text=True, check=True, creationflags=subprocess.CREATE_NO_WINDOW)
disks_data = json.loads(process.stdout)
if not isinstance(disks_data, list): disks_data = [disks_data] # Ensure it's a list
if disks_data:
for disk in disks_data:
disks_data = json.loads(process.stdout); disks_json = disks_data if isinstance(disks_data, list) else [disks_data] if disks_data else []
if disks_json:
for disk in disks_json:
if disk.get('DeviceID') is None or disk.get('Index') is None: continue
disk_text = f"Disk {disk['Index']}: {disk.get('Model','N/A')} ({disk.get('SizeGB','N/A')} GB) - {disk['DeviceID']}"
self.usb_drive_combo.addItem(disk_text, userData=str(disk['Index']))
self.output_area.append(f"Found {len(disks_data)} USB disk(s) via WMI. Select from dropdown.")
if self._current_usb_selection_text:
self.output_area.append(f"Found {len(disks_json)} USB disk(s) via WMI.");
if current_selection_text:
for i in range(self.usb_drive_combo.count()):
if self.usb_drive_combo.itemText(i) == self._current_usb_selection_text: self.usb_drive_combo.setCurrentIndex(i); break
else:
self.output_area.append("No USB disks found via WMI/PowerShell. Manual input field shown as fallback.")
self.windows_usb_input_label.setVisible(True); self.windows_disk_id_input.setVisible(True) # Show manual input as fallback
except Exception as e:
self.output_area.append(f"Error querying WMI for USB disks: {e}. Manual input field shown.")
self.windows_usb_input_label.setVisible(True); self.windows_disk_id_input.setVisible(True)
else: # Linux / macOS
if self.usb_drive_combo.itemText(i) == current_selection_text: self.usb_drive_combo.setCurrentIndex(i); break
else: self.output_area.append("No USB disks found via WMI/PowerShell. Manual input field shown as fallback."); self.windows_disk_id_input.setVisible(True)
except Exception as e: self.output_area.append(f"Error scanning Windows USBs with PowerShell: {e}"); self.windows_disk_id_input.setVisible(True)
else:
self.usb_drive_label.setText("Available USB Drives (for Linux/macOS):")
self.refresh_usb_button.setText("Refresh List")
self.windows_usb_guidance_label.setVisible(False); self.windows_disk_id_input.setVisible(False)
try:
partitions = psutil.disk_partitions(all=False); potential_usbs = []
for p in partitions:
is_removable = 'removable' in p.opts; is_likely_usb = False
if current_os == "Darwin" and p.device.startswith("/dev/disk") and 'external' in p.opts.lower() and 'physical' in p.opts.lower(): is_likely_usb = True
elif current_os == "Linux" and ((p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)) or (p.device.startswith("/dev/sd") and not p.device.endswith("da"))): is_likely_usb = True
if platform.system() == "Darwin" and p.device.startswith("/dev/disk") and 'external' in p.opts.lower() and 'physical' in p.opts.lower(): is_likely_usb = True
elif platform.system() == "Linux" and ((p.mountpoint and ("/media/" in p.mountpoint or "/run/media/" in p.mountpoint)) or (p.device.startswith("/dev/sd") and not p.device.endswith("da"))): is_likely_usb = True
if is_removable or is_likely_usb:
try: usage = psutil.disk_usage(p.mountpoint); size_gb = usage.total / (1024**3)
except Exception: continue
@ -553,108 +424,67 @@ class MainWindow(QMainWindow):
if potential_usbs:
idx_to_select = -1
for i, (text, device_path) in enumerate(potential_usbs): self.usb_drive_combo.addItem(text, userData=device_path);
if text == self._current_usb_selection_text: idx_to_select = i
if text == current_selection_text: idx_to_select = i
if idx_to_select != -1: self.usb_drive_combo.setCurrentIndex(idx_to_select)
self.output_area.append(f"Found {len(potential_usbs)} potential USB drive(s). Please verify carefully.")
else: self.output_area.append("No suitable USB drives found for Linux/macOS.")
except ImportError: self.output_area.append("psutil library not found.")
except Exception as e: self.output_area.append(f"Error scanning for USB drives: {e}")
self.update_all_button_states()
self.update_write_to_usb_button_state()
def handle_write_to_usb(self): # Modified for Windows WMI
if not self.check_admin_privileges():
QMessageBox.warning(self, "Privileges Required", "This operation requires Administrator/root privileges."); return
def handle_write_to_usb(self):
if not self.check_admin_privileges(): QMessageBox.warning(self, "Privileges Required", "This operation requires Administrator/root privileges."); return
if not self.macos_download_path or not os.path.isdir(self.macos_download_path): QMessageBox.warning(self, "Missing macOS Assets", "Download macOS installer assets first."); return
current_os = platform.system(); usb_writer_module = None; target_device_id_for_worker = None
enhance_plist_enabled = self.enhance_plist_checkbox.isChecked() # Get state
target_macos_ver = self.version_combo.currentText() # Get macOS version
if current_os == "Windows": target_device_id_for_worker = self.usb_drive_combo.currentData() or self.windows_disk_id_input.text().strip(); usb_writer_module = USBWriterWindows
else: target_device_id_for_worker = self.usb_drive_combo.currentData(); usb_writer_module = USBWriterLinux if current_os == "Linux" else USBWriterMacOS if current_os == "Darwin" else None
if not usb_writer_module: QMessageBox.warning(self, "Unsupported Platform", f"USB writing not supported for {current_os}."); return
if not target_device_id_for_worker: QMessageBox.warning(self, "No USB Selected/Identified", f"Please select/identify target USB."); return
if current_os == "Windows" and target_device_id_for_worker.isdigit(): target_device_id_for_worker = f"disk {target_device_id_for_worker}"
if current_os == "Windows":
target_device_id_for_worker = self.usb_drive_combo.currentData() # Disk Index from WMI
if not target_device_id_for_worker:
if self.windows_disk_id_input.isVisible():
target_device_id_for_worker = self.windows_disk_id_input.text().strip()
if not target_device_id_for_worker: QMessageBox.warning(self, "Input Required", "Please select a USB disk or enter its Disk Number."); return
if not target_device_id_for_worker.isdigit(): QMessageBox.warning(self, "Input Invalid", "Windows Disk Number must be a digit."); return
else:
QMessageBox.warning(self, "USB Error", "No USB disk selected for Windows."); return
usb_writer_module = USBWriterWindows
else: # Linux/macOS
target_device_id_for_worker = self.usb_drive_combo.currentData()
if current_os == "Linux": usb_writer_module = USBWriterLinux
elif current_os == "Darwin": usb_writer_module = USBWriterMacOS
enhance_plist_state = self.enhance_plist_checkbox.isChecked()
target_macos_name = self.version_combo.currentText()
reply = QMessageBox.warning(self, "Confirm Write Operation", f"WARNING: ALL DATA ON TARGET '{target_device_id_for_worker}' WILL BE ERASED.
Proceed?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel, QMessageBox.StandardButton.Cancel)
if reply == QMessageBox.StandardButton.Cancel: self.output_area.append("
USB write cancelled."); return
if not usb_writer_module: QMessageBox.warning(self, "Unsupported Platform", f"USB writing not supported/enabled for {current_os}."); return
if not (self.extracted_main_image_path and self.extracted_opencore_image_path and self.extraction_status["main"] and self.extraction_status["opencore"]):
QMessageBox.warning(self, "Missing Images", "Ensure both images are extracted."); return
if not target_device_id_for_worker: QMessageBox.warning(self, "No USB Selected/Identified", f"Please select/identify target USB for {current_os}."); return
confirm_msg = (f"WARNING: ALL DATA ON TARGET '{target_device_id_for_worker}' WILL BE ERASED PERMANENTLY.\n"
f"Enhance config.plist: {'Yes' if enhance_plist_enabled else 'No'}.\nProceed?")
reply = QMessageBox.warning(self, "Confirm Write Operation", confirm_msg, QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel, QMessageBox.StandardButton.Cancel)
if reply == QMessageBox.StandardButton.Cancel: self.output_area.append("\nUSB write cancelled."); return
self.output_area.append(f"\n--- Starting USB Write for {target_device_id_for_worker} on {current_os} ---")
if enhance_plist_enabled: self.output_area.append("Attempting config.plist enhancement...")
usb_worker = USBWriterWorker(
target_device_id_for_worker,
self.extracted_opencore_image_path,
self.extracted_main_image_path,
enhance_plist_enabled,
target_macos_ver
# USBWriterWorker now needs different args
# The platform specific writers (USBWriterLinux etc) will need to be updated to accept macos_download_path
# and use it to find BaseSystem.dmg, EFI/OC etc. instead of opencore_qcow2_path, macos_qcow2_path
usb_worker_adapted = USBWriterWorker(
device=target_device_id_for_worker,
macos_download_path=self.macos_download_path,
enhance_plist=enhance_plist_state,
target_macos_version=target_macos_name
)
self._start_worker(usb_worker,
self.usb_write_finished,
self.usb_write_error,
"usb_write_worker",
f"Writing to USB {target_device_id_for_worker}...")
if not self._start_worker(usb_worker_adapted, self.usb_write_finished, self.usb_write_error, "usb_write_worker",
busy_message=f"Creating USB for {target_device_id_for_worker}...",
provides_progress=False): # USB writing can be long, but progress parsing is per-platform script.
self._set_ui_busy(False, "Failed to start USB write operation.")
@pyqtSlot(str)
def usb_write_finished(self, message): # Specific handler
QMessageBox.information(self, "USB Write Complete", message)
# UI state reset by generic _handle_worker_finished -> _set_ui_busy(False)
def usb_write_finished(self, message): QMessageBox.information(self, "USB Write Complete", message)
@pyqtSlot(str)
def usb_write_error(self, error_message): # Specific handler
QMessageBox.critical(self, "USB Write Error", error_message)
# UI state reset by generic _handle_worker_error -> _set_ui_busy(False)
def usb_write_error(self, error_message): QMessageBox.critical(self, "USB Write Error", error_message)
def update_write_to_usb_button_state(self):
images_ready = self.extraction_status.get("main", False) and self.extraction_status.get("opencore", False); usb_identified = False; current_os = platform.system(); writer_module = None
if current_os == "Linux": writer_module = USBWriterLinux; usb_identified = bool(self.usb_drive_combo.currentData())
elif current_os == "Darwin": writer_module = USBWriterMacOS; usb_identified = bool(self.usb_drive_combo.currentData())
elif current_os == "Windows":
writer_module = USBWriterWindows
usb_identified = bool(self.usb_drive_combo.currentData()) or bool(self.windows_disk_id_input.text().strip().isdigit() and self.windows_disk_id_input.isVisible())
self.write_to_usb_button.setEnabled(images_ready and usb_identified and writer_module is not None)
tooltip = ""
if writer_module is None: tooltip = f"USB Writing not supported on {current_os} or module missing."
elif not images_ready: tooltip = "Extract VM images first."
elif not usb_identified: tooltip = "Select a USB disk from dropdown (or enter Disk Number if dropdown empty on Windows)."
else: tooltip = ""
self.write_to_usb_button.setToolTip(tooltip)
def closeEvent(self, event):
def closeEvent(self, event): # ... (same logic)
self._current_usb_selection_text = self.usb_drive_combo.currentText()
if self.active_worker_thread and self.active_worker_thread.isRunning():
reply = QMessageBox.question(self, 'Confirm Exit', "An operation is running. Exit anyway?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.Yes:
worker_instance_attr_name = self.active_worker_thread.objectName().replace("_thread", "_instance")
worker_to_stop = getattr(self, worker_instance_attr_name, None)
if worker_to_stop and hasattr(worker_to_stop, 'stop'): worker_to_stop.stop()
if self.current_worker_instance and hasattr(self.current_worker_instance, 'stop'): self.current_worker_instance.stop()
else: self.active_worker_thread.quit()
self.active_worker_thread.wait(1000); event.accept()
else: event.ignore(); return
elif self.current_container_name and self.stop_container_button.isEnabled():
reply = QMessageBox.question(self, 'Confirm Exit', f"Container '{self.current_container_name}' may still exist. Exit anyway?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.Yes: event.accept()
else: event.ignore()
else: event.accept()
if __name__ == "__main__":
import traceback # Ensure traceback is available for GibMacOSWorker
import shutil # Ensure shutil is available for GibMacOSWorker path check
app = QApplication(sys.argv)
window = MainWindow()
window.show()

View File

@ -1,311 +1,302 @@
# usb_writer_linux.py
# usb_writer_linux.py (Significant Refactoring for Installer Creation)
import subprocess
import os
import time
import shutil # For checking command existence
import shutil
import glob
import re
import plistlib # For plist_modifier call, and potentially for InstallInfo.plist
try:
from plist_modifier import enhance_config_plist
except ImportError:
enhance_config_plist = None
print("Warning: plist_modifier.py not found. Plist enhancement feature will be disabled for USBWriterLinux.")
# Assume a basic OpenCore EFI template directory exists relative to this script
OC_TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "EFI_template_installer")
class USBWriterLinux:
def __init__(self, device: str, opencore_qcow2_path: str, macos_qcow2_path: str,
progress_callback=None, enhance_plist_enabled: bool = False, target_macos_version: str = ""): # New args
def __init__(self, device: str, macos_download_path: str,
progress_callback=None, enhance_plist_enabled: bool = False,
target_macos_version: str = ""):
self.device = device
self.opencore_qcow2_path = opencore_qcow2_path
self.macos_qcow2_path = macos_qcow2_path
self.macos_download_path = macos_download_path
self.progress_callback = progress_callback
self.enhance_plist_enabled = enhance_plist_enabled # Store
self.target_macos_version = target_macos_version # Store
self.enhance_plist_enabled = enhance_plist_enabled
self.target_macos_version = target_macos_version # String name like "Sonoma"
pid = os.getpid()
self.temp_basesystem_hfs_path = f"temp_basesystem_{pid}.hfs"
self.temp_efi_build_dir = f"temp_efi_build_{pid}"
self.temp_shared_support_extract_dir = f"temp_shared_support_extract_{pid}"
# Define unique temporary file and mount point names
pid = os.getpid() # Make temp names more unique if multiple instances run (though unlikely for this app)
self.opencore_raw_path = f"opencore_temp_{pid}.raw"
self.macos_raw_path = f"macos_main_temp_{pid}.raw"
self.mount_point_opencore_efi = f"/mnt/opencore_efi_temp_skyscope_{pid}"
self.mount_point_usb_esp = f"/mnt/usb_esp_temp_skyscope_{pid}"
self.mount_point_macos_source = f"/mnt/macos_source_temp_skyscope_{pid}"
self.mount_point_usb_macos_target = f"/mnt/usb_macos_target_temp_skyscope_{pid}"
self.temp_files_to_clean = [self.opencore_raw_path, self.macos_raw_path]
self.temp_mount_points_to_clean = [
self.mount_point_opencore_efi, self.mount_point_usb_esp,
self.mount_point_macos_source, self.mount_point_usb_macos_target
self.temp_files_to_clean = [self.temp_basesystem_hfs_path]
self.temp_dirs_to_clean = [
self.temp_efi_build_dir, self.mount_point_usb_esp,
self.mount_point_usb_macos_target, self.temp_shared_support_extract_dir
]
def _report_progress(self, message: str):
print(message) # For standalone testing
if self.progress_callback:
self.progress_callback(message)
if self.progress_callback: self.progress_callback(message)
else: print(message)
def _run_command(self, command: list[str], check=True, capture_output=False, shell=False, timeout=None):
self.progress_callback(f"Executing: {' '.join(command)}")
def _run_command(self, command: list[str] | str, check=True, capture_output=False, timeout=None, shell=False, working_dir=None):
self._report_progress(f"Executing: {command if isinstance(command, str) else ' '.join(command)}")
try:
process = subprocess.run(
command,
check=check,
capture_output=capture_output,
text=True,
shell=shell, # Use shell=True with caution
timeout=timeout
command, check=check, capture_output=capture_output, text=True, timeout=timeout,
shell=shell, cwd=working_dir,
creationflags=0 # No CREATE_NO_WINDOW on Linux
)
# Log stdout/stderr only if capture_output is True and content exists
if capture_output:
if process.stdout and process.stdout.strip():
self._report_progress(f"STDOUT: {process.stdout.strip()}")
if process.stderr and process.stderr.strip():
self._report_progress(f"STDERR: {process.stderr.strip()}")
if capture_output: # Log only if content exists
if process.stdout and process.stdout.strip(): self._report_progress(f"STDOUT: {process.stdout.strip()}")
if process.stderr and process.stderr.strip(): self._report_progress(f"STDERR: {process.stderr.strip()}")
return process
except subprocess.TimeoutExpired:
self._report_progress(f"Command {' '.join(command)} timed out after {timeout} seconds.")
raise
except subprocess.CalledProcessError as e:
self._report_progress(f"Error executing {' '.join(command)} (return code {e.returncode}): {e}")
if e.stderr: self._report_progress(f"STDERR: {e.stderr.strip()}")
if e.stdout: self._report_progress(f"STDOUT: {e.stdout.strip()}") # Sometimes errors go to stdout
raise
except FileNotFoundError:
self._report_progress(f"Error: Command '{command[0]}' not found. Is it installed and in PATH?")
raise
except subprocess.TimeoutExpired: self._report_progress(f"Command timed out after {timeout} seconds."); raise
except subprocess.CalledProcessError as e: self._report_progress(f"Error executing (code {e.returncode}): {e.stderr or e.stdout or str(e)}"); raise
except FileNotFoundError: self._report_progress(f"Error: Command '{command[0] if isinstance(command, list) else command.split()[0]}' not found."); raise
def _cleanup_temp_files_and_dirs(self):
self._report_progress("Cleaning up temporary files and directories...")
for mp in self.temp_dirs_to_clean: # Unmount first
if os.path.ismount(mp):
self._run_command(["sudo", "umount", "-lf", mp], check=False, timeout=15)
def _cleanup_temp_files(self):
self._report_progress("Cleaning up temporary image files...")
for f_path in self.temp_files_to_clean:
if os.path.exists(f_path):
try:
self._run_command(["sudo", "rm", "-f", f_path], check=False) # Use sudo rm for root-owned files
self._report_progress(f"Removed {f_path}")
except Exception as e: # Catch broad exceptions from _run_command
self._report_progress(f"Error removing {f_path} via sudo rm: {e}")
def _unmount_path(self, mount_point):
if os.path.ismount(mount_point):
self._report_progress(f"Unmounting {mount_point}...")
self._run_command(["sudo", "umount", "-lf", mount_point], check=False, timeout=30)
def _remove_dir_if_exists(self, dir_path):
if os.path.exists(dir_path):
try:
self._run_command(["sudo", "rmdir", dir_path], check=False)
except Exception as e: # Catch broad exceptions from _run_command
self._report_progress(f"Could not rmdir {dir_path}: {e}. May need manual cleanup.")
def _cleanup_all_mounts_and_mappings(self):
self._report_progress("Cleaning up all temporary mounts and kpartx mappings...")
for mp in self.temp_mount_points_to_clean:
self._unmount_path(mp) # Unmount first
# Detach kpartx for raw images
if os.path.exists(self.opencore_raw_path): # Check if raw file was even created
self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path], check=False)
if os.path.exists(self.macos_raw_path):
self._run_command(["sudo", "kpartx", "-d", self.macos_raw_path], check=False)
# Remove mount point directories after unmounting and detaching
for mp in self.temp_mount_points_to_clean:
self._remove_dir_if_exists(mp)
try: self._run_command(["sudo", "rm", "-f", f_path], check=False)
except Exception as e: self._report_progress(f"Error removing temp file {f_path}: {e}")
for d_path in self.temp_dirs_to_clean:
if os.path.exists(d_path):
try: self._run_command(["sudo", "rm", "-rf", d_path], check=False)
except Exception as e: self._report_progress(f"Error removing temp dir {d_path}: {e}")
def check_dependencies(self):
self._report_progress("Checking dependencies (qemu-img, parted, kpartx, rsync, mkfs.vfat, mkfs.hfsplus, apfs-fuse)...")
dependencies = ["qemu-img", "parted", "kpartx", "rsync", "mkfs.vfat", "mkfs.hfsplus", "apfs-fuse"]
missing_deps = []
for dep in dependencies:
if not shutil.which(dep):
missing_deps.append(dep)
self._report_progress("Checking dependencies (sgdisk, mkfs.vfat, mkfs.hfsplus, 7z, rsync, dd)...")
dependencies = ["sgdisk", "mkfs.vfat", "mkfs.hfsplus", "7z", "rsync", "dd"]
missing_deps = [dep for dep in dependencies if not shutil.which(dep)]
if missing_deps:
msg = f"Missing dependencies: {', '.join(missing_deps)}. Please install them. `apfs-fuse` may require manual installation from source or a user repository (e.g., AUR for Arch Linux)."
self._report_progress(msg)
raise RuntimeError(msg)
self._report_progress("All critical dependencies found.")
msg = f"Missing dependencies: {', '.join(missing_deps)}. Please install them (e.g., hfsprogs, p7zip-full)."
self._report_progress(msg); raise RuntimeError(msg)
self._report_progress("All critical dependencies for Linux USB installer creation found.")
return True
def _get_mapped_partition_device(self, kpartx_output: str, partition_index_in_image: int = 1) -> str:
lines = kpartx_output.splitlines()
# Try to find loopXpY where Y is partition_index_in_image
for line in lines:
parts = line.split()
if len(parts) > 2 and parts[0] == "add" and parts[1] == "map" and f"p{partition_index_in_image}" in parts[2]:
return f"/dev/mapper/{parts[2]}"
# Fallback for images that might be a single partition mapped directly (e.g. loopX)
# This is less common for full disk images like OpenCore.qcow2 or mac_hdd_ng.img
if partition_index_in_image == 1 and len(lines) == 1: # Only one mapping line
parts = lines[0].split()
if len(parts) > 2 and parts[0] == "add" and parts[1] == "map":
# Check if it does NOT look like a partition (no 'p' number)
if 'p' not in parts[2]:
return f"/dev/mapper/{parts[2]}" # e.g. /dev/mapper/loop0
self._report_progress(f"Could not find partition index {partition_index_in_image} in kpartx output:\n{kpartx_output}")
def _find_source_file(self, patterns: list[str], description: str) -> str | None:
"""Finds the first existing file matching a list of glob patterns within self.macos_download_path."""
self._report_progress(f"Searching for {description} in {self.macos_download_path}...")
for pattern in patterns:
# Using iglob for efficiency if many files, but glob is fine for fewer expected matches
found_files = glob.glob(os.path.join(self.macos_download_path, "**", pattern), recursive=True)
if found_files:
# Prefer files not inside .app bundles if multiple are found, unless it's the app itself.
# This is a simple heuristic.
non_app_files = [f for f in found_files if ".app/" not in f]
target_file = non_app_files[0] if non_app_files else found_files[0]
self._report_progress(f"Found {description} at: {target_file}")
return target_file
self._report_progress(f"Warning: {description} not found with patterns: {patterns}")
return None
def _extract_hfs_from_dmg(self, dmg_path: str, output_hfs_path: str) -> bool:
"""Extracts the primary HFS+ partition image (e.g., '4.hfs') from a DMG."""
# Assumes BaseSystem.dmg or similar that contains a HFS+ partition image.
temp_extract_dir = f"temp_hfs_extract_{os.getpid()}"
os.makedirs(temp_extract_dir, exist_ok=True)
try:
self._report_progress(f"Extracting HFS+ partition image from {dmg_path}...")
# 7z e -tdmg <dmg_path> *.hfs -o<output_dir_for_hfs> (usually 4.hfs or similar)
self._run_command(["7z", "e", "-tdmg", dmg_path, "*.hfs", f"-o{temp_extract_dir}"], check=True)
hfs_files = glob.glob(os.path.join(temp_extract_dir, "*.hfs"))
if not hfs_files: raise RuntimeError(f"No .hfs file found after extracting {dmg_path}")
final_hfs_file = max(hfs_files, key=os.path.getsize) # Assume largest is the one
self._report_progress(f"Found HFS+ partition image: {final_hfs_file}. Moving to {output_hfs_path}")
shutil.move(final_hfs_file, output_hfs_path)
return True
except Exception as e:
self._report_progress(f"Error during HFS extraction from DMG: {e}")
return False
finally:
if os.path.exists(temp_extract_dir): shutil.rmtree(temp_extract_dir, ignore_errors=True)
def format_and_write(self) -> bool:
# Ensure cleanup runs even if errors occur early
try:
self.check_dependencies()
self._cleanup_all_mounts_and_mappings() # Clean before start, just in case
for mp in self.temp_mount_points_to_clean: # Create mount point directories
self._cleanup_temp_files_and_dirs()
for mp in [self.mount_point_usb_esp, self.mount_point_usb_macos_target, self.temp_efi_build_dir]:
self._run_command(["sudo", "mkdir", "-p", mp])
self._report_progress(f"WARNING: ALL DATA ON {self.device} WILL BE ERASED!")
self._report_progress(f"Unmounting all partitions on {self.device} (best effort)...")
for i in range(1, 10):
self._run_command(["sudo", "umount", f"{self.device}{i}"], check=False, timeout=5)
self._run_command(["sudo", "umount", f"{self.device}p{i}"], check=False, timeout=5)
for i in range(1, 10): self._run_command(["sudo", "umount", "-lf", f"{self.device}{i}"], check=False, timeout=5); self._run_command(["sudo", "umount", "-lf", f"{self.device}p{i}"], check=False, timeout=5)
self._report_progress(f"Creating new GPT partition table on {self.device}...")
self._run_command(["sudo", "parted", "--script", self.device, "mklabel", "gpt"])
self._report_progress("Creating EFI partition (ESP)...")
self._run_command(["sudo", "parted", "--script", self.device, "mkpart", "EFI", "fat32", "1MiB", "551MiB"])
self._run_command(["sudo", "parted", "--script", self.device, "set", "1", "esp", "on"])
self._report_progress("Creating macOS partition...")
self._run_command(["sudo", "parted", "--script", self.device, "mkpart", "macOS", "hfs+", "551MiB", "100%"])
self._report_progress(f"Partitioning {self.device} with GPT (sgdisk)...")
self._run_command(["sudo", "sgdisk", "--zap-all", self.device])
self._run_command(["sudo", "sgdisk", "-n", "1:0:+550M", "-t", "1:ef00", "-c", "1:EFI", self.device])
self._run_command(["sudo", "sgdisk", "-n", "2:0:0", "-t", "2:af00", "-c", "2:Install macOS", self.device])
self._run_command(["sudo", "partprobe", self.device], timeout=10); time.sleep(3)
self._run_command(["sudo", "partprobe", self.device], timeout=10)
time.sleep(3)
esp_partition_dev = f"{self.device}1" if os.path.exists(f"{self.device}1") else f"{self.device}p1"
macos_partition_dev = f"{self.device}2" if os.path.exists(f"{self.device}2") else f"{self.device}p2"
if not (os.path.exists(esp_partition_dev) and os.path.exists(macos_partition_dev)):
raise RuntimeError(f"Could not reliably determine partition names for {self.device}. Expected {esp_partition_dev} and {macos_partition_dev} to exist after partprobe.")
esp_partition_dev = next((f"{self.device}{i}" for i in ["1", "p1"] if os.path.exists(f"{self.device}{i}")), None)
macos_partition_dev = next((f"{self.device}{i}" for i in ["2", "p2"] if os.path.exists(f"{self.device}{i}")), None)
if not (esp_partition_dev and macos_partition_dev): raise RuntimeError(f"Could not reliably determine partition names for {self.device}.")
self._report_progress(f"Formatting ESP ({esp_partition_dev}) as FAT32...")
self._run_command(["sudo", "mkfs.vfat", "-F", "32", esp_partition_dev])
self._run_command(["sudo", "mkfs.vfat", "-F", "32", "-n", "EFI", esp_partition_dev])
self._report_progress(f"Formatting macOS Install partition ({macos_partition_dev}) as HFS+...")
self._run_command(["sudo", "mkfs.hfsplus", "-v", f"Install macOS {self.target_macos_version}", macos_partition_dev])
# --- Write EFI content ---
self._report_progress(f"Converting OpenCore QCOW2 ({self.opencore_qcow2_path}) to RAW ({self.opencore_raw_path})...")
self._run_command(["qemu-img", "convert", "-O", "raw", self.opencore_qcow2_path, self.opencore_raw_path])
# --- Prepare macOS Installer Content ---
basesystem_dmg_path = self._find_source_file(["BaseSystem.dmg", "InstallAssistant.pkg", "SharedSupport.dmg"], "BaseSystem.dmg or InstallAssistant.pkg or SharedSupport.dmg")
if not basesystem_dmg_path: raise RuntimeError("Essential macOS installer DMG/PKG not found in download path.")
map_output_efi = self._run_command(["sudo", "kpartx", "-av", self.opencore_raw_path], capture_output=True).stdout
mapped_efi_device = self._get_mapped_partition_device(map_output_efi, 1) # EFI is partition 1 in OpenCore.qcow2
if not mapped_efi_device: raise RuntimeError(f"Could not map EFI partition from {self.opencore_raw_path}.")
self._report_progress(f"Mapped OpenCore EFI partition device: {mapped_efi_device}")
if basesystem_dmg_path.endswith(".pkg") or "SharedSupport.dmg" in os.path.basename(basesystem_dmg_path) :
# If we found InstallAssistant.pkg or SharedSupport.dmg, we need to extract BaseSystem.hfs from it.
self._report_progress(f"Extracting bootable HFS+ image from {basesystem_dmg_path}...")
if not self._extract_hfs_from_dmg(basesystem_dmg_path, self.temp_basesystem_hfs_path):
raise RuntimeError("Failed to extract HFS+ image from installer assets.")
elif basesystem_dmg_path.endswith("BaseSystem.dmg"): # If it's BaseSystem.dmg directly
self._report_progress(f"Extracting bootable HFS+ image from {basesystem_dmg_path}...")
if not self._extract_hfs_from_dmg(basesystem_dmg_path, self.temp_basesystem_hfs_path):
raise RuntimeError("Failed to extract HFS+ image from BaseSystem.dmg.")
else:
raise RuntimeError(f"Unsupported file type for BaseSystem extraction: {basesystem_dmg_path}")
self._report_progress(f"Mounting {mapped_efi_device} to {self.mount_point_opencore_efi}...")
self._run_command(["sudo", "mount", "-o", "ro", mapped_efi_device, self.mount_point_opencore_efi])
if self.enhance_plist_enabled:
try:
from plist_modifier import enhance_config_plist # Import here
if enhance_config_plist:
config_plist_on_source_efi = os.path.join(self.mount_point_opencore_efi, "EFI", "OC", "config.plist")
if os.path.exists(config_plist_on_source_efi):
self._report_progress("Attempting to enhance config.plist...")
if enhance_config_plist(config_plist_on_source_efi, self.target_macos_version, self._report_progress):
self._report_progress("config.plist enhancement successful.")
else:
self._report_progress("config.plist enhancement failed or had issues. Continuing with original/partially modified plist.")
else:
self._report_progress(f"Warning: config.plist not found at {config_plist_on_source_efi}. Cannot enhance.")
else:
self._report_progress("Warning: enhance_config_plist function not available. Skipping enhancement.")
except ImportError:
self._report_progress("Warning: plist_modifier.py module not found. Skipping config.plist enhancement.")
except Exception as e:
self._report_progress(f"Error during config.plist enhancement attempt: {e}. Continuing with original plist.")
self._report_progress(f"Writing BaseSystem HFS+ image to {macos_partition_dev} using dd...")
self._run_command(["sudo", "dd", f"if={self.temp_basesystem_hfs_path}", f"of={macos_partition_dev}", "bs=4M", "status=progress", "oflag=sync"])
self._report_progress(f"Mounting USB ESP ({esp_partition_dev}) to {self.mount_point_usb_esp}...")
self._run_command(["sudo", "mount", esp_partition_dev, self.mount_point_usb_esp])
self._report_progress(f"Copying EFI files from {self.mount_point_opencore_efi}/EFI to {self.mount_point_usb_esp}/EFI...")
source_efi_content_path = os.path.join(self.mount_point_opencore_efi, "EFI")
if not os.path.isdir(source_efi_content_path): # Check if EFI folder is in root of partition
source_efi_content_path = self.mount_point_opencore_efi # Assume content is in root
target_efi_dir_on_usb = os.path.join(self.mount_point_usb_esp, "EFI")
self._run_command(["sudo", "mkdir", "-p", target_efi_dir_on_usb])
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{source_efi_content_path}/", f"{target_efi_dir_on_usb}/"]) # Copy content of EFI
self._unmount_path(self.mount_point_opencore_efi)
self._unmount_path(self.mount_point_usb_esp)
self._run_command(["sudo", "kpartx", "-d", self.opencore_raw_path])
# --- Write macOS main image (File-level copy) ---
self._report_progress(f"Formatting macOS partition ({macos_partition_dev}) on USB as HFS+...")
self._run_command(["sudo", "mkfs.hfsplus", "-v", "macOS_USB", macos_partition_dev])
self._report_progress(f"Converting macOS QCOW2 ({self.macos_qcow2_path}) to RAW ({self.macos_raw_path})...")
self._report_progress("This may take a very long time and consume significant disk space temporarily.")
self._run_command(["qemu-img", "convert", "-O", "raw", self.macos_qcow2_path, self.macos_raw_path])
self._report_progress(f"Mapping partitions from macOS RAW image ({self.macos_raw_path})...")
map_output_macos = self._run_command(["sudo", "kpartx", "-av", self.macos_raw_path], capture_output=True).stdout
# The mac_hdd_ng.img usually contains an APFS container.
# kpartx might show multiple APFS volumes within the container, or the container partition itself.
# We need to mount the APFS Data or System volume.
# Typically, the main usable partition is the largest one, or the second one (after a small EFI if present in this image).
mapped_macos_device = self._get_mapped_partition_device(map_output_macos, 2) # Try p2 (common for APFS container)
if not mapped_macos_device:
mapped_macos_device = self._get_mapped_partition_device(map_output_macos, 1) # Fallback to p1
if not mapped_macos_device:
raise RuntimeError(f"Could not identify and map main macOS data partition from {self.macos_raw_path}.")
self._report_progress(f"Mapped macOS source partition device: {mapped_macos_device}")
self._report_progress(f"Mounting source macOS partition ({mapped_macos_device}) to {self.mount_point_macos_source} using apfs-fuse...")
self._run_command(["sudo", "apfs-fuse", "-o", "ro,allow_other", mapped_macos_device, self.mount_point_macos_source])
self._report_progress(f"Mounting target USB macOS partition ({macos_partition_dev}) to {self.mount_point_usb_macos_target}...")
self._report_progress("Mounting macOS Install partition on USB...")
self._run_command(["sudo", "mount", macos_partition_dev, self.mount_point_usb_macos_target])
self._report_progress(f"Copying macOS system files from {self.mount_point_macos_source} to {self.mount_point_usb_macos_target} using rsync...")
self._report_progress("This will take a very long time. Please be patient.")
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{self.mount_point_macos_source}/", f"{self.mount_point_usb_macos_target}/"]) # Note trailing slashes
# Copy BaseSystem.dmg & .chunklist to /System/Library/CoreServices/
core_services_path_usb = os.path.join(self.mount_point_usb_macos_target, "System", "Library", "CoreServices")
self._run_command(["sudo", "mkdir", "-p", core_services_path_usb])
self._report_progress("USB writing process completed successfully.")
# Find original BaseSystem.dmg and chunklist in download path to copy them
actual_bs_dmg = self._find_source_file(["BaseSystem.dmg"], "original BaseSystem.dmg for copying")
if actual_bs_dmg:
self._report_progress(f"Copying {actual_bs_dmg} to {core_services_path_usb}/BaseSystem.dmg")
self._run_command(["sudo", "cp", actual_bs_dmg, os.path.join(core_services_path_usb, "BaseSystem.dmg")])
bs_chunklist = actual_bs_dmg.replace(".dmg", ".chunklist")
if os.path.exists(bs_chunklist):
self._report_progress(f"Copying {bs_chunklist} to {core_services_path_usb}/BaseSystem.chunklist")
self._run_command(["sudo", "cp", bs_chunklist, os.path.join(core_services_path_usb, "BaseSystem.chunklist")])
else: self._report_progress(f"Warning: BaseSystem.chunklist not found at {bs_chunklist}")
else: self._report_progress("Warning: Could not find original BaseSystem.dmg in download path to copy to CoreServices.")
# Copy InstallInfo.plist
install_info_src = self._find_source_file(["InstallInfo.plist"], "InstallInfo.plist")
if install_info_src:
self._report_progress(f"Copying {install_info_src} to {self.mount_point_usb_macos_target}/InstallInfo.plist")
self._run_command(["sudo", "cp", install_info_src, os.path.join(self.mount_point_usb_macos_target, "InstallInfo.plist")])
else: self._report_progress("Warning: InstallInfo.plist not found in download path.")
# Copy Packages (placeholder - needs more specific logic based on gibMacOS output structure)
self._report_progress("Placeholder: Copying macOS installation packages to USB (e.g., /System/Installation/Packages)...")
# Example: sudo rsync -a /path/to/downloaded_packages_dir/ /mnt/usb_macos_target/System/Installation/Packages/
# This needs to correctly identify the source Packages directory from gibMacOS output.
# For now, we'll skip actual copying of packages folder, as its location and content can vary.
# A proper implementation would require inspecting the gibMacOS download structure.
# Create the directory though:
self._run_command(["sudo", "mkdir", "-p", os.path.join(self.mount_point_usb_macos_target, "System", "Installation", "Packages")])
# --- OpenCore EFI Setup ---
self._report_progress("Setting up OpenCore EFI on ESP...")
if not os.path.isdir(OC_TEMPLATE_DIR):
self._report_progress(f"FATAL: OpenCore template directory not found at {OC_TEMPLATE_DIR}. Cannot proceed."); return False
self._report_progress(f"Copying OpenCore EFI template from {OC_TEMPLATE_DIR} to {self.temp_efi_build_dir}")
self._run_command(["sudo", "cp", "-a", f"{OC_TEMPLATE_DIR}/.", self.temp_efi_build_dir]) # Copy contents
temp_config_plist_path = os.path.join(self.temp_efi_build_dir, "EFI", "OC", "config.plist") # Assume template is named config.plist
if not os.path.exists(temp_config_plist_path) and os.path.exists(os.path.join(self.temp_efi_build_dir, "EFI", "OC", "config-template.plist")):
# If template is config-template.plist, rename it for enhancement
shutil.move(os.path.join(self.temp_efi_build_dir, "EFI", "OC", "config-template.plist"), temp_config_plist_path)
if self.enhance_plist_enabled and enhance_config_plist and os.path.exists(temp_config_plist_path):
self._report_progress("Attempting to enhance config.plist...")
if enhance_config_plist(temp_config_plist_path, self.target_macos_version, self._report_progress):
self._report_progress("config.plist enhancement successful.")
else: self._report_progress("config.plist enhancement failed or had issues. Continuing with (potentially original template) plist.")
self._run_command(["sudo", "mount", esp_partition_dev, self.mount_point_usb_esp])
self._report_progress(f"Copying final EFI folder to USB ESP ({self.mount_point_usb_esp})...")
self._run_command(["sudo", "rsync", "-avh", "--delete", f"{self.temp_efi_build_dir}/EFI/", f"{self.mount_point_usb_esp}/EFI/"])
self._report_progress("USB Installer creation process completed successfully.")
return True
except Exception as e:
self._report_progress(f"An error occurred during USB writing: {e}")
import traceback
self._report_progress(traceback.format_exc()) # Log full traceback for debugging
import traceback; self._report_progress(traceback.format_exc())
return False
finally:
self._cleanup_all_mounts_and_mappings()
self._cleanup_temp_files()
self._cleanup_temp_files_and_dirs()
if __name__ == '__main__':
if os.geteuid() != 0:
print("Please run this script as root (sudo) for testing.")
exit(1)
if os.geteuid() != 0: print("Please run this script as root (sudo) for testing."); exit(1)
print("USB Writer Linux Standalone Test - Installer Method")
print("USB Writer Linux Standalone Test - REFACTORED for File Copy")
mock_download_dir = f"temp_macos_download_test_{os.getpid()}"
os.makedirs(mock_download_dir, exist_ok=True)
# Create dummy qcow2 files for testing script structure
# These won't result in a bootable USB but allow testing the commands.
mock_opencore_path = "mock_opencore_usb_writer.qcow2"
mock_macos_path = "mock_macos_usb_writer.qcow2"
# Create a dummy placeholder for what gibMacOS might download
# This is highly simplified. A real gibMacOS download has a complex structure.
# For this test, we'll simulate having BaseSystem.dmg and InstallInfo.plist
mock_install_data_path = os.path.join(mock_download_dir, "macOS_Install_Data") # Simplified path
os.makedirs(mock_install_data_path, exist_ok=True)
dummy_bs_dmg_path = os.path.join(mock_install_data_path, "BaseSystem.dmg")
dummy_installinfo_path = os.path.join(mock_download_dir, "InstallInfo.plist") # Often at root of a specific product download
print(f"Creating mock image: {mock_opencore_path}")
subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_opencore_path, "384M"], check=True)
# TODO: A more complex mock would involve creating a partition table and filesystem inside this qcow2.
# For now, this is just to ensure the file exists for qemu-img convert.
# Actual EFI content would be needed for kpartx to map something meaningful.
if not os.path.exists(dummy_bs_dmg_path):
# Create a tiny dummy file for 7z to "extract" from.
# To make _extract_hfs_from_dmg work, it needs a real DMG with a HFS part.
# This is hard to mock simply. For now, it will likely fail extraction.
# A better mock would be a small, actual DMG with a tiny HFS file.
print(f"Creating dummy BaseSystem.dmg at {dummy_bs_dmg_path} (will likely fail HFS extraction in test without a real DMG structure)")
with open(dummy_bs_dmg_path, "wb") as f: f.write(os.urandom(1024*10)) # 10KB dummy
if not os.path.exists(dummy_installinfo_path):
with open(dummy_installinfo_path, "w") as f: f.write("<plist><dict><key>DummyInstallInfo</key><true/></dict></plist>")
print(f"Creating mock image: {mock_macos_path}")
subprocess.run(["qemu-img", "create", "-f", "qcow2", mock_macos_path, "1G"], check=True) # Small for quick test
# TODO: Similar to above, a real test needs a qcow2 with a mountable filesystem.
# Create dummy EFI template
if not os.path.exists(OC_TEMPLATE_DIR): os.makedirs(OC_TEMPLATE_DIR)
if not os.path.exists(os.path.join(OC_TEMPLATE_DIR, "EFI", "OC")): os.makedirs(os.path.join(OC_TEMPLATE_DIR, "EFI", "OC"))
dummy_config_template_path = os.path.join(OC_TEMPLATE_DIR, "EFI", "OC", "config.plist") # Name it config.plist directly
if not os.path.exists(dummy_config_template_path):
with open(dummy_config_template_path, "w") as f: f.write("<plist><dict><key>TestTemplate</key><true/></dict></plist>")
print("\nAvailable block devices (be careful!):")
subprocess.run(["lsblk", "-d", "-o", "NAME,SIZE,MODEL"], check=True)
test_device = input("\nEnter target device (e.g., /dev/sdX). THIS DEVICE WILL BE WIPED: ")
if not test_device or not (test_device.startswith("/dev/") or test_device.startswith("/dev/mapper/")): # Allow /dev/mapper for testing with loop devices
if not test_device or not test_device.startswith("/dev/"):
print("Invalid device. Exiting.")
# Clean up mock files
if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path)
if os.path.exists(mock_macos_path): os.remove(mock_macos_path)
exit(1)
confirm = input(f"Are you absolutely sure you want to wipe {test_device} and write mock images? (yes/NO): ")
success = False
if confirm.lower() == 'yes':
writer = USBWriterLinux(test_device, mock_opencore_path, mock_macos_path, print)
success = writer.format_and_write()
else:
print("Test cancelled by user.")
confirm = input(f"Are you absolutely sure you want to wipe {test_device} and create installer? (yes/NO): ")
success = False
if confirm.lower() == 'yes':
writer = USBWriterLinux(
device=test_device,
macos_download_path=mock_download_dir,
progress_callback=print,
enhance_plist_enabled=True,
target_macos_version="Sonoma"
)
success = writer.format_and_write()
else: print("Test cancelled by user.")
print(f"Test finished. Success: {success}")
print(f"Test finished. Success: {success}")
# Clean up mock files
if os.path.exists(mock_opencore_path): os.remove(mock_opencore_path)
if os.path.exists(mock_macos_path): os.remove(mock_macos_path)
print("Mock files cleaned up.")
# Cleanup
if os.path.exists(mock_download_dir): shutil.rmtree(mock_download_dir, ignore_errors=True)
# if os.path.exists(OC_TEMPLATE_DIR) and "EFI_template_installer" in OC_TEMPLATE_DIR :
# shutil.rmtree(OC_TEMPLATE_DIR, ignore_errors=True) # Avoid deleting if it's a real shared template
print("Mock download dir cleaned up.")
print(f"Note: {OC_TEMPLATE_DIR} and its contents might persist if not created by this test run specifically.")