-
Notifications
You must be signed in to change notification settings - Fork 161
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #502 from python-rope/lieryan-autoimport-database-…
…wrapper Add autoimport database wrapper
- Loading branch information
Showing
4 changed files
with
198 additions
and
37 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from typing import List | ||
|
||
|
||
class FinalQuery: | ||
def __init__(self, query): | ||
self._query = query | ||
|
||
|
||
class Query: | ||
def __init__(self, query: str, columns: List[str]): | ||
self.query = query | ||
self.columns = columns | ||
|
||
def select(self, *columns: str): | ||
if not (set(columns) <= set(self.columns)): | ||
raise ValueError( | ||
f"Unknown column names passed: {set(columns) - set(self.columns)}" | ||
) | ||
|
||
selected_columns = ", ".join(columns) | ||
return FinalQuery(f"SELECT {selected_columns} FROM {self.query}") | ||
|
||
def select_star(self): | ||
return FinalQuery(f"SELECT * FROM {self.query}") | ||
|
||
def where(self, where_clause: str): | ||
return Query( | ||
f"{self.query} WHERE {where_clause}", | ||
columns=self.columns, | ||
) | ||
|
||
def insert_into(self) -> FinalQuery: | ||
columns = ", ".join(self.columns) | ||
placeholders = ", ".join(["?"] * len(self.columns)) | ||
return FinalQuery( | ||
f"INSERT INTO {self.query}({columns}) VALUES ({placeholders})" | ||
) | ||
|
||
def drop_table(self) -> FinalQuery: | ||
return FinalQuery(f"DROP TABLE {self.query}") | ||
|
||
def delete_from(self) -> FinalQuery: | ||
return FinalQuery(f"DELETE FROM {self.query}") | ||
|
||
|
||
class Name: | ||
table_name = "names" | ||
columns = [ | ||
"name", | ||
"module", | ||
"package", | ||
"source", | ||
"type", | ||
] | ||
|
||
@classmethod | ||
def create_table(self, connection): | ||
names_table = ( | ||
"(name TEXT, module TEXT, package TEXT, source INTEGER, type INTEGER)" | ||
) | ||
connection.execute(f"CREATE TABLE IF NOT EXISTS names{names_table}") | ||
connection.execute("CREATE INDEX IF NOT EXISTS name ON names(name)") | ||
connection.execute("CREATE INDEX IF NOT EXISTS module ON names(module)") | ||
connection.execute("CREATE INDEX IF NOT EXISTS package ON names(package)") | ||
|
||
objects = Query(table_name, columns) | ||
|
||
search_submodule_like = objects.where('module LIKE ("%." || ?)') | ||
search_module_like = objects.where("module LIKE (?)") | ||
|
||
import_assist = objects.where("name LIKE (? || '%')") | ||
|
||
search_by_name_like = objects.where("name LIKE (?)") | ||
|
||
delete_by_module_name = objects.where("module = ?").delete_from() | ||
|
||
|
||
class Package: | ||
table_name = "packages" | ||
columns = [ | ||
"package", | ||
"path", | ||
] | ||
|
||
@classmethod | ||
def create_table(self, connection): | ||
packages_table = "(package TEXT, path TEXT)" | ||
connection.execute(f"CREATE TABLE IF NOT EXISTS packages{packages_table}") | ||
|
||
objects = Query(table_name, columns) | ||
|
||
delete_by_package_name = objects.where("package = ?").delete_from() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from unittest import TestCase | ||
|
||
from rope.contrib.autoimport import models | ||
|
||
|
||
class QueryTest(TestCase): | ||
def test_select_non_existent_column(self): | ||
with self.assertRaisesRegex(ValueError, """Unknown column names passed: {['"]doesnotexist['"]}"""): | ||
models.Name.objects.select('doesnotexist')._query | ||
|
||
|
||
class NameModelTest(TestCase): | ||
def test_name_objects(self): | ||
self.assertEqual( | ||
models.Name.objects.select_star()._query, | ||
"SELECT * FROM names", | ||
) | ||
|
||
def test_query_strings(self): | ||
with self.subTest("objects"): | ||
self.assertEqual( | ||
models.Name.objects.select_star()._query, | ||
'SELECT * FROM names', | ||
) | ||
|
||
with self.subTest("search_submodule_like"): | ||
self.assertEqual( | ||
models.Name.search_submodule_like.select_star()._query, | ||
'SELECT * FROM names WHERE module LIKE ("%." || ?)', | ||
) | ||
|
||
with self.subTest("search_module_like"): | ||
self.assertEqual( | ||
models.Name.search_module_like.select_star()._query, | ||
'SELECT * FROM names WHERE module LIKE (?)', | ||
) | ||
|
||
with self.subTest("import_assist"): | ||
self.assertEqual( | ||
models.Name.import_assist.select_star()._query, | ||
"SELECT * FROM names WHERE name LIKE (? || '%')", | ||
) | ||
|
||
with self.subTest("search_by_name_like"): | ||
self.assertEqual( | ||
models.Name.search_by_name_like.select_star()._query, | ||
'SELECT * FROM names WHERE name LIKE (?)', | ||
) | ||
|
||
with self.subTest("delete_by_module_name"): | ||
self.assertEqual( | ||
models.Name.delete_by_module_name._query, | ||
'DELETE FROM names WHERE module = ?', | ||
) | ||
|
||
|
||
class PackageModelTest(TestCase): | ||
def test_query_strings(self): | ||
with self.subTest("objects"): | ||
self.assertEqual( | ||
models.Package.objects.select_star()._query, | ||
'SELECT * FROM packages', | ||
) | ||
|
||
with self.subTest("delete_by_package_name"): | ||
self.assertEqual( | ||
models.Package.delete_by_package_name._query, | ||
'DELETE FROM packages WHERE package = ?', | ||
) |