versioned_collection package

class versioned_collection.VersionedCollection(*args: Any, **kwargs: Any)[source]

Bases: Collection

A tracked and versioned MongoDB collection.

Warning

All the interactions with the collection should be done through this class, and not by directly accessing the collection using the pymongo driver. An exception to this is when the listener is started using the CLI via vc listen.

Warning

Note that outputting the result of an aggregation pipeline directly into the versioned collection using the $out or $merge stages will not track the changes. This is caused by how those commands are processed by MongoDB itself. For instance, $out creates a temporary collection, drops the original collection and then renames the temporary collection.

To enable versioning on a collection, create a class that inherits from VersionedCollection, or create a VersionedCollection object and pass the name of the collection as well.

Usage example:

import pymongo
from versioned_collection import VersionedCollection

class Users(VersionedCollection):
    pass

client = pymongo.MongoClient("mongodb://localhost:27017")
db = client['database_name']

user_collection = Users(db)
# OR
same_db_collection = VersionedCollection(db, name='users')

Under the hood, the VersionedCollection module uses pymongo to manage the interactions with the database, so all the available features and commands available in pymongo are available with a VersionedCollection at no extra cost.

aggregate(pipeline, *args, **kwargs)[source]
aggregate_raw_batches(pipeline, *args, **kwargs)[source]
property branch: str | None

Get the current branch name of this collection.

If the collection is not tracked, it returns None.

branches() Set[str][source]

Return the names of the existing branches.

Returns:

A set containing the name of all branches registered on the collection. If the collection is not initialised for tracking, an empty set is returned.

bulk_write(*args, **kwargs)[source]
checkout(version: int | None = None, branch: str | None = None) bool[source]

Update the collection to match the state of the given version.

Collection versions have to exist (be registered) before checking them out.

If changes were made since the latest registered version, they have to be discarded, registered or stashed, before checking out to another version.

For checking out versions of the collection on the same branch as the working branch, the branch parameter can be skipped. For checking out the version the head of the branch is pointing to, the version parameter can be omitted.

col = VersionedCollection(db, name='col')
col.init('v0')  # created version 0 on 'main'
col.insert_one({'doc': 'example'})
col.register('v1')  # created version 1 on 'main'

# checkout to v0
col.checkout(0)  # now at version 0 on 'main'

col.create_branch('branch1')  # on 'branch1', no versions registered
col.create_branch('branch2')  # on 'branch2'
col.checkout(0, 'main')  # on branch 'main' at version 0
col.checkout(branch='branch1')  # on branch 'branch1'
col.checkout(branch='branch2')  # on branch 'branch2'

col.checkout(branch='main')  # now at version 1 on 'main'
Raises:
  • InvalidCollectionVersion – If given version does not match any recorded versions.

  • InvalidOperation – If called is called when the collection has unregistered changes.

  • ValueError – If called without providing at least one argument.

Parameters:
  • version – The version of the collection to be checked out.

  • branch – The branch of the collection to check out to.

Returns:

True if the operation succeeds, False if the checkout is not performed, but no errors were raised.

create_branch(branch_name: str) Tuple[int, str] | None[source]

Create a branch with the given name and checks out to it.

When creating a new branch changes are allowed to exist since the last registered version. This allows checking out a previous version of the collection on any branch (other version than the version the branch’s head points to), modifying the collection and then registering the new changes as a new version on a new branch.

After creating a new branch, the version of the collection is set to -1, indicating that there are no versions registered on the newly created branch.

>>> collection: VersionedCollection # assume it exists in scope
>>> collection.version, collection.branch
(10, 'main')
>>> collection.checkout(5)
True
>>> collection.create_branch('branch')
(5, 'main')
>>> collection.version, collection.branch
(-1, 'branch')
>>> collection.is_detached()
False
Raises:

ValueError

  • If branch_name starts with __;

  • If a branch with name branch_name already exists.

Parameters:

branch_name – The name of the new branch. Can be any string, but it cannot start with double underscore (__).

Returns:

The version id and branch name of the version the new branch points to, i.e., the previous position of the head, if the collection is initialised for versioning, None otherwise.

delete_many(*args, **kwargs)[source]
delete_one(*args, **kwargs)[source]
delete_version_subtree(version: int, branch: str | None = None) bool[source]

Delete a version and all versions registered after it.

Warning

This deletes the subtree of the version tree rooted in version (version, branch), and does not just remove a version in the middle of a branch.

Warning

Deleting the root of the version tree is equivalent to dropping the collection. After this step, the collection is uninitialised for tracking, so init() has to be called again on it.

If the collection is checked out on the branch and a version that needs to be deleted, all the changes made to the collection are discarded, as well. Also, in the same case, the state of the collection will be rolled back to the parent’s version state of the given version.

Raises:

InvalidCollectionVersion – If the given version does not exist.

Parameters:
  • version – The version id of the version that will be removed.

  • branch – The branch on which the versions to be deleted are located. If no branch name is given, branch is assumed to be the current branch.

Returns:

True if the versions were successfully removed, False otherwise.

diff(version: int | None = None, branch: str | None = None, deep: Literal[False] = False, direction: Literal['to', 'from', 'bidirectional'] = 'from') Dict[Any, str] | None[source]
diff(version: int | None = None, branch: str | None = None, deep: Literal[True] = True, direction: Literal['to', 'from', 'bidirectional'] = 'from') Dict[Any, deepdiff.DeepDiff] | None
diff(version: int | None = None, branch: str | None = None, deep: Literal[False] = False, direction: Literal['to', 'from', 'bidirectional'] = 'bidirectional') Dict[Literal['to', 'from'], Dict[Any, str]] | None
diff(version: int | None = None, branch: str | None = None, deep: Literal[True] = True, direction: Literal['to', 'from', 'bidirectional'] = 'bidirectional') Dict[Literal['to', 'from'], Dict[Any, deepdiff.DeepDiff]] | None

Return the diffs between the current and the given version.

If no version id or branch are given, this method computes the diffs between the current working version and the last version registered.

If the version parameter is omitted and the branch parameter is given, then the target version is considered to be the version the branch’s branch pointer is pointing to.

If the branch parameter is omitted and the version parameter is given, then the target version is considered to be version with id version from the current branch.

Note

Passing deep=True can consume a large volume of memory for large collection diffs since each diff stores both versions of a document.

Examples: .. code-block:: python

>>> collection: VersionedCollection  # assume this exists in scope
>>> collection.diff()
<diffs between the current state and latest version registered>
>>> collection.diff(0, 'main')
<diffs between current state and version 0 on branch 'main'>
>>> collection.diff(2)
<diffs between current state and version 0 on the current branch>
>>> collection.diff(branch='branch')
<diffs between current state and the latest version from 'branch'>
>>> print(collection.diff(structural=True))
<pretty structural diff>
>>> collection.diff(0, 'main', direction='to')
<diff from the current version to version 0 on branch main>
Raises:

InvalidCollectionVersion – If the given version does not exist.

Parameters:
  • version – The version to compare the current version with.

  • branch – The branch on which the version to compare the current version with is registered on.

  • deep – Whether to compute the class:DeepDiff object containing the deep differences between the objects or a structural diff ( printable, similar to git diffs). Defaults to False. the deep differences between the objects.

  • direction – The direction in which to compute the diff. When equal to 'to', the current version is considered the reference version and the diffs represent the changes made to current collection state to reach the target collection state. When equal to 'from', the given version is considered the reference version. When equal to 'bidirectional', both forward and backward diffs are computed and returned. Defaults to 'from'.

Returns:

The structural or deep diffs of the modified documents, grouped by their ids, in case of unidirectional diffs. In the case of bidirectional diffs, it returns the diffs grouped by the modified document id and grouped by the direction. If the collection is not tracked, returns None.

discard_changes() bool[source]

Discard the changes made to the collection.

After discarding the changes, the collection will return to the state of the previous registered version.

If the changes made to the collection should be temporarily and safely stored, consider calling stash().

Raises:

InvalidCollectionState – If there exists documents modified via invalid operations.

Returns:

Whether the operation was successfully executed or not.

drop(*args, **kwargs) None[source]

Drop this versioned collection.

In case this collection is being tracked, it also removes all the tracking information.

Warning

Calling this method is the only valid way of properly dropping a tracked collection. Calling db.drop_collection(name) will result in the removal of this collection only.

find_one_and_delete(*args, **kwargs)[source]
find_one_and_replace(*args, **kwargs)[source]
find_one_and_update(*args, **kwargs)[source]
get_log(branch: str | None = None) List[SCHEMA][source]

Return the log of this collection for the given branch.

The returned history is in descending order (the latest entry first). The first entry will correspond to the previous registered version on the given branch, with respect to the current version.

Raises:

BranchNotFound – If no branch with the given name exists.

Parameters:

branch – The name of the branch for which to get the history. If it is not provided, this defaults to the current branch

Returns:

The history for the specified branch.

has_changes() bool[source]

Return whether this collection has unregistered changes.

has_conflicts() bool[source]

Return whether this collection has unresolved conflicts.

has_stash() bool[source]

Return whether this collection has stashed changes.

init(message: str | None = None) None[source]

Initialise this collection for tracking.

Creates a snapshot of the current state of the collection and initialises the collection used for tracking this collection. The current version of this collection is recorded as version 0.

A versioned collection can be initialised only once. For registering another version of the collection, call register().

Usage example:

collection = VersionedCollection(db, 'my_collection')
collection.init('Initial version.')
Raises:

CollectionAlreadyInitialised – If this collection has already been initialised.

Parameters:

message – A short description of the initial state of the collection.

insert_many(*args, **kwargs)[source]
insert_one(*args, **kwargs)[source]
is_detached() bool[source]

Return whether this collection is in the detached head mode.

is_tracked() bool[source]

Return whether this collection is initialised for versioning.

pull(remote_collection: VersionedCollection, branch: str | None = None) bool[source]

Pull a branch from a remote collection to this collection.

Pulling allows downloading a single branch at a time and does not pull the entire version tree of the remote collection. If this is desired, then it can be achieved by iteratively pulling the all branches of the remote collection.

Warning

If the local and remote versions of branch have diverged and more branches were created locally on branch branch after the divergence point, all that data will be lost after branch is pulled.

Note

If the local and remote collection’s versions of branch have diverged and in both versions a document containing the same data has been added to the local and remote collections, resulting in two documents with different ids, but the same data, then pulling the remote branch will result in having duplicated documents, since the auto-merge will be successful since the two documents have different ids.

If branch is the current branch and the head is attached, i.e., the collection is checked out at the last version on branch, then after the branch is pulled the collection will be checked out at the last version of the newly pulled branch. If other branch, different from the current branch, is pulled or the collection is in detached mode, then the after the branch is pulled, the collection will be at the same version as it was before calling this method.

Note

If the auto-merging of the local and remote versions of branch has failed due to merge conflicts and the local collection had unregistered changes, a warning will be displayed, notifying the version at which the collection was checked out when pull() was called and that the modified data from that version is saved in the stash area. After solving the conflicts, the user should manually check out the that version and apply the stash, or discard it.

Note

This method locks both the remote and the local collections, so none of the collections can perform other versioning operations until the synchronisation is finished.

Raises:
  • InvalidOperation

    • If trying to pull from the same collection into itself;

    • If the collection is in detached mode and branch is not given.

    • If the collection is checked out to the head of local branch, but the collection has changes;

    • If branch is not a branch of the remote_collection;

    • If this collection and the remote_collection have diverging initial versions, i.e., they were initialised independently and not properly synchronised using pull() or push();

    • If the local and remote versions of branch have diverged, the local collection has data in the stash area and the local collection has unregistered changes. Automatic stashing is possible, but since there is already data in the stashing area, that data will be lost, so an error is raised to manually correct it.

  • ValueError – If the name of the remote_collection is different from the name of this collection.

  • AutoMergeFailedError – If the auto-merging the local and remote versions of branch resulted in merge conflicts.

  • Exception – _ignored

Parameters:
  • remote_collection – The remote VersionedCollection from which to download a branch.

  • branch – The name of the branch of the remote collection to pull. If omitted, it defaults to the current branch of this collection.

Returns:

False if the remote collection is not initialised, True if everything is up-to-date or the pull operation has finished successfully.

push(remote_collection: VersionedCollection, branch: str | None = None, do_checkout: bool = True) bool[source]

Push a branch of this collection to a remote collection.

If the remote collection is checked out on branch branch, by default, upon pushing, the remote’s collection state is updated, and it is checked out to the latest version pushed. To change this behaviour set do_checkout=False.

Warning

This does not perform a remote collection validation to check if the local and the remote collections are of the same ‘type’. If the local and remote collections have the same name, a branch that is not present in the remote collection can be pushed.

This method initialises the remote collection in case it is not, e.g., the collection is pushed for the first time. This is the preferred method of doing it, since manually initialising the remote calling init() will cause in discrepancies between the two collections and pushes will be denied.

Note

This method locks both the remote and the local collections, so none of the collections can perform other versioning operations until the synchronisation is finished.

Raises:
  • InvalidOperation

    • If trying to push from a collection to itself;

    • If trying to push when the collection’s head is detached and no branch parameter is provided;

    • If the remote branch has changes that are not present on the local branch;

    • If the remote collection is not initialised;

    • If the local collection has more than one version registered,

      the local collection has data into the stash area and the local collection has unregistered changes.

  • ValueError – When the remote collection has a different name and type compared to this collection.

  • Exception – _ignore.

Parameters:
  • remote_collection – The versioned collection on which the branch will be pushed. This can be a collection from the same database as the current collection, on other database, or even on a different host server.

  • branch – The branch to push to the remote collection. If it is omitted, the currently checked out branch is considered for being pushed. If the collection’s head is detached this must be provided.

  • do_checkout – Whether to update the remote collection to the latest pushed version if remote_collection is checked out at the tip of branch branch.

Returns:

False if this collection is not tracked, otherwise True if the operation completed successfully, or the remote branch is up-to-date.

register(message: str, branch_name: str | None = None) bool[source]

Register a new version of this collection.

When the head is detached, a new branch with name branch_name is created pointing to the currently checked out version. This is equivalent to calling create_branch() with branch_name as paramenter and then registering the new version of the collection. If the head is attached, i.e., it points to the latest version on the current branch, then the branch_name parameter is ignored and this will not register the version on a new branch.

>>> collection: VersionedCollection # assume it exists in scope
>>> collection.branch, collection.has_changes()
('main', True)
>>> collection.register('New version')
True
>>> collection.version
2
>>> collection.checkout(1)
True
>>> collection.insert_one({'example': 'doc'})
>>> collection.register('Another version', branch_name='new')
>>> collection.version, collection.branch
(0, 'new')
Raises:

ValueError – If no branch name parameter is provided when the head is detached or if a branch with name branch_name already exists.

Parameters:
  • message – The message associated with the new version of the collection.

  • branch_name – The name of the branch on which to register the new version. This is ignored if the head is not detached.

Returns:

Whether the collection was successfully registered.

rename(new_name: str, *args, **kwargs) VersionedCollection[source]

Rename this collection and the tracking collections.

The rename operation returns a new collection.

Usage example:

collection = VersionedCollection(db, 'usrs')
collection = collection.rename(new_name='users')

See the rename() method of the superclass for more information.

Parameters:
  • new_name – The new name of the collection.

  • args – The rest of the args.

  • kwargs – The rest of the kwargs.

Returns:

a new instance of VersionedCollection.

replace_one(*args, **kwargs)[source]
resolve_conflicts(discard_local_changes: bool = False) bool[source]

Interactively resolve the merge conflicts.

A GUI conflict resolver will pop up for each conflicting document. You will view three columns: the one in the left of the screen represents the destination or the remote version of the document, the one in the middle represents the auto-merged document with conflicts, and reflects the current state of the document in this collection, and finally, the rightmost column shows the source or local version of the document.

The GUI of the merge tool can be used to automatically edit and integrate the changes, but it also serves as a full text editor, so in that the suggested conflict resolution does not satisfy the requirements, the document can be manually edited. Note that the remote and local files cannot and should not be edited, because the changes are ignored

To move to the next conflict in another document make sure you save the document (by pressing on one of the save icons or pressing Ctrl+s) and then close the merge tool.

See also

Meld merge tool.

Parameters:

discard_local_changes – Whether to ignore the local changes of the conflicting documents.

Returns:

True if the operation ended successfully, False otherwise, or if there were no conflicts to resolve.

stash(overwrite: bool = False) bool[source]

Stash the changes made to this collection.

Only a single set of changes can be stashed at any time. Calling this method multiple times without restoring the stashed data first will either raise an exception or will overwrite the previously stored stash data if overwrite=True.

>>> collection: VersionedCollection # assume it exists in scope
>>> collection.status()
{'tracked': False}
>>> collection.init()
True
>>> collection.stash(), collection.has_changes()
(False, False)
>>> collection.insert_one({'field': 'value'})
ObjectId('54f112defba522406c9cc207')
>>> collection.has_changes()
True
>>> collection.stash()
True
>>> collection.has_changes()
False
>>> collection.count_documents({})
0
>>> collection.insert_one({'field': 'new value'})
ObjectId('54f112defba522406c9cc208')
>>> try:
...     collection.stash()
... except InvalidOperation:
...     print("Stash blocked")
...
Stash blocked
>>> collection.stash(overwrite=True)
True
>>> collection.count_documents({})
0
>>> collection.stash_apply()
True
>>> collection.find_one({})
{'_id': ObjectId('54f112defba522406c9cc208'), 'field': 'new value'}
Raises:

InvalidOperation – When overwrite=False and there exists another stash.

Parameters:

overwrite – Whether to overwrite the existing stash space.

Returns:

True if the changes were stashed, False if the collection is not tracked or there is nothing to stash.

stash_apply() bool[source]

Apply the stashed changes over the currently checked out version.

Warning

Applying the stashed changes will overwrite the existing documents.

>>> collection: VersionedCollection # assume it exists in scope
>>> collection.has_changes()
True
>>> collection.stash()
True
>>> collection.checkout(0, 'main')
True
>>> collection.has_changes()
False
>>> collection.stash_apply()
True
>>> collection.has_changes()
True
Raises:

InvalidOperation – If the collection has changes.

Returns:

True if the stash is successfully applied, False if the collection is not tracked or there is no stash to apply.

stash_discard() bool[source]

Remove the stashed data from the stash area.

After this method is called the stash will be empty and cannot be recovered.

Note

Stashing and discarding the stashed data is equivalent to calling discard_changes().

>>> collection: VersionedCollection # assume it exists in scope
>>> collection.has_changes()
True
>>> collection.stash()
True
>>> collection.stash_discard()
True
>>> collection.has_changes()
True
Returns:

True if the stash is successfully discarded, False if it does not exist or the collection is not tracked.

status() Dict[str, str | bool | int][source]

Return the status of this collection.

update_many(*args, **kwargs)[source]
update_one(*args, **kwargs)[source]
property version: int | None

Get the current version id of this collection.

If the collection is not tracked, it returns None.

property version_tree: Tree | None

Get the tree of register versions of this collection.

Subpackages

Submodules