Skip to content

Commit

Permalink
type annotations: use string literals (PEP 0484 forward references)
Browse files Browse the repository at this point in the history
Workaround for:
pylint-dev/pylint#3285
  • Loading branch information
redshiftzero committed Jun 8, 2020
1 parent a5218db commit d3707e6
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 33 deletions.
2 changes: 1 addition & 1 deletion securedrop/crypto_util.py
Expand Up @@ -135,7 +135,7 @@ def do_runtime_tests(self):
if not rm.check_secure_delete_capability():
raise AssertionError("Secure file deletion is not possible.")

def get_wordlist(self, locale: Text) -> List[str]:
def get_wordlist(self, locale: 'Text') -> 'List[str]':
"""" Ensure the wordlist for the desired locale is read and available
in the words global variable. If there is no wordlist for the
desired local, fallback to the default english wordlist.
Expand Down
10 changes: 6 additions & 4 deletions securedrop/journalist_app/__init__.py
Expand Up @@ -38,7 +38,7 @@
_insecure_views = ['main.login', 'main.select_logo', 'static']


def create_app(config: SDConfig) -> Flask:
def create_app(config: 'SDConfig') -> Flask:
app = Flask(__name__,
template_folder=config.JOURNALIST_TEMPLATES_DIR,
static_folder=path.join(config.SECUREDROP_ROOT, 'static'))
Expand Down Expand Up @@ -81,14 +81,16 @@ def create_app(config: SDConfig) -> Flask:
)

@app.errorhandler(CSRFError)
def handle_csrf_error(e: CSRFError) -> Response:
def handle_csrf_error(e: CSRFError) -> 'Response':
# render the message first to ensure it's localized.
msg = gettext('You have been logged out due to inactivity')
session.clear()
flash(msg, 'error')
return redirect(url_for('main.login'))

def _handle_http_exception(error: HTTPException) -> Tuple[Union[Response, str], Optional[int]]:
def _handle_http_exception(
error: 'HTTPException'
) -> 'Tuple[Union[Response, str], Optional[int]]':
# Workaround for no blueprint-level 404/5 error handlers, see:
# https://github.com/pallets/flask/issues/503#issuecomment-71383286
handler = list(app.error_handler_spec['api'][error.code].values())[0]
Expand Down Expand Up @@ -126,7 +128,7 @@ def load_instance_config():
app.instance_config = InstanceConfig.get_current()

@app.before_request
def setup_g() -> Optional[Response]:
def setup_g() -> 'Optional[Response]':
"""Store commonly used values in Flask's special g object"""
if 'expires' in session and datetime.utcnow() >= session['expires']:
session.clear()
Expand Down
2 changes: 1 addition & 1 deletion securedrop/journalist_app/utils.py
Expand Up @@ -254,7 +254,7 @@ def col_delete(cols_selected):
return redirect(url_for('main.index'))


def make_password(config: SDConfig) -> str:
def make_password(config: 'SDConfig') -> str:
while True:
password = current_app.crypto_util.genrandomid(
7,
Expand Down
38 changes: 19 additions & 19 deletions securedrop/models.py
Expand Up @@ -42,9 +42,9 @@
ARGON2_PARAMS = dict(memory_cost=2**16, rounds=4, parallelism=2)


def get_one_or_else(query: Query,
logger: Logger,
failure_method: Callable[[int], None]) -> None:
def get_one_or_else(query: 'Query',
logger: 'Logger',
failure_method: 'Callable[[int], None]') -> None:
try:
return query.one()
except MultipleResultsFound as e:
Expand Down Expand Up @@ -97,7 +97,7 @@ def journalist_filename(self) -> str:
return ''.join([c for c in self.journalist_designation.lower().replace(
' ', '_') if c in valid_chars])

def documents_messages_count(self) -> Dict[str, int]:
def documents_messages_count(self) -> 'Dict[str, int]':
self.docs_msgs_count = {'messages': 0, 'documents': 0}
for submission in self.submissions:
if submission.filename.endswith('msg.gpg'):
Expand All @@ -108,7 +108,7 @@ def documents_messages_count(self) -> Dict[str, int]:
return self.docs_msgs_count

@property
def collection(self) -> List[Union[Submission, Reply]]:
def collection(self) -> 'List[Union[Submission, Reply]]':
"""Return the list of submissions and replies for this source, sorted
in ascending order by the filename/interaction count."""
collection = [] # type: List[Union[Submission, Reply]]
Expand Down Expand Up @@ -141,7 +141,7 @@ def public_key(self, value: str) -> None:
def public_key(self) -> None:
raise NotImplementedError

def to_json(self) -> Dict[str, Union[str, bool, int, str]]:
def to_json(self) -> 'Dict[str, Union[str, bool, int, str]]':
docs_msg_count = self.documents_messages_count()

if self.last_updated:
Expand Down Expand Up @@ -212,7 +212,7 @@ def __init__(self, source: Source, filename: str) -> None:
def __repr__(self) -> str:
return '<Submission %r>' % (self.filename)

def to_json(self) -> Dict[str, Union[str, int, bool]]:
def to_json(self) -> 'Dict[str, Union[str, int, bool]]':
json_submission = {
'source_url': url_for('api.single_source',
source_uuid=self.source.uuid),
Expand Down Expand Up @@ -260,7 +260,7 @@ class Reply(db.Model):
deleted_by_source = Column(Boolean, default=False, nullable=False)

def __init__(self,
journalist: Journalist,
journalist: 'Journalist',
source: Source,
filename: str) -> None:
self.journalist_id = journalist.id
Expand All @@ -273,7 +273,7 @@ def __init__(self,
def __repr__(self) -> str:
return '<Reply %r>' % (self.filename)

def to_json(self) -> Dict[str, Union[str, int, bool]]:
def to_json(self) -> 'Dict[str, Union[str, int, bool]]':
username = "deleted"
first_name = ""
last_name = ""
Expand Down Expand Up @@ -307,7 +307,7 @@ class SourceStar(db.Model):
source_id = Column("source_id", Integer, ForeignKey('sources.id'))
starred = Column("starred", Boolean, default=True)

def __eq__(self, other: Any) -> bool:
def __eq__(self, other: 'Any') -> bool:
if isinstance(other, SourceStar):
return (self.source_id == other.source_id and
self.id == other.id and self.starred == other.starred)
Expand Down Expand Up @@ -418,10 +418,10 @@ class Journalist(db.Model):
def __init__(self,
username: str,
password: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
first_name: 'Optional[str]' = None,
last_name: 'Optional[str]' = None,
is_admin: bool = False,
otp_secret: Optional[str] = None) -> None:
otp_secret: 'Optional[str]' = None) -> None:

self.check_username_acceptable(username)
self.username = username
Expand Down Expand Up @@ -547,14 +547,14 @@ def set_hotp_secret(self, otp_secret: str) -> None:
self.hotp_counter = 0

@property
def totp(self) -> OTP:
def totp(self) -> 'OTP':
if self.is_totp:
return pyotp.TOTP(self.otp_secret)
else:
raise ValueError('{} is not using TOTP'.format(self))

@property
def hotp(self) -> OTP:
def hotp(self) -> 'OTP':
if not self.is_totp:
return pyotp.HOTP(self.otp_secret)
else:
Expand Down Expand Up @@ -618,7 +618,7 @@ def verify_token(self, token: str) -> bool:
_MAX_LOGIN_ATTEMPTS_PER_PERIOD = 5

@classmethod
def throttle_login(cls, user: Journalist) -> None:
def throttle_login(cls, user: 'Journalist') -> None:
# Record the login attempt...
login_attempt = JournalistLoginAttempt(user)
db.session.add(login_attempt)
Expand All @@ -640,7 +640,7 @@ def throttle_login(cls, user: Journalist) -> None:
def login(cls,
username: str,
password: str,
token: str) -> Journalist:
token: str) -> 'Journalist':
try:
user = Journalist.query.filter_by(username=username).one()
except NoResultFound:
Expand Down Expand Up @@ -677,7 +677,7 @@ def validate_token_is_not_expired_or_invalid(token):
return True

@staticmethod
def validate_api_token_and_get_user(token: str) -> Union[Journalist, None]:
def validate_api_token_and_get_user(token: str) -> 'Union[Journalist, None]':
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
Expand All @@ -690,7 +690,7 @@ def validate_api_token_and_get_user(token: str) -> Union[Journalist, None]:

return Journalist.query.get(data['id'])

def to_json(self) -> Dict[str, Union[str, bool, str]]:
def to_json(self) -> 'Dict[str, Union[str, bool, str]]':
json_user = {
'username': self.username,
'last_login': self.last_access.isoformat() + 'Z',
Expand Down
2 changes: 1 addition & 1 deletion securedrop/source_app/__init__.py
Expand Up @@ -31,7 +31,7 @@
from sdconfig import SDConfig # noqa: F401


def create_app(config: SDConfig) -> Flask:
def create_app(config: 'SDConfig') -> Flask:
app = Flask(__name__,
template_folder=config.SOURCE_TEMPLATES_DIR,
static_folder=path.join(config.SECUREDROP_ROOT, 'static'))
Expand Down
14 changes: 7 additions & 7 deletions securedrop/store.py
Expand Up @@ -190,8 +190,8 @@ def path_without_filesystem_id(self, filename: str) -> str:
return absolute

def get_bulk_archive(self,
selected_submissions: List,
zip_directory: str = '') -> _TemporaryFileWrapper:
selected_submissions: 'List',
zip_directory: str = '') -> '_TemporaryFileWrapper':
"""Generate a zip file from the selected submissions"""
zip_file = tempfile.NamedTemporaryFile(
prefix='tmp_securedrop_bulk_dl_',
Expand Down Expand Up @@ -299,7 +299,7 @@ def save_file_submission(self,
count: int,
journalist_filename: str,
filename: str,
stream: BufferedIOBase) -> str:
stream: 'BufferedIOBase') -> str:
sanitized_filename = secure_filename(filename)

# We store file submissions in a .gz file for two reasons:
Expand Down Expand Up @@ -363,7 +363,7 @@ def save_message_submission(self,
return filename


def async_add_checksum_for_file(db_obj: Union[Submission, Reply]) -> str:
def async_add_checksum_for_file(db_obj: 'Union[Submission, Reply]') -> str:
return create_queue().enqueue(
queued_add_checksum_for_file,
type(db_obj),
Expand All @@ -373,7 +373,7 @@ def async_add_checksum_for_file(db_obj: Union[Submission, Reply]) -> str:
)


def queued_add_checksum_for_file(db_model: Union[Type[Submission], Type[Reply]],
def queued_add_checksum_for_file(db_model: 'Union[Type[Submission], Type[Reply]]',
model_id: int,
file_path: str,
db_uri: str) -> str:
Expand All @@ -385,8 +385,8 @@ def queued_add_checksum_for_file(db_model: Union[Type[Submission], Type[Reply]],
return "success"


def add_checksum_for_file(session: Session,
db_obj: Union[Submission, Reply],
def add_checksum_for_file(session: 'Session',
db_obj: 'Union[Submission, Reply]',
file_path: str) -> None:
hasher = sha256()
with open(file_path, 'rb') as f:
Expand Down

0 comments on commit d3707e6

Please sign in to comment.