import dataclasses
import datetime
from typing import Dict, Optional, List, Tuple, TypedDict, Any
import pymongo
from bson import ObjectId
from pymongo.database import Database
from treelib import Node
from treelib.exceptions import NodeIDAbsentError
from versioned_collection.collection.tracking_collections import (
_BaseTrackerCollection,
)
from versioned_collection.errors import (
BranchNotFound,
InvalidCollectionState,
InvalidCollectionVersion,
)
from versioned_collection.tree import Tree
from versioned_collection.utils.data_structures import hashabledict
[docs]
class LogTreeIdentifier(TypedDict):
version: int
branch: str
[docs]
class LogsCollection(_BaseTrackerCollection):
"""Stores the logs for the tracked versions of the target collection.
This collection represents a log book for storing information about the
registered versions of the target collection. A version has a number,
a branch, a short description message and the timestamp at which it was
registered.
The log is structured as a tree, each branch of the tree representing a
versioning branch, therefore the log tree can be seen as a tree
describing how the versions of the target collection have evolved in
time, i.e., a tree that has as nodes the nodes of the version tree.
Since this tree is a small data structure, it is cached in memory and
automatically updated when new versions are registered, together with its
persistent in-database representation.
"""
_NAME_TEMPLATE = '__log_{}'
[docs]
@dataclasses.dataclass
class SCHEMA:
version: int
branch: str
timestamp: datetime.datetime
message: str
prev: Optional[ObjectId]
next: List[ObjectId]
@property
def id(self) -> Optional[ObjectId]:
_id = None
if hasattr(self, '_id'):
_id = self._id
return _id
def __str__(self) -> str:
return f"""\
* version: {self.version}
branch: {self.branch}
message: {self.message}
timestamp: {self.timestamp}
"""
def __repr__(self) -> str:
return f"<version: {self.version}, branch: {self.branch}>"
def __hash__(self) -> int:
return hash((
self.id,
self.version,
self.branch,
self.timestamp,
self.message,
))
def __eq__(self, other: 'LogsCollection.SCHEMA') -> bool:
if not isinstance(other, LogsCollection.SCHEMA):
return False
return (
self.weakly_equals(other)
and self.message == other.message
and self.prev == other.prev
and set(self.next) == set(other.next)
)
[docs]
def weakly_equals(self, other: 'LogsCollection.SCHEMA') -> bool:
"""Check if two log objects are weakly equal.
The weak equality does not take into account the linkage relation
between log entries, i.e., it ignores the ``next`` and ``prev``
fields.
:param other: The object to check for weak equality.
:return: Whether the log entries are weakly equal.
"""
if not isinstance(other, LogsCollection.SCHEMA):
return False
return (
self.version == other.version
and self.branch == other.branch
and self.timestamp == self.timestamp
and self.id == other.id
)
def __init__(
self,
database: Database,
parent_collection_name: str,
**kwargs,
) -> None:
super().__init__(database, parent_collection_name, **kwargs)
self._log_tree: Optional[Tree] = None
self._levels: Optional[Dict[LogTreeIdentifier, int]] = None
# Load the log tree
if self.exists():
self._load_log_tree()
@property
def log_tree(self) -> Optional[Tree]:
"""Return the log tree."""
return self._log_tree
def _load_log_tree(self) -> None:
"""Build and loads the log tree into memory.
:raises InvalidCollectionState: When:
- No root version is found;
- The log contains cycles;
- Missing parent version even if recorded in child.
"""
log_entries = {e['_id']: e for e in self.find({})}
# find the root
root = None
for entry in log_entries.values():
if entry['prev'] is None:
root = entry
break
if root is None:
raise InvalidCollectionState("No root entry in the log tree!")
# Build the tree and cache the level of each node.
self._log_tree = Tree()
self._levels = dict()
to_visit: List[Dict[str, Any]] = [root]
root_identifier = self._get_log_tree_identifier(
version=root['version'], branch=root['branch']
)
self._levels[root_identifier] = 0
while len(to_visit) > 0:
node = to_visit.pop(-1)
try:
_id = node.pop('_id')
except KeyError as e:
raise InvalidCollectionState(
"The log tree has cycles. Found log entry for "
f"version '{node['version']}', branch '{node['branch']}' "
"referenced from a subsequent node."
) from e
node_identifier = self._get_log_tree_identifier(
version=node['version'], branch=node['branch']
)
if node['prev'] is None:
parent = None
else:
try:
parent = log_entries[node['prev']]
except KeyError as e:
raise InvalidCollectionState(
f"Found log entry with id {_id} whose parent "
"does not exist."
) from e
parent = self._get_log_tree_identifier(
version=parent['version'], branch=parent['branch']
)
self._log_tree.create_node(
identifier=node_identifier,
tag=_id,
data=self.SCHEMA(**node),
parent=parent,
)
for child_id in node['next']:
child_doc = log_entries[child_id]
to_visit.append(child_doc)
child_node_id = self._get_log_tree_identifier(
child_doc['version'], child_doc['branch']
)
self._levels[child_node_id] = self._levels[node_identifier] + 1
if len(self._log_tree) != len(log_entries):
num_unconnected = len(log_entries) - len(self._log_tree)
raise InvalidCollectionState(
"The log tree has unconnected components. "
f"Found {num_unconnected} log entries that are not connected "
"to the main tree that is being built."
)
[docs]
def build(
self,
message: Optional[str] = None,
timestamp: Optional[datetime.datetime] = None,
with_id: Optional[ObjectId] = None,
) -> bool:
"""Build this collection on the database.
:param message: The message associated with the initial version of
this collection.
:param timestamp: The timestamp when this collection was created.
:param with_id: The id of the root of the log tree.
:return: ``True`` if the collection was successfully built,
``False`` otherwise.
"""
if self.exists():
return False
self._log_tree = Tree()
self._levels = dict()
message = "Initial collection." if message is None else message
timestamp = (
datetime.datetime.now(datetime.timezone.utc)
if timestamp is None
else timestamp
)
self.add_log_entry(
previous_version=-1,
previous_branch=None,
current_branch='main',
message=message,
timestamp=timestamp,
with_id=with_id,
)
self.create_index(
[('version', pymongo.DESCENDING), ('branch', pymongo.ASCENDING)]
)
return True
[docs]
def reset(self) -> bool:
"""Reset this collection and the in-memory cache.
:return: ``True`` if the operation is successful, ``False`` otherwise.
"""
if not self.exists():
return False
self._log_tree = Tree()
self._levels = dict()
self.drop()
return True
@staticmethod
def _get_log_tree_identifier(
version: int,
branch: str,
) -> LogTreeIdentifier:
"""Create a hashable dictionary with the given fields."""
return hashabledict({'version': version, 'branch': branch})
[docs]
def contains_version(self, version: int, branch: str) -> bool:
"""Return whether the given version is present in the log tree.
:param version: The version id of the version.
:param branch: The branch on which the version is registered.
:return: Whether the version exists in the log tree, i.e., if it is
registered.
"""
if self._log_tree is None:
return False
v_id = self._get_log_tree_identifier(version, branch)
return self._log_tree.get_node(v_id) is not None
[docs]
def get_previous_version_and_branch(
self,
current_version: int,
current_branch: str,
) -> Optional[Tuple[int, str]]:
"""Get the version and branch name of the previous version.
Looks up the current version in the log tree and returns the
parent's node version number and the branch name.
:raises InvalidCollectionVersion: If the given version is invalid or
not registered.
:return: ``None`` if the collection is not tracked,
the version and branch name of the previous node otherwise.
If the current version is the root version, ``-1`` will be
returned as the previous version number.
"""
if self._log_tree is None:
return None
curr_node = self._log_tree.get_node(
self._get_log_tree_identifier(current_version, current_branch)
)
if curr_node is None:
raise InvalidCollectionVersion(current_version, current_branch)
node = self._log_tree.parent(curr_node.identifier)
if node is None:
# The current version is the root of the tree
return -1, current_branch
return node.data.version, node.data.branch
[docs]
def get_path_between_versions(
self,
current: Tuple[int, str],
target: Tuple[int, str],
) -> Dict[Tuple[int, str], int]:
"""Find the path between the given points in the log tree.
.. note::
The returned path includes the both ends.
:raises InvalidCollectionVersion: If the current or target versions do
not exist in the log tree.
:param current: The start point version.
:param target: The end point version.
:return: An ordered dictionary representing the path that has to be
followed in the log tree to get from the current version to the
target version. The keys of the returned dictionary are tuples
containing the version represented as ``(version, branch)`` and the
values are the directions in time to be taken to get to the next
version. The forward direction is represented as ``1``, and
the backward direction as ``-1``. The last entry, representing the
target version, has the direction of the previous step as direction.
If the two versions are on different branches with a common
ancestor, then the direction at that node is ``0``.
"""
if current == target:
# The versions are the same, so there is no path.
return dict()
current_node: Node = self._log_tree.get_node(
self._get_log_tree_identifier(*current)
)
if current_node is None:
raise InvalidCollectionVersion(*current)
target_node: Node = self._log_tree.get_node(
self._get_log_tree_identifier(*target)
)
if target_node is None:
raise InvalidCollectionVersion(*target)
# Find the path between the nodes by computing their lowest common
# ancestor.
path: List[Tuple[Tuple[int, str], int]] = []
src = current_node
dst = target_node
_swapped = False
if self._levels[src.identifier] < self._levels[dst.identifier]:
_swapped = True
src, dst = dst, src
while self._levels[src.identifier] != self._levels[dst.identifier]:
path.append((src.identifier, -1))
src = self._log_tree.parent(src.identifier)
_path_dst = [(dst.identifier, 0)]
on_different_branches = False
while src != dst:
on_different_branches = True
path.append((src.identifier, -1))
src = self._log_tree.parent(src.identifier)
# The order here also takes care to add the root of the subtree
# rooted at the LCA.
dst = self._log_tree.parent(dst.identifier)
_path_dst.insert(0, (dst.identifier, 1))
path = path + _path_dst
# Process the path
# A bit ugly because tuples are immutable
if _swapped:
# Reverse the path and the direction in time.
path = path[::-1]
path = [((i['version'], i['branch']), -d) for (i, d) in path]
sgn = 1
if on_different_branches:
# In this case, `src` and `dst` are on different branches,
# joined by a branching node. For any possible path between
# `src` and `dst` in this case, the direction at the
# branching node should be 1, so fix it.
# This is just temporary, to ensure the correct direction, and
# it needs to be set to 0 later on.
node_id, _ = path[len(_path_dst) - 1]
path[len(_path_dst) - 1] = node_id, 1
# After reversing the path in this case the direction of the
# first step should be the opposite of the last step
sgn = -1
# Fix the direction at the beginning of the path after reversing
path[0] = ((path[0][0][0], path[0][0][1]), sgn * path[-1][1])
if on_different_branches:
node_id, _ = path[len(_path_dst) - 1]
path[len(_path_dst) - 1] = node_id, 0
else:
path = [((i['version'], i['branch']), d) for (i, d) in path]
# This is not really needed for this direction, but change it for
# consistency
path[-1] = ((path[-1][0][0], path[-1][0][1]), path[-2][1])
if on_different_branches:
node_id, _ = path[len(path) - len(_path_dst)]
path[len(path) - len(_path_dst)] = node_id, 0
return dict(path)
[docs]
def add_log_entry(
self,
previous_version: int,
previous_branch: Optional[str],
current_branch: str,
message: str,
timestamp: datetime.datetime,
with_id: Optional[ObjectId] = None,
) -> Tuple[int, str]:
"""Add a new entry to the log tree.
The log entries are created and added to the log tree when a new
version of the target collection is registered. This method updates
both the memory-cached tree and the persistent database tree.
:raises InvalidCollectionVersion: If the previous version and branch
do not exist.
:param previous_version: The version id of the previous version, i.e.,
the version which was modified to generate the version to be
registered. Must be ``-1`` only for the first version (root).
:param previous_branch: The branch name of the previous version, i.e.,
the branch on which the previous version was registered.
:param current_branch: The branch on which the new version should be
registered.
:param message: The message describing changes made to this version.
:param timestamp: The time the version was registered.
:param with_id: An optional :class:`ObjectId` used for inserting the
new entry into this collection. This is used when adding entries
to a remote collection.
:return: The version id and the branch name of the new entry.
"""
if previous_version == -1 and previous_branch is None:
# This adds the root of the log tree
version = 0
previous_id = None
prev_tree_node = None
else:
previous_branch = previous_branch or current_branch
# A previous version exists
identifier = self._get_log_tree_identifier(
previous_version, previous_branch
)
prev_tree_node = self._log_tree.get_node(identifier)
if prev_tree_node is None:
raise InvalidCollectionVersion(
previous_version,
previous_branch,
)
previous_id = prev_tree_node.tag
# If the previous version branch is different, then this is the
# first node on a new branch, so set its version to 0, otherwise
# just increment the version
version = (
0
if previous_branch != current_branch
else prev_tree_node.data.version + 1
)
# Create the new entry
log_data = self.SCHEMA(
version=version,
branch=current_branch,
message=message,
timestamp=timestamp,
prev=previous_id,
next=[],
)
# Persist the change
_log_data_dict = log_data.__dict__
if with_id is not None:
_log_data_dict['_id'] = with_id
log_entry_id = self.insert_one(_log_data_dict).inserted_id
# Update the `next` list of the parent in the database
if prev_tree_node is not None:
next_nodes = prev_tree_node.data.next
next_nodes.append(log_entry_id)
self.find_one_and_update(
filter={'_id': prev_tree_node.tag},
update={"$set": {"next": next_nodes}},
)
# Update the in-memory cache
identifier = self._get_log_tree_identifier(
log_data.version, log_data.branch
)
self._log_tree.create_node(
identifier=identifier,
tag=log_entry_id,
data=log_data,
parent=prev_tree_node,
)
# Update the node's cached level in the log tree.
if prev_tree_node is None:
# root node
self._levels[identifier] = 0
else:
self._levels[identifier] = (
self._levels[prev_tree_node.identifier] + 1
)
return version, current_branch
[docs]
def get_log(
self,
branch: str,
version: Optional[int] = None,
return_ids: bool = False,
) -> List[SCHEMA]:
"""Return the log sorted in descending order for the given branch.
If a version number is provided, the log will start at the collection
version identified by `version` and `branch`, otherwise, the
returned log will contain the entire history from the top of the
branch to the root of the log tree.
:raises ValueError: If there is no node on the given branch,
or if there is no node identified by `version` and `branch`.
:param branch: The branch on which for which to retrieve the log.
:param version: The version where to start the log from.
:param return_ids: Whether to include the ids of the log documents as
well.
:return: A list of log entries in descending order, i.e., the latest
version at top.
"""
if self._log_tree is None:
return []
if version is None:
nodes = self._log_tree.leaves()
leaf = None
for node in nodes:
if node.data.branch == branch:
leaf = node
if leaf is None:
raise ValueError(
f"Invalid branch name {branch}! No branches named {branch} "
f"were found in the log tree."
)
else:
n_id = self._get_log_tree_identifier(version=version, branch=branch)
# Here leaf means leaf of the returned log, not necessarily a
# leaf in the log tree.
leaf = self._log_tree.get_node(n_id)
if leaf is None:
raise ValueError(
f"Invalid version (version: {version}, branch: {branch})!"
f"No such version exists in the log tree."
)
if return_ids:
leaf.data._id = leaf.tag
entries: List[LogsCollection.SCHEMA] = [leaf.data]
parent: Node = self._log_tree.parent(leaf.identifier)
while parent is not None:
data = parent.data
if return_ids:
data._id = parent.tag
entries.append(data)
parent = self._log_tree.parent(parent.identifier)
return entries
[docs]
def get_log_entry(self, version: int, branch: str) -> Optional[SCHEMA]:
"""Return the entry for the given version in the log tree.
:param version: The version for which the entry will be retrieved.
:param branch: The branch for which the entry will be retrieved.
:return: The entry in the log tree if it exists, ``None`` if the
version or the log tree does not exist.
"""
if self._log_tree is None:
return None
nid = self._get_log_tree_identifier(version, branch)
node = self._log_tree.get_node(nid)
return None if node is None else node.data
[docs]
def get_log_doc_id(self, version: int, branch: str) -> Optional[ObjectId]:
"""Return the id of the document with the given version.
:param version: The version of the log entry.
:param branch: The branch of the log entry.
:return: The id of the log document in this collection, ``None`` if the
version or the log tree does not exist.
"""
if self._log_tree is None:
return None
nid = self._get_log_tree_identifier(version, branch)
node = self._log_tree.get_node(nid)
return None if node is None else node.tag
[docs]
def get_parent_branch(self, branch: str) -> Optional[str]:
"""Retrieve the parent branch of the branch identified by `branch`.
.. note::
Branches are just unliked pointer documents. The branching structure
of the collection is reflected in the log tree.
:raises BranchNotFound: If no branch with the given name exists.
:raises InvalidCollectionState: If the given branch does not have a
parent.
:param branch: The name of the branch whose parent should be returned.
:return: The parent branch, or ``None`` is ``branch='main'``.
"""
if branch == 'main':
return None
try:
node = self._log_tree.parent(
self._get_log_tree_identifier(0, branch)
)
except NodeIDAbsentError:
raise BranchNotFound(branch)
# The only valid way for this to happen is if there are multiple roots,
# but this is checked during loading the log tree.
assert node is not None, f"Branch {branch} does not have a parent"
return node.data.branch
[docs]
def get_parent_version(
self, version: Tuple[int, str]
) -> Optional[Tuple[int, str]]:
"""Return the version and branch of the previous version.
:raises InvalidCollectionVersion: If the version identified by `version`
does not exist.
:param version: The version whose parent should be retrieved.
:return: The parent of the given version.
"""
if version == (0, 'main'):
return None
version = self._get_log_tree_identifier(*version)
try:
node = self._log_tree.parent(version)
except NodeIDAbsentError as e:
raise InvalidCollectionVersion(
version=version['version'],
branch=version['branch'],
) from e
assert node is not None, f"Version '{version}' does not have a parent."
return node.data.version, node.data.branch
[docs]
def rebranch(self, version: Tuple[int, str], new_branch: str) -> None:
"""Move the versions starting at `version` to a new branch.
This updates the `branch` field of the log entries to be
`new_branch` and resets the `version` counter field for `version`.
:raises ValueError: If `version` is the root version.
:param version: The versions at which the new branch should start.
:param new_branch: The name of the new branch.
"""
if version == (0, "main"):
raise ValueError("Cannot rebranch the root of the version tree.")
node: Node = self._log_tree.parent(
self._get_log_tree_identifier(*version)
)
# Update the cached log tree
doc_ids = []
versions = 0
children = self._log_tree.children(node.identifier)
while len(children) > 0:
for child in children:
# Follow the branch
if child.data.branch != version[1]:
continue
child.data.branch = new_branch
child.data.version = versions
new_id = self._get_log_tree_identifier(versions, new_branch)
self._levels[new_id] = self._levels.pop(child.identifier)
# Update the node identifier in cache
self._log_tree.update_node(child.identifier, identifier=new_id)
versions += 1
doc_ids.append(child.tag)
node = child
children = self._log_tree.children(node.identifier)
# Update the database
self.update_many(
{'_id': {"$in": doc_ids}},
{"$set": {'branch': new_branch}, "$inc": {'version': -version[0]}},
)
[docs]
def delete_subtree(self, version: Tuple[int, str]) -> None:
"""Delete the subtree of the version tree rooted in `version`.
:raises InvalidCollectionVersion: If `version` does not exist.
:param version: The root version of the subtree to delete.
"""
if version == (0, 'main'):
self.reset()
return
version_identifier = self._get_log_tree_identifier(*version)
try:
parent_node = self._log_tree.parent(version_identifier)
except NodeIDAbsentError as e:
raise InvalidCollectionVersion(
version=version[0],
branch=version[1],
) from e
tree = self._log_tree.subtree(self._get_log_tree_identifier(*version))
paths_to_leaves = tree.paths_to_leaves()
versions = {v for paths in paths_to_leaves for v in paths}
cond = {"$or": list(versions)}
version_db_id = tree.get_node(tree.root).tag
# Delete the entries from the database and cache
self._log_tree.remove_subtree(version_identifier)
self.delete_many(cond)
# Update the parent
parent_node.data.next.remove(version_db_id)
self.find_one_and_update(
filter={'_id': parent_node.tag},
update={"$set": {"next": parent_node.data.next}},
)
for version in versions:
self._levels.pop(version, None)
[docs]
def get_versions_of_branch_tips(
self, version: Tuple[int, str]
) -> List[Tuple[int, str]]:
"""Get the versions of the leaves of the subtree rooted in `version`.
:param version: A version in the log tree.
:return: The versions of the tip of the branches of the log subtree
rooted in `version`.
"""
tree = self._log_tree.subtree(self._get_log_tree_identifier(*version))
leaves = [
(p[-1]['version'], p[-1]['branch']) for p in tree.paths_to_leaves()
]
return leaves