diff --git a/README.md b/README.md index d1909a6..e39b758 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ pipeline_config = { "http://my.kafka.broker:9093" ], "protocol": "PLAINTEXT", - "skip_auth": True + "mechanism": "NO_AUTH" }, "topics": [ { diff --git a/VERSION b/VERSION index d5c0c99..87ce492 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.5.1 +3.5.2 diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index d7d32a4..d991d83 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -1,3 +1,4 @@ +import warnings from typing import Any, List, Optional from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator @@ -18,6 +19,7 @@ class KafkaMechanism(CaseInsensitiveStrEnum): SCRAM_SHA_512 = "SCRAM-SHA-512" PLAIN = "PLAIN" GSSAPI = "GSSAPI" + NO_AUTH = "NO_AUTH" class SchemaField(BaseModel): @@ -163,7 +165,13 @@ class KafkaConnectionParams(BaseModel): kerberos_keytab: Optional[str] = Field(default=None) kerberos_realm: Optional[str] = Field(default=None) kerberos_config: Optional[str] = Field(default=None) - skip_auth: bool = Field(default=False) + skip_auth: bool = Field( + default=False, + exclude=True, + description='skip_auth is deprecated, use mechanism "NO_AUTH" instead or \ + set skip_tls_verification to True if you want to skip TLS verification', + ) + skip_tls_verification: bool = Field(default=False) @model_validator(mode="before") def empty_str_to_none(values): @@ -178,6 +186,23 @@ def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams" merged_dict = {**current_dict, **patch_dict} return KafkaConnectionParams.model_validate(merged_dict) + @model_validator(mode="after") + def correct_skip_auth(self) -> "KafkaConnectionParams": + """ + While the skip_auth field is deprecated, + we need to make sure the mechanism is NO_AUTH if skip_auth is True + """ + if self.skip_auth: + if self.mechanism is not None and self.mechanism != KafkaMechanism.NO_AUTH: + warnings.warn( + f"Mechanism is set to {self.mechanism}, but skip_auth is True. \ + Setting mechanism to NO_AUTH.", + category=RuntimeWarning, + stacklevel=1, + ) + self.mechanism = KafkaMechanism.NO_AUTH + return self + class SourceType(CaseInsensitiveStrEnum): KAFKA = "kafka"