From 8ab008b8e3588e1f26f3696f2d68b4f513f388c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 17 Jun 2022 20:54:35 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=20json=20+=20yaml=20=E6=A0=87=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 448 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 448 insertions(+) diff --git a/config.go b/config.go index 147149d..1b2de98 100644 --- a/config.go +++ b/config.go @@ -6,3 +6,451 @@ // // Date : 2021-09-21 3:48 下午 package kafka + +import ( + "crypto/tls" + "github.com/Shopify/sarama" + "github.com/rcrowley/go-metrics" + "golang.org/x/net/proxy" + "net" + "time" +) + +// Config kafka的配置, 和 sarama.Config 一致, 在此基础上增加了 json 与 yaml 标签 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 20:38 2022/6/17 +type Config struct { + // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client. + Admin struct { + Retry struct { + // The total number of times to retry sending (retriable) admin requests (default 5). + // Similar to the `retries` setting of the JVM AdminClientConfig. + Max int `json:"max" yaml:"max"` + // Backoff time between retries of a failed request (default 100ms) + Backoff time.Duration `json:"backoff" yaml:"backoff"` + } + // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations, + // including topics, brokers, configurations and ACLs (defaults to 3 seconds). + Timeout time.Duration `json:"timeout" yaml:"timeout"` + } `json:"admin" yaml:"admin"` + + // Net is the namespace for network-level properties used by the Broker, and + // shared by the Client/Producer/Consumer. + Net struct { + // How many outstanding requests a connection is allowed to have before + // sending on it blocks (default 5). + // Throughput can improve but message ordering is not guaranteed if Producer.Idempotent is disabled, see: + // https://kafka.apache.org/protocol#protocol_network + // https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection + MaxOpenRequests int `json:"max_open_requests" yaml:"max_open_requests"` + + // All three of the below configurations are similar to the + // `socket.timeout.ms` setting in JVM kafka. All of them default + // to 30 seconds. + DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout"` // How long to wait for the initial connection. + ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`// How long to wait for a response. + WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`// How long to wait for a transmit. + + TLS struct { + // Whether or not to use TLS when connecting to the broker + // (defaults to false). + Enable bool `json:"enable" yaml:"enable"` + // The TLS configuration to use for secure connections if + // enabled (defaults to nil). + Config *tls.Config `json:"config" yaml:"config"` + } `json:"tls" yaml:"tls"` + + // SASL based authentication with broker. While there are multiple SASL authentication methods + // the current implementation is limited to plaintext (SASL/PLAIN) authentication + SASL struct { + // Whether or not to use SASL authentication when connecting to the broker + // (defaults to false). + Enable bool `json:"enable" yaml:"enable"` + // SASLMechanism is the name of the enabled SASL mechanism. + // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN). + Mechanism sarama.SASLMechanism `json:"mechanism" yaml:"mechanism"` + // Version is the SASL Protocol Version to use + // Kafka > 1.x should use V1, except on Azure EventHub which use V0 + Version int16 `json:"version" yaml:"version"` + // Whether or not to send the Kafka SASL handshake first if enabled + // (defaults to true). You should only set this to false if you're using + // a non-Kafka SASL proxy. + Handshake bool `json:"handshake" yaml:"handshake"` + // AuthIdentity is an (optional) authorization identity (authzid) to + // use for SASL/PLAIN authentication (if different from User) when + // an authenticated user is permitted to act as the presented + // alternative user. See RFC4616 for details. + AuthIdentity string `json:"auth_identity" yaml:"auth_identity"` + // User is the authentication identity (authcid) to present for + // SASL/PLAIN or SASL/SCRAM authentication + User string `json:"user" yaml:"user"` + // Password for SASL/PLAIN authentication + Password string `json:"password" yaml:"password"` + // authz id used for SASL/SCRAM authentication + SCRAMAuthzID string `json:"scram_authz_id" yaml:"scram_authz_id"` + // SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM + // client used to perform the SCRAM exchange with the server. + SCRAMClientGeneratorFunc func() sarama.SCRAMClient + // TokenProvider is a user-defined callback for generating + // access tokens for SASL/OAUTHBEARER auth. See the + // AccessTokenProvider interface docs for proper implementation + // guidelines. + TokenProvider sarama.AccessTokenProvider + + GSSAPI sarama.GSSAPIConfig `json:"gssapi" yaml:"gssapi"` + } + + // KeepAlive specifies the keep-alive period for an active network connection (defaults to 0). + // If zero or positive, keep-alives are enabled. + // If negative, keep-alives are disabled. + KeepAlive time.Duration `json:"keep_alive" yaml:"keep_alive"` + + // LocalAddr is the local address to use when dialing an + // address. The address must be of a compatible type for the + // network being dialed. + // If nil, a local address is automatically chosen. + LocalAddr net.Addr `json:"local_addr" yaml:"local_addr"` + + Proxy struct { + // Whether or not to use proxy when connecting to the broker + // (defaults to false). + Enable bool `json:"enable" yaml:"enable"` + // The proxy dialer to use enabled (defaults to nil). + Dialer proxy.Dialer + } `json:"proxy" yaml:"proxy"` + } + + // Metadata is the namespace for metadata management properties used by the + // Client, and shared by the Producer/Consumer. + Metadata struct { + Retry struct { + // The total number of times to retry a metadata request when the + // cluster is in the middle of a leader election (default 3). + Max int `json:"max" yaml:"max"` + // How long to wait for leader election to occur before retrying + // (default 250ms). Similar to the JVM's `retry.backoff.ms`. + Backoff time.Duration `json:"backoff" yaml:"backoff"` + // Called to compute backoff time dynamically. Useful for implementing + // more sophisticated backoff strategies. This takes precedence over + // `Backoff` if set. + BackoffFunc func(retries, maxRetries int) time.Duration + } `json:"retry" yaml:"retry"` + // How frequently to refresh the cluster metadata in the background. + // Defaults to 10 minutes. Set to 0 to disable. Similar to + // `topic.metadata.refresh.interval.ms` in the JVM version. + RefreshFrequency time.Duration `json:"refresh_frequency" yaml:"refresh_frequency"` + + // Whether to maintain a full set of metadata for all topics, or just + // the minimal set that has been necessary so far. The full set is simpler + // and usually more convenient, but can take up a substantial amount of + // memory if you have many topics and partitions. Defaults to true. + Full bool `json:"full" yaml:"full"` + + // How long to wait for a successful metadata response. + // Disabled by default which means a metadata request against an unreachable + // cluster (all brokers are unreachable or unresponsive) can take up to + // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max` + // to fail. + Timeout time.Duration `json:"timeout" yaml:"timeout"` + + // Whether to allow auto-create topics in metadata refresh. If set to true, + // the broker may auto-create topics that we requested which do not already exist, + // if it is configured to do so (`auto.create.topics.enable` is true). Defaults to true. + AllowAutoTopicCreation bool `json:"allow_auto_topic_creation" yaml:"allow_auto_topic_creation"` + } `json:"metadata" yaml:"metadata"` + + // Producer is the namespace for configuration related to producing messages, + // used by the Producer. + Producer struct { + // The maximum permitted size of a message (defaults to 1000000). Should be + // set equal to or smaller than the broker's `message.max.bytes`. + MaxMessageBytes int `json:"max_message_bytes" yaml:"max_message_bytes"` + // The level of acknowledgement reliability needed from the broker (defaults + // to WaitForLocal). Equivalent to the `request.required.acks` setting of the + // JVM producer. + RequiredAcks sarama.RequiredAcks `json:"required_acks" yaml:"required_acks"` + // The maximum duration the broker will wait the receipt of the number of + // RequiredAcks (defaults to 10 seconds). This is only relevant when + // RequiredAcks is set to WaitForAll or a number > 1. Only supports + // millisecond resolution, nanoseconds will be truncated. Equivalent to + // the JVM producer's `request.timeout.ms` setting. + Timeout time.Duration `json:"timeout" yaml:"timeout"` + // The type of compression to use on messages (defaults to no compression). + // Similar to `compression.codec` setting of the JVM producer. + Compression sarama.CompressionCodec `json:"compression" yaml:"compression"` + // The level of compression to use on messages. The meaning depends + // on the actual compression type used and defaults to default compression + // level for the codec. + CompressionLevel int `json:"compression_level" yaml:"compression_level"` + // Generates partitioners for choosing the partition to send messages to + // (defaults to hashing the message key). Similar to the `partitioner.class` + // setting for the JVM producer. + Partitioner sarama.PartitionerConstructor `json:"partitioner" yaml:"partitioner"` + // If enabled, the producer will ensure that exactly one copy of each message is + // written. + Idempotent bool `json:"idempotent" yaml:"idempotent"` + + // Return specifies what channels will be populated. If they are set to true, + // you must read from the respective channels to prevent deadlock. If, + // however, this config is used to create a `SyncProducer`, both must be set + // to true and you shall not read from the channels since the producer does + // this internally. + Return struct { + // If enabled, successfully delivered messages will be returned on the + // Successes channel (default disabled). + Successes bool `json:"successes" yaml:"successes"` + + // If enabled, messages that failed to deliver will be returned on the + // Errors channel, including error (default enabled). + Errors bool `json:"errors" yaml:"errors"` + } `json:"return" yaml:"return"` + + // The following config options control how often messages are batched up and + // sent to the broker. By default, messages are sent as fast as possible, and + // all messages received while the current batch is in-flight are placed + // into the subsequent batch. + Flush struct { + // The best-effort number of bytes needed to trigger a flush. Use the + // global sarama.MaxRequestSize to set a hard upper limit. + Bytes int `json:"bytes" yaml:"bytes"` + // The best-effort number of messages needed to trigger a flush. Use + // `MaxMessages` to set a hard upper limit. + Messages int `json:"messages" yaml:"messages"` + // The best-effort frequency of flushes. Equivalent to + // `queue.buffering.max.ms` setting of JVM producer. + Frequency time.Duration `json:"frequency" yaml:"frequency"` + // The maximum number of messages the producer will send in a single + // broker request. Defaults to 0 for unlimited. Similar to + // `queue.buffering.max.messages` in the JVM producer. + MaxMessages int `json:"max_messages" yaml:"max_messages"` + } `json:"flush" yaml:"flush"` + + Retry struct { + // The total number of times to retry sending a message (default 3). + // Similar to the `message.send.max.retries` setting of the JVM producer. + Max int `json:"max" yaml:"max"` + // How long to wait for the cluster to settle between retries + // (default 100ms). Similar to the `retry.backoff.ms` setting of the + // JVM producer. + Backoff time.Duration `json:"backoff" yaml:"backoff"` + // Called to compute backoff time dynamically. Useful for implementing + // more sophisticated backoff strategies. This takes precedence over + // `Backoff` if set. + BackoffFunc func(retries, maxRetries int) time.Duration + } `json:"retry" yaml:"retry"` + + // Interceptors to be called when the producer dispatcher reads the + // message for the first time. Interceptors allows to intercept and + // possible mutate the message before they are published to Kafka + // cluster. *ProducerMessage modified by the first interceptor's + // OnSend() is passed to the second interceptor OnSend(), and so on in + // the interceptor chain. + Interceptors []sarama.ProducerInterceptor + } `json:"producer" yaml:"producer"` + + // Consumer is the namespace for configuration related to consuming messages, + // used by the Consumer. + Consumer struct { + + // Group is the namespace for configuring consumer group. + Group struct { + Session struct { + // The timeout used to detect consumer failures when using Kafka's group management facility. + // The consumer sends periodic heartbeats to indicate its liveness to the broker. + // If no heartbeats are received by the broker before the expiration of this session timeout, + // then the broker will remove this consumer from the group and initiate a rebalance. + // Note that the value must be in the allowable range as configured in the broker configuration + // by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s) + Timeout time.Duration `json:"timeout" yaml:"timeout"` + } `json:"session" yaml:"session"` + Heartbeat struct { + // The expected time between heartbeats to the consumer coordinator when using Kafka's group + // management facilities. Heartbeats are used to ensure that the consumer's session stays active and + // to facilitate rebalancing when new consumers join or leave the group. + // The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no + // higher than 1/3 of that value. + // It can be adjusted even lower to control the expected time for normal rebalances (default 3s) + Interval time.Duration `json:"interval" yaml:"interval"` + } `json:"heartbeat" yaml:"heartbeat"` + Rebalance struct { + // Strategy for allocating topic partitions to members (default BalanceStrategyRange) + Strategy sarama.BalanceStrategy `json:"strategy" yaml:"strategy"` + // The maximum allowed time for each worker to join the group once a rebalance has begun. + // This is basically a limit on the amount of time needed for all tasks to flush any pending + // data and commit offsets. If the timeout is exceeded, then the worker will be removed from + // the group, which will cause offset commit failures (default 60s). + Timeout time.Duration `json:"timeout" yaml:"timeout"` + + Retry struct { + // When a new consumer joins a consumer group the set of consumers attempt to "rebalance" + // the load to assign partitions to each consumer. If the set of consumers changes while + // this assignment is taking place the rebalance will fail and retry. This setting controls + // the maximum number of attempts before giving up (default 4). + Max int `json:"max" yaml:"max"` + // Backoff time between retries during rebalance (default 2s) + Backoff time.Duration `json:"backoff" yaml:"backoff"` + } `json:"retry" yaml:"retry"` + } `json:"rebalance" yaml:"rebalance"` + Member struct { + // Custom metadata to include when joining the group. The user data for all joined members + // can be retrieved by sending a DescribeGroupRequest to the broker that is the + // coordinator for the group. + UserData []byte `json:"user_data" yaml:"user_data"` + } `json:"member" yaml:"member"` + // support KIP-345 + InstanceId string `json:"instance_id" yaml:"instance_id"` + } `json:"group" yaml:"group"` + + Retry struct { + // How long to wait after a failing to read from a partition before + // trying again (default 2s). + Backoff time.Duration `json:"backoff" yaml:"backoff"` + // Called to compute backoff time dynamically. Useful for implementing + // more sophisticated backoff strategies. This takes precedence over + // `Backoff` if set. + BackoffFunc func(retries int) time.Duration + } `json:"retry" yaml:"retry"` + + // Fetch is the namespace for controlling how many bytes are retrieved by any + // given request. + Fetch struct { + // The minimum number of message bytes to fetch in a request - the broker + // will wait until at least this many are available. The default is 1, + // as 0 causes the consumer to spin when no messages are available. + // Equivalent to the JVM's `fetch.min.bytes`. + Min int32 `json:"min" yaml:"min"` + // The default number of message bytes to fetch from the broker in each + // request (default 1MB). This should be larger than the majority of + // your messages, or else the consumer will spend a lot of time + // negotiating sizes and not actually consuming. Similar to the JVM's + // `fetch.message.max.bytes`. + Default int32 `json:"default" yaml:"default"` + // The maximum number of message bytes to fetch from the broker in a + // single request. Messages larger than this will return + // ErrMessageTooLarge and will not be consumable, so you must be sure + // this is at least as large as your largest message. Defaults to 0 + // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The + // global `sarama.MaxResponseSize` still applies. + Max int32 `json:"max" yaml:"max"` + } `json:"fetch" yaml:"fetch"` + // The maximum amount of time the broker will wait for Consumer.Fetch.Min + // bytes to become available before it returns fewer than that anyways. The + // default is 250ms, since 0 causes the consumer to spin when no events are + // available. 100-500ms is a reasonable range for most cases. Kafka only + // supports precision up to milliseconds; nanoseconds will be truncated. + // Equivalent to the JVM's `fetch.wait.max.ms`. + MaxWaitTime time.Duration `json:"max_wait_time" yaml:"max_wait_time"` + + // The maximum amount of time the consumer expects a message takes to + // process for the user. If writing to the Messages channel takes longer + // than this, that partition will stop fetching more messages until it + // can proceed again. + // Note that, since the Messages channel is buffered, the actual grace time is + // (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms. + // If a message is not written to the Messages channel between two ticks + // of the expiryTicker then a timeout is detected. + // Using a ticker instead of a timer to detect timeouts should typically + // result in many fewer calls to Timer functions which may result in a + // significant performance improvement if many messages are being sent + // and timeouts are infrequent. + // The disadvantage of using a ticker instead of a timer is that + // timeouts will be less accurate. That is, the effective timeout could + // be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For + // example, if `MaxProcessingTime` is 100ms then a delay of 180ms + // between two messages being sent may not be recognized as a timeout. + MaxProcessingTime time.Duration `json:"max_processing_time" yaml:"max_processing_time"` + + // Return specifies what channels will be populated. If they are set to true, + // you must read from them to prevent deadlock. + Return struct { + // If enabled, any errors that occurred while consuming are returned on + // the Errors channel (default disabled). + Errors bool `json:"errors" yaml:"errors"` + } `json:"return" yaml:"return"` + + // Offsets specifies configuration for how and when to commit consumed + // offsets. This currently requires the manual use of an OffsetManager + // but will eventually be automated. + Offsets struct { + // Deprecated: CommitInterval exists for historical compatibility + // and should not be used. Please use Consumer.Offsets.AutoCommit + CommitInterval time.Duration `json:"commit_interval" yaml:"commit_interval"` + + // AutoCommit specifies configuration for commit messages automatically. + AutoCommit struct { + // Whether or not to auto-commit updated offsets back to the broker. + // (default enabled). + Enable bool `json:"enable" yaml:"enable"` + + // How frequently to commit updated offsets. Ineffective unless + // auto-commit is enabled (default 1s) + Interval time.Duration `json:"interval" yaml:"interval"` + } + + // The initial offset to use if no offset was previously committed. + // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. + Initial int64 `json:"initial" yaml:"initial"` + + // The retention duration for committed offsets. If zero, disabled + // (in which case the `offsets.retention.minutes` option on the + // broker will be used). Kafka only supports precision up to + // milliseconds; nanoseconds will be truncated. Requires Kafka + // broker version 0.9.0 or later. + // (default is 0: disabled). + Retention time.Duration `json:"retention" yaml:"retention"` + + Retry struct { + // The total number of times to retry failing commit + // requests during OffsetManager shutdown (default 3). + Max int `json:"max" yaml:"max"` + } `json:"retry" yaml:"retry"` + } `json:"offsets" yaml:"offsets"` + + // IsolationLevel support 2 mode: + // - use `ReadUncommitted` (default) to consume and return all messages in message channel + // - use `ReadCommitted` to hide messages that are part of an aborted transaction + IsolationLevel sarama.IsolationLevel `json:"isolation_level" yaml:"isolation_level"` + + // Interceptors to be called just before the record is sent to the + // messages channel. Interceptors allows to intercept and possible + // mutate the message before they are returned to the client. + // *ConsumerMessage modified by the first interceptor's OnConsume() is + // passed to the second interceptor OnConsume(), and so on in the + // interceptor chain. + Interceptors []sarama.ConsumerInterceptor + } + + // A user-provided string sent with every request to the brokers for logging, + // debugging, and auditing purposes. Defaults to "sarama", but you should + // probably set it to something specific to your application. + ClientID string `json:"client_id" yaml:"client_id"` + // A rack identifier for this client. This can be any string value which + // indicates where this client is physically located. + // It corresponds with the broker config 'broker.rack' + RackID string `json:"rack_id" yaml:"rack_id"` + // The number of events to buffer in internal and external channels. This + // permits the producer and consumer to continue processing some messages + // in the background while user code is working, greatly improving throughput. + // Defaults to 256. + ChannelBufferSize int `json:"channel_buffer_size" yaml:"channel_buffer_size"` + // ApiVersionsRequest determines whether Sarama should send an + // ApiVersionsRequest message to each broker as part of its initial + // connection. This defaults to `true` to match the official Java client + // and most 3rdparty ones. + ApiVersionsRequest bool `json:"api_versions_request" yaml:"api_versions_request"` + // The version of Kafka that Sarama will assume it is running against. + // Defaults to the oldest supported stable version. Since Kafka provides + // backwards-compatibility, setting it to a version older than you have + // will not break anything, although it may prevent you from using the + // latest features. Setting it to a version greater than you are actually + // running may lead to random breakage. + Version sarama.KafkaVersion `json:"version" yaml:"version"` + // The registry to define metrics into. + // Defaults to a local registry. + // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true" + // prior to starting Sarama. + // See Examples on how to use the metrics registry + MetricRegistry metrics.Registry +}