import queue
import datetime
from multiprocessing import Process, Value, Queue, Lock
from time import sleep
from typing import Optional, Tuple
from bson import ObjectId
from pymongo import MongoClient
from pymongo.change_stream import CollectionChangeStream
from versioned_collection.collection.tracking_collections import (
ModifiedCollection,
)
[docs]
class CollectionListener:
"""Listens to changes to a specific collection.
Starts a background process that monitors the target collection for any
changes and stores the ids of the updated documents in the
`ModifiedCollection` linked to the target collection.
"""
def __init__(
self,
database_name: str,
collection_name: str,
host: str = 'localhost',
port: int = 27017,
credentials: Optional[Tuple[Optional[str], Optional[str]]] = None,
) -> None:
"""Initialise a :class:`CollectionListener`.
:param database_name: The name of the database on which the target
collection is located.
:param collection_name: The name of the collection to listen to.
:param host: The host of the MongoDB server.
:param port: The port on which to connect to the MongoDB server.
:param credentials: The username and password of a valid user with
access to the database.
"""
self._database_name = database_name
self._collection_name = collection_name
self.__credentials = (
(None, None) if credentials is None else credentials
)
self._address = host, port
self._p: Optional[Process] = None
self._HEARTBEAT_TIMEOUT = 0.05
# Listener synchronisation helpers
self._listening: Value = None
self._lock: Lock = None
self._timestamp_q: Optional[Queue] = None
self._heartbeat_q: Optional[Queue] = None
# Start the listener
self.start()
def __del__(self):
self.stop()
[docs]
def is_listening(self) -> bool:
"""Check if this listener has started listening to changes.
:return: Whether the listener is listening or not.
"""
if self._listening is None:
return False
return bool(self._listening.value)
[docs]
def stop(self) -> None:
"""Stop this listener from monitoring the target collection.
The listener is safely stopped to allow the changes (that were produced
before signaling the listener to stop) to be processed. This is
relevant when a large number of entries are modified in any way from
the tracked collection in a transaction, or when unacknowledged
operations are performed and the database queues the operations to be
performed.
"""
# TODO: Stop the listener without killing the process.
# Here we have only to stop the change stream and ignore any
# upcoming events for the duration the listener was paused.
# This should basically record the cluster time when stop was called,
# and ignore all the events that happen after that time and before
# the time the listener is resumed.
if not self.is_listening():
return
with self._lock:
timestamp = datetime.datetime.now(datetime.timezone.utc)
self._listening.value = False
self._timestamp_q.put(timestamp)
# Wait for the listener to finish consuming the valid changes from
# the change stream.
while True:
try:
self._heartbeat_q.get(
block=True,
timeout=self._HEARTBEAT_TIMEOUT,
)
except queue.Empty:
# The heartbeat timeout expired, so the listener is
# probably idle, therefore kill it.
break
self._p.terminate()
self._p.join()
[docs]
def start(self) -> None:
"""Start the listener to monitor the target collection.
Lunches a monitoring daemon that uses `changeStreams` to watch the
target collection for all types of updates.
Blocks until the listener daemon has successfully started to prevent
the client from modifying the target collection.
"""
if self.is_listening():
return
self._listening = Value('b', False)
self._timestamp_q = Queue()
self._lock = Lock()
self._heartbeat_q = Queue()
self._p = Process(
target=self._listen,
args=(
self._database_name,
self._collection_name,
*self._address,
*self.__credentials,
self._listening,
self._timestamp_q,
self._heartbeat_q,
self._lock,
),
)
self._p.daemon = True
self._p.start()
# Block until the listener started.
while True:
sleep(0.001)
if self.is_listening():
break
@staticmethod
def _listen(
database_name: str,
collection_name: str,
host: str,
port: int,
username: Optional[str],
password: Optional[str],
listening: Value,
last_timestamp: Queue,
heartbeat_q: Queue,
lock: Lock,
) -> None:
"""Listen in a background task.
Opens a client connection to the given database and starts watching
the collection identified by `collection_name`. The ids of the
modified documents are inserted as standalone documents into the
`__modified_<collection_name>` collection.
For a description of the synchronisation mechanisms see the
documentation of :meth:`stop` method.
:param database_name: The name of the database to connect to.
:param collection_name: The name of the collection to watch for changes.
:param host: The host where the database is located.
:param port: The port at which the database can be accessed.
:param listening: Whether the listener listens (or should listen) to
the collection or not.
:param last_timestamp: The time after which changes should be ignored
:param heartbeat_q: A channel for sending heartbeats to the parent
process.
:param lock: A lock used to synchronise the shared variables.
"""
client = MongoClient(
host=host,
port=port,
username=username,
password=password,
directConnection=True,
)
with client:
target_collection = client[database_name][collection_name]
_output_collection = ModifiedCollection(
database=client[database_name],
parent_collection_name=collection_name,
)
with target_collection.watch() as change_stream:
listening.value = True
try:
CollectionListener.__listen(
change_stream=change_stream,
output_collection=_output_collection,
listening=listening,
heartbeat_q=heartbeat_q,
last_timestamp=last_timestamp,
lock=lock,
)
except KeyboardInterrupt:
# Gracefully exit. The synchronisation with the main process
# is done in :meth:`stop`.
pass
@staticmethod
def __listen(
change_stream: CollectionChangeStream,
output_collection: ModifiedCollection,
listening: Value,
heartbeat_q: Queue,
last_timestamp: Queue,
lock: Lock,
) -> None:
docs = []
timestamp = None
batch_size = 100
for change in change_stream:
with lock:
# The timestamp is sent only once by the parent process
# before terminating the process
try:
timestamp = last_timestamp.get(block=False)
except queue.Empty:
pass
# Process all changes that happened before the time the
# stop listening 'signal' was sent. This allows properly
# processing the pending changes that queued before being
# streamed through the change stream by mongo.
if timestamp is not None:
change_time = change['clusterTime'].as_datetime()
change_time = change_time.replace(tzinfo=None)
if change_time > timestamp:
# stop listening
if len(docs):
output_collection.insert_many(docs)
docs = []
break
# Send heartbeats to the parent process to signal that
# this process is still processing the pending changes.
if not listening.value:
heartbeat_q.put(0)
try:
document_id = change["documentKey"]['_id']
op_type = change["operationType"][0]
if op_type == 'r':
op_type = 'u'
# Manually generate ids to keep the order of the events
# and allow parallel insertion in database
docs.append({
'_id': ObjectId(),
'id': document_id,
'op': op_type,
})
if (
not change_stream._cursor._has_next() # noqa
or len(docs) > batch_size # noqa
):
output_collection.insert_many(docs)
docs = []
except KeyError:
# not really needed, but just in case the change stream
# hangs
break
if len(docs) > 0:
output_collection.insert_many(docs)