import dataclasses
import datetime
from copy import deepcopy
from functools import partial
from multiprocessing import cpu_count, Pool
from typing import Any, Dict, Optional, List, Tuple, Union
import pymongo
from bson import ObjectId
from deepdiff import DeepDiff, Delta
from pymongo.command_cursor import CommandCursor
from pymongo.database import Database
from treelib import Node
from versioned_collection.collection.tracking_collections import (
_BaseTrackerCollection,
)
from versioned_collection.tree import Tree
from versioned_collection.utils.data_structures import hashabledict
from versioned_collection.utils.mongo_query import group_documents_by_id
[docs]
class DeltasCollection(_BaseTrackerCollection):
"""Stores the deltas between different versions of the target collection.
Each `delta` document in this collection reflects the changes and
therefore, the actions needed to be performed to move between different
versions of the documents in the target collection. If no deltas for a
particular document are present, then that document has not been modified
since the target collection was initialised for versioning.
The `deltas` documents store information about the changes of particular
documents in the target collection for a specific version and a specific
branch of the versioning system. Even though the deltas for all documents
are stored together, they can be grouped by the document in the target
collection that they modify. The per-document deltas are tree
structures, where the branches of the tree are a subset of the branches of
the version tree (because some documents may not be modified on some
branches). The per-document delta tree allows applying changes in both
directions of time for a specific document. Since the target collection
may contain multiple documents, this collection represents a set of
per-document delta trees.
To get to a specific version of the target collection we need the path in
version tree or log tree between the two versions. All the deltas between
the start and the end points of the path will be retrieved according to
the direction in which we should navigate the delta trees. The delta
trees for each modified document between the two versions are built and
intersected with the given path, to get the sequence of deltas that have
to be applied to a document to move across versions.
"""
_NAME_TEMPLATE = '__deltas_{}'
_DOCUMENT_TYPE = Dict[str, Any]
# Used by deepdiff.Delta
_SAFE_TO_IMPORT = {'bson.objectid.ObjectId'}
[docs]
@dataclasses.dataclass
class SCHEMA:
document_id: Any # ObjectId | Dict[str, Any] | float | str | int
collection_version_id: int
branch: str
timestamp: datetime.datetime
forward: bytes
backward: bytes
prev: Optional[ObjectId]
next: List[ObjectId]
def __getitem__(self, attr: str):
if not hasattr(self, attr):
return None
return self.__dict__[attr]
def __init__(
self, database: Database, parent_collection_name: str, **kwargs
) -> None:
super().__init__(database, parent_collection_name, **kwargs)
[docs]
def build(self) -> bool:
"""Create the collection on the database.
:return: ``False`` if the collection already exists, ``True`` otherwise.
"""
if self.exists():
return False
self.create_index('document_id')
self.create_index([
('collection_version_id', pymongo.DESCENDING),
('branch', pymongo.ASCENDING),
])
return True
def _build_delta_tree_set(
self, deltas: List[_DOCUMENT_TYPE]
) -> Optional[List[Tree]]:
"""Build the per-document delta tree set.
.. note::
The given list of documents should be deltas related to a single
document.
The delta tree is just an in-memory representation of the delta tree
stored in this collection for each document. Since deltas do not
necessarily need to be all connected, the delta tree may be split
into unconnected components, which for the delta tree set.
:param deltas: A list of document deltas related to a document.
:return: The delta tree set.
"""
if len(deltas) == 0:
return None
# Find the roots of each unconnected delta tree or the root of the
# document delta tree. Note that it is impossible to have both a
# global root and unconnected trees, because if there is a global
# root, then all subtrees will be connected.
deltas = {d['_id']: self._delta_doc_to_schema(d) for d in deltas}
deltas: Dict[ObjectId, DeltasCollection.SCHEMA]
roots: List[ObjectId] = []
root = None
for d_id, delta in deltas.items():
if root is None:
root = d_id, delta
if delta.timestamp < root[1].timestamp:
root = d_id, delta
if delta.prev is None:
roots.append(d_id)
if len(roots) == 0:
roots.append(root[0])
return [self._build_delta_tree(root, deltas) for root in roots]
@staticmethod
def _build_delta_tree(
root: ObjectId, deltas: Dict[ObjectId, SCHEMA]
) -> Tree:
# Build the per-document partial delta tree.
tree = Tree()
to_visit = [root]
while len(to_visit) > 0:
_id = to_visit.pop(-1)
if _id not in deltas:
continue
_delta = deltas[_id]
# This allows building partial trees
parent = _delta.prev if _delta.prev in deltas else None
tree.create_node(identifier=_id, parent=parent, data=_delta)
to_visit.extend(_delta.next)
return tree
[docs]
def add_delta(
self,
document_old: _DOCUMENT_TYPE,
document_new: _DOCUMENT_TYPE,
document_id: ObjectId,
collection_version: int,
branch: str,
timestamp: datetime.datetime,
branch_history: List[Tuple[int, str]],
with_id: Optional[ObjectId] = None,
) -> Optional[ObjectId]:
"""Compute and records the deltas between the given document versions.
:raises InvalidCollectionState: If some deltas for the current
collection version cannot be identified.
:param document_old: The old version of the document.
:param document_new: The new version of the document, that contains
changes.
:param document_id: The id of the modified document.
:param collection_version: The version of the tracked collection to
which the changes to the given document should be registered.
:param branch: The branch that the modified target document belongs to.
:param timestamp: The date and time when the delta was registered.
:param branch_history: A set containing (version, branch) tuples
from the previous version to the root of the version tree.
:param with_id: An optional :class:`ObjectId` used for inserting the
new entry into this collection. This is used when adding deltas
from a local to a remote collection.
:return: The id of the delta document, or ``None`` if the two versions
of the document are unchanged.
"""
forward = DeepDiff(
document_old,
document_new,
ignore_order=False,
report_repetition=False,
)
# No changes actually made since the previous registered version.
# This can be caused by updating the document once, and then updating
# it again to its previous version.
if forward == {}:
return None
forward = Delta(forward)
# Search the per-document delta tree for a previous delta
# Get the set of deltas
deltas = self.find({'document_id': document_id})
# Keep only the deltas that are part of the branch history
_hist_set = set(branch_history)
# We have to add this for deltas that are already registered in case
# of multiple document updates
_hist_set.add((collection_version, branch))
deltas = [
d
for d in deltas
if (d['collection_version_id'], d['branch']) in _hist_set
]
prev_delta_doc = None
prev_delta_node = None
# Force a delta recompute in the case a delta has already been
# registered during this transaction
# This should not happen because the tracking documents are grouped
# by the document's ID. However, it is possible that the listener
# hasn't finished marking all documents as modified by the time they are
# extracted from 'modified documents' collection.
update_forward_and_backward_deltas = False
delta_doc_id = None
# If the document has been modified before, find the previous delta
# that modified the document on the current path from root to
# the current node in the version tree.
if len(deltas) > 0:
# Build the delta tree for this document
deltas = list(deltas)
trees = self._build_delta_tree_set(deltas)
tree = None
for _tree in trees:
root = _tree.get_node(_tree.root)
root_version = root.data.collection_version_id, root.data.branch
if root_version in _hist_set:
tree = _tree
break
assert tree is not None
branch_history = deepcopy(branch_history)
# Trim the path to match the root of the delta tree
next_node: Node = tree.get_node(tree.root)
while (
len(branch_history) > 0
and self._version_of(next_node) != branch_history[-1]
):
branch_history.pop(-1)
# Use the reversed history to descend from root to the latest
# delta that modified the document
branch_history.insert(0, (collection_version, branch))
while (
len(branch_history) > 1
and self._version_of(next_node) == branch_history[-1]
):
branch_history.pop(-1)
for child in tree.children(next_node.identifier):
if self._version_of(child) == branch_history[-1]:
next_node = child
if self._version_of(next_node) == (collection_version, branch):
old_forward_diff = Delta(
next_node.data['forward'],
safe_to_import=self._SAFE_TO_IMPORT,
).diff
if forward.diff == old_forward_diff:
# TODO: log here to make sure this doesn't happen
# under stress tests, then remove
# The document has not changed, but it was simply
# modified multiple times before registering the new
# version.
return next_node.identifier
else:
update_forward_and_backward_deltas = True
delta_doc_id = next_node.identifier
else:
prev_delta_node = next_node
prev_delta_doc = next_node.data.__dict__
prev_delta_doc['_id'] = next_node.identifier
backward = Delta(
DeepDiff(
document_new,
document_old,
ignore_order=False,
report_repetition=False,
)
)
delta_doc = self.SCHEMA(
document_id=document_id,
collection_version_id=collection_version,
branch=branch,
timestamp=timestamp,
forward=forward.dumps(),
backward=backward.dumps(),
prev=None if prev_delta_doc is None else prev_delta_doc['_id'],
next=[],
).__dict__
if update_forward_and_backward_deltas:
self.update_one(
{'_id': delta_doc_id},
update={
"$set": {
'forward': delta_doc['forward'],
'backward': delta_doc['backward'],
}
},
)
delta_doc['_id'] = delta_doc_id
else:
if with_id:
delta_doc['_id'] = with_id
self.insert_one(delta_doc)
# Link the delta with its parent
if prev_delta_doc is not None:
next_list = prev_delta_node.data.next
next_list.append(delta_doc['_id'])
self.find_one_and_update(
filter={'_id': delta_doc['prev']},
update={"$set": {"next": next_list}},
)
return delta_doc['_id']
[docs]
def insert_delta_docs(self, delta_docs: List[_DOCUMENT_TYPE]) -> None:
"""Insert a list of delta documents into this collection.
.. warning::
This method modifies the delta documents and removes the ids of
the documents from the ``next`` field that are not part of the
given `delta_docs` list.
This is used during remote-local synchronisation of a branch,
therefore the inserted deltas are slightly modified versions of the
local deltas. Since a single branch can be pushed or pulled at a
time, the forward references to other delta documents that are not in
the given list are removed. Also, the parent deltas are updated to
include the first delta document in `delta_docs` in its forward
references field, i.e., ``next``.
:param delta_docs: The delta documents to be inserted. It is assumed
that the deltas are sorted.
"""
deltas_ids = {d['_id'] for d in delta_docs}
for i, delta_doc in enumerate(delta_docs):
# Make sure we clean up the delta documents to include
# information only about the desired branch.
delta_doc['next'] = [
d for d in delta_doc['next'] if d not in deltas_ids
]
if i == 0 and delta_doc['prev'] is not None:
# Update the parent of the first delta doc.
# The following delta docs are inserted directly
self.find_one_and_update(
filter={'_id': delta_doc['prev']},
update={"$push": {"next": delta_doc['_id']}},
)
# Add the delta documents
self.insert_many(delta_docs)
def _delta_doc_to_schema(self, delta: Dict[str, Any]) -> SCHEMA:
"""Convert a delta document to a schema object."""
delta.pop('_id')
return self.SCHEMA(**delta)
[docs]
def get_deltas(
self, path: Dict[Tuple[int, str], int]
) -> Dict[Any, List[Delta]]:
"""Retrieve the deltas across the given path of versions.
:param path: The path between two versions. The keys identify the
version (i.e., (version, branch) tuples) and the values the
direction in time to take to move between versions.
:return: A list of tuples containing the requested deltas, grouped by
the documents' id.
"""
# Get the deltas grouped by the document's id for the versions in
# `path`.
documents = self.get_delta_documents_in_path(path)
deltas = dict()
for doc in documents:
_deltas = doc['deltas']
# Build the partial delta tree
trees = self._build_delta_tree_set(_deltas)
# Since _build_partial_delta_tree is only used in this method,
# we could've merged the functions, instead of composing them,
# since _get_deltas overlaps a partial with the path anyway. We
# still do the same amount of work, so it's fine for now.
_deltas = self._get_deltas(
self._build_partial_delta_tree(trees, path), path
)
doc_id = doc['_id']
if isinstance(doc_id, dict):
doc_id = hashabledict(doc_id)
deltas[doc_id] = _deltas
return deltas
def _build_partial_delta_tree(
self,
trees: List[Tree],
path: Dict[Tuple[int, str], int],
) -> Tree:
"""Build a partial per-document delta tree out of unconnected trees.
:param trees: The trees to merge.
:param path: The path to follow across the merged tree.
:return: A partial document delta tree.
"""
# The delta tree is complete, nothing to do.
if len(trees) == 1:
return trees[0]
# Filter the trees whose roots are not in path
trees = [
t for t in trees if self._version_of(t.get_node(t.root)) in path
]
# If after filtering the out-of-path trees, we are left with a single
# tree, then this tree is sufficient to recover the correct deltas.
if len(trees) == 1:
return trees[0]
# At this point, we can only have 2 unconnected trees that are part of
# different branches in the version tree.
# Proof:
# Suppose the 2 trees are part of the same branch. Since none of
# the trees were previously filtered, the root of one tree has to
# be one end of the path, and the leaf of the other has to be the
# other end of the path. But since they are part of the same
# branch, there exists a set of deltas for that branch that
# transform the document between the end points of the path,
# therefore there exists a unique delta tree -> contradiction.
# If the path is a linear chain (has no change in
# direction), then there exists a unique tree since the path is
# contained within a single branch. Otherwise, the path is split
# between two branches and there exists a delta tree for each
# branch, so 2 trees.
assert len(trees) == 2
direction = None
version: Optional[int, str] = None
for i, (_version, _direction) in enumerate(path.items()):
_version: Tuple[int, str]
if i == 0:
direction = _direction
if direction != _direction:
version = _version
break
empty_delta_binary = Delta(
DeepDiff(
dict(),
dict(),
ignore_order=False,
report_repetition=False,
)
).dumps()
tree = Tree()
tree.create_node(
identifier=-1,
parent=None,
data=DeltasCollection.SCHEMA(
document_id=None,
collection_version_id=version[0],
branch=version[1],
timestamp=None, # noqa
forward=empty_delta_binary,
backward=empty_delta_binary,
prev=None, # noqa
next=[t.get_node(t.root).identifier for t in trees],
),
)
tree.paste(tree.root, trees[0])
tree.paste(tree.root, trees[1])
return tree
[docs]
def get_delta_documents_in_path(
self,
path: Dict[Tuple[int, str], int],
sorting_order: Optional[int] = None,
) -> CommandCursor:
"""Get the delta documents grouped by tracked document's id in `path`.
:param path: The path in the version tree from which to pull the
delta documents.
:param sorting_order: The order in which to sort the delta documents by
timestamp. ``1`` means ascending and ``-1`` means descending.
If omitted, the sorting step is skipped.
:return: The delta documents.
"""
versions = list(path.keys())
directions = set(path.values())
if not (1 in directions and -1 in directions):
if len(versions) > 1:
if path[versions[0]] == -1:
versions.pop(-1)
elif path[versions[0]] in [0, 1]:
versions.pop(0)
# fmt: off
cond = {"$or": [
{'collection_version_id': v, 'branch': b}
for (v, b) in versions
]}
# fmt: on
sort_stage = (
[{"$sort": {'timestamp': sorting_order}}]
if sorting_order is not None
else []
)
# fmt: off
documents = self.aggregate([
{"$match": cond},
*sort_stage,
{"$group": {'_id': "$document_id", 'deltas': {"$push": "$$ROOT"}}}
],
allowDiskUse=True
)
# fmt: on
return documents
[docs]
def rebranch(
self,
start_version: Tuple[int, str],
new_branch: str,
num_versions: int,
) -> None:
"""Move the deltas after `start_version` to another branch.
:param start_version: The version which should be moved to a new branch.
:param new_branch: The name of the new branch.
:param num_versions: The number of versions to move, i.e., the length
of the branch starting at `start_version`.
"""
cond = {
"$or": [
{'collection_version_id': v, 'branch': start_version[1]}
for v in range(
start_version[0], start_version[0] + num_versions
)
]
}
self.update_many(
cond,
{
"$set": {'branch': new_branch},
"$inc": {'collection_version_id': -start_version[0]},
},
)
@staticmethod
def _version_of(n: Node) -> Tuple[int, str]:
"""Get the version identifier of a node in the delta tree."""
return n.data.collection_version_id, n.data.branch
def _get_deltas(
self, tree: Tree, path: Dict[Tuple[int, str], int]
) -> List[Delta]:
"""Get the list of deltas for the given path.
Given a delta tree and a sequence of versions and the direction to be
taken into the version tree to navigate between them, it computes the
actual deltas that needs to be applied to a document to get from the
first version in `path` to the last one.
Visually, it computes the overlap, or the intersection between the
delta tree and the path. The path could either contain a subtree of
the delta tree, or be contained in the delta tree.
:param tree: A per-document delta tree.
:param path: A path in the version tree between the start and end
versions.
:return: A list of deltas that have to be applied to the document
linked to the given `tree` that modify the document.
"""
def _extract_and_decode_deltas(
nodes: List[Node], _direction: str
) -> List[Delta]:
assert _direction in [
'forward',
'backward',
], "Invalid delta direction!"
return [
Delta(
n.data[_direction],
safe_to_import=DeltasCollection._SAFE_TO_IMPORT,
)
for n in nodes
]
# Find the first node in the delta tree that is in :param:`path`. The
# subtree rooted in that node contains the required deltas.
to_visit = [tree.root]
node = None
while len(to_visit) > 0:
node = tree.get_node(to_visit.pop(0))
if self._version_of(node) in path:
break
for child in tree.children(node.identifier):
if self._version_of(tree.get_node(child.identifier)) in path:
to_visit.append(child.identifier)
# Reconstruct the paths. There are two possible cases:
# 1. `node` has only one child in `path`. There are further two
# sub-cases:
# a) `node` is part of a linear path, ant it is either the end or
# the start of the path. This can be decided by inspecting the
# direction for `node`, i.e., it is the start if the direction
# is `1`, and it is the end if the direction is `-1`.
# b) `node` is the branching point of the path, but since it
# has only one child in `path`, its deltas must be excluded.
#
# 2. `node` has two children in path, so the path starts lower in
# the tree, goes up to `node` and then descends to another child node,
# so the direction in time should change.
# Find the paths in the subtree that intersect the `path`.
paths = [], []
i = 0
for child in tree.children(node.identifier):
_is_in_path = False
to_visit = [child]
# Go forward only if this child in path
while len(to_visit) > 0 and self._version_of(child) in path:
_is_in_path = True
_path = paths[i]
_node = to_visit.pop(-1)
# Order the inserted nodes by their traversal order
if path[self._version_of(_node)] == -1:
_path.insert(0, _node)
else:
_path.append(_node)
for next_child in tree.children(_node.identifier):
# At this point only one child can be in the path
if self._version_of(next_child) in path:
to_visit.append(next_child)
if _is_in_path:
i += 1
if not len(paths[1]):
# Case 1
_deltas = paths[0]
node_version = self._version_of(node)
if path[node_version] == 1:
direction = 'forward'
_deltas.insert(0, node)
elif path[node_version] == -1:
direction = 'backward'
_deltas.append(node)
else:
# Case 1 b)
if len(_deltas):
d = path[self._version_of(_deltas[0])]
assert d == 1 or d == -1
direction = 'forward' if d == 1 else 'backward'
else:
return []
_deltas = _extract_and_decode_deltas(_deltas, direction)
else:
# Case 2
# Fix the order
if path[self._version_of(paths[0][0])] == 1:
paths = paths[1], paths[0]
# Build the path
left = _extract_and_decode_deltas(paths[0], 'backward')
right = _extract_and_decode_deltas(paths[1], 'forward')
# We don't need the root here.
_deltas = left + right
return _deltas
[docs]
def apply_deltas(
self,
per_document_deltas: Dict[Any, List[Delta]],
documents: List[_DOCUMENT_TYPE],
return_current_documents: bool = False,
) -> Union[
Dict[Any, _DOCUMENT_TYPE],
Tuple[Dict[Any, _DOCUMENT_TYPE], Dict[Any, _DOCUMENT_TYPE]],
]:
"""Update the given documents and return them.
Applies the deltas between two versions of the target collection. It
uses the given deltas grouped by document and sorted by the direction
on which they have to be applied and sequentially updates each
document to get to the version of the document for the target version
of the tracked collection.
:param per_document_deltas: The prefetched and sorted list of deltas
that have to be applied, grouped by document id.
:param documents: The list of documents that will be updated.
:param return_current_documents: Whether to return the current
documents grouped by id.
:param: If ``return_current_documents`` is set, the current documents
are returned as well.
:return: The updated documents grouped by their ``'_id'`` field. If
The documents that are empty should be removed from the target
collection.
"""
__PROCESSING_LIMIT = 1000
# Allow complex ids
documents = group_documents_by_id(documents)
# If we have to update a lot of documents, do it in parallel.
if len(per_document_deltas) > __PROCESSING_LIMIT:
per_document_deltas = list(per_document_deltas.items())
_f = partial(self._process_deltas, documents=documents)
chunk_size = max(1, len(per_document_deltas) // cpu_count() * 3)
with Pool(cpu_count()) as p:
updated_docs = p.map(
_f, per_document_deltas, chunksize=chunk_size
)
updated_docs = dict(updated_docs)
else:
updated_docs = dict()
for item in per_document_deltas.items():
doc_id, doc = self._process_deltas(item, documents)
updated_docs[doc_id] = doc
if return_current_documents:
return updated_docs, documents
return updated_docs
@staticmethod
def _process_deltas(
item: Tuple[Any, List[Delta]],
documents: Dict[Any, _DOCUMENT_TYPE] = None,
) -> Tuple[Any, _DOCUMENT_TYPE]:
doc_id, deltas = item
# Documents that have to be created do not exist at all in the
# target collection, so use an empty document and update it.
document = documents.get(doc_id, {})
# Sequentially apply the deltas
for delta in deltas:
document = document + delta
if isinstance(doc_id, dict):
doc_id = hashabledict(doc_id)
return doc_id, document
[docs]
def delete_subtrees(
self,
root: Tuple[int, str],
leaves: List[Tuple[int, str]],
) -> None:
"""Delete the deltas registered after a specific version.
:param root: The root of the subtree of the version tree from which
deltas will be removed.
:param leaves: The versions of the leaves of the version subtree.
"""
# Construct the paths on which to remove the deltas
paths: Dict[Tuple[int, str], Optional[int]] = {root: None}
for leaf_v, leaf_b in leaves:
stop_v = 0 if leaf_b != root[1] else root[0]
while leaf_v >= stop_v:
paths[(leaf_v, leaf_b)] = None
leaf_v -= 1
deltas_per_doc = self.get_delta_documents_in_path(
path=paths, sorting_order=pymongo.ASCENDING
)
delta_ids = []
for doc in deltas_per_doc:
for i, delta_doc in enumerate(doc['deltas']):
delta_ids.append(delta_doc['_id'])
if i == 0 and delta_doc['prev'] is not None:
# Remove the delta reference from the parent
self.find_one_and_update(
filter={'_id': delta_doc['prev']},
update={"$pull": {"next": delta_doc['_id']}},
)
# Delete the deltas
self.delete_many({'_id': {"$in": delta_ids}})