-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathconfig.go
899 lines (819 loc) · 38.8 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
package sarama
import (
"crypto/tls"
"fmt"
"io"
"net"
"regexp"
"time"
"github.com/klauspost/compress/gzip"
"github.com/rcrowley/go-metrics"
"golang.org/x/net/proxy"
)
const defaultClientID = "sarama"
// validClientID specifies the permitted characters for a client.id when
// connecting to Kafka versions before 1.0.0 (KIP-190)
var validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
Retry struct {
// The total number of times to retry sending (retriable) admin requests (default 5).
// Similar to the `retries` setting of the JVM AdminClientConfig.
Max int
// Backoff time between retries of a failed request (default 100ms)
Backoff time.Duration
}
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
}
// Net is the namespace for network-level properties used by the Broker, and
// shared by the Client/Producer/Consumer.
Net struct {
// How many outstanding requests a connection is allowed to have before
// sending on it blocks (default 5).
// Throughput can improve but message ordering is not guaranteed if Producer.Idempotent is disabled, see:
// https://kafka.apache.org/protocol#protocol_network
// https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection
MaxOpenRequests int
// All three of the below configurations are similar to the
// `socket.timeout.ms` setting in JVM kafka. All of them default
// to 30 seconds.
DialTimeout time.Duration // How long to wait for the initial connection.
ReadTimeout time.Duration // How long to wait for a response.
WriteTimeout time.Duration // How long to wait for a transmit.
// ResolveCanonicalBootstrapServers turns each bootstrap broker address
// into a set of IPs, then does a reverse lookup on each one to get its
// canonical hostname. This list of hostnames then replaces the
// original address list. Similar to the `client.dns.lookup` option in
// the JVM client, this is especially useful with GSSAPI, where it
// allows providing an alias record instead of individual broker
// hostnames. Defaults to false.
ResolveCanonicalBootstrapServers bool
TLS struct {
// Whether or not to use TLS when connecting to the broker
// (defaults to false).
Enable bool
// The TLS configuration to use for secure connections if
// enabled (defaults to nil).
Config *tls.Config
}
// SASL based authentication with broker. While there are multiple SASL authentication methods
// the current implementation is limited to plaintext (SASL/PLAIN) authentication
SASL struct {
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Enable bool
// SASLMechanism is the name of the enabled SASL mechanism.
// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
Mechanism SASLMechanism
// Version is the SASL Protocol Version to use
// Kafka > 1.x should use V1, except on Azure EventHub which use V0
Version int16
// Whether or not to send the Kafka SASL handshake first if enabled
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
Handshake bool
// AuthIdentity is an (optional) authorization identity (authzid) to
// use for SASL/PLAIN authentication (if different from User) when
// an authenticated user is permitted to act as the presented
// alternative user. See RFC4616 for details.
AuthIdentity string
// User is the authentication identity (authcid) to present for
// SASL/PLAIN or SASL/SCRAM authentication
User string
// Password for SASL/PLAIN authentication
Password string
// authz id used for SASL/SCRAM authentication
SCRAMAuthzID string
// SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
// client used to perform the SCRAM exchange with the server.
SCRAMClientGeneratorFunc func() SCRAMClient
// TokenProvider is a user-defined callback for generating
// access tokens for SASL/OAUTHBEARER auth. See the
// AccessTokenProvider interface docs for proper implementation
// guidelines.
TokenProvider AccessTokenProvider
GSSAPI GSSAPIConfig
}
// KeepAlive specifies the keep-alive period for an active network connection (defaults to 0).
// If zero or positive, keep-alives are enabled.
// If negative, keep-alives are disabled.
KeepAlive time.Duration
// LocalAddr is the local address to use when dialing an
// address. The address must be of a compatible type for the
// network being dialed.
// If nil, a local address is automatically chosen.
LocalAddr net.Addr
Proxy struct {
// Whether or not to use proxy when connecting to the broker
// (defaults to false).
Enable bool
// The proxy dialer to use enabled (defaults to nil).
Dialer proxy.Dialer
}
}
// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata struct {
Retry struct {
// The total number of times to retry a metadata request when the
// cluster is in the middle of a leader election (default 3).
Max int
// How long to wait for leader election to occur before retrying
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}
// How frequently to refresh the cluster metadata in the background.
// Defaults to 10 minutes. Set to 0 to disable. Similar to
// `topic.metadata.refresh.interval.ms` in the JVM version.
RefreshFrequency time.Duration
// Whether to maintain a full set of metadata for all topics, or just
// the minimal set that has been necessary so far. The full set is simpler
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool
// How long to wait for a successful metadata response.
// Disabled by default which means a metadata request against an unreachable
// cluster (all brokers are unreachable or unresponsive) can take up to
// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
// to fail.
Timeout time.Duration
// Whether to allow auto-create topics in metadata refresh. If set to true,
// the broker may auto-create topics that we requested which do not already exist,
// if it is configured to do so (`auto.create.topics.enable` is true). Defaults to true.
AllowAutoTopicCreation bool
}
// Producer is the namespace for configuration related to producing messages,
// used by the Producer.
Producer struct {
// The maximum permitted size of a message (defaults to 1000000). Should be
// set equal to or smaller than the broker's `message.max.bytes`.
MaxMessageBytes int
// The level of acknowledgement reliability needed from the broker (defaults
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
// JVM producer.
RequiredAcks RequiredAcks
// The maximum duration the broker will wait the receipt of the number of
// RequiredAcks (defaults to 10 seconds). This is only relevant when
// RequiredAcks is set to WaitForAll or a number > 1. Only supports
// millisecond resolution, nanoseconds will be truncated. Equivalent to
// the JVM producer's `request.timeout.ms` setting.
Timeout time.Duration
// The type of compression to use on messages (defaults to no compression).
// Similar to `compression.codec` setting of the JVM producer.
Compression CompressionCodec
// The level of compression to use on messages. The meaning depends
// on the actual compression type used and defaults to default compression
// level for the codec.
CompressionLevel int
// Generates partitioners for choosing the partition to send messages to
// (defaults to hashing the message key). Similar to the `partitioner.class`
// setting for the JVM producer.
Partitioner PartitionerConstructor
// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool
// Transaction specify
Transaction struct {
// Used in transactions to identify an instance of a producer through restarts
ID string
// Amount of time a transaction can remain unresolved (neither committed nor aborted)
// default is 1 min
Timeout time.Duration
Retry struct {
// The total number of times to retry sending a message (default 50).
// Similar to the `message.send.max.retries` setting of the JVM producer.
Max int
// How long to wait for the cluster to settle between retries
// (default 10ms). Similar to the `retry.backoff.ms` setting of the
// JVM producer.
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}
}
// Return specifies what channels will be populated. If they are set to true,
// you must read from the respective channels to prevent deadlock. If,
// however, this config is used to create a `SyncProducer`, both must be set
// to true and you shall not read from the channels since the producer does
// this internally.
Return struct {
// If enabled, successfully delivered messages will be returned on the
// Successes channel (default disabled).
Successes bool
// If enabled, messages that failed to deliver will be returned on the
// Errors channel, including error (default enabled).
Errors bool
}
// The following config options control how often messages are batched up and
// sent to the broker. By default, messages are sent as fast as possible, and
// all messages received while the current batch is in-flight are placed
// into the subsequent batch.
Flush struct {
// The best-effort number of bytes needed to trigger a flush. Use the
// global sarama.MaxRequestSize to set a hard upper limit.
Bytes int
// The best-effort number of messages needed to trigger a flush. Use
// `MaxMessages` to set a hard upper limit.
Messages int
// The best-effort frequency of flushes. Equivalent to
// `queue.buffering.max.ms` setting of JVM producer.
Frequency time.Duration
// The maximum number of messages the producer will send in a single
// broker request. Defaults to 0 for unlimited. Similar to
// `queue.buffering.max.messages` in the JVM producer.
MaxMessages int
}
Retry struct {
// The total number of times to retry sending a message (default 3).
// Similar to the `message.send.max.retries` setting of the JVM producer.
Max int
// How long to wait for the cluster to settle between retries
// (default 100ms). Similar to the `retry.backoff.ms` setting of the
// JVM producer.
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
// The maximum length of the bridging buffer between `input` and `retries` channels
// in AsyncProducer#retryHandler.
// The limit is to prevent this buffer from overflowing or causing OOM.
// Defaults to 0 for unlimited.
// Any value between 0 and 4096 is pushed to 4096.
// A zero or negative value indicates unlimited.
MaxBufferLength int
}
// Interceptors to be called when the producer dispatcher reads the
// message for the first time. Interceptors allows to intercept and
// possible mutate the message before they are published to Kafka
// cluster. *ProducerMessage modified by the first interceptor's
// OnSend() is passed to the second interceptor OnSend(), and so on in
// the interceptor chain.
Interceptors []ProducerInterceptor
}
// Consumer is the namespace for configuration related to consuming messages,
// used by the Consumer.
Consumer struct {
// Group is the namespace for configuring consumer group.
Group struct {
Session struct {
// The timeout used to detect consumer failures when using Kafka's group management facility.
// The consumer sends periodic heartbeats to indicate its liveness to the broker.
// If no heartbeats are received by the broker before the expiration of this session timeout,
// then the broker will remove this consumer from the group and initiate a rebalance.
// Note that the value must be in the allowable range as configured in the broker configuration
// by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s)
Timeout time.Duration
}
Heartbeat struct {
// The expected time between heartbeats to the consumer coordinator when using Kafka's group
// management facilities. Heartbeats are used to ensure that the consumer's session stays active and
// to facilitate rebalancing when new consumers join or leave the group.
// The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no
// higher than 1/3 of that value.
// It can be adjusted even lower to control the expected time for normal rebalances (default 3s)
Interval time.Duration
}
Rebalance struct {
// Strategy for allocating topic partitions to members.
// Deprecated: Strategy exists for historical compatibility
// and should not be used. Please use GroupStrategies.
Strategy BalanceStrategy
// GroupStrategies is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
// default: [ NewBalanceStrategyRange() ]
GroupStrategies []BalanceStrategy
// The maximum allowed time for each worker to join the group once a rebalance has begun.
// This is basically a limit on the amount of time needed for all tasks to flush any pending
// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
// the group, which will cause offset commit failures (default 60s).
Timeout time.Duration
Retry struct {
// When a new consumer joins a consumer group the set of consumers attempt to "rebalance"
// the load to assign partitions to each consumer. If the set of consumers changes while
// this assignment is taking place the rebalance will fail and retry. This setting controls
// the maximum number of attempts before giving up (default 4).
Max int
// Backoff time between retries during rebalance (default 2s)
Backoff time.Duration
}
}
Member struct {
// Custom metadata to include when joining the group. The user data for all joined members
// can be retrieved by sending a DescribeGroupRequest to the broker that is the
// coordinator for the group.
UserData []byte
}
// support KIP-345
InstanceId string
// If true, consumer offsets will be automatically reset to configured Initial value
// if the fetched consumer offset is out of range of available offsets. Out of range
// can happen if the data has been deleted from the server, or during situations of
// under-replication where a replica does not have all the data yet. It can be
// dangerous to reset the offset automatically, particularly in the latter case. Defaults
// to true to maintain existing behavior.
ResetInvalidOffsets bool
}
Retry struct {
// How long to wait after a failing to read from a partition before
// trying again (default 2s).
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries int) time.Duration
}
// Fetch is the namespace for controlling how many bytes are retrieved by any
// given request.
Fetch struct {
// The minimum number of message bytes to fetch in a request - the broker
// will wait until at least this many are available. The default is 1,
// as 0 causes the consumer to spin when no messages are available.
// Equivalent to the JVM's `fetch.min.bytes`.
Min int32
// The default number of message bytes to fetch from the broker in each
// request (default 1MB). This should be larger than the majority of
// your messages, or else the consumer will spend a lot of time
// negotiating sizes and not actually consuming. Similar to the JVM's
// `fetch.message.max.bytes`.
Default int32
// The maximum number of message bytes to fetch from the broker in a
// single request. Messages larger than this will return
// ErrMessageTooLarge and will not be consumable, so you must be sure
// this is at least as large as your largest message. Defaults to 0
// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
// global `sarama.MaxResponseSize` still applies.
Max int32
}
// The maximum amount of time the broker will wait for Consumer.Fetch.Min
// bytes to become available before it returns fewer than that anyways. The
// default is 250ms, since 0 causes the consumer to spin when no events are
// available. 100-500ms is a reasonable range for most cases. Kafka only
// supports precision up to milliseconds; nanoseconds will be truncated.
// Equivalent to the JVM's `fetch.max.wait.ms`.
MaxWaitTime time.Duration
// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration
// Return specifies what channels will be populated. If they are set to true,
// you must read from them to prevent deadlock.
Return struct {
// If enabled, any errors that occurred while consuming are returned on
// the Errors channel (default disabled).
Errors bool
}
// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// Deprecated: CommitInterval exists for historical compatibility
// and should not be used. Please use Consumer.Offsets.AutoCommit
CommitInterval time.Duration
// AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}
// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64
// The retention duration for committed offsets. If zero, disabled
// (in which case the `offsets.retention.minutes` option on the
// broker will be used). Kafka only supports precision up to
// milliseconds; nanoseconds will be truncated. Requires Kafka
// broker version 0.9.0 or later.
// (default is 0: disabled).
Retention time.Duration
Retry struct {
// The total number of times to retry failing commit
// requests during OffsetManager shutdown (default 3).
Max int
}
}
// IsolationLevel support 2 mode:
// - use `ReadUncommitted` (default) to consume and return all messages in message channel
// - use `ReadCommitted` to hide messages that are part of an aborted transaction
IsolationLevel IsolationLevel
// Interceptors to be called just before the record is sent to the
// messages channel. Interceptors allows to intercept and possible
// mutate the message before they are returned to the client.
// *ConsumerMessage modified by the first interceptor's OnConsume() is
// passed to the second interceptor OnConsume(), and so on in the
// interceptor chain.
Interceptors []ConsumerInterceptor
}
// A user-provided string sent with every request to the brokers for logging,
// debugging, and auditing purposes. Defaults to "sarama", but you should
// probably set it to something specific to your application.
ClientID string
// A rack identifier for this client. This can be any string value which
// indicates where this client is physically located.
// It corresponds with the broker config 'broker.rack'
RackID string
// The number of events to buffer in internal and external channels. This
// permits the producer and consumer to continue processing some messages
// in the background while user code is working, greatly improving throughput.
// Defaults to 256.
ChannelBufferSize int
// ApiVersionsRequest determines whether Sarama should send an
// ApiVersionsRequest message to each broker as part of its initial
// connection. This defaults to `true` to match the official Java client
// and most 3rdparty ones.
ApiVersionsRequest bool
// The version of Kafka that Sarama will assume it is running against.
// Defaults to the oldest supported stable version. Since Kafka provides
// backwards-compatibility, setting it to a version older than you have
// will not break anything, although it may prevent you from using the
// latest features. Setting it to a version greater than you are actually
// running may lead to random breakage.
Version KafkaVersion
// The registry to define metrics into.
// Defaults to a local registry.
// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
// prior to starting Sarama.
// See Examples on how to use the metrics registry
MetricRegistry metrics.Registry
}
// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
c := &Config{}
c.Admin.Retry.Max = 5
c.Admin.Retry.Backoff = 100 * time.Millisecond
c.Admin.Timeout = 3 * time.Second
c.Net.MaxOpenRequests = 5
c.Net.DialTimeout = 30 * time.Second
c.Net.ReadTimeout = 30 * time.Second
c.Net.WriteTimeout = 30 * time.Second
c.Net.SASL.Handshake = true
c.Net.SASL.Version = SASLHandshakeV1
c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = 250 * time.Millisecond
c.Metadata.RefreshFrequency = 10 * time.Minute
c.Metadata.Full = true
c.Metadata.AllowAutoTopicCreation = true
c.Producer.MaxMessageBytes = 1024 * 1024
c.Producer.RequiredAcks = WaitForLocal
c.Producer.Timeout = 10 * time.Second
c.Producer.Partitioner = NewHashPartitioner
c.Producer.Retry.Max = 3
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.Return.Errors = true
c.Producer.CompressionLevel = CompressionLevelDefault
c.Producer.Transaction.Timeout = 1 * time.Minute
c.Producer.Transaction.Retry.Max = 50
c.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
c.Consumer.Fetch.Min = 1
c.Consumer.Fetch.Default = 1024 * 1024
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 500 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{NewBalanceStrategyRange()}
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
c.Consumer.Group.ResetInvalidOffsets = true
c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.ApiVersionsRequest = true
c.Version = DefaultVersion
c.MetricRegistry = metrics.NewRegistry()
return c
}
// Validate checks a Config instance. It will return a
// ConfigurationError if the specified values don't make sense.
//
//nolint:gocyclo // This function's cyclomatic complexity has go beyond 100
func (c *Config) Validate() error {
// some configuration values should be warned on but not fail completely, do those first
if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
}
if !c.Net.SASL.Enable {
if c.Net.SASL.User != "" {
Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
}
if c.Net.SASL.Password != "" {
Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
}
}
if c.Producer.RequiredAcks > 1 {
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
}
if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
Logger.Println("Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.")
}
if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
Logger.Println("Producer.Flush.Bytes must be smaller than MaxRequestSize; it will be ignored.")
}
if (c.Producer.Flush.Bytes > 0 || c.Producer.Flush.Messages > 0) && c.Producer.Flush.Frequency == 0 {
Logger.Println("Producer.Flush: Bytes or Messages are set, but Frequency is not; messages may not get flushed.")
}
if c.Producer.Timeout%time.Millisecond != 0 {
Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
}
if c.Consumer.MaxWaitTime < 100*time.Millisecond {
Logger.Println("Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
}
if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Group.Session.Timeout%time.Millisecond != 0 {
Logger.Println("Consumer.Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Group.Heartbeat.Interval%time.Millisecond != 0 {
Logger.Println("Consumer.Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Group.Rebalance.Timeout%time.Millisecond != 0 {
Logger.Println("Consumer.Group.Rebalance.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.ClientID == defaultClientID {
Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
}
// validate Net values
switch {
case c.Net.MaxOpenRequests <= 0:
return ConfigurationError("Net.MaxOpenRequests must be > 0")
case c.Net.DialTimeout <= 0:
return ConfigurationError("Net.DialTimeout must be > 0")
case c.Net.ReadTimeout <= 0:
return ConfigurationError("Net.ReadTimeout must be > 0")
case c.Net.WriteTimeout <= 0:
return ConfigurationError("Net.WriteTimeout must be > 0")
case c.Net.SASL.Enable:
if c.Net.SASL.Mechanism == "" {
c.Net.SASL.Mechanism = SASLTypePlaintext
}
switch c.Net.SASL.Mechanism {
case SASLTypePlaintext:
if c.Net.SASL.User == "" {
return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
}
if c.Net.SASL.Password == "" {
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
}
case SASLTypeOAuth:
if c.Net.SASL.TokenProvider == nil {
return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider")
}
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
if c.Net.SASL.User == "" {
return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
}
if c.Net.SASL.Password == "" {
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
}
if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
}
case SASLTypeGSSAPI:
if c.Net.SASL.GSSAPI.ServiceName == "" {
return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
}
switch c.Net.SASL.GSSAPI.AuthType {
case KRB5_USER_AUTH:
if c.Net.SASL.GSSAPI.Password == "" {
return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
}
case KRB5_KEYTAB_AUTH:
if c.Net.SASL.GSSAPI.KeyTabPath == "" {
return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
" and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
}
case KRB5_CCACHE_AUTH:
if c.Net.SASL.GSSAPI.CCachePath == "" {
return ConfigurationError("Net.SASL.GSSAPI.CCachePath must not be empty when GSS-API mechanism is used" +
" and Net.SASL.GSSAPI.AuthType = KRB5_CCACHE_AUTH")
}
default:
return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH, KRB5_KEYTAB_AUTH, and KRB5_CCACHE_AUTH")
}
if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
}
if c.Net.SASL.GSSAPI.Username == "" {
return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
}
if c.Net.SASL.GSSAPI.Realm == "" {
return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
}
default:
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
return ConfigurationError(msg)
}
}
// validate the Admin values
switch {
case c.Admin.Timeout <= 0:
return ConfigurationError("Admin.Timeout must be > 0")
}
// validate the Metadata values
switch {
case c.Metadata.Retry.Max < 0:
return ConfigurationError("Metadata.Retry.Max must be >= 0")
case c.Metadata.Retry.Backoff < 0:
return ConfigurationError("Metadata.Retry.Backoff must be >= 0")
case c.Metadata.RefreshFrequency < 0:
return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
}
// validate the Producer values
switch {
case c.Producer.MaxMessageBytes <= 0:
return ConfigurationError("Producer.MaxMessageBytes must be > 0")
case c.Producer.RequiredAcks < -1:
return ConfigurationError("Producer.RequiredAcks must be >= -1")
case c.Producer.Timeout <= 0:
return ConfigurationError("Producer.Timeout must be > 0")
case c.Producer.Partitioner == nil:
return ConfigurationError("Producer.Partitioner must not be nil")
case c.Producer.Flush.Bytes < 0:
return ConfigurationError("Producer.Flush.Bytes must be >= 0")
case c.Producer.Flush.Messages < 0:
return ConfigurationError("Producer.Flush.Messages must be >= 0")
case c.Producer.Flush.Frequency < 0:
return ConfigurationError("Producer.Flush.Frequency must be >= 0")
case c.Producer.Flush.MaxMessages < 0:
return ConfigurationError("Producer.Flush.MaxMessages must be >= 0")
case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set")
case c.Producer.Retry.Max < 0:
return ConfigurationError("Producer.Retry.Max must be >= 0")
case c.Producer.Retry.Backoff < 0:
return ConfigurationError("Producer.Retry.Backoff must be >= 0")
}
if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) {
return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
}
if c.Producer.Compression == CompressionGZIP {
if c.Producer.CompressionLevel != CompressionLevelDefault {
if _, err := gzip.NewWriterLevel(io.Discard, c.Producer.CompressionLevel); err != nil {
return ConfigurationError(fmt.Sprintf("gzip compression does not work with level %d: %v", c.Producer.CompressionLevel, err))
}
}
}
if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
}
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
}
if c.Producer.Retry.Max == 0 {
return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
}
if c.Producer.RequiredAcks != WaitForAll {
return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
}
if c.Net.MaxOpenRequests > 1 {
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
}
}
if c.Producer.Transaction.ID != "" && !c.Producer.Idempotent {
return ConfigurationError("Transactional producer requires Idempotent to be true")
}
// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
return ConfigurationError("Consumer.Fetch.Min must be > 0")
case c.Consumer.Fetch.Default <= 0:
return ConfigurationError("Consumer.Fetch.Default must be > 0")
case c.Consumer.Fetch.Max < 0:
return ConfigurationError("Consumer.Fetch.Max must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
case c.Consumer.MaxProcessingTime <= 0:
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
case c.Consumer.Offsets.Retry.Max < 0:
return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
case c.Consumer.IsolationLevel != ReadUncommitted && c.Consumer.IsolationLevel != ReadCommitted:
return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
}
if c.Consumer.Offsets.CommitInterval != 0 {
Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
" and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
}
if c.Consumer.Group.Rebalance.Strategy != nil {
Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
" and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
}
// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
}
// validate the Consumer Group values
switch {
case c.Consumer.Group.Session.Timeout <= 2*time.Millisecond:
return ConfigurationError("Consumer.Group.Session.Timeout must be >= 2ms")
case c.Consumer.Group.Heartbeat.Interval < 1*time.Millisecond:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
case c.Consumer.Group.Rebalance.Retry.Max < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}
for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
if strategy == nil {
return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
}
}
if c.Consumer.Group.InstanceId != "" {
if !c.Version.IsAtLeast(V2_3_0_0) {
return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
}
if err := validateGroupInstanceId(c.Consumer.Group.InstanceId); err != nil {
return err
}
}
// validate misc shared values
switch {
case c.ChannelBufferSize < 0:
return ConfigurationError("ChannelBufferSize must be >= 0")
}
// only validate clientID locally for Kafka versions before KIP-190 was implemented
if !c.Version.IsAtLeast(V1_0_0_0) && !validClientID.MatchString(c.ClientID) {
return ConfigurationError(fmt.Sprintf("ClientID value %q is not valid for Kafka versions before 1.0.0", c.ClientID))
}
return nil
}
func (c *Config) getDialer() proxy.Dialer {
if c.Net.Proxy.Enable {
Logger.Println("using proxy")
return c.Net.Proxy.Dialer
} else {
return &net.Dialer{
Timeout: c.Net.DialTimeout,
KeepAlive: c.Net.KeepAlive,
LocalAddr: c.Net.LocalAddr,
}
}
}
const MAX_GROUP_INSTANCE_ID_LENGTH = 249
var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`)
func validateGroupInstanceId(id string) error {
if id == "" {
return ConfigurationError("Group instance id must be non-empty string")
}
if id == "." || id == ".." {
return ConfigurationError(`Group instance id cannot be "." or ".."`)
}
if len(id) > MAX_GROUP_INSTANCE_ID_LENGTH {
return ConfigurationError(fmt.Sprintf(`Group instance id cannot be longer than %v, characters: %s`, MAX_GROUP_INSTANCE_ID_LENGTH, id))
}
if !GROUP_INSTANCE_ID_REGEXP.MatchString(id) {
return ConfigurationError(fmt.Sprintf(`Group instance id %s is illegal, it contains a character other than, '.', '_' and '-'`, id))
}
return nil
}