Skip to content

Kafka

Usage example

apiVersion: aiven.io/v1alpha1
kind: Kafka
metadata:
  name: my-kafka
spec:
  authSecretRef:
    name: aiven-token
    key: token

  connInfoSecretTarget:
    name: kafka-secret
    prefix: MY_SECRET_PREFIX_
    annotations:
      foo: bar
    labels:
      baz: egg

  project: my-aiven-project
  cloudName: google-europe-west1
  plan: startup-2

  maintenanceWindowDow: friday
  maintenanceWindowTime: 23:00:00

Kafka

Kafka is the Schema for the kafkas API.

Required

  • apiVersion (string). Value aiven.io/v1alpha1.
  • kind (string). Value Kafka.
  • metadata (object). Data that identifies the object, including a name string and optional namespace.
  • spec (object). KafkaSpec defines the desired state of Kafka. See below for nested schema.

spec

Appears on Kafka.

KafkaSpec defines the desired state of Kafka.

Required

  • plan (string, MaxLength: 128). Subscription plan.
  • project (string, Immutable, MaxLength: 63, Format: ^[a-zA-Z0-9_-]+$). Identifies the project this resource belongs to.

Optional

  • authSecretRef (object). Authentication reference to Aiven token in a secret. See below for nested schema.
  • cloudName (string, MaxLength: 256). Cloud the service runs in.
  • connInfoSecretTarget (object). Information regarding secret creation. Exposed keys: KAFKA_HOST, KAFKA_PORT, KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_ACCESS_CERT, KAFKA_ACCESS_KEY, KAFKA_SASL_HOST, KAFKA_SASL_PORT, KAFKA_SCHEMA_REGISTRY_HOST, KAFKA_SCHEMA_REGISTRY_PORT, KAFKA_CONNECT_HOST, KAFKA_CONNECT_PORT, KAFKA_REST_HOST, KAFKA_REST_PORT, KAFKA_CA_CERT. See below for nested schema.
  • connInfoSecretTargetDisabled (boolean, Immutable). When true, the secret containing connection information will not be created, defaults to false. This field cannot be changed after resource creation.
  • disk_space (string, Format: ^[1-9][0-9]*(GiB|G)*). The disk space of the service, possible values depend on the service type, the cloud provider and the project. Reducing will result in the service re-balancing.
  • karapace (boolean). Switch the service to use Karapace for schema registry and REST proxy.
  • maintenanceWindowDow (string, Enum: monday, tuesday, wednesday, thursday, friday, saturday, sunday). Day of week when maintenance operations should be performed. One monday, tuesday, wednesday, etc.
  • maintenanceWindowTime (string, MaxLength: 8). Time of day when maintenance operations should be performed. UTC time in HH:mm:ss format.
  • projectVPCRef (object). ProjectVPCRef reference to ProjectVPC resource to use its ID as ProjectVPCID automatically. See below for nested schema.
  • projectVpcId (string, MaxLength: 36). Identifier of the VPC the service should be in, if any.
  • serviceIntegrations (array of objects, Immutable, MaxItems: 1). Service integrations to specify when creating a service. Not applied after initial service creation. See below for nested schema.
  • tags (object, AdditionalProperties: string). Tags are key-value pairs that allow you to categorize services.
  • technicalEmails (array of objects, MaxItems: 10). Defines the email addresses that will receive alerts about upcoming maintenance updates or warnings about service instability. See below for nested schema.
  • terminationProtection (boolean). Prevent service from being deleted. It is recommended to have this enabled for all services.
  • userConfig (object). Kafka specific user configuration options. See below for nested schema.

authSecretRef

Appears on spec.

Authentication reference to Aiven token in a secret.

Required

  • key (string, MinLength: 1).
  • name (string, MinLength: 1).

connInfoSecretTarget

Appears on spec.

Information regarding secret creation. Exposed keys: KAFKA_HOST, KAFKA_PORT, KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_ACCESS_CERT, KAFKA_ACCESS_KEY, KAFKA_SASL_HOST, KAFKA_SASL_PORT, KAFKA_SCHEMA_REGISTRY_HOST, KAFKA_SCHEMA_REGISTRY_PORT, KAFKA_CONNECT_HOST, KAFKA_CONNECT_PORT, KAFKA_REST_HOST, KAFKA_REST_PORT, KAFKA_CA_CERT.

Required

  • name (string). Name of the secret resource to be created. By default, it is equal to the resource name.

Optional

  • annotations (object, AdditionalProperties: string). Annotations added to the secret.
  • labels (object, AdditionalProperties: string). Labels added to the secret.
  • prefix (string). Prefix for the secret's keys. Added "as is" without any transformations. By default, is equal to the kind name in uppercase + underscore, e.g. KAFKA_, REDIS_, etc.

projectVPCRef

Appears on spec.

ProjectVPCRef reference to ProjectVPC resource to use its ID as ProjectVPCID automatically.

Required

  • name (string, MinLength: 1).

Optional

serviceIntegrations

Appears on spec.

Service integrations to specify when creating a service. Not applied after initial service creation.

Required

technicalEmails

Appears on spec.

Defines the email addresses that will receive alerts about upcoming maintenance updates or warnings about service instability.

Required

  • email (string, Format: ^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$). Email address.

userConfig

Appears on spec.

Kafka specific user configuration options.

Optional

ip_filter

Appears on spec.userConfig.

Allow incoming connections from CIDR address block, e.g. 10.20.0.0/16.

Required

  • network (string, MaxLength: 43). CIDR address block.

Optional

  • description (string, MaxLength: 1024). Description for IP filter list entry.

kafka

Appears on spec.userConfig.

Kafka broker configuration values.

Optional

  • auto_create_topics_enable (boolean). Enable auto creation of topics.
  • compression_type (string, Enum: gzip, snappy, lz4, zstd, uncompressed, producer). Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (gzip, snappy, lz4, zstd). It additionally accepts uncompressed which is equivalent to no compression; and producer which means retain the original compression codec set by the producer.
  • connections_max_idle_ms (integer, Minimum: 1000, Maximum: 3600000). Idle connections timeout: the server socket processor threads close the connections that idle for longer than this.
  • default_replication_factor (integer, Minimum: 1, Maximum: 10). Replication factor for autocreated topics.
  • group_initial_rebalance_delay_ms (integer, Minimum: 0, Maximum: 300000). The amount of time, in milliseconds, the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins. The default value for this is 3 seconds. During development and testing it might be desirable to set this to 0 in order to not delay test execution time.
  • group_max_session_timeout_ms (integer, Minimum: 0, Maximum: 1800000). The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.
  • group_min_session_timeout_ms (integer, Minimum: 0, Maximum: 60000). The minimum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.
  • log_cleaner_delete_retention_ms (integer, Minimum: 0, Maximum: 315569260000). How long are delete records retained?.
  • log_cleaner_max_compaction_lag_ms (integer, Minimum: 30000). The maximum amount of time message will remain uncompacted. Only applicable for logs that are being compacted.
  • log_cleaner_min_cleanable_ratio (number, Minimum: 0.2, Maximum: 0.9). Controls log compactor frequency. Larger value means more frequent compactions but also more space wasted for logs. Consider setting log.cleaner.max.compaction.lag.ms to enforce compactions sooner, instead of setting a very high value for this option.
  • log_cleaner_min_compaction_lag_ms (integer, Minimum: 0). The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
  • log_cleanup_policy (string, Enum: delete, compact, compact,delete). The default cleanup policy for segments beyond the retention window.
  • log_flush_interval_messages (integer, Minimum: 1). The number of messages accumulated on a log partition before messages are flushed to disk.
  • log_flush_interval_ms (integer, Minimum: 0). The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in log.flush.scheduler.interval.ms is used.
  • log_index_interval_bytes (integer, Minimum: 0, Maximum: 104857600). The interval with which Kafka adds an entry to the offset index.
  • log_index_size_max_bytes (integer, Minimum: 1048576, Maximum: 104857600). The maximum size in bytes of the offset index.
  • log_local_retention_bytes (integer, Minimum: -2). The maximum size of local log segments that can grow for a partition before it gets eligible for deletion. If set to -2, the value of log.retention.bytes is used. The effective value should always be less than or equal to log.retention.bytes value.
  • log_local_retention_ms (integer, Minimum: -2). The number of milliseconds to keep the local log segments before it gets eligible for deletion. If set to -2, the value of log.retention.ms is used. The effective value should always be less than or equal to log.retention.ms value.
  • log_message_downconversion_enable (boolean). This configuration controls whether down-conversion of message formats is enabled to satisfy consume requests.
  • log_message_timestamp_difference_max_ms (integer, Minimum: 0). The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message.
  • log_message_timestamp_type (string, Enum: CreateTime, LogAppendTime). Define whether the timestamp in the message is message create time or log append time.
  • log_preallocate (boolean). Should pre allocate file when create new segment?.
  • log_retention_bytes (integer, Minimum: -1). The maximum size of the log before deleting messages.
  • log_retention_hours (integer, Minimum: -1, Maximum: 2147483647). The number of hours to keep a log file before deleting it.
  • log_retention_ms (integer, Minimum: -1). The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.
  • log_roll_jitter_ms (integer, Minimum: 0). The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in log.roll.jitter.hours is used.
  • log_roll_ms (integer, Minimum: 1). The maximum time before a new log segment is rolled out (in milliseconds).
  • log_segment_bytes (integer, Minimum: 10485760, Maximum: 1073741824). The maximum size of a single log file.
  • log_segment_delete_delay_ms (integer, Minimum: 0, Maximum: 3600000). The amount of time to wait before deleting a file from the filesystem.
  • max_connections_per_ip (integer, Minimum: 256, Maximum: 2147483647). The maximum number of connections allowed from each ip address (defaults to 2147483647).
  • max_incremental_fetch_session_cache_slots (integer, Minimum: 1000, Maximum: 10000). The maximum number of incremental fetch sessions that the broker will maintain.
  • message_max_bytes (integer, Minimum: 0, Maximum: 100001200). The maximum size of message that the server can receive.
  • min_insync_replicas (integer, Minimum: 1, Maximum: 7). When a producer sets acks to all (or -1), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
  • num_partitions (integer, Minimum: 1, Maximum: 1000). Number of partitions for autocreated topics.
  • offsets_retention_minutes (integer, Minimum: 1, Maximum: 2147483647). Log retention window in minutes for offsets topic.
  • producer_purgatory_purge_interval_requests (integer, Minimum: 10, Maximum: 10000). The purge interval (in number of requests) of the producer request purgatory(defaults to 1000).
  • replica_fetch_max_bytes (integer, Minimum: 1048576, Maximum: 104857600). The number of bytes of messages to attempt to fetch for each partition (defaults to 1048576). This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made.
  • replica_fetch_response_max_bytes (integer, Minimum: 10485760, Maximum: 1048576000). Maximum bytes expected for the entire fetch response (defaults to 10485760). Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum.
  • sasl_oauthbearer_expected_audience (string, Pattern: ^[^\r\n]*$, MaxLength: 128). The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences.
  • sasl_oauthbearer_expected_issuer (string, Pattern: ^[^\r\n]*$, MaxLength: 128). Optional setting for the broker to use to verify that the JWT was created by the expected issuer.
  • sasl_oauthbearer_jwks_endpoint_url (string, MaxLength: 2048). OIDC JWKS endpoint URL. By setting this the SASL SSL OAuth2/OIDC authentication is enabled. See also other options for SASL OAuth2/OIDC.
  • sasl_oauthbearer_sub_claim_name (string, Pattern: ^[^\r\n]*$, MaxLength: 128). Name of the scope from which to extract the subject claim from the JWT. Defaults to sub.
  • socket_request_max_bytes (integer, Minimum: 10485760, Maximum: 209715200). The maximum number of bytes in a socket request (defaults to 104857600).
  • transaction_partition_verification_enable (boolean). Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition.
  • transaction_remove_expired_transaction_cleanup_interval_ms (integer, Minimum: 600000, Maximum: 3600000). The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing (defaults to 3600000 (1 hour)).
  • transaction_state_log_segment_bytes (integer, Minimum: 1048576, Maximum: 2147483647). The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads (defaults to 104857600 (100 mebibytes)).

kafka_authentication_methods

Appears on spec.userConfig.

Kafka authentication methods.

Optional

  • certificate (boolean). Enable certificate/SSL authentication.
  • sasl (boolean). Enable SASL authentication.

kafka_connect_config

Appears on spec.userConfig.

Kafka Connect configuration values.

Optional

  • connector_client_config_override_policy (string, Enum: None, All). Defines what client configurations can be overridden by the connector. Default is None.
  • consumer_auto_offset_reset (string, Enum: earliest, latest). What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Default is earliest.
  • consumer_fetch_max_bytes (integer, Minimum: 1048576, Maximum: 104857600). Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum.
  • consumer_isolation_level (string, Enum: read_uncommitted, read_committed). Transaction read isolation level. read_uncommitted is the default, but read_committed can be used if consume-exactly-once behavior is desired.
  • consumer_max_partition_fetch_bytes (integer, Minimum: 1048576, Maximum: 104857600). Records are fetched in batches by the consumer.If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
  • consumer_max_poll_interval_ms (integer, Minimum: 1, Maximum: 2147483647). The maximum delay in milliseconds between invocations of poll() when using consumer group management (defaults to 300000).
  • consumer_max_poll_records (integer, Minimum: 1, Maximum: 10000). The maximum number of records returned in a single call to poll() (defaults to 500).
  • offset_flush_interval_ms (integer, Minimum: 1, Maximum: 100000000). The interval at which to try committing offsets for tasks (defaults to 60000).
  • offset_flush_timeout_ms (integer, Minimum: 1, Maximum: 2147483647). Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt (defaults to 5000).
  • producer_batch_size (integer, Minimum: 0, Maximum: 5242880). This setting gives the upper bound of the batch size to be sent. If there are fewer than this many bytes accumulated for this partition, the producer will linger for the linger.ms time waiting for more records to show up. A batch size of zero will disable batching entirely (defaults to 16384).
  • producer_buffer_memory (integer, Minimum: 5242880, Maximum: 134217728). The total bytes of memory the producer can use to buffer records waiting to be sent to the broker (defaults to 33554432).
  • producer_compression_type (string, Enum: gzip, snappy, lz4, zstd, none). Specify the default compression type for producers. This configuration accepts the standard compression codecs (gzip, snappy, lz4, zstd). It additionally accepts none which is the default and equivalent to no compression.
  • producer_linger_ms (integer, Minimum: 0, Maximum: 5000). This setting gives the upper bound on the delay for batching: once there is batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if there are fewer than this many bytes accumulated for this partition the producer will linger for the specified time waiting for more records to show up. Defaults to 0.
  • producer_max_request_size (integer, Minimum: 131072, Maximum: 67108864). This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
  • scheduled_rebalance_max_delay_ms (integer, Minimum: 0, Maximum: 600000). The maximum delay that is scheduled in order to wait for the return of one or more departed workers before rebalancing and reassigning their connectors and tasks to the group. During this period the connectors and tasks of the departed workers remain unassigned. Defaults to 5 minutes.
  • session_timeout_ms (integer, Minimum: 1, Maximum: 2147483647). The timeout in milliseconds used to detect failures when using Kafka’s group management facilities (defaults to 10000).

kafka_rest_config

Appears on spec.userConfig.

Kafka REST configuration.

Optional

  • consumer_enable_auto_commit (boolean). If true the consumer's offset will be periodically committed to Kafka in the background.
  • consumer_request_max_bytes (integer, Minimum: 0, Maximum: 671088640). Maximum number of bytes in unencoded message keys and values by a single request.
  • consumer_request_timeout_ms (integer, Enum: 1000, 15000, 30000, Minimum: 1000, Maximum: 30000). The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached.
  • name_strategy (string, Enum: topic_name, record_name, topic_record_name). Name strategy to use when selecting subject for storing schemas.
  • name_strategy_validation (boolean). If true, validate that given schema is registered under expected subject name by the used name strategy when producing messages.
  • producer_acks (string, Enum: all, -1, 0, 1). The number of acknowledgments the producer requires the leader to have received before considering a request complete. If set to all or -1, the leader will wait for the full set of in-sync replicas to acknowledge the record.
  • producer_compression_type (string, Enum: gzip, snappy, lz4, zstd, none). Specify the default compression type for producers. This configuration accepts the standard compression codecs (gzip, snappy, lz4, zstd). It additionally accepts none which is the default and equivalent to no compression.
  • producer_linger_ms (integer, Minimum: 0, Maximum: 5000). Wait for up to the given delay to allow batching records together.
  • producer_max_request_size (integer, Minimum: 0, Maximum: 2147483647). The maximum size of a request in bytes. Note that Kafka broker can also cap the record batch size.
  • simpleconsumer_pool_size_max (integer, Minimum: 10, Maximum: 250). Maximum number of SimpleConsumers that can be instantiated per broker.

private_access

Appears on spec.userConfig.

Allow access to selected service ports from private networks.

Optional

  • kafka (boolean). Allow clients to connect to kafka with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.
  • kafka_connect (boolean). Allow clients to connect to kafka_connect with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.
  • kafka_rest (boolean). Allow clients to connect to kafka_rest with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.
  • prometheus (boolean). Allow clients to connect to prometheus with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.
  • schema_registry (boolean). Allow clients to connect to schema_registry with a DNS name that always resolves to the service's private IP addresses. Only available in certain network locations.

Appears on spec.userConfig.

Allow access to selected service components through Privatelink.

Optional

public_access

Appears on spec.userConfig.

Allow access to selected service ports from the public Internet.

Optional

  • kafka (boolean). Allow clients to connect to kafka from the public internet for service nodes that are in a project VPC or another type of private network.
  • kafka_connect (boolean). Allow clients to connect to kafka_connect from the public internet for service nodes that are in a project VPC or another type of private network.
  • kafka_rest (boolean). Allow clients to connect to kafka_rest from the public internet for service nodes that are in a project VPC or another type of private network.
  • prometheus (boolean). Allow clients to connect to prometheus from the public internet for service nodes that are in a project VPC or another type of private network.
  • schema_registry (boolean). Allow clients to connect to schema_registry from the public internet for service nodes that are in a project VPC or another type of private network.

schema_registry_config

Appears on spec.userConfig.

Schema Registry configuration.

Optional

  • leader_eligibility (boolean). If true, Karapace / Schema Registry on the service nodes can participate in leader election. It might be needed to disable this when the schemas topic is replicated to a secondary cluster and Karapace / Schema Registry there must not participate in leader election. Defaults to true.
  • topic_name (string, MinLength: 1, MaxLength: 249). The durable single partition topic that acts as the durable log for the data. This topic must be compacted to avoid losing data due to retention policy. Please note that changing this configuration in an existing Schema Registry / Karapace setup leads to previous schemas being inaccessible, data encoded with them potentially unreadable and schema ID sequence put out of order. It's only possible to do the switch while Schema Registry / Karapace is disabled. Defaults to _schemas.

tiered_storage

Appears on spec.userConfig.

Tiered storage configuration.

Optional

  • enabled (boolean). Whether to enable the tiered storage functionality.
  • local_cache (object). Deprecated. Local cache configuration. See below for nested schema.

local_cache

Appears on spec.userConfig.tiered_storage.

Deprecated. Local cache configuration.

Required

  • size (integer, Minimum: 1, Maximum: 107374182400). Deprecated. Local cache size in bytes.