Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating Process-wise ThreadLocalODMSession and MappedClass #2

Open
KshitizGIT opened this issue Sep 15, 2018 · 6 comments
Open

Creating Process-wise ThreadLocalODMSession and MappedClass #2

KshitizGIT opened this issue Sep 15, 2018 · 6 comments

Comments

@KshitizGIT
Copy link

class WikiPage(MappedClass):
class mongometa:
session = session
name = 'wiki_page'

_id = FieldProperty(schema.ObjectId)
title = FieldProperty(schema.String(required=True))
text = FieldProperty(schema.String(if_missing=''))

I am developing a multiprocess app . For each process, I would like to create a new ThreadLocalODMSession.

Given the above declarative method, how would I bind the MappedClass to different session?

I have already tried numerous ways but none seem to work.

@brondsem
Copy link
Collaborator

I don't think I've tried this exactly, but a few ideas: when you declare the session variable, can you check there which process it is already and set it to a different connection? Otherwise if you need to change it dynamically later, I believe you can set session.bind to a new datastore connection at any time and it will use that then. Hope that helps.

@KshitizGIT
Copy link
Author

KshitizGIT commented Sep 29, 2018

from ming import create_datastore
from ming.odm import ThreadLocalODMSession
from ming import schema
from ming.odm import MappedClass
from ming.odm import FieldProperty, ForeignIdProperty


session = ThreadLocalODMSession(
        bind=create_datastore('mongodb://localhost:27017/test_1')
        )


class WikiPage(MappedClass):
    class __mongometa__:
        session = session
        name = 'wiki_page'

    _id = FieldProperty(schema.ObjectId)
    title = FieldProperty(schema.String(required=True))
    text = FieldProperty(schema.String(if_missing=''))

# Insert into db 'test_1' 
wp = WikiPage(title='This is first title', text='This is the first text')
session.flush()
# Insert into db 'test_2'
session.bind = create_datastore('mongodb://localhost:27017/test_2')
wp = WikiPage(title='This is second title', text='This is the second text')
session.flush()

I expected the above code snippet to insert into test_2 db after I changed the bind to new datastore connection. But this didn't happen. Am i missing anything here?

@amol-
Copy link
Member

amol- commented Jan 9, 2019

@KshitizGIT session it's just a thread local wrapper to the real session. So I think that what you are looking for is session._get().bind = ...

@amol-
Copy link
Member

amol- commented Jan 9, 2019

Btw my suggestion would be to use the ContextualODMSession instead of ThreadLocalODMSession and have each process use a different identifier.
Most obvious usage would probably be ContextualODMSession(os.getpid) but I'm not aware of the overhead of getpid given it's probably implemented over the getpid system call and thus involves a whole kernel space call.

@CastixGitHub
Copy link
Contributor

@KshitizGIT I ran into this issue too.
Changing the session property on __mongometa__ nested class didn't do anything.
also
session._get().bind = ... raises AttributeError because it's a readonly property
and even after changing it externally through

def fset(self, v): self.impl.bind = v
def fget(self): return self.impl.bind
type(s._get()).bind = property(fget=fget, fset=fset)
s._get().bind = Session(create_datastore('mongodb://127.0.0.1:27017/s'))

I didn't see any difference (I tested only through query tough).

Then I figured out the solution:

from ming.odm import Mapper
session2 = ThreadLocalODMSession(bind=create_datastore('mongodb://127.0.0.1:27017/test'))
for _mapper in Mapper.all_mappers():
    _mapper.session = session2
    _mapper.mapped_class.query.session = session2
    _mapper._compiled = False
    _mapper.compile()
    _mapper.session.ensure_indexes(_mapper.collection)

I double checked this is working, the id() of the proxied session is different on every process

I also had some trouble trying what's described here https://ming.readthedocs.io/en/latest/baselevel.html#other-sessions (another issue should be created for that)

Anyway there's something else weird:


Here's a minimal example that shows the issue

Python 3.10.2 (main, Jan 15 2022, 19:56:27) [GCC 11.1.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.1.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from ming import schema, create_datastore
   ...: from ming.odm import MappedClass, Mapper, ThreadLocalODMSession
   ...: from ming.odm import FieldProperty, ForeignIdProperty

In [2]: from os import getpid

In [3]: import multiprocessing

In [4]: from bson import ObjectId

In [5]: s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))

In [6]: def adapt_to_session(s):
   ...:     for _mapper in Mapper.all_mappers():
   ...:         _mapper.session = s
   ...:         _mapper.mapped_class.query.session = s
   ...:         _mapper._compiled = False
   ...:         _mapper.compile()
   ...:         _mapper.session.ensure_indexes(_mapper.collection)
   ...: 

In [7]: class TestCol(MappedClass):
   ...:     class __mongometa__:
   ...:         session = s
   ...:         name = 'testcol'
   ...: 
   ...:     _id = FieldProperty(schema.ObjectId)
   ...:     a = FieldProperty(schema.Int)
   ...:     b = FieldProperty(schema.Int)
   ...: 

In [8]: def target(_id, _type):
   ...:     s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))
   ...:     adapt_to_session(s)
   ...:     t = TestCol.query.get(_id)
   ...:     if _type == 'A':
   ...:         t.a = getpid()
   ...:     else:
   ...:         t.b = getpid()
   ...:     s.flush_all()
   ...:     s.close()
   ...: 

In [9]: for i in range(100):
   ...:     _id = TestCol(a=0, b=0)._id
   ...:     s.flush_all()
   ...:     proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
   ...:     proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
   ...:     proc1.start()
   ...:     proc2.start()
   ...: 

In [10]: for i in range(100):
    ...:     _id = TestCol(a=0, b=0)._id
    ...:     s.flush_all()
    ...:     proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
    ...:     proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
    ...:     proc1.start()
    ...:     proc2.start()
    ...: 

In [11]: 

And it seems to run fine.
but if you inspect in a mongo shell, if can find out that either a or b is still 0

> db.testcol.find({$or: [{a: 0}, {b: 0}]})
{ "_id" : ObjectId("6232017bb7957cba03f5f295"), "a" : 0, "b" : 404322 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a2"), "a" : 0, "b" : 404484 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a5"), "a" : 0, "b" : 404544 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2b4"), "a" : 404745, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2bf"), "a" : 0, "b" : 404896 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d6"), "a" : 405192, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d8"), "a" : 405217, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2df"), "a" : 0, "b" : 405317 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e7"), "a" : 405414, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e8"), "a" : 405427, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2ec"), "a" : 405477, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2f5"), "a" : 0, "b" : 405619 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f8"), "a" : 405694, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f9"), "a" : 0, "b" : 405709 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fc"), "a" : 405736, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fe"), "a" : 405772, "b" : 0 }
{ "_id" : ObjectId("6232019ab7957cba03f5f307"), "a" : 0, "b" : 405894 }
{ "_id" : ObjectId("6232019ab7957cba03f5f309"), "a" : 0, "b" : 405908 }
{ "_id" : ObjectId("6232019ab7957cba03f5f30d"), "a" : 0, "b" : 405964 }
{ "_id" : ObjectId("6232019ab7957cba03f5f311"), "a" : 406006, "b" : 0 }
Type "it" for more

I can't figure out why this is happening, I feel stuck.

I am using versions:
pymongo 3.11.4
ming 0.11.2
mongo 5.0.2

@CastixGitHub
Copy link
Contributor

CastixGitHub commented Mar 18, 2022

In [13]: def target(_id, _type):
    ...:     db = MongoClient()
    ...:     db.testdb.testcol.update_one({'_id': _id}, update={"$set": {_type: getpid()}})
    ...: 

In [14]: for i in range(100):
    ...:     _id = db.testdb.testcol.insert_one({'a': 0, 'b': 0}).inserted_id
    ...:     proc1 = multiprocessing.Process(target=target, args=(_id, 'a'))
    ...:     proc2 = multiprocessing.Process(target=target, args=(_id, 'b'))
    ...:     proc1.start()
    ...:     proc2.start()

with pymongo only it works fine, every field have both a and b ne 0.
In my use case I cannot drop the odm.


making assumptions based on the sourcecode:

How can we overcome this issue then?

  • each object has state (new, clean, dirty, deleted) under __ming__ property
  • the state have an original_document property (can validation make difference? yes, eg: datetime)
  • the tracker just keeps track of what objects get dirty (soil) through the instrumentation of the object, it doesn't care about which fields were changed
  • here's the object being flushed whithin the mapper with associated state
  • we could update only the fields that actually changed, to detect what changed, I tried in this way:
            def doc_to_set(doc):
                def to_hashable(v):
                    if isinstance(v, list):
                        return tuple((to_hashable(sv) for sv in v))
                    elif isinstance(v, dict):
                        return tuple(((to_hashable(k), to_hashable(sv))
                                      for k, sv in sorted(v.items())))
                    elif hasattr(v, '__hash__'):
                        return v
                    else:
                        return v
                return set((k, to_hashable(v)) for k, v in doc.copy().items())
            fields = tuple(set((k for k, v in
                                doc_to_set(state.original_document)
                                ^ doc_to_set(state.document))))

ming test suite is now saying OK, but it's not...
it seems it doesn't detect changes around instrumented list, why? because original_document is already mutated! ouch, how did that happen?
this comment makes me feel bad
I used remote_pdb to get the id of that instrumented list (that under the state.*document isn't instrumented) and they're the same object!
trying to deepcopy then... it works.


when two processes change the same field, the latter wins.
this can seem obvious, but for example calling append on a Ilist doesn't $push into the array when the session is flushed, the whole list is replaced instead.

so, before this change atomic operations and uow were always in conflict.
now there is a chance: an atomic update doesn't conflict with the unit of work, as long as they operate on different fields
you must still remember to rollback your atomic operations manually if flushing the session failed and vice versa

I'm preparing a PR out of this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants