versioned_collection.collection.tracking_collections package

Helper tracking collection used in versioning.

This module defines various tracking collection used to version the target VersionedCollection collection.

This module should NOT be directly used by the user. Any interaction with a tracked collection should be done through the VersionedCollection object.

Note

Across this module there is mentioned the Version Tree. This is not an actual data structure, and it is not physically implemented by any class or collection. The version tree is the combination between the deltas and the log tree, so the Version Tree is the tree that has the same sets of nodes as the Log Tree, while its edges are represented by the set of per-document deltas that have to be applied to move between two consecutive versions. Another view of this abstract versioning structure is represented as a directed acyclic graph, that has the same nodes as the Log Tree, and each edge in the graph that links two versions represents a delta document that updates one document in the source version of the target collection to its version in the destination version node.

class versioned_collection.collection.tracking_collections.BranchesCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores information about the branch pointers.

Warning

This class should not be used directly by the user.

Branches are pointers to specific version numbers and branch names on the version tree. The version tree is a tree that has as nodes the version identifiers, i.e., version id and branch name, of a version of the target collection, and as edges the set of per-document deltas that have to be applied to move between versions. Technically, the branches point to the latest set of per-document deltas that has to be applied to the previous version of the collection to get to the latest version, but that specific set of deltas is identified by the same tuple of version identifiers as the version node itself.

class SCHEMA(name: 'str', points_to_collection_version: 'int', points_to_branch: 'str')[source]

Bases: object

name: str
points_to_branch: str
points_to_collection_version: int
build() bool[source]

Create the collection on the database.

Returns:

False if the collection already exists, True otherwise.

create_branch(branch: str, pointing_to_collection_version: int, pointing_to_branch: str) None[source]

Create a new branch pointing to the specified location.

Raises:

ValueError – If a branch with name branch already exists.

Parameters:
  • branch – The name of the new branch.

  • pointing_to_collection_version – The collection version to which this branch should point to.

  • pointing_to_branch – The branch on which the collection version that the new branch should point to is located.

delete_branch(branch: str) None[source]

Delete the branch with the given name.

delete_branches(branches: List[str]) None[source]

Delete the branches with names in the given list.

get_branch(branch: str) SCHEMA[source]

Retrieve the branch information.

Raises:

BranchNotFound – If no branch with the given name exists.

Parameters:

branch – The branch for which the information should be retrieved.

Returns:

The branch document.

get_branch_names() Set[str][source]

Return the names of the existing branches.

get_empty_branches() Set[SCHEMA][source]

Return a set of empty branches data.

get_empty_child_branches(branch: str, after_version: int | None = None) List[SCHEMA][source]

Return the empty branches pointing at branch.

Parameters:
  • branch – The name of the parent branch.

  • after_version – The version after which to retrieve empty branches, including the version itself. If None, all empty branches for the given branch will be returned.

Returns:

A list of branch data.

has_branch(branch_name: str) bool[source]

Check whether a branch name with the provided name exists.

update_branch(branch: str, pointing_to_collection_version: int, pointing_to_branch: str, new_name: str | None = None) None[source]

Update the information about a branch pointer.

Raises:

BranchNotFound – If no branch with name branch exists.

Parameters:
  • branch – The name of the branch to be updated.

  • pointing_to_collection_version – The new collection version to which the new branch points to.

  • pointing_to_branch – The branch on which the new version of the collection was registered.

  • new_name – The new name of the branch.

class versioned_collection.collection.tracking_collections.ConflictsCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores the conflict information produced after merging a branch.

class SCHEMA(destination: Dict[str, Any], merged: Dict[str, Any], source: Dict[str, Any], destination_branch: str, source_branch: str)[source]

Bases: object

destination: Dict[str, Any]
destination_branch: str
merged: Dict[str, Any]
source: Dict[str, Any]
source_branch: str
class versioned_collection.collection.tracking_collections.DeltasCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores the deltas between different versions of the target collection.

Each delta document in this collection reflects the changes and therefore, the actions needed to be performed to move between different versions of the documents in the target collection. If no deltas for a particular document are present, then that document has not been modified since the target collection was initialised for versioning.

The deltas documents store information about the changes of particular documents in the target collection for a specific version and a specific branch of the versioning system. Even though the deltas for all documents are stored together, they can be grouped by the document in the target collection that they modify. The per-document deltas are tree structures, where the branches of the tree are a subset of the branches of the version tree (because some documents may not be modified on some branches). The per-document delta tree allows applying changes in both directions of time for a specific document. Since the target collection may contain multiple documents, this collection represents a set of per-document delta trees.

To get to a specific version of the target collection we need the path in version tree or log tree between the two versions. All the deltas between the start and the end points of the path will be retrieved according to the direction in which we should navigate the delta trees. The delta trees for each modified document between the two versions are built and intersected with the given path, to get the sequence of deltas that have to be applied to a document to move across versions.

class SCHEMA(document_id: Any, collection_version_id: int, branch: str, timestamp: datetime.datetime, forward: bytes, backward: bytes, prev: bson.ObjectId | None, next: List[bson.ObjectId])[source]

Bases: object

backward: bytes
branch: str
collection_version_id: int
document_id: Any
forward: bytes
next: List[bson.ObjectId]
prev: bson.ObjectId | None
timestamp: datetime
add_delta(document_old: Dict[str, Any], document_new: Dict[str, Any], document_id: bson.ObjectId, collection_version: int, branch: str, timestamp: datetime, branch_history: List[Tuple[int, str]], with_id: bson.ObjectId | None = None) bson.ObjectId | None[source]

Compute and records the deltas between the given document versions.

Raises:

InvalidCollectionState – If some deltas for the current collection version cannot be identified.

Parameters:
  • document_old – The old version of the document.

  • document_new – The new version of the document, that contains changes.

  • document_id – The id of the modified document.

  • collection_version – The version of the tracked collection to which the changes to the given document should be registered.

  • branch – The branch that the modified target document belongs to.

  • timestamp – The date and time when the delta was registered.

  • branch_history – A set containing (version, branch) tuples from the previous version to the root of the version tree.

  • with_id – An optional ObjectId used for inserting the new entry into this collection. This is used when adding deltas from a local to a remote collection.

Returns:

The id of the delta document, or None if the two versions of the document are unchanged.

apply_deltas(per_document_deltas: Dict[Any, List[deepdiff.Delta]], documents: List[Dict[str, Any]], return_current_documents: bool = False) Dict[Any, Dict[str, Any]] | Tuple[Dict[Any, Dict[str, Any]], Dict[Any, Dict[str, Any]]][source]

Update the given documents and return them.

Applies the deltas between two versions of the target collection. It uses the given deltas grouped by document and sorted by the direction on which they have to be applied and sequentially updates each document to get to the version of the document for the target version of the tracked collection.

Parameters:
  • per_document_deltas – The prefetched and sorted list of deltas that have to be applied, grouped by document id.

  • documents – The list of documents that will be updated.

  • return_current_documents – Whether to return the current documents grouped by id.

Param:

If return_current_documents is set, the current documents are returned as well.

Returns:

The updated documents grouped by their '_id' field. If The documents that are empty should be removed from the target collection.

build() bool[source]

Create the collection on the database.

Returns:

False if the collection already exists, True otherwise.

delete_subtrees(root: Tuple[int, str], leaves: List[Tuple[int, str]]) None[source]

Delete the deltas registered after a specific version.

Parameters:
  • root – The root of the subtree of the version tree from which deltas will be removed.

  • leaves – The versions of the leaves of the version subtree.

get_delta_documents_in_path(path: Dict[Tuple[int, str], int], sorting_order: int | None = None) pymongo.command_cursor.CommandCursor[source]

Get the delta documents grouped by tracked document’s id in path.

Parameters:
  • path – The path in the version tree from which to pull the delta documents.

  • sorting_order – The order in which to sort the delta documents by timestamp. 1 means ascending and -1 means descending. If omitted, the sorting step is skipped.

Returns:

The delta documents.

get_deltas(path: Dict[Tuple[int, str], int]) Dict[Any, List[deepdiff.Delta]][source]

Retrieve the deltas across the given path of versions.

Parameters:

path – The path between two versions. The keys identify the version (i.e., (version, branch) tuples) and the values the direction in time to take to move between versions.

Returns:

A list of tuples containing the requested deltas, grouped by the documents’ id.

insert_delta_docs(delta_docs: List[Dict[str, Any]]) None[source]

Insert a list of delta documents into this collection.

Warning

This method modifies the delta documents and removes the ids of the documents from the next field that are not part of the given delta_docs list.

This is used during remote-local synchronisation of a branch, therefore the inserted deltas are slightly modified versions of the local deltas. Since a single branch can be pushed or pulled at a time, the forward references to other delta documents that are not in the given list are removed. Also, the parent deltas are updated to include the first delta document in delta_docs in its forward references field, i.e., next.

Parameters:

delta_docs – The delta documents to be inserted. It is assumed that the deltas are sorted.

rebranch(start_version: Tuple[int, str], new_branch: str, num_versions: int) None[source]

Move the deltas after start_version to another branch.

Parameters:
  • start_version – The version which should be moved to a new branch.

  • new_branch – The name of the new branch.

  • num_versions – The number of versions to move, i.e., the length of the branch starting at start_version.

class versioned_collection.collection.tracking_collections.LockCollection(*args: Any, **kwargs: Any)[source]

Bases: Collection

Collection holding the locking information.

This collection is necessary when multiple users are concurrently interacting with a VersionedCollection.

The documents in this collection have the following format:

{
    _id: ObjectId(...)
    collection_name: 'name of the tracked collection'
    locked: True/ False
}

This collection is shared per database, holding information about each tracked collection in the database. By using atomic updates, each VersionedCollection can implement a simple locking mechanism.

class SCHEMA(collection_name: str, locked: bool)[source]

Bases: object

collection_name: str
locked: bool
init_lock(collection: str)[source]

Initialise the lock.

is_locked(collection: str) bool | None[source]
lock_acquire(collection: str) bool[source]

Acquire the lock for the given collection.

Parameters:

collection – The name of the collection to lock.

Returns:

Whether the process waited for the lock.

lock_release(collection: str) bool[source]

Release the lock for the given collection.

Parameters:

collection – The name of the collection to unlock.

Returns:

Whether the collection was locked.

remove_collection(collection: str) None[source]

Remove the locking information for the given collection.

Parameters:

collection – The name of the collection for which to remove the lock.

try_lock_acquire(collection: str) bool[source]

Try to acquire the lock for the given collection.

Returns:

True if the lock is successfully acquired, False if the lock is held by another process.

class versioned_collection.collection.tracking_collections.LogsCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores the logs for the tracked versions of the target collection.

This collection represents a log book for storing information about the registered versions of the target collection. A version has a number, a branch, a short description message and the timestamp at which it was registered.

The log is structured as a tree, each branch of the tree representing a versioning branch, therefore the log tree can be seen as a tree describing how the versions of the target collection have evolved in time, i.e., a tree that has as nodes the nodes of the version tree. Since this tree is a small data structure, it is cached in memory and automatically updated when new versions are registered, together with its persistent in-database representation.

class SCHEMA(version: int, branch: str, timestamp: datetime.datetime, message: str, prev: bson.ObjectId | None, next: List[bson.ObjectId])[source]

Bases: object

branch: str
property id: bson.ObjectId | None
message: str
next: List[bson.ObjectId]
prev: bson.ObjectId | None
timestamp: datetime
version: int
weakly_equals(other: SCHEMA) bool[source]

Check if two log objects are weakly equal.

The weak equality does not take into account the linkage relation between log entries, i.e., it ignores the next and prev fields.

Parameters:

other – The object to check for weak equality.

Returns:

Whether the log entries are weakly equal.

add_log_entry(previous_version: int, previous_branch: str | None, current_branch: str, message: str, timestamp: datetime, with_id: bson.ObjectId | None = None) Tuple[int, str][source]

Add a new entry to the log tree.

The log entries are created and added to the log tree when a new version of the target collection is registered. This method updates both the memory-cached tree and the persistent database tree.

Raises:

InvalidCollectionVersion – If the previous version and branch do not exist.

Parameters:
  • previous_version – The version id of the previous version, i.e., the version which was modified to generate the version to be registered. Must be -1 only for the first version (root).

  • previous_branch – The branch name of the previous version, i.e., the branch on which the previous version was registered.

  • current_branch – The branch on which the new version should be registered.

  • message – The message describing changes made to this version.

  • timestamp – The time the version was registered.

  • with_id – An optional ObjectId used for inserting the new entry into this collection. This is used when adding entries to a remote collection.

Returns:

The version id and the branch name of the new entry.

build(message: str | None = None, timestamp: datetime | None = None, with_id: bson.ObjectId | None = None) bool[source]

Build this collection on the database.

Parameters:
  • message – The message associated with the initial version of this collection.

  • timestamp – The timestamp when this collection was created.

  • with_id – The id of the root of the log tree.

Returns:

True if the collection was successfully built, False otherwise.

contains_version(version: int, branch: str) bool[source]

Return whether the given version is present in the log tree.

Parameters:
  • version – The version id of the version.

  • branch – The branch on which the version is registered.

Returns:

Whether the version exists in the log tree, i.e., if it is registered.

delete_subtree(version: Tuple[int, str]) None[source]

Delete the subtree of the version tree rooted in version.

Raises:

InvalidCollectionVersion – If version does not exist.

Parameters:

version – The root version of the subtree to delete.

get_log(branch: str, version: int | None = None, return_ids: bool = False) List[SCHEMA][source]

Return the log sorted in descending order for the given branch.

If a version number is provided, the log will start at the collection version identified by version and branch, otherwise, the returned log will contain the entire history from the top of the branch to the root of the log tree.

Raises:

ValueError – If there is no node on the given branch, or if there is no node identified by version and branch.

Parameters:
  • branch – The branch on which for which to retrieve the log.

  • version – The version where to start the log from.

  • return_ids – Whether to include the ids of the log documents as well.

Returns:

A list of log entries in descending order, i.e., the latest version at top.

get_log_doc_id(version: int, branch: str) bson.ObjectId | None[source]

Return the id of the document with the given version.

Parameters:
  • version – The version of the log entry.

  • branch – The branch of the log entry.

Returns:

The id of the log document in this collection, None if the version or the log tree does not exist.

get_log_entry(version: int, branch: str) SCHEMA | None[source]

Return the entry for the given version in the log tree.

Parameters:
  • version – The version for which the entry will be retrieved.

  • branch – The branch for which the entry will be retrieved.

Returns:

The entry in the log tree if it exists, None if the version or the log tree does not exist.

get_parent_branch(branch: str) str | None[source]

Retrieve the parent branch of the branch identified by branch.

Note

Branches are just unliked pointer documents. The branching structure of the collection is reflected in the log tree.

Raises:
Parameters:

branch – The name of the branch whose parent should be returned.

Returns:

The parent branch, or None is branch='main'.

get_parent_version(version: Tuple[int, str]) Tuple[int, str] | None[source]

Return the version and branch of the previous version.

Raises:

InvalidCollectionVersion – If the version identified by version does not exist.

Parameters:

version – The version whose parent should be retrieved.

Returns:

The parent of the given version.

get_path_between_versions(current: Tuple[int, str], target: Tuple[int, str]) Dict[Tuple[int, str], int][source]

Find the path between the given points in the log tree.

Note

The returned path includes the both ends.

Raises:

InvalidCollectionVersion – If the current or target versions do not exist in the log tree.

Parameters:
  • current – The start point version.

  • target – The end point version.

Returns:

An ordered dictionary representing the path that has to be followed in the log tree to get from the current version to the target version. The keys of the returned dictionary are tuples containing the version represented as (version, branch) and the values are the directions in time to be taken to get to the next version. The forward direction is represented as 1, and the backward direction as -1. The last entry, representing the target version, has the direction of the previous step as direction. If the two versions are on different branches with a common ancestor, then the direction at that node is 0.

get_previous_version_and_branch(current_version: int, current_branch: str) Tuple[int, str] | None[source]

Get the version and branch name of the previous version.

Looks up the current version in the log tree and returns the parent’s node version number and the branch name.

Raises:

InvalidCollectionVersion – If the given version is invalid or not registered.

Returns:

None if the collection is not tracked, the version and branch name of the previous node otherwise. If the current version is the root version, -1 will be returned as the previous version number.

get_versions_of_branch_tips(version: Tuple[int, str]) List[Tuple[int, str]][source]

Get the versions of the leaves of the subtree rooted in version.

Parameters:

version – A version in the log tree.

Returns:

The versions of the tip of the branches of the log subtree rooted in version.

property log_tree: Tree | None

Return the log tree.

rebranch(version: Tuple[int, str], new_branch: str) None[source]

Move the versions starting at version to a new branch.

This updates the branch field of the log entries to be new_branch and resets the version counter field for version.

Raises:

ValueError – If version is the root version.

Parameters:
  • version – The versions at which the new branch should start.

  • new_branch – The name of the new branch.

reset() bool[source]

Reset this collection and the in-memory cache.

Returns:

True if the operation is successful, False otherwise.

class versioned_collection.collection.tracking_collections.MetadataCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores metadata about the current state of the target collection.

This collection keeps track of the current version of the target versioned collection, i.e., its version and branch, and information about the ‘head’ branch pointer, which specifies whether unregistered changes were made since the last registered version of the collection and whether the head pointer is detached, i.e., we are not checked out to a branch ( or more specifically, to the latest registered version on a branch).

class SCHEMA(current_version: int, current_branch: str, detached: bool, changed: bool, has_stash: bool, has_conflicts: bool)[source]

Bases: object

changed: bool
current_branch: str
current_version: int
detached: bool
has_conflicts: bool
has_stash: bool
build() bool[source]

Build this collection on the database.

Returns:

True if the collection was successfully built, False otherwise.

property metadata: SCHEMA
set_metadata(current_version: int | None = None, current_branch: str | None = None, detached: bool | None = None, changed: bool | None = None, has_stash: bool | None = None, has_conflicts: bool | None = None) None[source]

Set some or all of the metadata attributes.

class versioned_collection.collection.tracking_collections.ModifiedCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores references to the documents that were modified and untracked.

A temporary collection created upon the initialisation for tracking of a target collection. The documents in this collection store references to the documents that have been modified with respect to the latest registered version and the operation that modified the document. When a new version of the target collection is registered all the documents of this collection are removed.

A tracking document is added to this collection each time a document in the target collection is modified. This allows correctly registering versions of the target collection when large volumes of unacknowledged writes are performed.

The set of all tracking documents in this collection forms a total order, induced by the ordering property of ObjectIds. The documents that track the changes of a target document are ordered by the order of occurrence of the events that modified the target document, which can be restored by comparing the tracking document ids, i.e., the _id field.

class SCHEMA(_id: 'ObjectId', id: 'ObjectId', op: 'str')[source]

Bases: object

id: bson.ObjectId
op: str
delete_modified(ids: List[bson.ObjectId]) None[source]

Delete the tracked documents from this collection.

Note

This deletes only one of the trackers documents. A document from the tracked collection that has been modified multiple times has multiple trackers in this collection.

Parameters:

ids – The ids of the tracker documents.

get_modified_document_ids_by_operation() Dict[Literal['i', 'd', 'u'], List[bson.ObjectId]][source]

Return the document ids grouped by the operation type.

Returns:

The list of ids of the modified documents grouped by the type of the modifying operation. The valid types are 'i' for inserts, 'd' for deletes and 'u' for updates and replacements.

get_modified_trackers() List[ModifiedTracker][source]

Get the modified document ids and the ids of the trackers.

Returns:

A list of documents containing the ids of the modified documents in the tracked collection and the ids of the trackers in this collection, grouped by the modified document ids.

get_unique_modified_document_ids() List[bson.ObjectId][source]

Get the unique ids of the modified documents.

has_changes() bool[source]
class versioned_collection.collection.tracking_collections.ReplicaCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

A snapshot of the latest tracked version of the target collection.

When the target collection is initialised for versioning, or when changes to a previous version of the target collection are tried to be made, i.e., when branching explicitly or implicitly from a previous checked-out version, this collection will replicate the state of the target collection.

The main purpose of a ReplicaCollection is to allow the user to efficiently perform queries to a tracked collection without any performance overhead, allowing to compute the deltas between the latest tracked version and the modified version of the target collection when a new version of it is registered.

build()[source]

Build this collection on the database.

Upon the creation of this collection a snapshot of the target collection is created as well, i.e., this collection will be a replica of the target collection at the moment of initialisation.

create_snapshot() None[source]
class versioned_collection.collection.tracking_collections.StashCollection(*args: Any, **kwargs: Any)[source]

Bases: _BaseTrackerCollection

Stores the stash data of a collection.

class versioned_collection.collection.tracking_collections.StashContainer(database: pymongo.database.Database, parent_collection_name: str, **kwargs)[source]

Bases: object

Container class managing the stash area.

When the state of the tracked collection is stashed, the modified documents of the tracked collection and the tracking documents from the corresponding ModifiedCollection are backed-up in two StashCollection collections. When the stashed data is restored, it is transferred back to the two corresponding collections and the stash area is cleared.

drop()[source]
exists() bool[source]
rename(new_name: str, *args, **kwargs)[source]
stash(main_collection: pymongo.collection.Collection, modified_collection: ModifiedCollection) None[source]

Copy the modified documents and the trackers to the stashing space.

Note

This does not modify the original collections.

Warning

This overwrites the existing stashed collections.

Parameters:
  • main_collection – The tracked versioned collection.

  • modified_collection – The collection that tracks the ids of the modified documents, i.e., __modified_<tracked_collection_name>.

stash_apply(main_collection: pymongo.collection.Collection, modified_collection: ModifiedCollection) None[source]

Apply the stash to restore the main collection.

Parameters:
  • main_collection – The tracked versioned collection

  • modified_collection – The collection that tracks the ids of the modified documents, i.e., __modified_<tracked_collection_name>.

Submodules