diff --git a/Orange/projection/manifold.py b/Orange/projection/manifold.py index f8ce113e117..b5d906ac3a1 100644 --- a/Orange/projection/manifold.py +++ b/Orange/projection/manifold.py @@ -496,13 +496,7 @@ def fit(self, X: np.ndarray, Y: np.ndarray = None) -> openTSNE.TSNEEmbedding: return embedding - def __call__(self, data: Table) -> TSNEModel: - # Preprocess the data - convert discrete to continuous - data = self.preprocess(data) - - # Run tSNE optimization - embedding = self.fit(data.X, data.Y) - + def convert_embedding_to_model(self, data, embedding): # The results should be accessible in an Orange table, which doesn't # need the full embedding attributes and is cast into a regular array n = self.n_components @@ -518,6 +512,17 @@ def __call__(self, data: Table) -> TSNEModel: return model + def __call__(self, data: Table) -> TSNEModel: + # Preprocess the data - convert discrete to continuous + data = self.preprocess(data) + + # Run tSNE optimization + embedding = self.fit(data.X, data.Y) + + # Convert the t-SNE embedding object to a TSNEModel and prepare the + # embedding table with t-SNE meta variables + return self.convert_embedding_to_model(data, embedding) + @staticmethod def default_initialization(data, n_components=2, random_state=None): return openTSNE.initialization.pca( diff --git a/Orange/widgets/tests/base.py b/Orange/widgets/tests/base.py index dd39eef9ec0..5da8560d153 100644 --- a/Orange/widgets/tests/base.py +++ b/Orange/widgets/tests/base.py @@ -857,10 +857,15 @@ def _compare_selected_annotated_domains(self, selected, annotated): annotated_vars = annotated.domain.variables self.assertLessEqual(set(selected_vars), set(annotated_vars)) - def test_setup_graph(self): + def test_setup_graph(self, timeout=DEFAULT_TIMEOUT): """Plot should exist after data has been sent in order to be properly set/updated""" self.send_signal(self.widget.Inputs.data, self.data) + + if self.widget.isBlocking(): + spy = QSignalSpy(self.widget.blockingStateChanged) + self.assertTrue(spy.wait(timeout)) + self.assertIsNotNone(self.widget.graph.scatterplot_item) def test_default_attrs(self, timeout=DEFAULT_TIMEOUT): @@ -934,16 +939,21 @@ def test_plot_once(self, timeout=DEFAULT_TIMEOUT): table = Table("heart_disease") self.widget.setup_plot = Mock() self.widget.commit = Mock() + self.send_signal(self.widget.Inputs.data, table) + if self.widget.isBlocking(): + spy = QSignalSpy(self.widget.blockingStateChanged) + self.assertTrue(spy.wait(timeout)) + self.widget.setup_plot.assert_called_once() self.widget.commit.assert_called_once() + self.widget.commit.reset_mock() + self.send_signal(self.widget.Inputs.data_subset, table[::10]) if self.widget.isBlocking(): spy = QSignalSpy(self.widget.blockingStateChanged) self.assertTrue(spy.wait(timeout)) - self.widget.commit.reset_mock() - self.send_signal(self.widget.Inputs.data_subset, table[::10]) self.widget.setup_plot.assert_called_once() self.widget.commit.assert_called_once() @@ -985,25 +995,38 @@ def test_invalidated_embedding(self, timeout=DEFAULT_TIMEOUT): self.widget.graph.update_coordinates = Mock() self.widget.graph.update_point_props = Mock() self.send_signal(self.widget.Inputs.data, self.data) - self.widget.graph.update_coordinates.assert_called_once() - self.widget.graph.update_point_props.assert_called_once() - if self.widget.isBlocking(): spy = QSignalSpy(self.widget.blockingStateChanged) self.assertTrue(spy.wait(timeout)) + self.widget.graph.update_coordinates.assert_called() + self.widget.graph.update_point_props.assert_called() + self.widget.graph.update_coordinates.reset_mock() self.widget.graph.update_point_props.reset_mock() self.send_signal(self.widget.Inputs.data, self.data) + if self.widget.isBlocking(): + spy = QSignalSpy(self.widget.blockingStateChanged) + self.assertTrue(spy.wait(timeout)) + self.widget.graph.update_coordinates.assert_not_called() self.widget.graph.update_point_props.assert_called_once() - def test_saved_selection(self): + def test_saved_selection(self, timeout=DEFAULT_TIMEOUT): self.send_signal(self.widget.Inputs.data, self.data) + if self.widget.isBlocking(): + spy = QSignalSpy(self.widget.blockingStateChanged) + self.assertTrue(spy.wait(timeout)) + self.widget.graph.select_by_indices(list(range(0, len(self.data), 10))) settings = self.widget.settingsHandler.pack_data(self.widget) w = self.create_widget(self.widget.__class__, stored_settings=settings) + self.send_signal(self.widget.Inputs.data, self.data, widget=w) + if w.isBlocking(): + spy = QSignalSpy(w.blockingStateChanged) + self.assertTrue(spy.wait(timeout)) + self.assertEqual(np.sum(w.graph.selection), 15) np.testing.assert_equal(self.widget.graph.selection, w.graph.selection) diff --git a/Orange/widgets/unsupervised/owtsne.py b/Orange/widgets/unsupervised/owtsne.py index 50c9e8e621a..db185768e93 100644 --- a/Orange/widgets/unsupervised/owtsne.py +++ b/Orange/widgets/unsupervised/owtsne.py @@ -1,60 +1,317 @@ import warnings +from concurrent import futures +from functools import partial +from types import SimpleNamespace as namespace +from typing import Optional # pylint: disable=unused-import import numpy as np -from AnyQt.QtCore import Qt, QTimer +from AnyQt.QtCore import Qt, pyqtSlot as Slot, pyqtSignal as Signal, QObject from AnyQt.QtWidgets import QFormLayout from Orange.data import Table, Domain from Orange.preprocess import preprocess -from Orange.projection import PCA, TSNE -from Orange.projection.manifold import TSNEModel +from Orange.projection import PCA +from Orange.projection import manifold from Orange.widgets import gui from Orange.widgets.settings import Setting, SettingProvider +from Orange.widgets.utils.concurrent import FutureWatcher from Orange.widgets.utils.widgetpreview import WidgetPreview from Orange.widgets.visualize.owscatterplotgraph import OWScatterPlotBase from Orange.widgets.visualize.utils.widget import OWDataProjectionWidget from Orange.widgets.widget import Msg, Output +_STEP_SIZE = 25 +_MAX_PCA_COMPONENTS = 50 +_DEFAULT_PCA_COMPONENTS = 20 + + +class TaskState(QObject): + status_changed = Signal(str) + _p_status_changed = Signal(str) + + progress_changed = Signal(float) + _p_progress_changed = Signal(float) + + partial_result_ready = Signal(object) + _p_partial_result_ready = Signal(object) + + def __init__(self, *args): + super().__init__(*args) + self.__future = None + self.watcher = FutureWatcher() + self.__interuption_requested = False + self.__progress = 0 + # Helpers to route the signal emits via a this object's queue. + # This ensures 'atomic' disconnect from signals for targets/slots + # in the same thread. Requires that the event loop is running in this + # object's thread. + self._p_status_changed.connect( + self.status_changed, Qt.QueuedConnection) + self._p_progress_changed.connect( + self.progress_changed, Qt.QueuedConnection) + self._p_partial_result_ready.connect( + self.partial_result_ready, Qt.QueuedConnection) + + @property + def future(self): + # type: () -> Future + return self.__future + + def set_status(self, text): + self._p_status_changed.emit(text) + + def set_progress_value(self, value): + if round(value, 1) > round(self.__progress, 1): + # Only emit progress when it has changed sufficiently + self._p_progress_changed.emit(value) + self.__progress = value + + def set_partial_results(self, value): + self._p_partial_result_ready.emit(value) + + def is_interuption_requested(self): + return self.__interuption_requested + + def start(self, executor, func=None): + # type: (futures.Executor, Callable[[], Any]) -> Future + assert self.future is None + assert not self.__interuption_requested + self.__future = executor.submit(func) + self.watcher.setFuture(self.future) + return self.future + + def cancel(self): + assert not self.__interuption_requested + self.__interuption_requested = True + if self.future is not None: + rval = self.future.cancel() + else: + # not even scheduled yet + rval = True + return rval + + +class InteruptRequested(BaseException): + pass + + +class Task(namespace): + """Completely determines the t-SNE task spec and intermediate results.""" + data = None # type: Optional[Table] + normalize = None # type: Optional[bool] + pca_components = None # type: Optional[int] + pca_projection = None # type: Optional[Table] + perplexity = None # type: Optional[float] + multiscale = None # type: Optional[bool] + exaggeration = None # type: Optional[float] + initialization = None # type: Optional[np.ndarray] + affinities = None # type: Optional[openTSNE.affinity.Affinities] + tsne_embedding = None # type: Optional[manifold.TSNEModel] + iterations_done = 0 # type: int + + # These attributes need not be set by the widget + tsne = None # type: Optional[manifold.TSNE] + + +def pca_preprocessing(data, n_components, normalize): + projector = PCA(n_components=n_components, random_state=0) + if normalize: + projector.preprocessors += (preprocess.Normalize(),) + + model = projector(data) + return model(data) + + +def prepare_tsne_obj(data, perplexity, multiscale, exaggeration): + # type: (Table, float, bool, float) -> manifold.TSNE + """Automatically determine the best parameters for the given data set.""" + # Compute perplexity settings for multiscale + n_samples = data.X.shape[0] + if multiscale: + perplexity = min((n_samples - 1) / 3, 50), min((n_samples - 1) / 3, 500) + else: + perplexity = perplexity + + # Determine whether to use settings for large data sets + if n_samples > 10_000: + neighbor_method, gradient_method = "approx", "fft" + else: + neighbor_method, gradient_method = "exact", "bh" + + # Larger data sets need a larger number of iterations + if n_samples > 100_000: + early_exagg_iter, n_iter = 500, 1000 + else: + early_exagg_iter, n_iter = 250, 750 + + return manifold.TSNE( + n_components=2, + perplexity=perplexity, + multiscale=multiscale, + early_exaggeration_iter=early_exagg_iter, + n_iter=n_iter, + exaggeration=exaggeration, + neighbors=neighbor_method, + negative_gradient_method=gradient_method, + theta=0.8, + random_state=0, + ) + class TSNERunner: - def __init__(self, tsne: TSNEModel, step_size=50, exaggeration=1): - self.embedding = tsne - self.iterations_done = 0 - self.step_size = step_size - self.exaggeration = exaggeration + @staticmethod + def compute_pca(task, state, **_): + # Perform PCA preprocessing + state.set_status("Computing PCA...") + pca_projection = pca_preprocessing( + task.data, task.pca_components, task.normalize + ) + # Apply t-SNE's preprocessors to the data + task.pca_projection = task.tsne.preprocess(pca_projection) + state.set_partial_results(("pca_projection", task)) + + @staticmethod + def compute_initialization(task, state, **_): + # Prepare initial positions for t-SNE + state.set_status("Preparing initialization...") + task.initialization = task.tsne.compute_initialization(task.pca_projection.X) + state.set_partial_results(("initialization", task)) + + @staticmethod + def compute_affinities(task, state, **_): + # Compute affinities + state.set_status("Finding nearest neighbors...") + task.affinities = task.tsne.compute_affinities(task.pca_projection.X) + state.set_partial_results(("affinities", task)) + + @staticmethod + def compute_tsne(task, state, progress_callback=None): + tsne = task.tsne + + state.set_status("Running optimization...") + + # If this the first time we're computing t-SNE (otherwise we may just + # be resuming optimization), we have to assemble the tsne object + if task.tsne_embedding is None: + # Assemble a t-SNE embedding object and convert it to a TSNEModel + task.tsne_embedding = tsne.prepare_embedding( + task.affinities, task.initialization + ) + task.tsne_embedding = tsne.convert_embedding_to_model( + task.pca_projection, task.tsne_embedding + ) + state.set_partial_results(("tsne_embedding", task)) + + if state.is_interuption_requested(): + raise InteruptRequested() + + total_iterations_needed = tsne.early_exaggeration_iter + tsne.n_iter + # If optimization has already been partially run, then the number of + # iterations left to do will be less than the total number of + # iterations. This is useful to have so the progress bar accurately + # reflects the amount of work left to be done when resuming optimization. + initial_iterations_done = task.iterations_done + actual_iterations_needed = total_iterations_needed - initial_iterations_done + + # Run early exaggeration phase + while task.iterations_done < tsne.early_exaggeration_iter: + step_size = min( + _STEP_SIZE, tsne.early_exaggeration_iter - task.iterations_done + ) + task.tsne_embedding.optimize( + step_size, + exaggeration=tsne.early_exaggeration, + momentum=0.5, + inplace=True, + ) + task.iterations_done += step_size + state.set_partial_results(("tsne_embedding", task)) + if progress_callback is not None: + progress_callback((task.iterations_done - initial_iterations_done) + / actual_iterations_needed) + + if state.is_interuption_requested(): + raise InteruptRequested() + + # Run regular optimization phase + while task.iterations_done < total_iterations_needed: + step_size = min(_STEP_SIZE, total_iterations_needed - task.iterations_done) + task.tsne_embedding.optimize( + step_size, + exaggeration=tsne.exaggeration, + momentum=0.8, + inplace=True, + ) + task.iterations_done += step_size + state.set_partial_results(("tsne_embedding", task)) + if progress_callback is not None: + progress_callback((task.iterations_done - initial_iterations_done) + / actual_iterations_needed) - # Larger data sets need a larger number of iterations - if self.n_samples > 100_000: - self.early_exagg_iter, self.n_iter = 500, 1000 - else: - self.early_exagg_iter, self.n_iter = 250, 750 + if state.is_interuption_requested(): + raise InteruptRequested() - @property - def n_samples(self): - return self.embedding.embedding_.shape[0] + @classmethod + def run(cls, task, state): + # type: (Task, TaskState) -> Task - def run_optimization(self): - total_iterations = self.early_exagg_iter + self.n_iter + # Assign weights to each job indicating how much time will be spent on each + weights = {"pca": 1, "init": 1, "aff": 23, "tsne": 75} - # Default values of early exaggeration phase - exaggeration, momentum = 12, 0.5 + # Prepare the tsne object and add it to the spec + task.tsne = prepare_tsne_obj( + task.data, task.perplexity, task.multiscale, task.exaggeration + ) - current_iter = self.iterations_done - while not current_iter >= total_iterations: - # Switch to normal regime if early exaggeration phase is over - if current_iter >= self.early_exagg_iter: - exaggeration, momentum = self.exaggeration, 0.8 + # Only execute jobs if needed + job_queue = [] + if task.pca_projection is None: + job_queue.append( + (partial(cls.compute_pca, task, state), weights["pca"]) + ) + if task.initialization is None: + job_queue.append( + (partial(cls.compute_initialization, task, state), weights["init"]) + ) + if task.affinities is None: + job_queue.append( + (partial(cls.compute_affinities, task, state), weights["aff"]) + ) - # Resume optimization for some number of steps - self.embedding.optimize( - self.step_size, inplace=True, exaggeration=exaggeration, - momentum=momentum, + total_iterations = task.tsne.early_exaggeration_iter + task.tsne.n_iter + if task.tsne_embedding is None or task.iterations_done < total_iterations: + job_queue.append( + (partial(cls.compute_tsne, task, state), weights["tsne"]) ) - current_iter += self.step_size + # Figure out the total weight of the jobs + total_weight = sum(j[1] for j in job_queue) + + try: + progress_done = 0 + for job, job_weight in job_queue: + job_weight /= total_weight + + def _progress_callback(val): + state.set_progress_value( + 100 * progress_done + 100 * val * job_weight + ) + + if state.is_interuption_requested(): + raise InteruptRequested() + + # Execute the job + job(progress_callback=_progress_callback) + # Update the progress bar according to the weights assigned to + # each job + progress_done += job_weight + state.set_progress_value(100 * progress_done) + + except InteruptRequested: + pass - yield self.embedding, current_iter / total_iterations + return task class OWtSNEGraph(OWScatterPlotBase): @@ -71,52 +328,42 @@ class OWtSNE(OWDataProjectionWidget): priority = 920 keywords = ["tsne"] - settings_version = 3 - max_iter = Setting(300) + settings_version = 4 perplexity = Setting(30) multiscale = Setting(False) exaggeration = Setting(1) - pca_components = Setting(20) + pca_components = Setting(_DEFAULT_PCA_COMPONENTS) normalize = Setting(True) GRAPH_CLASS = OWtSNEGraph graph = SettingProvider(OWtSNEGraph) embedding_variables_names = ("t-SNE-x", "t-SNE-y") - #: Runtime state - Running, Finished, Waiting, Paused = 1, 2, 3, 4 - class Outputs(OWDataProjectionWidget.Outputs): preprocessor = Output("Preprocessor", preprocess.Preprocess) + class Information(OWDataProjectionWidget.Information): + modified = Msg("The parameter settings have been changed. Press " + "\"Start\" to rerun with the new settings.") + class Error(OWDataProjectionWidget.Error): not_enough_rows = Msg("Input data needs at least 2 rows") constant_data = Msg("Input data is constant") no_attributes = Msg("Data has no attributes") out_of_memory = Msg("Out of memory") - optimization_error = Msg("Error during optimization\n{}") no_valid_data = Msg("No projection due to no valid data") def __init__(self): super().__init__() - self.pca_data = None - self.projection = None - self.tsne_runner = None - self.tsne_iterator = None - self.__update_loop = None - # timer for scheduling updates - self.__timer = QTimer(self, singleShot=True, interval=1, - timeout=self.__next_step) - self.__state = OWtSNE.Waiting - self.__in_next_step = False - self.__draw_similar_pairs = False - - def reset_needs_to_draw(): - self.needs_to_draw = True - - self.needs_to_draw = True - self.__timer_draw = QTimer(self, interval=2000, - timeout=reset_needs_to_draw) + self.pca_projection = None # type: Optional[Table] + self.initialization = None # type: Optional[np.ndarray] + self.affinities = None # type: Optional[openTSNE.affinity.Affinities] + self.tsne_embedding = None # type: Optional[manifold.TSNEModel] + self.iterations_done = 0 # type: int + + self.__executor = futures.ThreadPoolExecutor(max_workers=1) + self.__task = None # type: Optional[TaskState] + self.__invalidated = False def _add_controls(self): self._add_controls_start_box() @@ -133,26 +380,26 @@ def _add_controls_start_box(self): self.perplexity_spin = gui.spin( box, self, "perplexity", 1, 500, step=1, alignment=Qt.AlignRight, - callback=self._params_changed + callback=self._invalidate_affinities, ) + self.controls.perplexity.setDisabled(self.multiscale) form.addRow("Perplexity:", self.perplexity_spin) - self.perplexity_spin.setEnabled(not self.multiscale) form.addRow(gui.checkBox( box, self, "multiscale", label="Preserve global structure", - callback=self._multiscale_changed + callback=self._multiscale_changed, )) sbe = gui.hBox(self.controlArea, False, addToLayout=False) gui.hSlider( sbe, self, "exaggeration", minValue=1, maxValue=4, step=1, - callback=self._params_changed + callback=self._invalidate_tsne_embedding, ) form.addRow("Exaggeration:", sbe) sbp = gui.hBox(self.controlArea, False, addToLayout=False) gui.hSlider( - sbp, self, "pca_components", minValue=2, maxValue=50, step=1, - callback=self._invalidate_pca_projection + sbp, self, "pca_components", minValue=2, maxValue=_MAX_PCA_COMPONENTS, + step=1, callback=self._invalidate_pca_projection, ) form.addRow("PCA components:", sbp) @@ -165,19 +412,93 @@ def _add_controls_start_box(self): box.layout().addLayout(form) gui.separator(box, 10) - self.runbutton = gui.button(box, self, "Run", callback=self._toggle_run) + self.run_button = gui.button(box, self, "Start", callback=self._toggle_run) + + def _multiscale_changed(self): + self.controls.perplexity.setDisabled(self.multiscale) + self._invalidate_affinities() def _invalidate_pca_projection(self): - self.pca_data = None - self._params_changed() + self.pca_projection = None + self.initialization = None + self._invalidate_affinities() - def _params_changed(self): - self.__state = OWtSNE.Finished - self.__set_update_loop(None) + def _invalidate_affinities(self): + self.affinities = None + self._invalidate_tsne_embedding() - def _multiscale_changed(self): - self.perplexity_spin.setEnabled(not self.multiscale) - self._params_changed() + def _invalidate_tsne_embedding(self): + self.iterations_done = 0 + self.tsne_embedding = None + self._invalidate_output() + self._set_modified(True) + + def _invalidate_output(self): + self.__invalidated = True + if self.__task is not None: + self.__cancel_task(wait=False) + self.__set_state_ready() + self.run_button.setText("Start") + + def _set_modified(self, state): + """Mark the widget (GUI) as containing modified state.""" + if self.data is None: + # Does not apply when we have no data + state = False + self.Information.modified(shown=state) + + def cancel(self): + """Cancel any running jobs.""" + self.__cancel_task(wait=False) + self.__set_state_ready() + + @Slot(str) + def setStatusMessage(self, text): + super().setStatusMessage(text) + + @Slot(float) + def progressBarSet(self, value, *a, **kw): + super().progressBarSet(value, *a, **kw) + + def __set_state_ready(self): + self.progressBarFinished() + self.setBlocking(False) + self.setStatusMessage("") + + def __set_state_busy(self): + self.progressBarInit() + self.setBlocking(True) + + def __start_task(self, task, state): + # type: (Callable[[], Any], TaskState) -> None + assert self.__task is None + state.status_changed.connect(self.setStatusMessage) + state.progress_changed.connect(self.progressBarSet) + state.partial_result_ready.connect(self.__set_partial_results) + state.watcher.done.connect(self.__on_done) + state.start(self.__executor, task) + state.setParent(self) + self.__task = state + + def __cancel_task(self, wait=True): + """Cancel and dispose of the current task.""" + if self.__task is not None: + state, self.__task = self.__task, None + state.cancel() + state.partial_result_ready.disconnect(self.__set_partial_results) + state.status_changed.disconnect(self.setStatusMessage) + state.progress_changed.disconnect(self.progressBarSet) + state.watcher.done.disconnect(self.__on_done) + + if state.future is not None: + if wait: + futures.wait([state.future]) + state.deleteLater() + else: + w = FutureWatcher(state.future, parent=state) + w.done.connect(state.deleteLater) + else: + state.deleteLater() def check_data(self): def error(err): @@ -185,59 +506,62 @@ def error(err): self.data = None super().check_data() - if self.data is not None: - if len(self.data) < 2: - error(self.Error.not_enough_rows) - elif not self.data.domain.attributes: - error(self.Error.no_attributes) - elif not self.data.is_sparse(): - if np.all(~np.isfinite(self.data.X)): - error(self.Error.no_valid_data) - else: - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", "Degrees of freedom .*", RuntimeWarning) - if np.nan_to_num(np.nanstd(self.data.X, axis=0)).sum() \ - == 0: - error(self.Error.constant_data) + if self.data is None: + return + + if len(self.data) < 2: + error(self.Error.not_enough_rows) + + elif not self.data.domain.attributes: + error(self.Error.no_attributes) + + elif not self.data.is_sparse(): + if np.all(~np.isfinite(self.data.X)): + error(self.Error.no_valid_data) + else: + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", "Degrees of freedom .*", RuntimeWarning) + if np.nan_to_num(np.nanstd(self.data.X, axis=0)).sum() \ + == 0: + error(self.Error.constant_data) def get_embedding(self): - if self.data is None: + if self.tsne_embedding is None: self.valid_data = None return None - elif self.projection is None: - embedding = np.random.normal(size=(len(self.data), 2)) - else: - embedding = self.projection.embedding.X + + embedding = self.tsne_embedding.embedding.X self.valid_data = np.ones(len(embedding), dtype=bool) return embedding def _toggle_run(self): - if self.__state == OWtSNE.Running: - self.stop() + # If no data, there's nothing to do + if self.data is None: + return + + # Pause task + if self.__task is not None: + self.cancel() + self.run_button.setText("Resume") self.commit() - elif self.__state == OWtSNE.Paused: - self.resume() + # Resume task else: self.start() - - def start(self): - if not self.data or self.__state == OWtSNE.Running: - self.graph.update_coordinates() - elif self.__state in (OWtSNE.Finished, OWtSNE.Waiting): - self.__start() - - def stop(self): - self.__state = OWtSNE.Paused - self.__set_update_loop(None) - - def resume(self): - self.__set_update_loop(self.tsne_iterator) + self.run_button.setText("Stop") def set_data(self, data: Table): super().set_data(data) if data is not None: + n_attrs = len(data.domain.attributes) + self.controls.pca_components.setMaximum( + min(_MAX_PCA_COMPONENTS, n_attrs) + ) + self.controls.pca_components.setValue( + min(_DEFAULT_PCA_COMPONENTS, n_attrs) + ) + # PCA doesn't support normalization on sparse data, as this would # require centering and normalizing the matrix self.normalize_cbx.setDisabled(data.is_sparse()) @@ -249,164 +573,192 @@ def set_data(self, data: Table): else: self.normalize_cbx.setToolTip("") - def pca_preprocessing(self): - """Perform PCA preprocessing before passing off the data to t-SNE.""" - if self.pca_data is not None: - return - - projector = PCA(n_components=self.pca_components, random_state=0) - # If the normalization box is ticked, we'll add the `Normalize` - # preprocessor to PCA - if self.normalize: - projector.preprocessors += (preprocess.Normalize(),) - - model = projector(self.data) - self.pca_data = model(self.data) - - def __start(self): - self.pca_preprocessing() + self.start() - self.needs_to_draw = True + def start(self): + self.__invalidated = False + self._set_modified(False) - # We call PCA through fastTSNE because it involves scaling. Instead of - # worrying about this ourselves, we'll let the library worry for us. - initialization = TSNE.default_initialization( - self.pca_data.X, n_components=2, random_state=0) + # When the data is invalid, it is set to `None` and an error is set, + # therefore it would be erroneous to clear the error here + if self.data is not None: + self.Error.clear() + self.run_button.setText("Stop") - # Compute perplexity settings for multiscale - n_samples = self.pca_data.X.shape[0] - if self.multiscale: - perplexity = min((n_samples - 1) / 3, 50), min((n_samples - 1) / 3, 500) - else: - perplexity = self.perplexity + # Cancel current running task + self.__cancel_task(wait=False) - # Determine whether to use settings for large data sets - if n_samples > 10_000: - neighbor_method, gradient_method = "approx", "fft" - else: - neighbor_method, gradient_method = "exact", "bh" - - # Set number of iterations to 0 - these will be run subsequently - self.projection = TSNE( - n_components=2, perplexity=perplexity, multiscale=self.multiscale, - early_exaggeration_iter=0, n_iter=0, initialization=initialization, - exaggeration=self.exaggeration, neighbors=neighbor_method, - negative_gradient_method=gradient_method, random_state=0, - theta=0.8, - )(self.pca_data) - - self.tsne_runner = TSNERunner( - self.projection, step_size=20, exaggeration=self.exaggeration - ) - self.tsne_iterator = self.tsne_runner.run_optimization() - self.__set_update_loop(self.tsne_iterator) - self.progressBarInit(processEvents=None) - - def __set_update_loop(self, loop): - if self.__update_loop is not None: - if self.__state in (OWtSNE.Finished, OWtSNE.Waiting): - self.__update_loop.close() - self.__update_loop = None - self.progressBarFinished(processEvents=None) - - self.__update_loop = loop - - if loop is not None: - self.setBlocking(True) - self.progressBarInit(processEvents=None) - self.setStatusMessage("Running") - self.runbutton.setText("Stop") - self.__state = OWtSNE.Running - self.__timer.start() - self.__timer_draw.start() - else: - self.setBlocking(False) - self.setStatusMessage("") - if self.__state in (OWtSNE.Finished, OWtSNE.Waiting): - self.runbutton.setText("Start") - if self.__state == OWtSNE.Paused: - self.runbutton.setText("Resume") - self.__timer.stop() - self.__timer_draw.stop() - - def __next_step(self): - if self.__update_loop is None: + if self.data is None: + self.__set_state_ready() return - assert not self.__in_next_step - self.__in_next_step = True - - loop = self.__update_loop - self.Error.out_of_memory.clear() - self.Error.optimization_error.clear() - try: - projection, progress = next(self.__update_loop) - assert self.__update_loop is loop - except StopIteration: - self.__state = OWtSNE.Finished - self.__set_update_loop(None) - self.unconditional_commit() - except MemoryError: - self.Error.out_of_memory() - self.__state = OWtSNE.Finished - self.__set_update_loop(None) - except Exception as exc: - self.Error.optimization_error(str(exc)) - self.__state = OWtSNE.Finished - self.__set_update_loop(None) - else: - self.progressBarSet(100.0 * progress, processEvents=None) - self.projection = projection - if progress == 1 or self.needs_to_draw: + # Prepare the task spec with the partial results + task = Task( + data=self.data, + normalize=self.normalize, + pca_components=self.pca_components, + pca_projection=self.pca_projection, + perplexity=self.perplexity, + multiscale=self.multiscale, + exaggeration=self.exaggeration, + initialization=self.initialization, + affinities=self.affinities, + tsne_embedding=self.tsne_embedding, + iterations_done=self.iterations_done, + ) + state = TaskState(self) + + task = partial(TSNERunner.run, task, state) + self.__set_state_busy() + self.__start_task(task, state) + + def __ensure_task_same_for_pca(self, task: Task): + assert self.data is not None + assert task.data is self.data + assert task.normalize == self.normalize + assert task.pca_components == self.pca_components + assert isinstance(task.pca_projection, Table) and \ + len(task.pca_projection) == len(self.data) + + def __ensure_task_same_for_initialization(self, task: Task): + assert isinstance(task.initialization, np.ndarray) and \ + len(task.initialization) == len(self.data) + + def __ensure_task_same_for_affinities(self, task: Task): + assert task.perplexity == self.perplexity + assert task.multiscale == self.multiscale + + def __ensure_task_same_for_embedding(self, task: Task): + assert task.exaggeration == self.exaggeration + assert isinstance(task.tsne_embedding, manifold.TSNEModel) and \ + len(task.tsne_embedding.embedding) == len(self.data) + + @Slot(object) + def __set_partial_results(self, value): + # type: (Tuple[str, Task]) -> None + which, task = value + + if which == "pca_projection": + self.__ensure_task_same_for_pca(task) + self.pca_projection = task.pca_projection + elif which == "initialization": + self.__ensure_task_same_for_pca(task) + self.__ensure_task_same_for_initialization(task) + self.initialization = task.initialization + elif which == "affinities": + self.__ensure_task_same_for_pca(task) + self.__ensure_task_same_for_affinities(task) + self.affinities = task.affinities + elif which == "tsne_embedding": + self.__ensure_task_same_for_pca(task) + self.__ensure_task_same_for_initialization(task) + self.__ensure_task_same_for_affinities(task) + self.__ensure_task_same_for_embedding(task) + + prev_embedding, self.tsne_embedding = self.tsne_embedding, task.tsne_embedding + self.iterations_done = task.iterations_done + # If this is the first partial result we've gotten, we've got to + # setup the plot + if prev_embedding is None: + self.setup_plot() + # Otherwise, just update the point positions + else: self.graph.update_coordinates() self.graph.update_density() - self.needs_to_draw = False - # schedule next update - self.__timer.start() - - self.__in_next_step = False - - def setup_plot(self): - super().setup_plot() - self.start() + else: + raise RuntimeError( + "Unrecognized partial result called with `%s`" % which + ) - def commit(self): - super().commit() - self.send_preprocessor() + @Slot(object) + def __on_done(self, future): + assert future.done() + assert self.__task is not None + assert self.__task.future is future + assert self.__task.watcher.future() is future + self.__task, task = None, self.__task + task.deleteLater() + + result = future.result() + self.__set_results(result) + self.__set_state_ready() + + def __set_results(self, task): + # type: (Task) -> None + self.run_button.setText("Start") + # NOTE: All of these have already been set by __set_partial_results, + # we double check that they are aliases + if task.pca_projection is not None: + self.__ensure_task_same_for_pca(task) + assert task.pca_projection is self.pca_projection + if task.initialization is not None: + self.__ensure_task_same_for_initialization(task) + assert task.initialization is self.initialization + if task.affinities is not None: + assert task.affinities is self.affinities + if task.tsne_embedding is not None: + self.__ensure_task_same_for_embedding(task) + assert task.tsne_embedding is self.tsne_embedding + + self.commit() def _get_projection_data(self): if self.data is None: return None + data = self.data.transform( - Domain(self.data.domain.attributes, - self.data.domain.class_vars, - self.data.domain.metas + self._get_projection_variables())) + Domain( + self.data.domain.attributes, + self.data.domain.class_vars, + self.data.domain.metas + self._get_projection_variables() + ) + ) data.metas[:, -2:] = self.get_embedding() - if self.projection is not None: + if self.tsne_embedding is not None: data.domain = Domain( self.data.domain.attributes, self.data.domain.class_vars, - self.data.domain.metas + self.projection.domain.attributes) + self.data.domain.metas + self.tsne_embedding.domain.attributes, + ) return data + def commit(self): + super().commit() + self.send_preprocessor() + def send_preprocessor(self): - prep = None - if self.data is not None and self.projection is not None: - prep = preprocess.ApplyDomain(self.projection.domain, self.projection.name) - self.Outputs.preprocessor.send(prep) + preprocessor = None + if self.data is not None and self.tsne_embedding is not None: + preprocessor = preprocess.ApplyDomain( + self.tsne_embedding.domain, self.tsne_embedding.name + ) + self.Outputs.preprocessor.send(preprocessor) def clear(self): + """Clear widget state. Note that this doesn't clear the data.""" super().clear() - self.__state = OWtSNE.Waiting - self.__set_update_loop(None) - self.pca_data = None - self.projection = None + self.run_button.setText("Start") + self.__cancel_task(wait=False) + self.pca_projection = None + self.initialization = None + self.affinities = None + self.tsne_embedding = None + self.iterations_done = 0 + + def onDeleteWidget(self): + self.__cancel_task(wait=True) + self.__executor.shutdown(True) + self.clear() + self.data = None + super().onDeleteWidget() @classmethod def migrate_settings(cls, settings, version): if version < 3: if "selection_indices" in settings: settings["selection"] = settings["selection_indices"] + if version < 4: + settings.pop("max_iter", None) @classmethod def migrate_context(cls, context, version): @@ -419,7 +771,9 @@ def migrate_context(cls, context, version): if __name__ == "__main__": - data = Table("iris") + import sys + data = Table(sys.argv[1] if len(sys.argv) > 1 else "iris") WidgetPreview(OWtSNE).run( set_data=data, - set_subset_data=data[np.random.choice(len(data), 10)]) + set_subset_data=data[np.random.choice(len(data), 10)], + ) diff --git a/Orange/widgets/unsupervised/tests/test_owtsne.py b/Orange/widgets/unsupervised/tests/test_owtsne.py index bf270383777..8f8d3b8fac0 100644 --- a/Orange/widgets/unsupervised/tests/test_owtsne.py +++ b/Orange/widgets/unsupervised/tests/test_owtsne.py @@ -1,19 +1,31 @@ import unittest -from unittest.mock import patch +from unittest.mock import patch, Mock + import numpy as np from Orange.data import DiscreteVariable, ContinuousVariable, Domain, Table from Orange.preprocess import Preprocess, Normalize -from Orange.projection.manifold import TSNE +from Orange.projection import manifold from Orange.widgets.tests.base import ( WidgetTest, WidgetOutputsTestMixin, ProjectionWidgetTestMixin ) -from Orange.widgets.unsupervised import owtsne from Orange.widgets.unsupervised.owtsne import OWtSNE -class TestOWtSNE(WidgetTest, ProjectionWidgetTestMixin, - WidgetOutputsTestMixin): +class DummyTSNE(manifold.TSNE): + def fit(self, X, Y=None): + return np.ones((len(X), 2), float) + + +class DummyTSNEModel(manifold.TSNEModel): + def transform(self, X, **kwargs): + return np.ones((len(X), 2), float) + + def optimize(self, n_iter, **kwargs): + return self + + +class TestOWtSNE(WidgetTest, ProjectionWidgetTestMixin, WidgetOutputsTestMixin): @classmethod def setUpClass(cls): super().setUpClass() @@ -24,59 +36,49 @@ def setUpClass(cls): cls.signal_data = cls.data def setUp(self): - def fit(*args, **_): - return np.ones((len(args[1]), 2), float) - - def transform(*args, **_): - return np.ones((len(args[1]), 2), float) + # For almost all the tests, we won't need to verify t-SNE validity and + # the tests will run much faster if we dummy them out + self.tsne = patch("Orange.projection.manifold.TSNE", new=DummyTSNE) + self.tsne_model = patch("Orange.projection.manifold.TSNEModel", new=DummyTSNEModel) + self.tsne.start() + self.tsne_model.start() - def optimize(*_, **__): - return TSNE()() + self.widget = self.create_widget(OWtSNE, stored_settings={"multiscale": False}) - self._fit = owtsne.TSNE.fit - self._transform = owtsne.TSNEModel.transform - self._optimize = owtsne.TSNEModel.optimize - owtsne.TSNE.fit = fit - owtsne.TSNEModel.transform = transform - owtsne.TSNEModel.optimize = optimize - - self.widget = self.create_widget(OWtSNE, - stored_settings={"multiscale": False}) - - self.class_var = DiscreteVariable('Stage name', values=['STG1', 'STG2']) - self.attributes = [ContinuousVariable('GeneName' + str(i)) for i in range(5)] + self.class_var = DiscreteVariable("Stage name", values=["STG1", "STG2"]) + self.attributes = [ContinuousVariable("GeneName" + str(i)) for i in range(5)] self.domain = Domain(self.attributes, class_vars=self.class_var) self.empty_domain = Domain([], class_vars=self.class_var) - def tearDown(self): - self.restore_mocked_functions() - def restore_mocked_functions(self): - owtsne.TSNE.fit = self._fit - owtsne.TSNEModel.transform = self._transform - owtsne.TSNEModel.optimize = self._optimize + self.tsne.stop() + self.tsne_model.stop() def test_wrong_input(self): # no data self.data = None self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() self.assertIsNone(self.widget.data) # <2 rows self.data = Table(self.domain, [[1, 2, 3, 4, 5, 'STG1']]) self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() self.assertIsNone(self.widget.data) self.assertTrue(self.widget.Error.not_enough_rows.is_shown()) # no attributes self.data = Table(self.empty_domain, [['STG1']] * 2) self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() self.assertIsNone(self.widget.data) self.assertTrue(self.widget.Error.no_attributes.is_shown()) # constant data self.data = Table(self.domain, [[1, 2, 3, 4, 5, 'STG1']] * 2) self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() self.assertIsNone(self.widget.data) self.assertTrue(self.widget.Error.constant_data.is_shown()) @@ -84,6 +86,7 @@ def test_wrong_input(self): self.data = Table(self.domain, [[1, 2, 3, 4, 5, 'STG1'], [5, 4, 3, 2, 1, 'STG1']]) self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() self.assertIsNotNone(self.widget.data) self.assertFalse(self.widget.Error.not_enough_rows.is_shown()) self.assertFalse(self.widget.Error.no_attributes.is_shown()) @@ -96,10 +99,12 @@ def test_input(self): [5, 5, 5, 5, 5, 'STG2']]) self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() def test_attr_models(self): """Check possible values for 'Color', 'Shape', 'Size' and 'Label'""" self.send_signal(self.widget.Inputs.data, self.data) + self.wait_until_stop_blocking() controls = self.widget.controls for var in self.data.domain.class_vars + self.data.domain.metas: self.assertIn(var, controls.attr_color.model()) @@ -133,7 +138,7 @@ def test_output_preprocessor(self): self.assertEqual([a.name for a in transformed_data.domain.attributes], [m.name for m in output_data.domain.metas[:2]]) - def test_multiscale_changed(self): + def test_multiscale_changed_updates_ui(self): self.assertFalse(self.widget.controls.multiscale.isChecked()) self.assertTrue(self.widget.perplexity_spin.isEnabled()) self.widget.controls.multiscale.setChecked(True) @@ -150,6 +155,7 @@ def test_normalize_data(self): with patch("Orange.preprocess.preprocess.Normalize", wraps=Normalize) as normalize: self.send_signal(self.widget.Inputs.data, self.data) self.assertTrue(self.widget.controls.normalize.isEnabled()) + self.wait_until_stop_blocking() normalize.assert_called_once() # Disable checkbox @@ -158,6 +164,7 @@ def test_normalize_data(self): with patch("Orange.preprocess.preprocess.Normalize", wraps=Normalize) as normalize: self.send_signal(self.widget.Inputs.data, self.data) self.assertTrue(self.widget.controls.normalize.isEnabled()) + self.wait_until_stop_blocking() normalize.assert_not_called() # Normalization shouldn't work on sparse data @@ -168,6 +175,7 @@ def test_normalize_data(self): with patch("Orange.preprocess.preprocess.Normalize", wraps=Normalize) as normalize: self.send_signal(self.widget.Inputs.data, sparse_data) self.assertFalse(self.widget.controls.normalize.isEnabled()) + self.wait_until_stop_blocking() normalize.assert_not_called() @patch("Orange.projection.manifold.TSNEModel.optimize") @@ -182,7 +190,7 @@ def _check_exaggeration(call, exaggeration): # Set value to 1 self.widget.controls.exaggeration.setValue(1) self.send_signal(self.widget.Inputs.data, self.data) - self.commit_and_wait() + self.wait_until_stop_blocking() _check_exaggeration(optimize, 1) # Reset and clear state @@ -192,9 +200,34 @@ def _check_exaggeration(call, exaggeration): # Change to 3 self.widget.controls.exaggeration.setValue(3) self.send_signal(self.widget.Inputs.data, self.data) - self.commit_and_wait() + self.wait_until_stop_blocking() _check_exaggeration(optimize, 3) + def test_plot_once(self): + """Test if data is plotted only once but committed on every input change""" + self.widget.setup_plot = Mock() + self.widget.commit = Mock() + + self.send_signal(self.widget.Inputs.data, self.data) + # TODO: The base widget immediately calls `setup_plot` and `commit` + # even though there's nothing to show yet. Unfortunately, fixing this + # would require changing `OWDataProjectionWidget` in some strange way, + # so as a temporary fix, we reset the mocks, so they reflect the calls + # when the result was available. + self.widget.setup_plot.reset_mock() + self.widget.commit.reset_mock() + self.wait_until_stop_blocking() + + self.widget.setup_plot.assert_called_once() + self.widget.commit.assert_called_once() + + self.widget.commit.reset_mock() + self.send_signal(self.widget.Inputs.data_subset, self.data[::10]) + self.wait_until_stop_blocking() + + self.widget.setup_plot.assert_called_once() + self.widget.commit.assert_called_once() + if __name__ == '__main__': unittest.main() diff --git a/Orange/widgets/visualize/utils/widget.py b/Orange/widgets/visualize/utils/widget.py index 61cfb0cf5aa..c626b7e9c1d 100644 --- a/Orange/widgets/visualize/utils/widget.py +++ b/Orange/widgets/visualize/utils/widget.py @@ -476,6 +476,7 @@ def handleNewSignals(self): self.setup_plot() else: self.graph.update_point_props() + self.commit() def get_subset_mask(self):