diff --git a/tests/conftest.py b/tests/conftest.py index 65d68268..86135ecd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -336,6 +336,7 @@ def schema_simp(connection_test, prefix): schema(schema_simple.B) schema(schema_simple.L) schema(schema_simple.D) + schema(schema_simple.N) schema(schema_simple.E) schema(schema_simple.F) schema(schema_simple.F) diff --git a/tests/schema_simple.py b/tests/schema_simple.py index 9e3113c9..c563abea 100644 --- a/tests/schema_simple.py +++ b/tests/schema_simple.py @@ -92,7 +92,33 @@ def _make_tuples(self, key): # make reference to a random tuple from L random.seed(str(key)) lookup = list(L().fetch("KEY")) - self.insert(dict(key, id_d=i, **random.choice(lookup)) for i in range(4)) + self.insert(dict(key, id_d=i, **random.choice(lookup)) + for i in range(4)) + + +class N(dj.Computed): + definition = """ + # test for three part make function + -> A + id_d :int + --- + -> L + """ + + def make_fetch(self, key): + # make reference to a random tuple from L + lookup = list(L().fetch("KEY")) + inputs = key, lookup + return inputs + + def make_compute(self, inputs): + key, lookup = inputs + random.seed(str(key)) + values = [dict(key, id_d=i, **random.choice(lookup)) for i in range(4)] + return values + + def make_insert(self, values): + self.insert(values) class E(dj.Computed): diff --git a/tests/test_autopopulate.py b/tests/test_autopopulate.py index d1f0726e..d0e9dad7 100644 --- a/tests/test_autopopulate.py +++ b/tests/test_autopopulate.py @@ -1,16 +1,15 @@ import pytest from datajoint import DataJointError import datajoint as dj -import pymysql -from . import schema -def test_populate(trial, subject, experiment, ephys, channel): +def test_populate(schema_any, trial, subject, experiment, ephys, channel): # test simple populate assert subject, "root tables are empty" assert not experiment, "table already filled?" experiment.populate() - assert len(experiment) == len(subject) * experiment.fake_experiments_per_subject + assert len(experiment) == len(subject) * \ + experiment.fake_experiments_per_subject # test restricted populate assert not trial, "table already filled?" @@ -31,7 +30,7 @@ def test_populate(trial, subject, experiment, ephys, channel): assert channel -def test_populate_with_success_count(subject, experiment, trial): +def test_populate_with_success_count(schema_any, subject, experiment, trial): # test simple populate assert subject, "root tables are empty" assert not experiment, "table already filled?" @@ -62,10 +61,11 @@ def test_populate_exclude_error_and_ignore_jobs(schema_any, subject, experiment) schema_any.jobs.error(experiment.table_name, key, "") experiment.populate(reserve_jobs=True) - assert len(experiment.key_source & experiment) == len(experiment.key_source) - 2 + assert len(experiment.key_source & experiment) == len( + experiment.key_source) - 2 -def test_allow_direct_insert(subject, experiment): +def test_allow_direct_insert(schema_any, subject, experiment): assert subject, "root tables are empty" key = subject.fetch("KEY", limit=1)[0] key["experiment_id"] = 1000 @@ -74,14 +74,15 @@ def test_allow_direct_insert(subject, experiment): @pytest.mark.parametrize("processes", [None, 2]) -def test_multi_processing(subject, experiment, processes): +def test_multi_processing(schema_any, subject, experiment, processes): assert subject, "root tables are empty" assert not experiment, "table already filled?" experiment.populate(processes=None) - assert len(experiment) == len(subject) * experiment.fake_experiments_per_subject + assert len(experiment) == len(subject) * \ + experiment.fake_experiments_per_subject -def test_allow_insert(subject, experiment): +def test_allow_insert(schema_any, subject, experiment): assert subject, "root tables are empty" key = subject.fetch("KEY")[0] key["experiment_id"] = 1001