Skip to content

Commit

Permalink
Merge pull request #630 from python-rope/lieryan-sqlite-models
Browse files Browse the repository at this point in the history
SQLite models improvements
  • Loading branch information
lieryan committed Dec 28, 2022
2 parents 8101fa8 + e603e72 commit 7f0e7bb
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 86 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- #548 Implement MoveGlobal using string as destination module names (@lieryan)
- #627 Fix parsing of octal literal (@lieryan)
- #611 Implement JSON DataFile serialization (@lieryan)
- #630 SQLite models improvements (@lieryan)


# Release 1.6.0

Expand Down
70 changes: 43 additions & 27 deletions rope/contrib/autoimport/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
from abc import ABC, abstractmethod
from typing import Dict, List


class FinalQuery:
Expand Down Expand Up @@ -49,28 +50,48 @@ def delete_from(self) -> FinalQuery:
return FinalQuery(f"DELETE FROM {self.query}")


class Name:
table_name = "names"
columns = [
"name",
"module",
"package",
"source",
"type",
]
class Model(ABC):
@property
@abstractmethod
def table_name(self) -> str:
...

@property
@abstractmethod
def schema(self) -> Dict[str, str]:
...

@classmethod
def create_table(self, connection):
names_table = (
"(name TEXT, module TEXT, package TEXT, source INTEGER, type INTEGER)"
def create_table(cls, connection):
metadata_table = [
f"{column_name} {column_type}"
for column_name, column_type in cls.schema.items()
]
metadata_table_definition = ", ".join(metadata_table)
connection.execute(
f"CREATE TABLE IF NOT EXISTS {cls.table_name}({metadata_table_definition})"
)
connection.execute(f"CREATE TABLE IF NOT EXISTS names{names_table}")


class Name(Model):
table_name = "names"
schema = {
"name": "TEXT",
"module": "TEXT",
"package": "TEXT",
"source": "INTEGER",
"type": "INTEGER",
}
columns = list(schema.keys())
objects = Query(table_name, columns)

@classmethod
def create_table(cls, connection):
super().create_table(connection)
connection.execute("CREATE INDEX IF NOT EXISTS name ON names(name)")
connection.execute("CREATE INDEX IF NOT EXISTS module ON names(module)")
connection.execute("CREATE INDEX IF NOT EXISTS package ON names(package)")

objects = Query(table_name, columns)

search_submodule_like = objects.where('module LIKE ("%." || ?)')
search_module_like = objects.where("module LIKE (?)")

Expand All @@ -81,18 +102,13 @@ def create_table(self, connection):
delete_by_module_name = objects.where("module = ?").delete_from()


class Package:
class Package(Model):
table_name = "packages"
columns = [
"package",
"path",
]

@classmethod
def create_table(self, connection):
packages_table = "(package TEXT, path TEXT)"
connection.execute(f"CREATE TABLE IF NOT EXISTS packages{packages_table}")

schema = {
"package": "TEXT",
"path": "TEXT",
}
columns = list(schema.keys())
objects = Query(table_name, columns)

delete_by_package_name = objects.where("package = ?").delete_from()
126 changes: 67 additions & 59 deletions ropetest/contrib/autoimport/modeltest.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,77 @@
from unittest import TestCase
import sqlite3
from typing import Any

import pytest

from rope.contrib.autoimport import models


class QueryTest(TestCase):
@pytest.fixture
def empty_db():
return sqlite3.connect(":memory:")


class TestQuery:
def test_select_non_existent_column(self):
with self.assertRaisesRegex(ValueError, """Unknown column names passed: {['"]doesnotexist['"]}"""):
models.Name.objects.select('doesnotexist')._query
expected_msg = """Unknown column names passed: {['"]doesnotexist['"]}"""
with pytest.raises(ValueError, match=expected_msg):
models.Name.objects.select("doesnotexist")._query


class CreateTableTestMixin:
model_class: Any = None

def test_create_table(self, empty_db):
self.model_class.create_table(empty_db)


class NameModelTest(TestCase):
class TestNameModel(CreateTableTestMixin):
model_class = models.Name

def test_name_objects(self):
self.assertEqual(
models.Name.objects.select_star()._query,
"SELECT * FROM names",
assert models.Name.objects.select_star()._query == "SELECT * FROM names"

def test_search_submodule_like(self):
assert (
models.Name.search_submodule_like.select_star()._query
== 'SELECT * FROM names WHERE module LIKE ("%." || ?)'
)

def test_query_strings(self):
with self.subTest("objects"):
self.assertEqual(
models.Name.objects.select_star()._query,
'SELECT * FROM names',
)

with self.subTest("search_submodule_like"):
self.assertEqual(
models.Name.search_submodule_like.select_star()._query,
'SELECT * FROM names WHERE module LIKE ("%." || ?)',
)

with self.subTest("search_module_like"):
self.assertEqual(
models.Name.search_module_like.select_star()._query,
'SELECT * FROM names WHERE module LIKE (?)',
)

with self.subTest("import_assist"):
self.assertEqual(
models.Name.import_assist.select_star()._query,
"SELECT * FROM names WHERE name LIKE (? || '%')",
)

with self.subTest("search_by_name_like"):
self.assertEqual(
models.Name.search_by_name_like.select_star()._query,
'SELECT * FROM names WHERE name LIKE (?)',
)

with self.subTest("delete_by_module_name"):
self.assertEqual(
models.Name.delete_by_module_name._query,
'DELETE FROM names WHERE module = ?',
)


class PackageModelTest(TestCase):
def test_query_strings(self):
with self.subTest("objects"):
self.assertEqual(
models.Package.objects.select_star()._query,
'SELECT * FROM packages',
)

with self.subTest("delete_by_package_name"):
self.assertEqual(
models.Package.delete_by_package_name._query,
'DELETE FROM packages WHERE package = ?',
)
def test_search_module_like(self):
assert (
models.Name.search_module_like.select_star()._query
== "SELECT * FROM names WHERE module LIKE (?)"
)

def test_import_assist(self):
assert (
models.Name.import_assist.select_star()._query
== "SELECT * FROM names WHERE name LIKE (? || '%')"
)

def test_search_by_name_like(self):
assert (
models.Name.search_by_name_like.select_star()._query
== "SELECT * FROM names WHERE name LIKE (?)"
)

def test_delete_by_module_name(self):
assert (
models.Name.delete_by_module_name._query
== "DELETE FROM names WHERE module = ?"
)


class TestPackageModel(CreateTableTestMixin):
model_class = models.Package

def test_objects(self):
assert (
models.Package.objects.select_star()._query == "SELECT * FROM packages"
)

def test_delete_by_package_name(self):
assert (
models.Package.delete_by_package_name._query
== "DELETE FROM packages WHERE package = ?"
)

0 comments on commit 7f0e7bb

Please sign in to comment.