'connector.class' = 'org.apache.kafka.connect.file.FileStreamSourceConnector', =========================================== Message Common settings for a topic that you may want to customise include cleanup.policy, min.insync.replicas, and compression.type. = |_| = -------------------------------------- = |_|, _, |_|____/|____/ = So long as this is set, you can then specify the defaults for new topics to be created by a connector in the connector configuration: When topics are created they are done so using the defaults configured on the broker for topic creation including num.partitions and default.replication.factor. ------------------------------------------------------------------------- This is particularly true for connectors which are creating a large number of topics, or where the topic name is not known in advance (e.g. When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. If you have set auto.create.topics.enable = true on your broker then the topic will be created when written to. ksqldb confluent modifying avro schema complains Heres an example of creating a connector that overrides the min.insync.replicas, partition count, and replication factor for a created topic: In the example above I used just the default topic creation group, but you can create multiple groups of configuration based on the topic name.
rowtime: 2021/01/06 14:09:27.522 Z, key: , value: Hello world! testdata-04 | 4 | 1 -------------------------------------- ksql> PRINT 'testdata-04' FROM BEGINNING; "value.converter": "org.apache.kafka.connect.storage.StringConverter", "file" : "/data/test.txt", "topic.creation.default.replication.factor": 1, Check out the docs page for some nicely documented examples of using this feature further. ksqldb confluent avro modifying "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", If auto.create.topics.enable = false (as it is on Confluent Cloud and many self-managed environments, for good reasons) then you can tell Kafka Connect to create those topics first. Lets try a different variation just to prove out the topic configuration: In the Kafka Connect worker log you can see the settings used (under the covers its done through TopicCreationGroup): Checking out the topic details we can see its as we wanted it - four partitions, and using snappy compression . Server Status: RUNNING Type 'help' (case-insensitive) for a rundown of how things work! = Event Streaming Database purpose-built = "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector", He likes writing about himself in the third person, eating good breakfasts, and drinking good beer. "topic.creation.default.cleanup.policy" : "compact" "value.converter" : "org.apache.kafka.connect.storage.StringConverter", "topic.creation.default.partitions" : 4, at https://rmoff.net/2021/01/06/creating-topics-with-kafka-connect/, topic.creation.default.replication.factor, //localhost:8083/connectors/source-txt-file-00/config \, { "key.converter" : "org.apache.kafka.connect.storage.StringConverter", ksqlDB can be used to create Kafka Connect connectors, either against an existing Kafka Connect cluster or using ksqlDBs embedded Connect worker. 'file' = '/data/test.txt', Kafka Topic | Partitions | Partition Replicas = for stream processing apps =
Key format: . Having trouble? KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to customise topic-level configurations for topics created by Kafka Connect source connectors. ()_/ - no data processed "topic.creation.default.replication.factor": 1, "topic.creation.default.compression.type" : "snappy" "file" : "/data/test.txt", 'topic' = 'testdata-03', schema confluent avro ingestion kafka registry using imply json describe location data create Kafka Connect (as of Apache Kafka 2.6) ships with a new worker configuration, topic.creation.enable which is set to true by default. "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "topic" : "testdata-02", }, '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})', ===========================================, (_| | | |_| | |_) | = Without these two settings present in the connector configuration, Kafka Connect will, If you are setting topic creation overrides you. This was added in Apache Kafka 2.6 (Confluent Platform 6.0) - prior to that you had to manually create the topics yourself otherwise the connector would fail. In the broker log you can see that the cleanup.policy configuration has been honoured ({cleanup.policy=compact}): The FileStreamSourceConnector sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException. Value format: KAFKA_STRING ------------------------------------------------------------------------- Robin Moffatt is a Principal Developer Advocate at Confluent, and an Oracle ACE Director (Alumnus). Heres a very simply Kafka Connect source connector, reading data in from a file: Since the broker is configured to automagically create new topics (auto.create.topics.enable = true), it does so and using the defaults - one partition, replication factor of 1, etc. );
}, '.topics[] | select(.topic =="testdata")', //localhost:8083/connectors/source-txt-file-01/config \, { ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH ( ksql> SHOW TOPICS; "topic" : "testdata-00", }, //localhost:8083/connectors/source-txt-file-02/config \, { Created connector SOURCE_TXT_FILE_03 We can examine this using various tools: Lets see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when its created. "topic" : "testdata-01", 'topic.creation.default.replication.factor' = 1, when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088 "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector", 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. In my sandbox I just have a single broker so Im going to leave the number of replicas as a sensible setting of 1, but Im going to change the number of partitions to four, as well as the cleanup policy from its default of delete to instead compact. Copyright 2017-2020 Confluent Inc. 'topic.creation.default.min.insync.replicas' = 1
ksql confluent cli etl kafka 'value.converter' = 'org.apache.kafka.connect.storage.StringConverter', "value.converter" : "org.apache.kafka.connect.storage.StringConverter", I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name. 'topic.creation.default.partitions' = 4, "file" : "/data/test.txt" "topic.creation.default.partitions" : 4,

Key format: . Having trouble? KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to customise topic-level configurations for topics created by Kafka Connect source connectors. ()_/ - no data processed "topic.creation.default.replication.factor": 1, "topic.creation.default.compression.type" : "snappy" "file" : "/data/test.txt", 'topic' = 'testdata-03', schema confluent avro ingestion kafka registry using imply json describe location data create Kafka Connect (as of Apache Kafka 2.6) ships with a new worker configuration, topic.creation.enable which is set to true by default. "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "topic" : "testdata-02", }, '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})', ===========================================, (_| | | |_| | |_) | = Without these two settings present in the connector configuration, Kafka Connect will, If you are setting topic creation overrides you. This was added in Apache Kafka 2.6 (Confluent Platform 6.0) - prior to that you had to manually create the topics yourself otherwise the connector would fail. In the broker log you can see that the cleanup.policy configuration has been honoured ({cleanup.policy=compact}): The FileStreamSourceConnector sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException. Value format: KAFKA_STRING ------------------------------------------------------------------------- Robin Moffatt is a Principal Developer Advocate at Confluent, and an Oracle ACE Director (Alumnus). Heres a very simply Kafka Connect source connector, reading data in from a file: Since the broker is configured to automagically create new topics (auto.create.topics.enable = true), it does so and using the defaults - one partition, replication factor of 1, etc. );
}, '.topics[] | select(.topic =="testdata")', //localhost:8083/connectors/source-txt-file-01/config \, { ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH ( ksql> SHOW TOPICS; "topic" : "testdata-00", }, //localhost:8083/connectors/source-txt-file-02/config \, { Created connector SOURCE_TXT_FILE_03 We can examine this using various tools: Lets see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when its created. "topic" : "testdata-01", 'topic.creation.default.replication.factor' = 1, when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088 "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector", 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. In my sandbox I just have a single broker so Im going to leave the number of replicas as a sensible setting of 1, but Im going to change the number of partitions to four, as well as the cleanup policy from its default of delete to instead compact. Copyright 2017-2020 Confluent Inc. 'topic.creation.default.min.insync.replicas' = 1
ksql confluent cli etl kafka 'value.converter' = 'org.apache.kafka.connect.storage.StringConverter', "value.converter" : "org.apache.kafka.connect.storage.StringConverter", I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name. 'topic.creation.default.partitions' = 4, "file" : "/data/test.txt" "topic.creation.default.partitions" : 4,