-
Notifications
You must be signed in to change notification settings - Fork 138
WIP: source: HDFS #145
WIP: source: HDFS #145
Conversation
This pull request introduces 5 alerts when merging fc82d86 into 44744d8 - view on LGTM.com new alerts:
|
This pull request introduces 5 alerts when merging cd91197 into 44744d8 - view on LGTM.com new alerts:
|
tests/source/test_hdfs.py
Outdated
from dffml.source.csv import CSVSource, CSVSourceConfig | ||
|
||
|
||
class TestHDFSSource(FileSourceTest, AsyncTestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileSourceTest
should be SourceTest
dffml/source/hdfs_source.py
Outdated
port: str | ||
user: str | ||
filepath: str | ||
source: BaseSource.load |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source: FileSource
self.client = InsecureClient( | ||
"http://" + self.config.host + ":" + self.config.port, | ||
user="hadoopuser", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async def new_open(source):
with self.client.read(self.config.filepath, encoding="utf-8") as fd:
await source.load_fd(fd)
self.config.source._open = new_open
Also, give it a new close method, which will be called when we exit from the filesource's context.
To enter the context, we usually do the async with
statement:
async with self.config.source as source: # this is where new_open get called
# At the end of this with block new_close would get called, but we don't want to call it until the __aexit__ method of HDFSSource
You'll need to do something like this, also make sure to modify __aexit__
similarly.
Lines 780 to 792 in 927fb73
) -> "MemoryOperationImplementationNetworkContext": | |
self.__stack = AsyncExitStack() | |
await self.__stack.__aenter__() | |
self.operations = { | |
opimp.op.name: await self.__stack.enter_async_context(opimp) | |
for opimp in self.opimps.values() | |
} | |
return self | |
async def __aexit__(self, exc_type, exc_value, traceback): | |
if self.__stack is not None: | |
await self.__stack.aclose() | |
self.__stack = None |
You'll use
self.source = await self.__stack.enter_async_context(self.config.source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement __call__
method for HDFS source which does: return self.source()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 60 to 61 in eb9b600
def __call__(self) -> BaseSourceContext: | |
return self.CONTEXT(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done 👍
dffml/source/hdfs_source.py
Outdated
@classmethod | ||
def args(cls, args, *above) -> Dict[str, Arg]: | ||
cls.config_set(args, above, "host", Arg(type=str)) | ||
cls.config_set(args, above, "port", Arg(type=str)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type=int
.ci/Dockerfile
Outdated
mv hadoop-3.1.2 /usr/local && \ | ||
ln -sf /usr/local/hadoop-3.1.2/ /usr/local/hadoop | ||
|
||
CMD cat hadoop_config >> ~/.bashrc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to COPY
hadoop_config into the container first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've included the following line:
ADD configs/hadoop_config /usr/local/hadoop
CMD cat /usr/local/hadoop/hadoop_config >> ~/.bashrc
Is this fine?
.ci/Dockerfile
Outdated
|
||
CMD bash cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys \ | ||
ssh-copy-id -i ~/.ssh/id_rsa.pub localhost \ | ||
ssh localhost |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work because the ssh server isn't running. You can safely delete these steps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, so there's no need to fetch the authorized keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or I should just remove ssh localhost
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it 👍
.ci/Dockerfile
Outdated
|
||
## ssh key gen | ||
RUN \ | ||
ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this, since you don't need to create an ssh key for the root account within the container
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay 👍
.ci/Dockerfile
Outdated
## mkdir ~/.ssh \ | ||
## chmod 700 ~/.ssh | ||
|
||
CMD bash cat "" >> ~/.ssh/authorized_keys \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay 👍
.ci/configs/ssh_config
Outdated
@@ -0,0 +1,3 @@ | |||
Host * | |||
UserKnownHostsFile /dev/null | |||
StrictHostKeyChecking no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should remove this file
.ci/start-hadoop.sh
Outdated
@@ -0,0 +1,15 @@ | |||
#!/bin/bash | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want to source /usr/local/hadoop/hadoop_config
- Add another dockerfile that sets up a mini cli hadoop cluster - modify new_open() for hdfs_source
Signed-off-by: John Andersen <[email protected]>
Signed-off-by: John Andersen <[email protected]>
This pull request introduces 5 alerts when merging 56aab8f into eb9b600 - view on LGTM.com new alerts:
|
diff --git a/dffml/source/hdfs_source.py b/dffml/source/hdfs_source.py
index 9f90f62..cc2a919 100644
--- a/dffml/source/hdfs_source.py
+++ b/dffml/source/hdfs_source.py
@@ -24,12 +24,13 @@ class HDFSSourceConfig(BaseConfig, NamedTuple):
class HDFSSource(BaseSource):
CONTEXT = BaseSourceContext
+
async def new_open(self):
with self.client.read(self.config.filepath, encoding="utf-8") as fd:
await self.config.source.load_fd(fd)
async def new_close(self):
- with self.client.write(self.config.filepath, encoding="utf-8") as fd:
+ with self.client.write(self.config.filepath, encoding="utf-8", overwrite=True) as fd:
await self.config.source.dump_fd(fd)
async def __aenter__(self) -> "BaseSource":
@@ -37,12 +38,29 @@ class HDFSSource(BaseSource):
"http://" + self.config.host + ":" + self.config.port,
user="hadoopuser",
)
- self.config.source._open = await self.new_open()
- self.config.source._close = await self.new_close()
+ # This might be a problem
+ self.config.source._open = self.new_open.__get__(self.config.source, self.config.source.__class__)
+ self.config.source._close = self.new_close.__get__(self.config.source, self.config.source.__class__)
+ # Create AsyncExitStack and call it's __aenter__ method
+ self.__stack = AsyncExitStack()
+ await self.__stack.__aenter__()
+ # Enter the context of the source
+ # Same as:
+ # async with self.config.source as self.source:
+ self.source = await self.__stack.enter_async_context(self.config.source)
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback):
+ if self.__stack is not None:
+ await self.__stack.__aexit__(exc_type, exc_value, traceback)
+ self.__stack = None
return self
def __call__(self) -> BaseSourceContext:
- return self.config.source()
+ # Call the source
+ # Same as:
+ # async with self.source() as source_context:
+ return self.source()
@classmethod
def args(cls, args, *above) -> Dict[str, Arg]: You may need to add the |
This pull request introduces 6 alerts when merging 979f83a into 2864a11 - view on LGTM.com new alerts:
|
This pull request introduces 6 alerts when merging 743de45 into f4a5dda - view on LGTM.com new alerts:
|
This pull request introduces 7 alerts when merging 3426b39 into 6d8f8e9 - view on LGTM.com new alerts:
|
This pull request introduces 6 alerts when merging 5210a3e into c8f9ba3 - view on LGTM.com new alerts:
|
This pull request introduces 9 alerts when merging 0b6dfcb into d3bf6c9 - view on LGTM.com new alerts:
|
This pull request introduces 10 alerts when merging 95f7b73 into 65e4ce4 - view on LGTM.com new alerts:
|
Signed-off-by: John Andersen <[email protected]>
This pull request introduces 13 alerts when merging 99764b0 into 65e4ce4 - view on LGTM.com new alerts:
|
Signed-off-by: John Andersen <[email protected]>
This pull request introduces 15 alerts when merging 82d6e16 into 65e4ce4 - view on LGTM.com new alerts:
|
This PR hopes to add HDFS source.