from __future__ import annotations from mainui.utils import parse_host_port, url_from_host_port class ServicesTabMixin: def _apply_service_url( self, *, host: str, port: int, client_attr: str, status_widget: str, log_fn, log_prefix: str, ping_fn, ) -> None: url = url_from_host_port(host, port).rstrip("/") client = getattr(self, client_attr, None) if client is None: return client.base_url = url if self.services is not None and hasattr(self.services, status_widget): getattr(self.services, status_widget).setText(f"Status: {url}") log_fn(f"{log_prefix}{url}") ping_fn() def _init_services_tab(self) -> None: if self.services is None: self.log("[SERVICES] No Services tab. Using config defaults.") self._ping_backend_service() return mri_host, mri_port = parse_host_port(self.hardware_api.base_url) i_host, i_port = parse_host_port(self.interp_api.base_url) a_host, a_port = parse_host_port(self.analysis.base_url) self.services.leMRIHost.setText(mri_host) self.services.sbMRIPort.setValue(mri_port) if hasattr(self.services, "leIHost"): self.services.leIHost.setText(i_host) if hasattr(self.services, "sbIPort"): self.services.sbIPort.setValue(i_port) self.services.leAHost.setText(a_host) self.services.sbAPort.setValue(a_port) self.services.btnApplyMRI.clicked.connect(self.apply_mri_service) if hasattr(self.services, "btnApplyInterp"): self.services.btnApplyInterp.clicked.connect(self.apply_interp_service) self.services.btnApplyAnalysis.clicked.connect(self.apply_analysis_service) self.apply_mri_service() self.apply_interp_service() self.apply_analysis_service() def apply_mri_service(self) -> None: if self.services is None: return self._apply_service_url( host=self.services.leMRIHost.text(), port=int(self.services.sbMRIPort.value()), client_attr="hardware_api", status_widget="lblMRIStatus", log_fn=self.log, log_prefix="[SERVICES] srv-hardware set: ", ping_fn=self._ping_backend_service, ) def apply_interp_service(self) -> None: if self.services is None: return if not (hasattr(self.services, "leIHost") and hasattr(self.services, "sbIPort")): return self._apply_service_url( host=self.services.leIHost.text(), port=int(self.services.sbIPort.value()), client_attr="interp_api", status_widget="lblIStatus", log_fn=self.log, log_prefix="[SERVICES] srv-interp set: ", ping_fn=self._ping_interp_service, ) def apply_analysis_service(self) -> None: if self.services is None: return self._apply_service_url( host=self.services.leAHost.text(), port=int(self.services.sbAPort.value()), client_attr="analysis", status_widget="lblAStatus", log_fn=self.spec_log, log_prefix="[SERVICES] Analysis backend set: ", ping_fn=self._ping_analysis_service, )