Skip to content

Commit baa594b

Browse files
Add local dc property (#312)
* add local dc property * update version
1 parent e6f67e9 commit baa594b

File tree

4 files changed

+36
-1
lines changed

4 files changed

+36
-1
lines changed

butterfree/configs/db/cassandra_config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __init__(
4343
stream_checkpoint_path: str = None,
4444
read_consistency_level: str = None,
4545
write_consistency_level: str = None,
46+
local_dc: str = None,
4647
):
4748
self.username = username
4849
self.password = password
@@ -55,6 +56,7 @@ def __init__(
5556
self.stream_checkpoint_path = stream_checkpoint_path
5657
self.read_consistency_level = read_consistency_level
5758
self.write_consistency_level = write_consistency_level
59+
self.local_dc = local_dc
5860

5961
@property
6062
def database(self) -> str:
@@ -178,6 +180,15 @@ def write_consistency_level(self, value: str) -> None:
178180
"CASSANDRA_WRITE_CONSISTENCY_LEVEL", "LOCAL_QUORUM"
179181
)
180182

183+
@property
184+
def local_dc(self) -> Optional[str]:
185+
"""Local DC for Cassandra connection."""
186+
return self.__local_dc
187+
188+
@local_dc.setter
189+
def local_dc(self, value: str) -> None:
190+
self.__local_dc = value or environment.get_variable("CASSANDRA_LOCAL_DC")
191+
181192
def get_options(self, table: str) -> Dict[Optional[str], Optional[str]]:
182193
"""Get options for connect to Cassandra DB.
183194
@@ -197,6 +208,7 @@ def get_options(self, table: str) -> Dict[Optional[str], Optional[str]]:
197208
"spark.cassandra.auth.username": self.username,
198209
"spark.cassandra.auth.password": self.password,
199210
"spark.cassandra.connection.host": self.host,
211+
"spark.cassandra.connection.localDC": self.local_dc,
200212
"spark.cassandra.input.consistency.level": self.read_consistency_level,
201213
"spark.cassandra.output.consistency.level": self.write_consistency_level,
202214
}

butterfree/configs/environment.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"STREAM_CHECKPOINT_PATH": None,
1515
"CASSANDRA_READ_CONSISTENCY_LEVEL": None,
1616
"CASSANDRA_WRITE_CONSISTENCY_LEVEL": None,
17+
"CASSANDRA_LOCAL_DC": None,
1718
}
1819

1920

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from setuptools import find_packages, setup
22

33
__package_name__ = "butterfree"
4-
__version__ = "1.2.0.dev10"
4+
__version__ = "1.2.0.dev11"
55
__repository_url__ = "https://github.com/quintoandar/butterfree"
66

77
with open("requirements.txt") as f:

tests/unit/butterfree/configs/db/test_cassandra_config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,28 @@ def test_write_consistency_level_custom_env_var(self, mocker, cassandra_config):
203203
# then
204204
assert cassandra_config.write_consistency_level == value
205205

206+
def test_local_dc(self, cassandra_config):
207+
# expecting
208+
default = None
209+
assert cassandra_config.local_dc == default
210+
211+
def test_local_dc_custom(self, cassandra_config):
212+
# given
213+
value = "VPC_1"
214+
cassandra_config.local_dc = value
215+
216+
# then
217+
assert cassandra_config.local_dc == value
218+
219+
def test_local_dc_custom_env_var(self, mocker, cassandra_config):
220+
# given
221+
value = "VPC_1"
222+
mocker.patch("butterfree.configs.environment.get_variable", return_value=value)
223+
cassandra_config.local_dc = value
224+
225+
# then
226+
assert cassandra_config.local_dc == value
227+
206228
def test_set_credentials_on_instantiation(self):
207229
cassandra_config = CassandraConfig( # noqa: S106
208230
username="username", password="password", host="host", keyspace="keyspace"

0 commit comments

Comments
 (0)