Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/DataDog/zstd v1.4.4 // indirect
github.com/MichaelTJones/walk v0.0.0-20161122175330-4748e29d5718
github.com/PuerkitoBio/goquery v1.5.0
github.com/PuerkitoBio/goquery v1.5.1
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
Expand Down Expand Up @@ -144,9 +144,10 @@ require (
github.com/stretchr/testify v1.6.1
github.com/twpayne/go-geom v1.3.6
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292
go.etcd.io/etcd v0.0.0-00010101000000-000000000000
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897
golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925
golang.org/x/lint v0.0.0-20200130185559-910be7a94367
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
Expand Down
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+q
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP7EJk=
github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg=
github.com/PuerkitoBio/goquery v1.5.1 h1:PSPBGne8NIUWw+/7vFBV+kG2J/5MOjbzc7154OaKCSE=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f h1:SgZvxOvp9NLnAjkIiby0LQgXH0yQNTk2eDzbYPVoTA4=
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
Expand All @@ -90,8 +90,7 @@ github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5 h1:P5U+E4x5OkVEK
github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5/go.mod h1:976q2ETgjT2snVCf2ZaBnyBbVoPERGjUz+0sofzEfro=
github.com/andy-kimball/arenaskl v0.0.0-20200617143215-f701008588b9 h1:vCvyXiLsgAs7qgclk56iBTJQ+gdfiVuzfe5T6sVBL+w=
github.com/andy-kimball/arenaskl v0.0.0-20200617143215-f701008588b9/go.mod h1:V2fyPx0Gm2VBNpGPq4z0bjNRaBPR+kC3aSqIuiWCdg4=
github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o=
github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/andybalholm/cascadia v1.2.0 h1:vuRCkM5Ozh/BfmsaTm26kbjm0mIOM3yS5Ek/F5h18aE=
github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxBp0T0eFw1RUQY=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
Expand Down Expand Up @@ -736,7 +735,9 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
Expand Down Expand Up @@ -775,6 +776,8 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+v
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc=
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,14 @@ func TestChangefeedErrors(t *testing.T) {
t, `param tls_enabled must be a bool`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_enabled=foo`,
)
sqlDB.ExpectErr(
t, `param tls_skip_verify must be a bool`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_skip_verify=foo`,
)
sqlDB.ExpectErr(
t, `tls_skip_verify requires tls_enabled=true`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?tls_skip_verify=true`,
)
sqlDB.ExpectErr(
t, `param ca_cert must be base 64 encoded`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?ca_cert=!`,
Expand Down Expand Up @@ -2051,6 +2059,10 @@ func TestChangefeedErrors(t *testing.T) {
t, `sasl_enabled must be enabled if a SASL password is provided`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_password=a`,
)
sqlDB.ExpectErr(
t, `sasl_enabled must be enabled to configure SASL mechanism`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=false`,
)

// The avro format doesn't support key_in_value yet.
sqlDB.ExpectErr(
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
SinkParamFileSize = `file_size`
SinkParamSchemaTopic = `schema_topic`
SinkParamTLSEnabled = `tls_enabled`
SinkParamTLSSkipVerify = `tls_skip_verify`
SinkParamTopicPrefix = `topic_prefix`
SinkSchemeBuffer = ``
SinkSchemeExperimentalSQL = `experimental-sql`
Expand All @@ -90,6 +91,7 @@ const (
SinkParamSASLHandshake = `sasl_handshake`
SinkParamSASLUser = `sasl_user`
SinkParamSASLPassword = `sasl_password`
SinkParamSASLMechanism = `sasl_mechanism`
)

// ChangefeedOptionExpectValues is used to parse changefeed options using
Expand Down
44 changes: 44 additions & 0 deletions pkg/ccl/changefeedccl/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type ScramClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (c *ScramClient) Begin(userName, password, authzID string) (err error) {
c.Client, err = c.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
c.ClientConversation = c.Client.NewConversation()
return nil
}

func (c *ScramClient) Step(challenge string) (response string, err error) {
response, err = c.ClientConversation.Step(challenge)
return
}

func (c *ScramClient) Done() bool {
return c.ClientConversation.Done()
}
35 changes: 33 additions & 2 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func getSink(
}
}
q.Del(changefeedbase.SinkParamTLSEnabled)
if tlsBool := q.Get(changefeedbase.SinkParamTLSSkipVerify); tlsBool != `` {
var err error
if cfg.tlsSkipVerify, err = strconv.ParseBool(tlsBool); err != nil {
return nil, errors.Errorf(`param %s must be a bool: %s`, changefeedbase.SinkParamTLSSkipVerify, err)
}
}
q.Del(changefeedbase.SinkParamTLSSkipVerify)
if caCertHex := q.Get(changefeedbase.SinkParamCACert); caCertHex != `` {
// TODO(dan): There's a straightforward and unambiguous transformation
// between the base 64 encoding defined in RFC 4648 and the URL variant
Expand Down Expand Up @@ -149,6 +156,11 @@ func getSink(
}
cfg.saslHandshake = b
}
cfg.saslMechanism = q.Get(changefeedbase.SinkParamSASLMechanism)
q.Del(changefeedbase.SinkParamSASLMechanism)
if cfg.saslMechanism != "" && !cfg.saslEnabled {
return nil, errors.Errorf(`%s must be enabled to configure SASL mechanism`, changefeedbase.SinkParamSASLEnabled)
}
cfg.saslUser = q.Get(changefeedbase.SinkParamSASLUser)
q.Del(changefeedbase.SinkParamSASLUser)
cfg.saslPassword = q.Get(changefeedbase.SinkParamSASLPassword)
Expand Down Expand Up @@ -289,13 +301,15 @@ func init() {
type kafkaSinkConfig struct {
kafkaTopicPrefix string
tlsEnabled bool
tlsSkipVerify bool
caCert []byte
clientCert []byte
clientKey []byte
saslEnabled bool
saslHandshake bool
saslUser string
saslPassword string
saslMechanism string
}

// kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all
Expand Down Expand Up @@ -335,15 +349,26 @@ func makeKafkaSink(
config.Producer.Return.Successes = true
config.Producer.Partitioner = newChangefeedPartitioner

if cfg.tlsSkipVerify {
if !cfg.tlsEnabled {
return nil, errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamTLSSkipVerify, changefeedbase.SinkParamTLSEnabled)
}
if config.Net.TLS.Config == nil {
config.Net.TLS.Config = &tls.Config{}
}
config.Net.TLS.Config.InsecureSkipVerify = cfg.tlsSkipVerify
}

if cfg.caCert != nil {
if !cfg.tlsEnabled {
return nil, errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamCACert, changefeedbase.SinkParamTLSEnabled)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(cfg.caCert)
config.Net.TLS.Config = &tls.Config{
RootCAs: caCertPool,
if config.Net.TLS.Config == nil {
config.Net.TLS.Config = &tls.Config{}
}
config.Net.TLS.Config.RootCAs = caCertPool
config.Net.TLS.Enable = true
} else if cfg.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -373,6 +398,12 @@ func makeKafkaSink(
config.Net.SASL.Handshake = cfg.saslHandshake
config.Net.SASL.User = cfg.saslUser
config.Net.SASL.Password = cfg.saslPassword
config.Net.SASL.Mechanism = sarama.SASLMechanism(cfg.saslMechanism)
if config.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &ScramClient{HashGeneratorFcn: SHA512} }
} else if config.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &ScramClient{HashGeneratorFcn: SHA256} }
}
}

// When we emit messages to sarama, they're placed in a queue (as does any
Expand Down
2 changes: 1 addition & 1 deletion vendor