Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions butterfree/configs/db/cassandra_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
stream_checkpoint_path: str = None,
read_consistency_level: str = None,
write_consistency_level: str = None,
local_dc: str = None,
):
self.username = username
self.password = password
Expand All @@ -55,6 +56,7 @@ def __init__(
self.stream_checkpoint_path = stream_checkpoint_path
self.read_consistency_level = read_consistency_level
self.write_consistency_level = write_consistency_level
self.local_dc = local_dc

@property
def database(self) -> str:
Expand Down Expand Up @@ -178,6 +180,15 @@ def write_consistency_level(self, value: str) -> None:
"CASSANDRA_WRITE_CONSISTENCY_LEVEL", "LOCAL_QUORUM"
)

@property
def local_dc(self) -> Optional[str]:
"""Local DC for Cassandra connection."""
return self.__local_dc

@local_dc.setter
def local_dc(self, value: str) -> None:
self.__local_dc = value or environment.get_variable("CASSANDRA_LOCAL_DC")

def get_options(self, table: str) -> Dict[Optional[str], Optional[str]]:
"""Get options for connect to Cassandra DB.

Expand All @@ -197,6 +208,7 @@ def get_options(self, table: str) -> Dict[Optional[str], Optional[str]]:
"spark.cassandra.auth.username": self.username,
"spark.cassandra.auth.password": self.password,
"spark.cassandra.connection.host": self.host,
"spark.cassandra.connection.localDC": self.local_dc,
"spark.cassandra.input.consistency.level": self.read_consistency_level,
"spark.cassandra.output.consistency.level": self.write_consistency_level,
}
Expand Down
1 change: 1 addition & 0 deletions butterfree/configs/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"STREAM_CHECKPOINT_PATH": None,
"CASSANDRA_READ_CONSISTENCY_LEVEL": None,
"CASSANDRA_WRITE_CONSISTENCY_LEVEL": None,
"CASSANDRA_LOCAL_DC": None,
}


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.2.0.dev10"
__version__ = "1.2.0.dev11"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/butterfree/configs/db/test_cassandra_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,28 @@ def test_write_consistency_level_custom_env_var(self, mocker, cassandra_config):
# then
assert cassandra_config.write_consistency_level == value

def test_local_dc(self, cassandra_config):
# expecting
default = None
assert cassandra_config.local_dc == default

def test_local_dc_custom(self, cassandra_config):
# given
value = "VPC_1"
cassandra_config.local_dc = value

# then
assert cassandra_config.local_dc == value

def test_local_dc_custom_env_var(self, mocker, cassandra_config):
# given
value = "VPC_1"
mocker.patch("butterfree.configs.environment.get_variable", return_value=value)
cassandra_config.local_dc = value

# then
assert cassandra_config.local_dc == value

def test_set_credentials_on_instantiation(self):
cassandra_config = CassandraConfig( # noqa: S106
username="username", password="password", host="host", keyspace="keyspace"
Expand Down