This repository was archived by the owner on Aug 25, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 138
WIP: source: Mysql Source feature #114
Merged
Merged
Changes from 6 commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
43f4897
add mysql_source.py
sudharsana-kjl bd0fb5f
add help for query arg in mysql_source
sudharsana-kjl fe34a34
fix typo
sudharsana-kjl a7e19fd
add query format in mysql_source
sudharsana-kjl 1a43843
modify CHANGELOG
sudharsana-kjl 2dbe495
add DictCursor in mysql source
sudharsana-kjl d36a2f3
modify changelog
sudharsana-kjl facb3da
modify setup.py to add aiomysql package requirement
sudharsana-kjl 563c04a
source: mysql_source: add different args for different queries
sudharsana-kjl f0423b5
source: mysql_source: remove defaults for db, user, pass
sudharsana-kjl fcb1ed4
initial test for mysql connection setup
sudharsana-kjl 31eeffd
source: mysql_source: connection verified and add test for fetch a re…
sudharsana-kjl 3f4a628
source: mysql: modify update()
sudharsana-kjl 3a06cee
source: mysql: modify update()
sudharsana-kjl 05abd5f
source: mysql: update() refined
sudharsana-kjl 9e39bce
add db.sql file
sudharsana-kjl 09b9bc5
source: mysql: modify convert_to_repos()
sudharsana-kjl 1dd30fe
source: mysql: unfold escaped string after fetching
sudharsana-kjl 59b3982
source: mysql: modify convert_to_repo to work for one repo
sudharsana-kjl 896dd91
Create table in mysql testcase setUP
867dcae
source: mysql: modify help queries in args()
sudharsana-kjl 58f4437
add ssl for mysql source
sudharsana-kjl f95b9e3
Merge branch 'master' into mysql_feature
sudharsana-kjl b9a6f4e
fix merge conflict
sudharsana-kjl 2f4cde4
Merge branch 'mysql_feature' of https://github.com/sudharsana-kjl/dff…
sudharsana-kjl f692839
set up mysql in travis
sudharsana-kjl 6554a2b
Merge branch 'master' into mysql_feature
sudharsana-kjl 57362f7
add mysql test-db for travis
sudharsana-kjl cfae9a8
modify test-db for travis
sudharsana-kjl efcd934
modify test-db
sudharsana-kjl e4d9d7f
modify mysql host in test_mysql
sudharsana-kjl 3897fc3
change port in test_mysql
sudharsana-kjl 6a13093
modify port in test_mysql
sudharsana-kjl a098103
remove ssl
sudharsana-kjl c791b2c
fix style
sudharsana-kjl 6c67cee
Merge branch 'master' into mysql_feature
sudharsana-kjl 181bf51
source: mysql: New module
pdxjohnny 67c139b
mysql is being a fucker: line 54: /docker-entrypoint-initdb.d/dump.sq…
b5f1acd
source: mysql: Test in docker
pdxjohnny 2fa312a
merge master
pdxjohnny 864d2ef
ci: source mysql travis
pdxjohnny 38b2417
source: mysql: certificate verify failed: self signed certificate in …
pdxjohnny 60f837f
signing with intermedate still results in self signed error
pdxjohnny cfebc4f
internmetiate cert
2844a02
mariadb and pcks8 to 1
f4a80c9
tests: TLS with mariadb
f7a0a58
merge
34e2e2f
util: Add timing
bd46437
Squashed commit of the following:
pdxjohnny d9b08a3
style: Format with black
pdxjohnny b57872b
Merge remote-tracking branch 'origin/master' into mysql_feature
pdxjohnny 39512c3
fix link to readme
pdxjohnny File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| import json | ||
| import aiomysql | ||
| from typing import AsyncIterator, NamedTuple, Dict | ||
|
|
||
| from dffml.base import BaseConfig | ||
| from dffml.repo import Repo | ||
| from dffml.source.source import BaseSourceContext, BaseSource | ||
| from dffml.util.cli.arg import Arg | ||
| from dffml.util.entrypoint import entry_point | ||
|
|
||
|
|
||
| class MysqlSourceConfig(BaseConfig, NamedTuple): | ||
| host: str | ||
| port: int | ||
| user: str | ||
| password: str | ||
| db: str | ||
| query: str | ||
|
|
||
|
|
||
| class MysqlSourceContext(BaseSourceContext): | ||
| async def update(self, repo: Repo): | ||
| db = self.conn | ||
| marshall = json.dumps(repo.dict()) | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| await db.execute(self.config.query, (repo.src_url, marshall, marshall)) | ||
| self.logger.debug("updated: %s", marshall) | ||
| self.logger.debug("update: %s", await self.repo(repo.src_url)) | ||
|
|
||
| async def repos(self) -> AsyncIterator[Repo]: | ||
| # Query Format: | ||
| # SELECT key as src_url, data_1 as feature_1, data_2 as feature_2 FROM list_of_all_repos | ||
| query = self.config.query | ||
| await self.conn.execute(query) | ||
| src_urls = set(map(lambda row: row[0], await self.conn.fetchall())) | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for src_url in src_urls: | ||
| yield await self.repo(src_url) | ||
|
|
||
| async def repo(self, src_url: str): | ||
| # Query Format: | ||
| # SELECT key as src_url, data_1 as feature_1, data_2 as feature_2 FROM list_of_all_repos WHERE key=%s; -> repo() %s == repo.src_url | ||
| query = self.config.query | ||
| repo = Repo(src_url) | ||
| db = self.conn | ||
| await db.execute(query, (src_url,)) | ||
| dump = await db.fetchone() | ||
| if dump is not None and dump[0] is not None: | ||
| repo.merge(Repo(src_url, data=json.loads(dump[0]))) | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return repo | ||
|
|
||
| async def __aenter__(self) -> "MysqlSourceContext": | ||
| self.__conn = self.parent.db.cursor(aiomysql.DictCursor) | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.conn = await self.__conn.__aenter__() | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type, exc_value, traceback): | ||
| await self.__conn.__aexit__(exc_type, exc_value, traceback) | ||
| await self.parent.db.commit() | ||
|
|
||
|
|
||
| @entry_point("dffml.source") | ||
| class MysqlSource(BaseSource): | ||
|
|
||
| CONTEXT = MysqlSourceContext | ||
|
|
||
| async def __aenter__(self) -> "MysqlSource": | ||
| self.pool = await aiomysql.create_pool( | ||
| host=self.config.host, | ||
| port=self.config.port, | ||
| user=self.config.user, | ||
| password=self.config.password, | ||
| db=self.config.db, | ||
| ) | ||
| self.__db = self.pool.acquire() | ||
| self.db = await self.__db.__aenter__() | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type, exc_value, traceback): | ||
| await self.__db.__aexit__(exc_type, exc_value, traceback) | ||
| self.pool.close() | ||
| await self.pool.wait_closed() | ||
|
|
||
| @classmethod | ||
| def args(cls, args, *above) -> Dict[str, Arg]: | ||
| cls.config_set(args, above, "host", Arg(default="127.0.0.1")) | ||
| cls.config_set(args, above, "port", Arg(type=int, default=3306)) | ||
| cls.config_set(args, above, "user", Arg(default="user")) | ||
| cls.config_set(args, above, "password", Arg(default="pass")) | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| cls.config_set(args, above, "db", Arg(default="db")) | ||
| cls.config_set( | ||
| args, | ||
| above, | ||
| "query", | ||
| Arg( | ||
| type=str, | ||
| help="For repos:" | ||
| "SELECT key as src_url, data_1 as feature_1, data_2 as feature_2 FROM list_of_all_repos, " | ||
| "For repo:" | ||
| "SELECT key as src_url, data_1 as feature_1, data_2 as feature_2 FROM list_of_all_repos WHERE key=%s -> %s = repo.src_url" | ||
| "For update:" | ||
| "INSERT INTO source_data (src_url, json) VALUES(%s, %s) ON DUPLICATE KEY UPDATE json = %s", | ||
johnandersen777 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ), | ||
| ) | ||
| return args | ||
|
|
||
| @classmethod | ||
| def config(cls, config, *above): | ||
| return MysqlSourceConfig( | ||
| host=cls.config_get(config, above, "host"), | ||
| port=cls.config_get(config, above, "port"), | ||
| user=cls.config_get(config, above, "user"), | ||
| password=cls.config_get(config, above, "password"), | ||
| db=cls.config_get(config, above, "db"), | ||
| query=cls.config_get(config, above, "query"), | ||
| ) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.