The MongoDB Event stream source works similar to Cinchy's Change Data Capture functionality. The listener subscribes to monitor the change stream of a specific collection in the database of the MongoDB server. Any actions performed on document(s) inside of that collection are picked up by the listener and sent to the queue.
Limitations
To use change streams in MongoDB, there are a few requirements your environment must meet.
The database must use the WiredTiger storage engine.
The replica set or sharded cluster must use replica set protocol version 1.
The MongoDB Event source supports real-time syncs.
Info tab
You can find the parameters in the Info tab below (Image 1).
Values
Parameter
Description
Example
Title
Mandatory. Input a name for your data sync
Mongo Event to Cinchy
Variables
Optional. Review our documentation on Variables here for more information about this field.
Permissions
Data syncs are role based access systems where you can give specific groups read, write, execute, and/or all of the above with admin access. Inputting at least an Admin Group is mandatory.
Source tab
The following table outlines the mandatory and optional parameters you will find on the Source tab (Image 2).
The following parameters will help to define your data sync source and how it functions.
Parameter
Description
Example
Source
Mandatory. Select your source from the drop down menu.
MongoDB Event
To set up a real-time sync, you must configure your Listener values. You can do so through the Connections UI.
Note that If there is more than one listener associated with your data sync, you will need to configure the addition listeners via the Listener Configuration table.
Reset Behaviour
Parameter
Description
Example
Auto Offset Reset
Earliest, Latest or None.
In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from.
Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy.
Latest will fetch the last value after whatever was last processed. This is the typical configuration.
None won't read or start reading any events.
You are able to switch between Auto Offset Reset types after your initial configuration through the process outlined here.
None
Topic JSON
The below table can be used to help create your Topic JSON needed to set up a real-time sync.
Optional. This parameter allows you to specify pipeline stages with filters.
In MongoDB, an aggregation pipeline consists of one or more stages that process documents:
Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values.
The documents that are output from a stage are passed to the next stage.
An aggregation pipeline can return results for groups of documents. For example, return the total, average, maximum, and minimum values.
See the Example Topic JSON below.
Our example config uses a filter to return documents with an ID between 0 and 10,000 AND documents with the location set to Montreal, OR where the operation type is 'delete'
Optional. You may choose to add a description to your column.
Select Show Advanced for more options for the Schema section.
Parameter
Description
Example
Mandatory
If both Mandatory and Validatedare checked on a column, then rows where the column is empty are rejected
If just Mandatory is checked on a column, then all rows are synced with the execution log status of failed, and the source error of "Mandatory Rule Violation"
If just Validated is checked on a column, then all rows are synced.
Validate Data
If both Mandatory and Validatedare checked on a column, then rows where the column is empty are rejected
If just Validated is checked on a column, then all rows are synced.
Trim Whitespace
Optional if data type = text. For Text data types, you can choose whether to trim the whitespace._
Max Length
Optional if data type = text. You can input a numerical value in this field that represents the maximum length of the data that can be synced in your column. If the value is exceeded, the row will be rejected (you can find this error in the Execution Log).
You can choose to add in a Transformation > String Replacement by inputting the following:
Parameter
Description
Example
Pattern
Mandatory if using a Transformation. The pattern for your string replacement.
Replacement
What you want to replace your pattern with.
Note that you can have more than one String Replacement