Kafka Topic
The page outlines the parameters that should be included in your listener configuration when setting up a real time sync with a Kafka stream source. This process currently supports JSON string or AVRO serialized format.To listen in on multiple topics, you will need to configure multiple listener configs.
To listen in on multiple topics, you will need to configure multiple listener configs.
To set up an Stream Source, you must navigate to the Listener Config table and insert a new row for your data sync (Image 1). Most of the columns within the Listener Config table persist across all Stream Sources, however exceptions will be noted. You can find all of these parameters and their relevant descriptions in the tables below.

Image 1: The Listener Config table
General Parameters
Topic
Connection Attributes
The following column parameters can be found in the Listener Config table:
Parameter | Description | Example |
---|---|---|
Name | Mandatory. Provide a name for your Listener Config. | Kafka Real-Time Sync |
Event Connector Type | Mandatory. Select your Connector type from the drop down menu. | Kafka Topic |
Topic | Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring. | See the Topic tab. |
Connection Attributes | Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring. | See the Connection Attributes tab. |
Status | Mandatory. This value refers to whether your listener config/real-time sync is turned on or off. Make sure you keep this set to Disabled until you are confident you have the rest of your data sync properly configured first. | Disabled |
Data Sync Config | Mandatory. This drop down will list all of the data syncs on your platform. Select the one that you want to use for your real-time sync. | Kafka Data Sync |
Subscription Expires On | This value is only relevant for Salesforce Stream Sources. This field is a timestamp that is auto-populated when it has successfully subscribed to a topic. | |
Message | Leave this value blank when setting up your configuration. This field will auto-populate during the running of your sync with any relevant messages. For instance "Cinchy listener is running", or "Listener is disabled". | |
Auto Offset Reset | Earliest, Latest or None.
In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from.
Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy.
Latest will fetch the last value after whatever was last processed. This is the typical configuration.
None will not read start reading any events.
You are able to switch between Auto Offset Reset types after your initial configuration through the below steps:
1. Navigate to the Listener Config table.
2. Re-configure the Auto Offset Reset value.
3. Set the "Status" column of the Listener Config to "Disabled".
4. Navigate to the Event Listener State table.
5. Find the column that pertains to your data sync's Listener Config and delete it.
6. Navigate back to the Listener Config table.
7. Set the "Status" column of the Listener Config to "Enabled" in order for your new Auto Offset Reset configuration to take effect. | Latest |
The below table can be used to help create your Topic JSON needed to set up a real-time sync.
Parameter | Description | Example |
---|---|---|
topicName | Mandatory. This is the Kafka topic name to listen messages on. | |
messageFormat | |
Example Topic JSON
{
"topicName": "<(mandatory) kafka topic name to listen messages on>",
"messageFormat": "<(optional) Put "AVRO" if the messages are serialized in AVRO>"
}
The below table can be used to help create your Connection Attributes JSON needed to set up a real-time sync.
Parameter | Description |
---|---|
bootstrapServers | List the Kafka bootstrap servers in a comma-separated list. This should be in the form of host:port |
saslMechanism | This will be either PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.
SCRAM-SHA-256 must be formatted as: SCRAMSHA256
SCRAM-SHA-512 must be formatted as: SCRAMSHA512 |
saslPassword | The password for your chosen SASL mechanism |
saslUsername | The username for your chosen SASL mechanism. |
url | This is required if your data follows a schema when serialized in AVRO. It is a comma-separated list of URLs for schema registry instances that are used to register or lookup schemas. |
basicAuthCredentialsSource | Specifies the Kafka configuration property "schema.registry.basic.auth.credentials.source" that provides the basic authentication credentials. This can be "UserInfo" | "SaslInherit" |
basicAuthUserInfo | Basic Auth credentials specified in the form of username:password |
sslKeystorePassword | This is the client keystore (PKCS#12) password. |
securityProtocol | Kafka supports cluster encryption and authentication, which can encrypt data-in-transit between your applications and Kafka. Use this field to specify which protocol will be used for communication between client and server. Cinchy currently supports the following options: Plaintext, SaslPlaintext, or SaslSsl.
Paintext: Unauthenticated, non-encrypted.
SaslPlaintext: SASL-based authentication, non-encrypted.
SaslSSL: SASL-based authentication, TLS-based encryption.
If no parameter is specified, this will default to Plaintext. |
{
"bootstrapServers": "< (mandatory) kafka bootstrap servers in a comma-separated list in the form of host:port>",
"saslMechanism": "<PLAIN|SCRAM-SHA-256|SCRAM-SHA-512>",
"saslPassword": "",
"saslUsername": "",
"schemaRegistrySettings": {
"url": "<(optional) This is required if your data follows a schema when serialized in Avro. A comma-separated list of URLs for schema registry instances that are used to register or lookup schemas. >",
"basicAuthCredentialsSource": "<(optional) Specifies the Kafka configuration property "schema.registry.basic.auth.credentials.source" that provides the basic authentication credentials, this can be "UserInfo" | "SaslInherit">",
"basicAuthUserInfo": "<(optional) Basic Auth credentials specified in the form of username:password>",
"sslKeystorePassword": "<(optional) The client keystore (PKCS#12) password>"
}
"securityProtocol": "Plaintext | SaslPlaintext | SaslSsl"
}
Last modified 4mo ago