[docs]classLockCollection(Collection):"""Collection holding the locking information. This collection is necessary when multiple users are concurrently interacting with a `VersionedCollection`. The documents in this collection have the following format:: { _id: ObjectId(...) collection_name: 'name of the tracked collection' locked: True/ False } This collection is shared per database, holding information about each tracked collection in the database. By using atomic updates, each `VersionedCollection` can implement a simple locking mechanism. """# The current locking mechanism is not safe against adversarial usage.# Since the lock is represented by a shared collection, any other user can# call :meth:`unlock` before calling :meth:`lock` to be able to proceed in# executing a function. Since this is only an internal mechanism, and it is# not exposed it should be fine for now.
[docs]definit_lock(self,collection:str):"""Initialise the lock."""doc={'collection_name':collection,'locked':False}self.find_one_and_replace(filter=doc,replacement=doc,upsert=True)
[docs]deftry_lock_acquire(self,collection:str)->bool:"""Try to acquire the lock for the given collection. :return: ``True`` if the lock is successfully acquired, ``False`` if the lock is held by another process. """ret=self.find_one_and_update(filter={'collection_name':collection,'locked':False},update={"$set":{'locked':True}},)returnretisnotNone
[docs]deflock_acquire(self,collection:str)->bool:"""Acquire the lock for the given collection. :param collection: The name of the collection to lock. :return: Whether the process waited for the lock. """has_waited_for_lock=Falsewhilenotself.try_lock_acquire(collection):has_waited_for_lock=Truesleep(0.1)returnhas_waited_for_lock
[docs]deflock_release(self,collection:str)->bool:"""Release the lock for the given collection. :param collection: The name of the collection to unlock. :return: Whether the collection was locked. """ret=self.find_one_and_update(filter={'collection_name':collection,'locked':True},update={"$set":{'locked':False}},)returnretisnotNone
[docs]defremove_collection(self,collection:str)->None:"""Remove the locking information for the given collection. :param collection: The name of the collection for which to remove the lock. """self.delete_one({'collection_name':collection})ifself.count_documents({})==0:self.drop()