From 26f027efba57d606c9f257504bdd500015220045 Mon Sep 17 00:00:00 2001 From: "alex-berger@gmx.ch" Date: Tue, 20 Oct 2020 21:18:41 +0200 Subject: [PATCH] ccl: add support for sasl mechanism Add support to define the SASL mechanism for Kafka Changefeed DSNs as well as disabling TLS server certificate verification, which is important for debugging connectivity issue to Kafka. This commit is a result of support desk request 6569. Release note (ccl change): Add parameter to Kafka Changefeed DSNs to specify SASL mechanism. Release note (ccl change): Add parameter to Kafka Changefeed DSNs to disable TLS server certificate and hostname verification. --- go.mod | 5 ++- go.sum | 11 +++-- pkg/ccl/changefeedccl/changefeed_test.go | 12 +++++ .../changefeedccl/changefeedbase/options.go | 2 + pkg/ccl/changefeedccl/scram_client.go | 44 +++++++++++++++++++ pkg/ccl/changefeedccl/sink.go | 35 ++++++++++++++- vendor | 2 +- 7 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 pkg/ccl/changefeedccl/scram_client.go diff --git a/go.mod b/go.mod index 7b4a7dc46a1c..949c1811f991 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/DataDog/zstd v1.4.4 // indirect github.com/MichaelTJones/walk v0.0.0-20161122175330-4748e29d5718 - github.com/PuerkitoBio/goquery v1.5.0 + github.com/PuerkitoBio/goquery v1.5.1 github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect @@ -144,9 +144,10 @@ require ( github.com/stretchr/testify v1.6.1 github.com/twpayne/go-geom v1.3.6 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 go.etcd.io/etcd v0.0.0-00010101000000-000000000000 - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 + golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 golang.org/x/lint v0.0.0-20200130185559-910be7a94367 golang.org/x/net v0.0.0-20200602114024-627f9648deb9 diff --git a/go.sum b/go.sum index 8e5e068e0fd0..872868a24502 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+q github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP7EJk= -github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg= +github.com/PuerkitoBio/goquery v1.5.1 h1:PSPBGne8NIUWw+/7vFBV+kG2J/5MOjbzc7154OaKCSE= +github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f h1:SgZvxOvp9NLnAjkIiby0LQgXH0yQNTk2eDzbYPVoTA4= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= @@ -90,8 +90,7 @@ github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5 h1:P5U+E4x5OkVEK github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5/go.mod h1:976q2ETgjT2snVCf2ZaBnyBbVoPERGjUz+0sofzEfro= github.com/andy-kimball/arenaskl v0.0.0-20200617143215-f701008588b9 h1:vCvyXiLsgAs7qgclk56iBTJQ+gdfiVuzfe5T6sVBL+w= github.com/andy-kimball/arenaskl v0.0.0-20200617143215-f701008588b9/go.mod h1:V2fyPx0Gm2VBNpGPq4z0bjNRaBPR+kC3aSqIuiWCdg4= -github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o= -github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= +github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.2.0 h1:vuRCkM5Ozh/BfmsaTm26kbjm0mIOM3yS5Ek/F5h18aE= github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= @@ -736,7 +735,9 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= @@ -775,6 +776,8 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+v golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 078f506dc1d5..8b19f24ba1a9 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1989,6 +1989,14 @@ func TestChangefeedErrors(t *testing.T) { t, `param tls_enabled must be a bool`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_enabled=foo`, ) + sqlDB.ExpectErr( + t, `param tls_skip_verify must be a bool`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_skip_verify=foo`, + ) + sqlDB.ExpectErr( + t, `tls_skip_verify requires tls_enabled=true`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_skip_verify=true`, + ) sqlDB.ExpectErr( t, `param ca_cert must be base 64 encoded`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?ca_cert=!`, @@ -2051,6 +2059,10 @@ func TestChangefeedErrors(t *testing.T) { t, `sasl_enabled must be enabled if a SASL password is provided`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_password=a`, ) + sqlDB.ExpectErr( + t, `sasl_enabled must be enabled to configure SASL mechanism`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=false`, + ) // The avro format doesn't support key_in_value yet. sqlDB.ExpectErr( diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index bcf5082cc9ee..dd17f9d6cc04 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -82,6 +82,7 @@ const ( SinkParamFileSize = `file_size` SinkParamSchemaTopic = `schema_topic` SinkParamTLSEnabled = `tls_enabled` + SinkParamTLSSkipVerify = `tls_skip_verify` SinkParamTopicPrefix = `topic_prefix` SinkSchemeBuffer = `` SinkSchemeExperimentalSQL = `experimental-sql` @@ -90,6 +91,7 @@ const ( SinkParamSASLHandshake = `sasl_handshake` SinkParamSASLUser = `sasl_user` SinkParamSASLPassword = `sasl_password` + SinkParamSASLMechanism = `sasl_mechanism` ) // ChangefeedOptionExpectValues is used to parse changefeed options using diff --git a/pkg/ccl/changefeedccl/scram_client.go b/pkg/ccl/changefeedccl/scram_client.go new file mode 100644 index 000000000000..601e37ca64a2 --- /dev/null +++ b/pkg/ccl/changefeedccl/scram_client.go @@ -0,0 +1,44 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/xdg/scram" +) + +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type ScramClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (c *ScramClient) Begin(userName, password, authzID string) (err error) { + c.Client, err = c.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + c.ClientConversation = c.Client.NewConversation() + return nil +} + +func (c *ScramClient) Step(challenge string) (response string, err error) { + response, err = c.ClientConversation.Step(challenge) + return +} + +func (c *ScramClient) Done() bool { + return c.ClientConversation.Done() +} diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index c36fbbce9312..f00e0bb39c57 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -102,6 +102,13 @@ func getSink( } } q.Del(changefeedbase.SinkParamTLSEnabled) + if tlsBool := q.Get(changefeedbase.SinkParamTLSSkipVerify); tlsBool != `` { + var err error + if cfg.tlsSkipVerify, err = strconv.ParseBool(tlsBool); err != nil { + return nil, errors.Errorf(`param %s must be a bool: %s`, changefeedbase.SinkParamTLSSkipVerify, err) + } + } + q.Del(changefeedbase.SinkParamTLSSkipVerify) if caCertHex := q.Get(changefeedbase.SinkParamCACert); caCertHex != `` { // TODO(dan): There's a straightforward and unambiguous transformation // between the base 64 encoding defined in RFC 4648 and the URL variant @@ -149,6 +156,11 @@ func getSink( } cfg.saslHandshake = b } + cfg.saslMechanism = q.Get(changefeedbase.SinkParamSASLMechanism) + q.Del(changefeedbase.SinkParamSASLMechanism) + if cfg.saslMechanism != "" && !cfg.saslEnabled { + return nil, errors.Errorf(`%s must be enabled to configure SASL mechanism`, changefeedbase.SinkParamSASLEnabled) + } cfg.saslUser = q.Get(changefeedbase.SinkParamSASLUser) q.Del(changefeedbase.SinkParamSASLUser) cfg.saslPassword = q.Get(changefeedbase.SinkParamSASLPassword) @@ -289,6 +301,7 @@ func init() { type kafkaSinkConfig struct { kafkaTopicPrefix string tlsEnabled bool + tlsSkipVerify bool caCert []byte clientCert []byte clientKey []byte @@ -296,6 +309,7 @@ type kafkaSinkConfig struct { saslHandshake bool saslUser string saslPassword string + saslMechanism string } // kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all @@ -335,15 +349,26 @@ func makeKafkaSink( config.Producer.Return.Successes = true config.Producer.Partitioner = newChangefeedPartitioner + if cfg.tlsSkipVerify { + if !cfg.tlsEnabled { + return nil, errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamTLSSkipVerify, changefeedbase.SinkParamTLSEnabled) + } + if config.Net.TLS.Config == nil { + config.Net.TLS.Config = &tls.Config{} + } + config.Net.TLS.Config.InsecureSkipVerify = cfg.tlsSkipVerify + } + if cfg.caCert != nil { if !cfg.tlsEnabled { return nil, errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamCACert, changefeedbase.SinkParamTLSEnabled) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(cfg.caCert) - config.Net.TLS.Config = &tls.Config{ - RootCAs: caCertPool, + if config.Net.TLS.Config == nil { + config.Net.TLS.Config = &tls.Config{} } + config.Net.TLS.Config.RootCAs = caCertPool config.Net.TLS.Enable = true } else if cfg.tlsEnabled { config.Net.TLS.Enable = true @@ -373,6 +398,12 @@ func makeKafkaSink( config.Net.SASL.Handshake = cfg.saslHandshake config.Net.SASL.User = cfg.saslUser config.Net.SASL.Password = cfg.saslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(cfg.saslMechanism) + if config.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &ScramClient{HashGeneratorFcn: SHA512} } + } else if config.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &ScramClient{HashGeneratorFcn: SHA256} } + } } // When we emit messages to sarama, they're placed in a queue (as does any diff --git a/vendor b/vendor index 5d84f81dcedd..b68509606557 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 5d84f81dcedd801eeb8251bb0fea7562a27c8424 +Subproject commit b685096065576ed2ed3be0b9cd57ae3f12fe5217