Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nesting and recursion errors #33

Open
supermorphDotTech opened this issue Jan 2, 2025 · 23 comments
Open

Nesting and recursion errors #33

supermorphDotTech opened this issue Jan 2, 2025 · 23 comments

Comments

@supermorphDotTech
Copy link

Hi,

I am trying to write my own framework with UltraDict as a backbone. However, I can not manage to get it running. My own dictionary-like class is subclassed from UltraDict, but runs fine in singlecore. I made sure, that when a new process is spawned, the dictionary is rebuilt accordingly, manually setting the attributes that are not part of UltraDict.

I tried different approaches, one of which was to set up an UltraDict (not subclassed) and nest several dictionaries in it. However, the spawned processes do not ever get access to the shared memory.

app_dict = UltraDict(name="app_dict", create=True)
app_dict["some_data"] = {}

I tried setting up all nested dictionaries as an UltraDict, but that did not work either. My framework is quite complex and I am not sure in which way to present the issue to you, rather than giving you my repo:
morPy v1.0.0c

Notes on the framework:

  • Still in early development
  • Initialization in lib\mpy_init
  • Dictionaries in lib\mpy_dict
  • Multiprocessing in lib\mpy_mp

Thank you for taking care, looking forward to hear from you. I am also open to meet online with you to further discuss this.

Regards
Bastian Neuwirth
supermorph.tech

@supermorphDotTech
Copy link
Author

supermorphDotTech commented Jan 2, 2025

I tried to boil my code down to a bare minimum. The code presented here never actually prints the expected result at the end. This shows to me, that inter process data sharing does not work. It is, however, a representation of my framework.

from multiprocessing import Process, current_process, active_children
from functools import partial
from math import sqrt

def u_dict_build(create: bool=False):
    """ Create a nested UltraDict """
    from UltraDict import UltraDict

    app_dict = UltraDict(
        name="app_dict",
        create=create,
        shared_lock=True
    )

    app_dict["nested_udict"] = UltraDict(
        name="app_dict[conf]",
        create=create,
        shared_lock=True
    )

    return app_dict

def parallel_task(app_dict):
    """ Task to be run in parallel with writes to app_dict """
    try:
        if not app_dict:
            raise RuntimeError("No connection to app_dict.")

        # get the current process instance
        process = current_process()

        i = 0
        app_dict["test_count"] = i
        app_dict["nested_udict"]["nested_dict"] = {}
        total = 10
        tmp_val = 0

        # Hold on until all processes are ready
        while not app_dict["run"]:
            pass

        while i < total:
            i += 1
            # Read and write app_dict and nested dictionaries
            app_dict["test_count"] += 1
            app_dict["nested_udict"]["nested_dict"].update({f'{i}' : process.name})
            if i != app_dict["test_count"]:
                raise RuntimeError("Counter mismatch!")
            while tmp_val < total:
                tmp_val = (sqrt(sqrt(i)*i) / i) + tmp_val**2

    except Exception as e:
        print(f'ERROR: {e}')

def spawn_wrapper(task):
    """ Wrapper for the task tuple / wrapped by the spawned process"""
    app_dict = u_dict_build() # Rebuild UltraDict with create=False

    task[1] = app_dict # Reassign app_dict to the process
    run = partial(*task,) # Wrap the task
    run()

if __name__ == '__main__':
    # Build the app_dict
    app_dict = u_dict_build(create=True)

    # If True, workers will start executing
    app_dict["run"] = False

    task = [parallel_task, app_dict]

    i = 0
    j = 10 # Processes to be started
    while i < j:
        p = Process(target=partial(spawn_wrapper, task))
        p.start()
        i += 1

    # Release workers for calculations
    app_dict["run"] = True

    print(f'\nJoining processes.\n')
    p.join()

    # Print the calculations in relation to the executing process.
    for step, proc in app_dict["nested_udict"]["nested_dict"]:
        print(f'{step} :: {proc}')

@supermorphDotTech
Copy link
Author

Never mind, fixed it in my example. Here is the revised code.

import sys

from multiprocessing import Process, current_process, active_children
from functools import partial
from math import sqrt

def u_dict_build(create: bool=False):
    """ Create a nested UltraDict """
    from UltraDict import UltraDict

    app_dict = UltraDict(
        name="app_dict",
        create=create,
        shared_lock=True
    )

    app_dict["nested_udict"] = UltraDict(
        name="app_dict[conf]",
        create=create,
        shared_lock=True
    )

    return app_dict

def parallel_task(app_dict):
    """ Task to be run in parallel with writes to app_dict """
    try:
        if not app_dict:
            raise RuntimeError("No connection to app_dict.")

        # get the current process instance
        process = current_process()

        i = 0
        total = 10
        tmp_val = 0

        # Hold on until all processes are ready
        while not app_dict["run"]:
            pass

        while i < total:
            i += 1
            # Read and write app_dict and nested dictionaries
            app_dict["test_count"] += 1
            app_dict["nested_udict"].update({f'{app_dict["test_count"]}' : process.name})
            while tmp_val < total:
                tmp_val = (sqrt(sqrt(i)*i) / i) + tmp_val**2

    except Exception as e:
        print(f'Line: {sys.exc_info()[-1].tb_lineno}\n{e}\n')

def spawn_wrapper(task):
    """ Wrapper for the task tuple / wrapped by the spawned process"""
    app_dict = u_dict_build() # Rebuild UltraDict with create=False

    task[1] = app_dict # Reassign app_dict to the process
    run = partial(*task,) # Wrap the task
    run()

if __name__ == '__main__':
    # Build the app_dict
    app_dict = u_dict_build(create=True)

    # If True, workers will start executing
    app_dict["run"] = False

    # Some value in shared memory
    app_dict["test_count"] = 0

    task = [parallel_task, app_dict]

    i = 0
    j = 10 # Processes to be started
    processes = []
    while i < j:
        p = Process(target=partial(spawn_wrapper, task))
        p.start()
        processes.append(p)
        i += 1

    # Release workers for calculations
    app_dict["run"] = True

    print(f'\nJoining processes.\n')
    # Now join everyone
    for p in processes:
        p.join()

    # Print the calculations in relation to the executing process.
    for step, proc in app_dict["nested_udict"].items():
        print(f'{step} :: {proc}')

@ronny-rentner
Copy link
Owner

Hi Bastian, happy to help. I'll check out your code later and get back to you.

Meanwhile, feel free to try and run the unit tests. There are also a couple of examples in the examples folder which should be working fine on Windows. Otherwise there's a different problem.

Check the GitHub actions which run under Windows: https://github.com/ronny-rentner/UltraDict/actions/runs/12079887451/job/33686427007

Also check out the nested example here on how to handle nested dicts: https://github.com/ronny-rentner/UltraDict/blob/main/examples/nested.py or https://github.com/ronny-rentner/UltraDict/blob/main/examples/mccoydj1_nested.py

@supermorphDotTech
Copy link
Author

I created a dump of the errors raised during init() my subclass. It reveals, that the first spawned process already blows up during super().init() due to "maximum recursion exceeded".
Process-1.log

The exception branch of cl_mpy_dict_root.init() now looks like:

except Exception as e:
    msg = (f'CRITICAL {self._name}.__init__(): Failed to initialize UltraDict.\n'
        f'Line: {sys.exc_info()[-1].tb_lineno}\n{e}\n')
    ###################################
    # TODO remove textfile dump
    from multiprocessing import current_process
    import pathlib
    process = current_process()
    filename = process.name
    filepath = pathlib.Path(f'Z://{filename}.log')

    with open(filepath, 'a') as ap:
        ap.write(f'{msg}\n{6*"#"} END OF LOG {6*"#"}\n\n')
    ###################################

    raise RuntimeError(msg)
    sys.exit()

@supermorphDotTech
Copy link
Author

Also, thank you for getting so soon. I will check out the examples.

Unfortunately, I am not yet good at working with GitHub, so I am not sure what to do with the GitHub actions which run under Windows: https://github.com/ronny-rentner/UltraDict/actions/runs/12079887451/job/33686427007

@ronny-rentner
Copy link
Owner

ronny-rentner commented Jan 3, 2025

I'm not fully sure what your example is supposed to do. For nested dicts, UltraDict already has the recurse=True parameter that let's you have recursive UltraDicts transparently, so you don't need to handle this.

When you have multiple processes writing to the UltraDict in parallel, you must use a lock to ensure they're not doing it in parallel.

This corrected code works for me on linux:

import sys

from multiprocessing import Process, current_process, active_children
from functools import partial
from math import sqrt

def u_dict_build(create: bool=False):
    """ Create a nested UltraDict """
    from UltraDict import UltraDict

    app_dict = UltraDict(
        name="app_dict",
        shared_lock=True,
        recurse=True
    )

    app_dict["nested_udict"] = {}

    return app_dict

def parallel_task(app_dict):
    """ Task to be run in parallel with writes to app_dict """
    try:
        if not app_dict:
            raise RuntimeError("No connection to app_dict.")

        # get the current process instance
        process = current_process()

        i = 0
        total = 10
        tmp_val = 0

        # Hold on until all processes are ready
        while not app_dict["run"]:
            pass

        while i < total:
            i += 1
            # Read and write app_dict and nested dictionaries
            with app_dict.lock:
                    app_dict["test_count"] += 1
                    app_dict["nested_udict"][app_dict["test_count"]] = process.name
            while tmp_val < total:
                tmp_val = (sqrt(sqrt(i)*i) / i) + tmp_val**2

    except Exception as e:
        print(f'Line: {sys.exc_info()[-1].tb_lineno}\n{e}\n')

def spawn_wrapper(task):
    """ Wrapper for the task tuple / wrapped by the spawned process"""
    app_dict = u_dict_build() # Rebuild UltraDict with create=False

    task[1] = app_dict # Reassign app_dict to the process
    run = partial(*task,) # Wrap the task
    run()

if __name__ == '__main__':
    # Build the app_dict
    app_dict = u_dict_build(create=True)

    # If True, workers will start executing
    app_dict["run"] = False

    # Some value in shared memory
    app_dict["test_count"] = 0

    task = [parallel_task, app_dict]

    i = 0
    j = 10 # Processes to be started
    processes = []
    while i < j:
        p = Process(target=partial(spawn_wrapper, task))
        p.start()
        processes.append(p)
        i += 1

    # Release workers for calculations
    app_dict["run"] = True

    print(f'\nJoining processes.\n')
    # Now join everyone
    for p in processes:
        p.join()

    # Print the calculations in relation to the executing process.
    for step, proc in app_dict["nested_udict"].items():
        print(f'{step} :: {proc}')

Output:

python ./supermorph.py 

Joining processes.

69 :: Process-9
70 :: Process-9
71 :: Process-9
72 :: Process-9
73 :: Process-9
74 :: Process-2
75 :: Process-2
76 :: Process-2
77 :: Process-2
78 :: Process-2
79 :: Process-2
80 :: Process-2
81 :: Process-2
82 :: Process-3
83 :: Process-3
84 :: Process-10
85 :: Process-10
86 :: Process-10
87 :: Process-10
88 :: Process-2
89 :: Process-2
90 :: Process-9
91 :: Process-9
92 :: Process-9
93 :: Process-9
94 :: Process-10
95 :: Process-10
96 :: Process-10
97 :: Process-10
98 :: Process-10
99 :: Process-10
100 :: Process-9


@supermorphDotTech
Copy link
Author

supermorphDotTech commented Jan 3, 2025

Well as I wrote, my example works for me now, too. The issue arises in my framework morPy v1.0.0c. The example is supposed to rebuild how the UltraDict is used by the framework. And there, I receive recursion errors.

Regarding recurse, I played around with a couple of arguments for UltraDict.__init__() in order get it to run. So far to no avail.
Regarding locks, the subclass cl_mpy_dict_root of morPy uses locks by enhancing UltraDict dict-like methods.

I have tried different approaches to this:

  1. Create only the root dict as an UltraDict and nest regular dicts
    > Played around with recurse_register to make sure, the right UltraDict is referenced
  2. Create all nested dicts as UltraDict, too (like in my example code)

For 1) I get "maximum recursion error"
For 2) The spawned process just don't get data

What I will do next:
Enhance my example step by step in the likes of morPy until the error can be reproduced.

@supermorphDotTech
Copy link
Author

When you have multiple processes writing to the UltraDict in parallel, you must use a lock to ensure they're not doing it in parallel.

Shouldn't UltraDict handle any locks? In the source code I find:

def __setitem__(self, key, item):
    #log.debug("__setitem__ {}, {}", key, item)
    with self.lock:
        self.apply_update()

@ronny-rentner
Copy link
Owner

It seems the recurse mode indeed has an issue. The problem is that each sub-process creates its own nested_udict instance and it seems inside UltraDict, this is not properly protected by locks.

When I modify the code like this, it at least counts correctly to 100. I've left in some commented debug statements that have helped me understand the problem:

import sys

from multiprocessing import Process, current_process, active_children
from functools import partial
from math import sqrt

def u_dict_build(create: bool=False):
    """ Create a nested UltraDict """
    from UltraDict import UltraDict

    app_dict = UltraDict(
        name="app_dict",
        shared_lock=True,
        recurse=True
    )

    with app_dict.lock:
        if "nested_udict" not in app_dict:
            app_dict["nested_udict"] = {}

    return app_dict

def parallel_task(app_dict):
    """ Task to be run in parallel with writes to app_dict """
    try:
        if not app_dict:
            raise RuntimeError("No connection to app_dict.")

        # get the current process instance
        process = current_process()

        i = 0
        total = 10
        tmp_val = 0

        # Hold on until all processes are ready
        while not app_dict["run"]:
            pass

        while i < total:
            i += 1
            # Read and write app_dict and nested dictionaries
            with app_dict.lock:
                #print(f'BEFOR: {process.name} => {app_dict["nested_udict"].name} => {app_dict}')
                #app_dict.print_status()
                #app_dict['nested_udict'].print_status()
                app_dict["test_count"] += 1
                with app_dict["nested_udict"].lock:
                    app_dict["nested_udict"][app_dict["test_count"]] = process.name
                #print(f'AFTER: {process.name} => {app_dict["nested_udict"].name} => {app_dict}')
                #app_dict.print_status()
                #app_dict['nested_udict'].print_status()
            while tmp_val < total:
                tmp_val = (sqrt(sqrt(i)*i) / i) + tmp_val**2

    except Exception as e:
        print(f'Line: {sys.exc_info()[-1].tb_lineno}\n{e}\n')

def spawn_wrapper(task):
    """ Wrapper for the task tuple / wrapped by the spawned process"""
    app_dict = u_dict_build() # Rebuild UltraDict with create=False

    task[1] = app_dict # Reassign app_dict to the process
    run = partial(*task,) # Wrap the task
    run()

if __name__ == '__main__':
    # Build the app_dict
    app_dict = u_dict_build(create=True)

    # If True, workers will start executing
    app_dict["run"] = False

    # Some value in shared memory
    app_dict["test_count"] = 0

    task = [parallel_task, app_dict]

    i = 0
    j = 10 # Processes to be started
    processes = []
    while i < j:
        p = Process(target=partial(spawn_wrapper, task))
        p.start()
        processes.append(p)
        i += 1

    # Release workers for calculations
    app_dict["run"] = True

    print(f'\nJoining processes.\n')
    # Now join everyone
    for p in processes:
        p.join()

    # Print the calculations in relation to the executing process.
    for step, proc in app_dict["nested_udict"].items():
        print(f'{step} :: {proc}')

Output:

python ./supermorph.py

Joining processes.

1 :: Process-6
2 :: Process-6
3 :: Process-6
4 :: Process-6
5 :: Process-6
6 :: Process-4
7 :: Process-3
8 :: Process-3
9 :: Process-3
10 :: Process-3
11 :: Process-3
12 :: Process-3
13 :: Process-3
14 :: Process-3
15 :: Process-3
16 :: Process-3
17 :: Process-6
18 :: Process-6
19 :: Process-6
20 :: Process-6
21 :: Process-6
22 :: Process-5
23 :: Process-5
24 :: Process-9
25 :: Process-7
26 :: Process-7
27 :: Process-7
28 :: Process-7
29 :: Process-7
30 :: Process-7
31 :: Process-7
32 :: Process-7
33 :: Process-7
34 :: Process-7
35 :: Process-5
36 :: Process-5
37 :: Process-5
38 :: Process-5
39 :: Process-5
40 :: Process-5
41 :: Process-5
42 :: Process-5
43 :: Process-10
44 :: Process-10
45 :: Process-10
46 :: Process-10
47 :: Process-10
48 :: Process-10
49 :: Process-10
50 :: Process-10
51 :: Process-10
52 :: Process-10
53 :: Process-9
54 :: Process-4
55 :: Process-4
56 :: Process-4
57 :: Process-4
58 :: Process-4
59 :: Process-4
60 :: Process-4
61 :: Process-4
62 :: Process-4
63 :: Process-8
64 :: Process-8
65 :: Process-8
66 :: Process-8
67 :: Process-8
68 :: Process-8
69 :: Process-8
70 :: Process-8
71 :: Process-8
72 :: Process-8
73 :: Process-9
74 :: Process-9
75 :: Process-9
76 :: Process-9
77 :: Process-9
78 :: Process-9
79 :: Process-9
80 :: Process-9
81 :: Process-2
82 :: Process-2
83 :: Process-1
84 :: Process-1
85 :: Process-1
86 :: Process-1
87 :: Process-1
88 :: Process-1
89 :: Process-1
90 :: Process-1
91 :: Process-1
92 :: Process-1
93 :: Process-2
94 :: Process-2
95 :: Process-2
96 :: Process-2
97 :: Process-2
98 :: Process-2
99 :: Process-2
100 :: Process-2

@ronny-rentner
Copy link
Owner

When you have multiple processes writing to the UltraDict in parallel, you must use a lock to ensure they're not doing it in parallel.

Shouldn't UltraDict handle any locks? In the source code I find:

def __setitem__(self, key, item):
    #log.debug("__setitem__ {}, {}", key, item)
    with self.lock:
        self.apply_update()

Yes, that lock is used internally to ensure the integrity of the stream, so one write cannot break another write, but it does not protect your processes from writing a wrong value.

                app_dict["test_count"] += 1

When you do this, it is doing multiple operations:

  1. Read the value from app_dict["test_count"]
  2. Add 1 to the value
  3. Write the value back to app_dict["test_count"]

So imagine one process A is between 2) and 3) and another process B reads the value now in 1). The update in 3) has not yet happened so process B would read the old value as process A was not fast enough updating the value before process B has read it.

The internal lock in UltraDict only protects 2 processes at step 3) not interfer with each other.

But, if you want your process to have enough time, to read a value, modify it and write it back, you'll need another application lock.

@supermorphDotTech
Copy link
Author

supermorphDotTech commented Jan 3, 2025

Yes, that lock is used internally to ensure the integrity of the stream, so one write cannot break another write, but it does not protect your processes from writing a wrong value.

Yeah, I can see the need for locking. Thanks.

Reviewing my subclass, I think the issue may be where I put my lock. My idea was to not lock, until really necessary to avoid wait times for processes. Here is my __setitem__() method, with the lock just before super_class.__setitem__(key, value):

def __setitem__(self, key, value):
    lock, super_class = self._get_super()
    if not isinstance(key, str):
        # Keys must be strings.
        raise TypeError(f'{self.loc["cl_mpy_dict_key_str"]}:')
    # TODO make verbose warning for regular dict (can't be shared)
    if self._access == 'tightened':
        msg = f'{self.msg__setitem__} {key}'
        if key not in super_class.keys():
            raise KeyError(msg)
        else:
            with lock:
                super_class.__setitem__(key, value)
  1. Would you agree, that the lock is better put to the top of the method?
    def __setitem__(self, key, value):
        lock, super_class = self._get_super()
        with lock:
            if not isinstance(key, str):
                # Keys must be strings.
                raise TypeError(f'{self.loc["cl_mpy_dict_key_str"]}:')
            # TODO make verbose warning for regular dict (can't be shared)
            if self._access == 'tightened':
                msg = f'{self.msg__setitem__} {key}'
                if key not in super_class.keys():
                    raise KeyError(msg)
                else:
                    super_class.__setitem__(key, value)
  1. Or are you suggesting setting up a new lock, rather than getting the one of the UltraDict?

@ronny-rentner
Copy link
Owner

Well, the locks have been carefully created and optimized, so not sure if you want to go the path of creating your own. I'd recommend to use a built-in locks which are easily available as you can see above in the adjusted example.

By the way, it might be useful to use the UltraDict dev branch from Github (which I am using). It contains a couple of bug fixes. I'll soon do another release.

When using locks, you always have to ask yourself if something could change the values in the meantime and if it is important to take the changed values into account. Eg. if you do if self._access == 'tightened': without a lock, it could happen that this value changes before you call super_class.__setitem__(key, value). Though, only you can know if this could be a problem or not for your application/business logic.

@supermorphDotTech
Copy link
Author

Well, the locks have been carefully created and optimized, so not sure if you want to go the path of creating your own. I'd recommend to use a built-in locks which are easily available as you can see above in the adjusted example.

Understood, that's also the way I implemented locking.

By the way, it might be useful to use the UltraDict dev branch from Github (which I am using). It contains a couple of bug fixes. I'll soon do another release.

Using the dev version, my framework produces the same issue/errors. Also the example code blows up at some point when counting (snippet):

[...]
625 :: Process-6
626 :: Process-6
627 :: Process-6
628 :: Process-6
Process Process-3:
Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 744, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 658, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_042b5752'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 744, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 658, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_042b5752'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 744, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 658, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_042b5752'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 744, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 658, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_042b5752'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\smph-usr\AppData\Local\Programs\Python\Python310\lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Users\smph-usr\AppData\Local\Programs\Python\Python310\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "X:\Projekte\morPy\ultradict_test.py", line 74, in spawn_wrapper
    app_dict = u_dict_build() # Rebuild UltraDict with create=False
  File "X:\Projekte\morPy\ultradict_test.py", line 17, in u_dict_build
    app_dict["nested_udict"] = UltraDict(
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 920, in __setitem__
    self.apply_update()
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 846, in apply_update
    self.load(force=True)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 771, in load
    full_dump_memory = self.get_full_dump_memory()
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 747, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 747, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 747, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 751, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 753, in get_full_dump_memory
    raise e
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 744, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 658, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_042b5752'
629 :: Process-9
630 :: Process-9
631 :: Process-9
632 :: Process-9
Line: 46
Could not get memory 'wnsm_042b5752'

Line: 46
Could not get memory 'wnsm_1f0f34d0'
[...]

@ronny-rentner
Copy link
Owner

Ok, I'll spin up a Windows VM later on to test there.

@supermorphDotTech
Copy link
Author

supermorphDotTech commented Jan 3, 2025

Ok, I'll spin up a Windows VM later on to test there.

Nice, thanks. It may very well be, that the issue is spawning-related. In the meantime I will try to reproduce the errors the framework raises by enhancing the example.

@supermorphDotTech
Copy link
Author

supermorphDotTech commented Jan 6, 2025

I have run ultradict_test.py on Ubuntu 24.04 and it seems there is an issue with create=True/False that I did not have on Windows.

Please also use the main branch of morPy from now on, as this will be the latest version.

/home/smph/Projects/morPy/.venv-linux/bin/python /home/smph/Projects/morPy/ultradict_test.py 
Traceback (most recent call last):
  File "/home/smph/Projects/morPy/ultradict_test.py", line 82, in <module>
    app_dict = u_dict_build(create=True)
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/smph/Projects/morPy/ultradict_test.py", line 11, in u_dict_build
    app_dict = UltraDict(
               ^^^^^^^^^^
  File "/home/smph/Projects/morPy/.venv-linux/lib/python3.12/site-packages/UltraDict/UltraDict.py", line 375, in __init__
    self.control = self.get_memory(create=create, name=name, size=1000)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/smph/Projects/morPy/.venv-linux/lib/python3.12/site-packages/UltraDict/UltraDict.py", line 555, in get_memory
    raise Exceptions.AlreadyExists(f"Cannot create memory '{name}' because it already exists")
Exceptions.AlreadyExists: Cannot create memory 'app_dict' because it already exists

Process finished with exit code 1

@supermorphDotTech
Copy link
Author

I want to note, that when a new process spawns, UltraDict tries to attach to all of the shared-memory segments it needs, including the second one for the full dump. That’s where you may see:

CannotAttachSharedMemory: Could not get memory 'wnsm_11f71172'

This means the child tried to attach to that auto-generated full-dump segment (or a nested UltraDict’s internal memory), but it did not exist or was already destroyed. So maybe the UltraDicts in the backbone need to mitigate this kind of race condition.

@supermorphDotTech
Copy link
Author

I have written another "simpler" example. In this example UltraDict blows up
when starting 53 processes. At 52 the code still works without errors. However, 53 seems to hit a limit of some kind.

from multiprocessing import Process
from UltraDict import UltraDict

def child_task():
    # Because `child_task` is now top-level, Python can pickle it
    d2 = UltraDict(name="test_dict", create=False, shared_lock=True, full_dump=False)
    d2["n_d2"] = UltraDict(name="nested_dict", create=False, shared_lock=True, full_dump=False)
    with d2.lock:
        d2["count"] += 1
        d2["n_d2"]["count"] += 13

def main():
    d = UltraDict(name="test_dict", create=True, shared_lock=True, full_dump=False)
    d["nested"] = UltraDict(name="nested_dict", create=True, shared_lock=True, full_dump=False)
    d["count"] = 0
    d["nested"]["count"] = 0

    processes = []
    for i in range(53):
        p = Process(target=child_task)
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(f'{d["count"]}')
    print(f'{d["nested"]["count"]}')

if __name__ == "__main__":
    main()

The errors raised for this code are familiar (in regards to this issue thread):

X:\Projekte\morPy\.venv-win\Scripts\python.exe X:\Projekte\morPy\ultradict_test_minimal.py 
Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 657, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 571, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_0096b8c2'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 657, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 571, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_0096b8c2'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 657, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 571, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_0096b8c2'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 657, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 571, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_0096b8c2'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "X:\Projekte\morPy\ultradict_test_minimal.py", line 31, in <module>
    main()
  File "X:\Projekte\morPy\ultradict_test_minimal.py", line 27, in main
    print(f'{d["count"]}')
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 862, in __getitem__
    self.apply_update()
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 759, in apply_update
    self.load(force=True)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 684, in load
    full_dump_memory = self.get_full_dump_memory()
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 660, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 660, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 660, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 664, in get_full_dump_memory
    return self.get_full_dump_memory(max_retry=max_retry, retry=retry+1)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 666, in get_full_dump_memory
    raise e
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 657, in get_full_dump_memory
    return self.get_memory(create=False, name=name)
  File "X:\Projekte\morPy\.venv-win\lib\site-packages\UltraDict\UltraDict.py", line 571, in get_memory
    raise Exceptions.CannotAttachSharedMemory(f"Could not get memory '{name}'")
Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_0096b8c2'

Process finished with exit code 1

@supermorphDotTech
Copy link
Author

supermorphDotTech commented Jan 6, 2025

What I can say so far regarding the actual UltraDict dev branch (MS Windows only):

  1. When constructing UltraDict with recurse=True, it will always blow up with Windows (or generally spawned processes).
  2. Recursion does not work. Neither nested UltraDict's nor nested dict's will be referenced right by spawned processes.
  3. Trying to construct with recurse=True and insert a nested dict will create an UltraDict in the background producing many warnings:
WARNING:root:You are running on win32, potentially without locks. Consider setting shared_lock=True

...these warnings should be suppressed or automatically set shared_lock=True, since the nested dict is not constructed by a dev, hence it can not be influenced by me.

Following are the dictionary-structures I have tried to make work. I want to note, that my test case - and probably my use case - make a lot of reads and writes. So I am not sure, if it is in UltraDict's scope.

  1. root_dict: UltraDict
    sub_dict1: UltraDict
    sub_dict2: UltraDict

  2. root_dict: UltraDict
    sub_dict1: dict
    sub_dict2: dict

  3. root_dict: dict
    sub_dict1: UltraDict
    sub_dict2: dict

  4. root_dict: UltraDict
    sub_dict1: UltraDict
    sub_dict2: dict

What did work, was only a flat collection of UltraDict's:

root_dict: dict = {
sub_dict1: UltraDict
sub_dict2: UltraDict
[...]
}

@ronny-rentner
Copy link
Owner

Hey, I recommend to check out the existing examples and unit tests which should work. Let me know if they don't.

On Windows, you'll have a problem if your program is spawning many short-lived processes because Windows will clean up a process' shared memory when it ends. If one of those processes has created a full dump and then ends, all the other processes cannot read the full dump anymore as the OS has cleaned it up. This problem only exists on Windows. (The same is probably true for nested dicts.)

I had once created a bug report here: https://bugs.python.org/issue46888
If you check out the last comment on the issue thread, there's an approach how to solve it but I've never tested or implemented this. If you're up to it, I'd be happy to accept a pull request to solve this.

In the meantime, there's this parameter in UltraDict. Excerpt from the docs:
full_dump_size: If set, uses a static full dump memory instead of dynamically creating it. This might be necessary on Windows depending on your write behaviour. On Windows, the full dump memory goes away if the process goes away that had created the full dump. Thus you must plan ahead which processes might be writing to the dict and therefore creating full dumps.

So to facilitate that, you might need to set up your dict structure with all child dicts from one starting process that lives the whole time on Windows (or implement the approach from the python bug thread).

It all depends a bit on your use case, and read/write patterns and type of data.

@supermorphDotTech
Copy link
Author

Thanks for elaborating on that. I will continue testing then.

As a side note:
morPy uses an orchestrator process, that initially sets up the app_dict (consisting of nested UltraDict's). This process will only end, if all processes spawned have ended and the heap is empty. Therefore this process outlives all other processes spawned.

@supermorphDotTech
Copy link
Author

I have one more thought on short-lived shared full dumps:
Isn't it feasible to have UltraDict be more generous to windows systems? So my idea is, if system is win32, then allow for dictionaries to be missing and assume the process just ended.

Do you think, this can work?

@supermorphDotTech
Copy link
Author

Partly Solution

It seems, that on MS Windows it is good practice to increase buffer sizes. In my example representing how morPy works, I could make the code run that way.

Recommended arguments to UltraDict on MS Windows:

    app_dict = UltraDict(
        name="app_dict",
        create=create,
        shared_lock=True,
        buffer_size=1_000_000,
        full_dump_size=1_000_000,
        auto_unlink=False
    )

Complete example:

import sys
import time

from multiprocessing import Process, current_process, active_children
from functools import partial
from math import sqrt

def u_dict_build(create: bool=False):
    """ Create a nested UltraDict """
    from UltraDict import UltraDict

    app_dict = UltraDict(
        name="app_dict",
        create=create,
        shared_lock=True,
        buffer_size=1_000_000,
        full_dump_size=1_000_000,
        auto_unlink=False
    )
    # app_dict = {}

    app_dict["nested_udict"] = UltraDict(
        name="nested_udict",
        create=create,
        shared_lock=True,
        buffer_size=1_000_000,
        full_dump_size=1_000_000,
        auto_unlink=False
    )
    # app_dict["nested_udict"] = {}

    app_dict["nested_dict"] = {}

    return app_dict

def parallel_task(app_dict):
    """ Task to be run in parallel with writes to app_dict """
    try:
        if not app_dict:
            raise RuntimeError("No connection to app_dict.")

        # get the current process instance
        process = current_process()

        i = 0
        total = 10**3
        tmp_val = 0

        # Hold on until all processes are ready
        while not app_dict["run"]:
            pass

        while i < total:
            i += 1
            # Read and write app_dict and nested dictionaries
            with app_dict.lock:
                app_dict["test_count"] += 1

            with app_dict["nested_udict"].lock:
                app_dict["nested_udict"]["test_count"] += 1

            print(f'{app_dict["test_count"]} :: {app_dict["nested_udict"]["test_count"]} :: {process.name}')
            while tmp_val < total:
                tmp_val = (sqrt(sqrt(i)*i) / i) + tmp_val**2

    except Exception as e:
        print(f'Line: {sys.exc_info()[-1].tb_lineno}\n{e}\n')

def spawn_wrapper(task):
    """ Wrapper for the task tuple / wrapped by the spawned process"""

    app_dict = u_dict_build()

    task = [task[0], app_dict] # Reassign app_dict to the process
    func, *args = task
    result = func(*args)

if __name__ == '__main__':
    # Build the app_dict
    app_dict = u_dict_build(create=True)

    # If True, workers will start executing
    app_dict["run"] = False

    # Some value in shared memory
    app_dict["test_count"] = 0
    app_dict["nested_udict"]["test_count"] = 0

    task = [parallel_task,]

    i = 0
    j = 50 # Processes to be started
    processes = []
    while i < j:
        # TODO mitigate in main
        p = Process(target=partial(spawn_wrapper, task))
        p.start()
        processes.append(p)
        i += 1

    # Release workers for calculations
    app_dict["run"] = True

    print(f'\nJoining processes.\n')
    # Now join everyone
    for p in processes:
        p.join()

ToDo / Recursion issues:

I will change how morPy utilizes UltraDict. It seems, that nesting UltraDict's may lead to complex structures calling UltraDict.init() quite a lot. Therefore, the idea is to keep UltraDict's strictly separated and build a dict-subclass making it appear nested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants