The MongoDB Event stream source works similar to Cinchy's Change Data Capture functionality. The listener subscribes to monitor the change stream of a specific collection in the database of the MongoDB server. Any actions performed on document(s) inside of that collection are picked up by the listener and sent to the queue.
To use change streams in MongoDB, there are a few requirements your environment must meet.
The database must be in a replica set or sharded cluster.
The database must use the WiredTiger storage engine.
The replica set or sharded cluster must use replica set protocol version 1.
The MongoDB Event source supports real-time syncs.
You can find the parameters in the Info tab below (Image 1).
Title
Mandatory. Input a name for your data sync
Mongo Event to Cinchy
Variables
Permissions
Data syncs are role based access systems where you can give specific groups read, write, execute, and/or all of the above with admin access. Inputting at least an Admin Group is mandatory.
The following table outlines the mandatory and optional parameters you will find on the Source tab (Image 2).
The following parameters will help to define your data sync source and how it functions.
Source
Mandatory. Select your source from the drop down menu.
MongoDB Event
To set up a real-time sync, you must configure your Listener values. You can do so through the Connections UI.
Note that If there is more than one listener associated with your data sync, you will need to configure the addition listeners via the Listener Configuration table.
Reset Behaviour
Auto Offset Reset
Earliest, Latest or None. In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from.
None
Topic JSON
The below table can be used to help create your Topic JSON needed to set up a real-time sync.
Database
Cinchy
Collection
Employee
Pipeline Stages
Optional. This parameter allows you to specify pipeline stages with filters.
Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values.
The documents that are output from a stage are passed to the next stage.
An aggregation pipeline can return results for groups of documents. For example, return the total, average, maximum, and minimum values.
See the Example Topic JSON below. Our example config uses a filter to return documents with an ID between 0 and 10,000 AND documents with the location set to Montreal, OR where the operation type is 'delete'
Example Topic JSON
Connection Attributes
The below table can be used to help create your Connection Attributes JSON needed to set up a real-time sync.
connectionString
mongodb://localhost:9877
The Schema section is where you define which source columns you want to sync in your connection. You can repeat the values for multiple columns.
Name
Mandatory. The name of your column as it appears in the source. This field is case sensitive and preserves spaces.
Name
Alias
Optional. You may choose to use an alias on your column so that it has a different name in the data sync.
Data Type
Text
Description
Optional. You may choose to add a description to your column.
Select Show Advanced for more options for the Schema section.
Mandatory
If both Mandatory and Validated are checked on a column, then rows where the column is empty are rejected
If just Mandatory is checked on a column, then all rows are synced with the execution log status of failed, and the source error of "Mandatory Rule Violation"
If just Validated is checked on a column, then all rows are synced.
Validate Data
If both Mandatory and Validated are checked on a column, then rows where the column is empty are rejected
If just Validated is checked on a column, then all rows are synced.
Trim Whitespace
Optional if data type = text. For Text data types, you can choose whether to trim the whitespace._
Max Length
Optional if data type = text. You can input a numerical value in this field that represents the maximum length of the data that can be synced in your column. If the value is exceeded, the row will be rejected (you can find this error in the Execution Log).
You can choose to add in a Transformation > String Replacement by inputting the following:
Pattern
Mandatory if using a Transformation. The pattern for your string replacement.
Replacement
What you want to replace your pattern with.
Note that you can have more than one String Replacement
Configure your Destination
Define your Sync Actions.
Add in your Post Sync Scripts, if required.
If more than one listener is needed for a real-time sync, configure it/them via the Listener Config table.
To run a real-time sync, enable your Listener from the Execution tab.
Optional. Review our documentation on for more information about this field.
Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy. Latest will fetch the last value after whatever was last processed. This is the typical configuration. None won't read or start reading any events. You are able to switch between Auto Offset Reset types after your initial configuration through the process outlined
Mandatory. The name of your MongoDB
Mandatory. The name of your MongoDB
In MongoDB, an aggregation pipeline consists of one or more that process documents:
Mandatory. Your MongoDB
Mandatory. The data type of the column values.