MongoDB
Last updated
Last updated
The MongoDB stream source works similar to Cinchy's Change Data Capture functionality. The listener subscribes to monitor the change stream of a specific collection in the database of the MongoDB server. Any actions performed on document(s) inside of that collection are picked up by the listener and sent to the queue.
In order to use change streams in MongoDB, there are a few requirements your environment must meet.
The database must be in a replica set or sharded cluster.
The database must use the WiredTiger storage engine.
The replica set or sharded cluster must use replica set protocol version 1.
To set up an Stream Source, you must navigate to the Listener Config table and insert a new row for your data sync (Image 1). Most of the columns within the Listener Config table persist across all Stream Sources, however exceptions will be noted. You can find all of these parameters and their relevant descriptions in the tables below.
The following column parameters can be found in the Listener Config table:
Parameter | Description | Example |
---|---|---|
Parameter | Description | Example |
---|---|---|
Parameter | Description | Example |
---|---|---|
Name
Mandatory. Provide a name for your Listener Config.
MongoDB Real-Time Sync
Event Connector Type
Mandatory. Select your Connector type from the drop down menu.
MongoDB
Topic
Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring.
See the Topic tab.
Connection Attributes
Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring.
See the Connection Attributes tab.
Status
Mandatory. This value refers to whether your listener config/real-time sync is turned on or off. Make sure you keep this set to Disabled until you are confident you have the rest of your data sync properly configured first.
Disabled
Data Sync Config
Mandatory. This drop down will list all of the data syncs on your platform. Select the one that you want to use for your real-time sync.
MongoDB Data Sync
Subscription Expires On
This value is only relevant for Salesforce Stream Sources. This field is a timestamp that's auto populated when it has successfully subscribed to a topic.
Message
Leave this value blank when setting up your configuration. This field will auto populate during the running of your sync with any relevant messages. For instance Cinchy listener is running, or Listener is disabled.
Auto Offset Reset
Earliest, Latest or None. In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from. Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy. Latest will fetch the last value after whatever was last processed. This is the typical configuration. None won't read start reading any events. You are able to switch between Auto Offset Reset types after your initial configuration through the below steps: 1. Navigate to the Listener Config table. 2. Re-configure the Auto Offset Reset value. 3. Set the Status column of the Listener Config to Disabled. 4. Navigate to the Event Listener State table. 5. Find the column that pertains to your data sync's Listener Config and delete it. 6. Navigate back to the Listener Config table. 7. Set the Status column of the Listener Config to Enabled in order for your new Auto Offset Reset configuration to take effect.
Latest
Database
Mandatory. The name of your MongoDB database.
Cinchy
Collection
Mandatory. The name of your MongoDB collection.
Employee
Pipeline Stages
Optional. This parameter allows you to specify pipeline stages with filters.
In MongoDB, an aggregation pipeline consists of one or more stages that process documents:
Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values.
The documents that are output from a stage are passed to the next stage.
An aggregation pipeline can return results for groups of documents. For example, return the total, average, maximum, and minimum values.
See the Example Topic JSON below. Our example config uses a filter to return documents with an ID between 0 and 10,000 AND documents with the location set to Montreal, OR where the operation type is 'delete'
connectionString
Mandatory. Your MongoDB connection string.
mongodb://localhost:9877