diff --git a/go.mod b/go.mod index 7b4a7dc46a1c..949c1811f991 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/DataDog/zstd v1.4.4 // indirect github.com/MichaelTJones/walk v0.0.0-20161122175330-4748e29d5718 - github.com/PuerkitoBio/goquery v1.5.0 + github.com/PuerkitoBio/goquery v1.5.1 github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect @@ -144,9 +144,10 @@ require ( github.com/stretchr/testify v1.6.1 github.com/twpayne/go-geom v1.3.6 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 go.etcd.io/etcd v0.0.0-00010101000000-000000000000 - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 + golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 golang.org/x/lint v0.0.0-20200130185559-910be7a94367 golang.org/x/net v0.0.0-20200602114024-627f9648deb9 diff --git a/go.sum b/go.sum index 8e5e068e0fd0..872868a24502 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+q github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP7EJk= -github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg= +github.com/PuerkitoBio/goquery v1.5.1 h1:PSPBGne8NIUWw+/7vFBV+kG2J/5MOjbzc7154OaKCSE= +github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f h1:SgZvxOvp9NLnAjkIiby0LQgXH0yQNTk2eDzbYPVoTA4= github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= @@ -90,8 +90,7 @@ github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5 h1:P5U+E4x5OkVEK github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5/go.mod h1:976q2ETgjT2snVCf2ZaBnyBbVoPERGjUz+0sofzEfro= github.com/andy-kimball/arenaskl v0.0.0-20200617143215-f701008588b9 h1:vCvyXiLsgAs7qgclk56iBTJQ+gdfiVuzfe5T6sVBL+w= github.com/andy-kimball/arenaskl v0.0.0-20200617143215-f701008588b9/go.mod h1:V2fyPx0Gm2VBNpGPq4z0bjNRaBPR+kC3aSqIuiWCdg4= -github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o= -github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= +github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.2.0 h1:vuRCkM5Ozh/BfmsaTm26kbjm0mIOM3yS5Ek/F5h18aE= github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= @@ -736,7 +735,9 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= @@ -775,6 +776,8 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+v golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 078f506dc1d5..8b19f24ba1a9 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1989,6 +1989,14 @@ func TestChangefeedErrors(t *testing.T) { t, `param tls_enabled must be a bool`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_enabled=foo`, ) + sqlDB.ExpectErr( + t, `param tls_skip_verify must be a bool`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_skip_verify=foo`, + ) + sqlDB.ExpectErr( + t, `tls_skip_verify requires tls_enabled=true`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_skip_verify=true`, + ) sqlDB.ExpectErr( t, `param ca_cert must be base 64 encoded`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?ca_cert=!`, @@ -2051,6 +2059,10 @@ func TestChangefeedErrors(t *testing.T) { t, `sasl_enabled must be enabled if a SASL password is provided`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_password=a`, ) + sqlDB.ExpectErr( + t, `sasl_enabled must be enabled to configure SASL mechanism`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=false`, + ) // The avro format doesn't support key_in_value yet. sqlDB.ExpectErr( diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index bcf5082cc9ee..dd17f9d6cc04 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -82,6 +82,7 @@ const ( SinkParamFileSize = `file_size` SinkParamSchemaTopic = `schema_topic` SinkParamTLSEnabled = `tls_enabled` + SinkParamTLSSkipVerify = `tls_skip_verify` SinkParamTopicPrefix = `topic_prefix` SinkSchemeBuffer = `` SinkSchemeExperimentalSQL = `experimental-sql` @@ -90,6 +91,7 @@ const ( SinkParamSASLHandshake = `sasl_handshake` SinkParamSASLUser = `sasl_user` SinkParamSASLPassword = `sasl_password` + SinkParamSASLMechanism = `sasl_mechanism` ) // ChangefeedOptionExpectValues is used to parse changefeed options using diff --git a/pkg/ccl/changefeedccl/scram_client.go b/pkg/ccl/changefeedccl/scram_client.go new file mode 100644 index 000000000000..601e37ca64a2 --- /dev/null +++ b/pkg/ccl/changefeedccl/scram_client.go @@ -0,0 +1,44 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/xdg/scram" +) + +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type ScramClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (c *ScramClient) Begin(userName, password, authzID string) (err error) { + c.Client, err = c.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + c.ClientConversation = c.Client.NewConversation() + return nil +} + +func (c *ScramClient) Step(challenge string) (response string, err error) { + response, err = c.ClientConversation.Step(challenge) + return +} + +func (c *ScramClient) Done() bool { + return c.ClientConversation.Done() +} diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index c36fbbce9312..f00e0bb39c57 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -102,6 +102,13 @@ func getSink( } } q.Del(changefeedbase.SinkParamTLSEnabled) + if tlsBool := q.Get(changefeedbase.SinkParamTLSSkipVerify); tlsBool != `` { + var err error + if cfg.tlsSkipVerify, err = strconv.ParseBool(tlsBool); err != nil { + return nil, errors.Errorf(`param %s must be a bool: %s`, changefeedbase.SinkParamTLSSkipVerify, err) + } + } + q.Del(changefeedbase.SinkParamTLSSkipVerify) if caCertHex := q.Get(changefeedbase.SinkParamCACert); caCertHex != `` { // TODO(dan): There's a straightforward and unambiguous transformation // between the base 64 encoding defined in RFC 4648 and the URL variant @@ -149,6 +156,11 @@ func getSink( } cfg.saslHandshake = b } + cfg.saslMechanism = q.Get(changefeedbase.SinkParamSASLMechanism) + q.Del(changefeedbase.SinkParamSASLMechanism) + if cfg.saslMechanism != "" && !cfg.saslEnabled { + return nil, errors.Errorf(`%s must be enabled to configure SASL mechanism`, changefeedbase.SinkParamSASLEnabled) + } cfg.saslUser = q.Get(changefeedbase.SinkParamSASLUser) q.Del(changefeedbase.SinkParamSASLUser) cfg.saslPassword = q.Get(changefeedbase.SinkParamSASLPassword) @@ -289,6 +301,7 @@ func init() { type kafkaSinkConfig struct { kafkaTopicPrefix string tlsEnabled bool + tlsSkipVerify bool caCert []byte clientCert []byte clientKey []byte @@ -296,6 +309,7 @@ type kafkaSinkConfig struct { saslHandshake bool saslUser string saslPassword string + saslMechanism string } // kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all @@ -335,15 +349,26 @@ func makeKafkaSink( config.Producer.Return.Successes = true config.Producer.Partitioner = newChangefeedPartitioner + if cfg.tlsSkipVerify { + if !cfg.tlsEnabled { + return nil, errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamTLSSkipVerify, changefeedbase.SinkParamTLSEnabled) + } + if config.Net.TLS.Config == nil { + config.Net.TLS.Config = &tls.Config{} + } + config.Net.TLS.Config.InsecureSkipVerify = cfg.tlsSkipVerify + } + if cfg.caCert != nil { if !cfg.tlsEnabled { return nil, errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamCACert, changefeedbase.SinkParamTLSEnabled) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(cfg.caCert) - config.Net.TLS.Config = &tls.Config{ - RootCAs: caCertPool, + if config.Net.TLS.Config == nil { + config.Net.TLS.Config = &tls.Config{} } + config.Net.TLS.Config.RootCAs = caCertPool config.Net.TLS.Enable = true } else if cfg.tlsEnabled { config.Net.TLS.Enable = true @@ -373,6 +398,12 @@ func makeKafkaSink( config.Net.SASL.Handshake = cfg.saslHandshake config.Net.SASL.User = cfg.saslUser config.Net.SASL.Password = cfg.saslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(cfg.saslMechanism) + if config.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &ScramClient{HashGeneratorFcn: SHA512} } + } else if config.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &ScramClient{HashGeneratorFcn: SHA256} } + } } // When we emit messages to sarama, they're placed in a queue (as does any diff --git a/vendor b/vendor index 5d84f81dcedd..b68509606557 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 5d84f81dcedd801eeb8251bb0fea7562a27c8424 +Subproject commit b685096065576ed2ed3be0b9cd57ae3f12fe5217