Skip to content

KafkaConnect

Usage example

Example
apiVersion: aiven.io/v1alpha1
kind: KafkaConnect
metadata:
  name: my-kafka-connect
spec:
  authSecretRef:
    name: aiven-token
    key: token

  project: my-aiven-project
  cloudName: google-europe-west1
  plan: business-4

  userConfig:
    kafka_connect:
      consumer_isolation_level: read_committed
    public_access:
      kafka_connect: true

Info

To create this resource, a Secret containing Aiven token must be created first.

Apply the resource with:

kubectl apply -f example.yaml

Verify the newly created KafkaConnect:

kubectl get kafkaconnects my-kafka-connect

The output is similar to the following:

Name                Project             Region                 Plan          State      
my-kafka-connect    my-aiven-project    google-europe-west1    business-4    RUNNING    

KafkaConnect

KafkaConnect is the Schema for the kafkaconnects API.

Required

  • apiVersion (string). Value aiven.io/v1alpha1.
  • kind (string). Value KafkaConnect.
  • metadata (object). Data that identifies the object, including a name string and optional namespace.
  • spec (object). KafkaConnectSpec defines the desired state of KafkaConnect. See below for nested schema.

spec

Appears on KafkaConnect.

KafkaConnectSpec defines the desired state of KafkaConnect.

Required

  • plan (string, MaxLength: 128). Subscription plan.
  • project (string, Immutable, Pattern: ^[a-zA-Z0-9_-]+$, MaxLength: 63). Identifies the project this resource belongs to.

Optional

  • authSecretRef (object). Authentication reference to Aiven token in a secret. See below for nested schema.
  • cloudName (string, MaxLength: 256). Cloud the service runs in.
  • maintenanceWindowDow (string, Enum: monday, tuesday, wednesday, thursday, friday, saturday, sunday). Day of week when maintenance operations should be performed. One monday, tuesday, wednesday, etc.
  • maintenanceWindowTime (string, MaxLength: 8). Time of day when maintenance operations should be performed. UTC time in HH:mm:ss format.
  • projectVPCRef (object). ProjectVPCRef reference to ProjectVPC resource to use its ID as ProjectVPCID automatically. See below for nested schema.
  • projectVpcId (string, MaxLength: 36). Identifier of the VPC the service should be in, if any.
  • serviceIntegrations (array of objects, Immutable, MaxItems: 1). Service integrations to specify when creating a service. Not applied after initial service creation. See below for nested schema.
  • tags (object, AdditionalProperties: string). Tags are key-value pairs that allow you to categorize services.
  • technicalEmails (array of objects, MaxItems: 10). Defines the email addresses that will receive alerts about upcoming maintenance updates or warnings about service instability. See below for nested schema.
  • terminationProtection (boolean). Prevent service from being deleted. It is recommended to have this enabled for all services.
  • userConfig (object). KafkaConnect specific user configuration options. See below for nested schema.

authSecretRef

Appears on spec.

Authentication reference to Aiven token in a secret.

Required

  • key (string, MinLength: 1).
  • name (string, MinLength: 1).

projectVPCRef

Appears on spec.

ProjectVPCRef reference to ProjectVPC resource to use its ID as ProjectVPCID automatically.

Required

  • name (string, MinLength: 1).

Optional

serviceIntegrations

Appears on spec.

Service integrations to specify when creating a service. Not applied after initial service creation.

Required

technicalEmails

Appears on spec.

Defines the email addresses that will receive alerts about upcoming maintenance updates or warnings about service instability.

Required

  • email (string). Email address.

userConfig

Appears on spec.

KafkaConnect specific user configuration options.

Optional

  • additional_backup_regions (array of strings, MaxItems: 1). Deprecated. Additional Cloud Regions for Backup Replication.
  • ip_filter (array of objects, MaxItems: 1024). Allow incoming connections from CIDR address block, e.g. 10.20.0.0/16. See below for nested schema.
  • kafka_connect (object). Kafka Connect configuration values. See below for nested schema.
  • private_access (object). Allow access to selected service ports from private networks. See below for nested schema.
  • privatelink_access (object). Allow access to selected service components through Privatelink. See below for nested schema.
  • public_access (object). Allow access to selected service ports from the public Internet. See below for nested schema.
  • secret_providers. See below for } (array of objects). Configure external secret providers in order to reference external secrets in connector configuration. Currently Hashicorp Vault (provider: vault, auth_method: token) and AWS Secrets Manager (provider: aws, auth_method: credentials) are supported. Secrets can be referenced in connector config with ${::nested schema.
  • service_log (boolean). Store logs for the service so that they are available in the HTTP API and console.
  • static_ips (boolean). Use static public IP addresses.

ip_filter

Appears on spec.userConfig.

CIDR address block, either as a string, or in a dict with an optional description field.

Required

  • network (string, MaxLength: 43). CIDR address block.

Optional

  • description (string, MaxLength: 1024). Description for IP filter list entry.

kafka_connect

Appears on spec.userConfig.

Kafka Connect configuration values.

Optional

  • connector_client_config_override_policy (string, Enum: None, All). Defines what client configurations can be overridden by the connector. Default is None.
  • consumer_auto_offset_reset (string, Enum: earliest, latest). What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Default is earliest.
  • consumer_fetch_max_bytes (integer, Minimum: 1048576, Maximum: 104857600). Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum.
  • consumer_isolation_level (string, Enum: read_uncommitted, read_committed). Transaction read isolation level. read_uncommitted is the default, but read_committed can be used if consume-exactly-once behavior is desired.
  • consumer_max_partition_fetch_bytes (integer, Minimum: 1048576, Maximum: 104857600). Records are fetched in batches by the consumer.If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
  • consumer_max_poll_interval_ms (integer, Minimum: 1, Maximum: 2147483647). The maximum delay in milliseconds between invocations of poll() when using consumer group management (defaults to 300000).
  • consumer_max_poll_records (integer, Minimum: 1, Maximum: 10000). The maximum number of records returned in a single call to poll() (defaults to 500).
  • offset_flush_interval_ms (integer, Minimum: 1, Maximum: 100000000). The interval at which to try committing offsets for tasks (defaults to 60000).
  • offset_flush_timeout_ms (integer, Minimum: 1, Maximum: 2147483647). Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt (defaults to 5000).
  • producer_batch_size (integer, Minimum: 0, Maximum: 5242880). This setting gives the upper bound of the batch size to be sent. If there are fewer than this many bytes accumulated for this partition, the producer will linger for the linger.ms time waiting for more records to show up. A batch size of zero will disable batching entirely (defaults to 16384).
  • producer_buffer_memory (integer, Minimum: 5242880, Maximum: 134217728). The total bytes of memory the producer can use to buffer records waiting to be sent to the broker (defaults to 33554432).
  • producer_compression_type (string, Enum: gzip, snappy, lz4, zstd, none). Specify the default compression type for producers. This configuration accepts the standard compression codecs (gzip, snappy, lz4, zstd). It additionally accepts none which is the default and equivalent to no compression.
  • producer_linger_ms (integer, Minimum: 0, Maximum: 5000). This setting gives the upper bound on the delay for batching: once there is batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if there are fewer than this many bytes accumulated for this partition the producer will linger for the specified time waiting for more records to show up. Defaults to 0.
  • producer_max_request_size (integer, Minimum: 131072, Maximum: 67108864). This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
  • scheduled_rebalance_max_delay_ms (integer, Minimum: 0, Maximum: 600000). The maximum delay that is scheduled in order to wait for the return of one or more departed workers before rebalancing and reassigning their connectors and tasks to the group. During this period the connectors and tasks of the departed workers remain unassigned. Defaults to 5 minutes.
  • session_timeout_ms (integer, Minimum: 1, Maximum: 2147483647). The timeout in milliseconds used to detect failures when using Kafka’s group management facilities (defaults to 10000).

private_access

Appears on spec.userConfig.

Allow access to selected service ports from private networks.

Optional

  • kafka_connect (boolean). Allow clients to connect to kafka_connect with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.
  • prometheus (boolean). Allow clients to connect to prometheus with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.

Appears on spec.userConfig.

Allow access to selected service components through Privatelink.

Optional

public_access

Appears on spec.userConfig.

Allow access to selected service ports from the public Internet.

Optional

  • kafka_connect (boolean). Allow clients to connect to kafka_connect from the public internet for service nodes that are in a project VPC or another type of private network.
  • prometheus (boolean). Allow clients to connect to prometheus from the public internet for service nodes that are in a project VPC or another type of private network.

secret_providers

Appears on spec.userConfig.

SecretProvider.

Required

  • name (string). Name of the secret provider. Used to reference secrets in connector config.

Optional

aws

Appears on spec.userConfig.secret_providers.

AWS config for Secret Provider.

Required

  • auth_method (string, Enum: credentials). Auth method of the vault secret provider.
  • region (string, MaxLength: 64). Region used to lookup secrets with AWS SecretManager.

Optional

  • access_key (string, MaxLength: 128). Access key used to authenticate with aws.
  • secret_key (string, MaxLength: 128). Secret key used to authenticate with aws.

vault

Appears on spec.userConfig.secret_providers.

Vault Config for Secret Provider.

Required

  • address (string, MinLength: 1, MaxLength: 65536). Address of the Vault server.
  • auth_method (string, Enum: token). Auth method of the vault secret provider.

Optional

  • engine_version (integer, Enum: 1, 2). KV Secrets Engine version of the Vault server instance.
  • prefix_path_depth (integer). Prefix path depth of the secrets Engine. Default is 1. If the secrets engine path has more than one segment it has to be increased to the number of segments.
  • token (string, MaxLength: 256). Token used to authenticate with vault and auth method token.