Cinchy Platform Documentation
Cinchy v5.6


1. Overview

The MongoDB stream source works similar to Cinchy's Change Data Capture functionality. The listener subscribes to monitor the change stream of a specific collection in the database of the MongoDB server. Any actions performed on document(s) inside of that collection are picked up by the listener and sent to the queue.

1.1 Limitations

In order to use change streams in MongoDB, there are a few requirements your environment must meet.

2. The Listener Config Table

To set up an Stream Source, you must navigate to the Listener Config table and insert a new row for your data sync (Image 1). Most of the columns within the Listener Config table persist across all Stream Sources, however exceptions will be noted. You can find all of these parameters and their relevant descriptions in the tables below.
Image 1: The Listener Config table
General Parameters
Connection Attributes
The following column parameters can be found in the Listener Config table:
Mandatory. Provide a name for your Listener Config.
MongoDB Real-Time Sync
Event Connector Type
Mandatory. Select your Connector type from the drop down menu.
Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring.
See the Topic tab.
Connection Attributes
Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring.
See the Connection Attributes tab.
Mandatory. This value refers to whether your listener config/real-time sync is turned on or off. Make sure you keep this set to Disabled until you are confident you have the rest of your data sync properly configured first.
Data Sync Config
Mandatory. This drop down will list all of the data syncs on your platform. Select the one that you want to use for your real-time sync.
MongoDB Data Sync
Subscription Expires On
This value is only relevant for Salesforce Stream Sources. This field is a timestamp that is auto-populated when it has successfully subscribed to a topic.
Leave this value blank when setting up your configuration. This field will auto-populate during the running of your sync with any relevant messages. For instance "Cinchy listener is running", or "Listener is disabled".
Auto Offset Reset
Earliest, Latest or None. In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from.
Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy. Latest will fetch the last value after whatever was last processed. This is the typical configuration. None will not read start reading any events. You are able to switch between Auto Offset Reset types after your initial configuration through the below steps: 1. Navigate to the Listener Config table. 2. Re-configure the Auto Offset Reset value. 3. Set the "Status" column of the Listener Config to "Disabled". 4. Navigate to the Event Listener State table. 5. Find the column that pertains to your data sync's Listener Config and delete it. 6. Navigate back to the Listener Config table. 7. Set the "Status" column of the Listener Config to "Enabled" in order for your new Auto Offset Reset configuration to take effect.
The below table can be used to help create your Topic JSON needed to set up a real-time sync.
Mandatory. The name of your MongoDB database.
Mandatory. The name of your MongoDB collection.
Pipeline Stages
Optional. This parameter allows you to specify pipeline stages with filters.
In MongoDB, an aggregation pipeline consists of one or more stages that process documents:
  • Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values.
  • The documents that are output from a stage are passed to the next stage.
  • An aggregation pipeline can return results for groups of documents. For example, return the total, average, maximum, and minimum values.
See the Example Topic JSON below. Our example config uses a filter to return documents with an ID between 0 and 10,000 AND documents with the location set to Montreal, OR where the operation type is 'delete'
Example Topic JSON
"database": "cinchy",
"collection": "employee",
"changeStreamSettings": {
"pipelineStages": [
"{ $match: {
$or: [
{ $and: [
{ '': { $gt: 0, $lt: 10000 } },
{ 'fullDocument.location': 'Montreal' } ]
{ 'operationType': 'delete' }
} }"
The below table can be used to help create your Connection Attributes JSON needed to set up a real-time sync.
Mandatory. Your MongoDB connection string.
"connectionString": "mongodb://localhost:9877"