Skip to content

Commit

Permalink
pylint: Fix unspecified-encoding
Browse files Browse the repository at this point in the history
Pylint 2.10 introduced new checker unspecified-encoding:
> Added unspecified-encoding: Emitted when open() is called without
  specifying an encoding

See pylint-dev/pylint#3826
See https://www.python.org/dev/peps/pep-0597/

Fixes: fleet-commander#279
Signed-off-by: Stanislav Levin <slev@altlinux.org>
  • Loading branch information
stanislavlevin committed Feb 17, 2022
1 parent 658f6af commit 0890be3
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 46 deletions.
6 changes: 4 additions & 2 deletions admin/fleetcommander/fcad.py
Expand Up @@ -187,10 +187,12 @@ def _prepare_gpo_data(self, profile):
os.mkdir(os.path.join(gpodir, "User"))
# GPT file
gpt_contents = "[General]\r\nVersion=0\r\n"
with open(os.path.join(gpodir, "GPT.INI"), "w") as fd:
with open(os.path.join(gpodir, "GPT.INI"), "w", encoding="utf-8") as fd:
fd.write(gpt_contents)

with open(os.path.join(gpodir, "fleet-commander.json"), "w") as fd:
with open(
os.path.join(gpodir, "fleet-commander.json"), "w", encoding="utf-8"
) as fd:
fd.write(
json.dumps(
{
Expand Down
2 changes: 1 addition & 1 deletion admin/fleetcommander/fcdbus.py
Expand Up @@ -242,7 +242,7 @@ def get_public_key(self):
hostname=None,
mode="system",
)
with open(ctrlr.public_key_file, "r") as fd:
with open(ctrlr.public_key_file, "r", encoding="utf-8") as fd:
public_key = fd.read().strip()

return public_key
Expand Down
6 changes: 3 additions & 3 deletions admin/fleetcommander/sshcontroller.py
Expand Up @@ -99,7 +99,7 @@ def add_keys_to_known_hosts(self, known_hosts_file, key_data):
directory = os.path.dirname(known_hosts_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(known_hosts_file, "a") as fd:
with open(known_hosts_file, "a", encoding="utf-8") as fd:
fd.write(key_data)

def add_to_known_hosts(self, known_hosts_file, hostname, port=DEFAULT_SSH_PORT):
Expand All @@ -112,7 +112,7 @@ def check_known_host(self, known_hosts_file, hostname):
"""
if os.path.exists(known_hosts_file):
# Check if host exists in file
with open(known_hosts_file) as fd:
with open(known_hosts_file, encoding="utf-8") as fd:
lines = fd.readlines()

for line in lines:
Expand All @@ -126,7 +126,7 @@ def get_fingerprint_from_key_data(self, key_data):
Get host SSH fingerprint
"""
tmpfile = tempfile.mktemp(prefix="fc-ssh-keydata")
with open(tmpfile, "w") as fd:
with open(tmpfile, "w", encoding="utf-8") as fd:
fd.write(key_data)

prog = subprocess.Popen(
Expand Down
4 changes: 2 additions & 2 deletions logger/fleet_commander_logger.py
Expand Up @@ -819,7 +819,7 @@ def load_policy_map(self):
dirpath, "fleet-commander-logger/fc-chromium-policies.json"
)
try:
with open(filepath) as fd:
with open(filepath, encoding="utf-8") as fd:
contents = fd.read()
policy_map = json.loads(contents)

Expand Down Expand Up @@ -1177,7 +1177,7 @@ def _preferences_file_updated(self, monitor, fileobj, otherfile, eventType):
if fileobj.query_exists(None):
logger.debug("Preference file %s exists. Loading it", path)
# data = fileobj.load_contents(None)[1]
with open(path) as fd:
with open(path, encoding="utf-8") as fd:
data = fd.read()

logger.debug("Preference file %s Loaded. Loading preferences.", path)
Expand Down
2 changes: 1 addition & 1 deletion tests/_fcdbus_tests.py
Expand Up @@ -149,7 +149,7 @@ def get_data_from_file(self, path):
"""
Reads JSON file contents
"""
with open(path) as fd:
with open(path, encoding="utf-8") as fd:
data = fd.read()

return json.loads(data)
Expand Down
2 changes: 1 addition & 1 deletion tests/directorymock.py
Expand Up @@ -47,7 +47,7 @@ def save_to_datadir(self, filename="directorymock-data.json"):
if self.datadir is not None:
path = os.path.join(self.datadir, filename)
logging.debug("Directory mock exporting data to %s", path)
with open(path, "w") as fd:
with open(path, "w", encoding="utf-8") as fd:
fd.write(self.get_json())

logging.debug("Directory mock data saved to %s", path)
Expand Down
2 changes: 1 addition & 1 deletion tests/freeipamock.py
Expand Up @@ -68,7 +68,7 @@ def save_to_datadir(self, filename="freeipamock-data.json"):
if self.datadir is not None:
path = os.path.join(self.datadir, filename)
logger.debug("IPAMock exporting data to %s", path)
with open(path, "w") as fd:
with open(path, "w", encoding="utf-8") as fd:
fd.write(self.get_json())

logger.debug("FreeIPA mock data saved to %s", path)
Expand Down
2 changes: 1 addition & 1 deletion tests/smbmock.py
Expand Up @@ -78,7 +78,7 @@ def set_acl(self, duri, fssd, sio):
"fssd": fssd.as_sddl(),
}
)
with open(aclpath, "w") as fd:
with open(aclpath, "w", encoding="utf-8") as fd:
fd.write(acldata)

def deltree(self, duri):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fcdbus_service.py
Expand Up @@ -78,7 +78,7 @@ def __init__(self, data_path, username, hostname, mode):

self.public_key_file = os.path.join(self.data_dir, "id_rsa.pub")

with open(self.public_key_file, "w") as fd:
with open(self.public_key_file, "w", encoding="utf-8") as fd:
fd.write("PUBLIC_KEY")

self.session_params = namedtuple(
Expand Down
26 changes: 13 additions & 13 deletions tests/test_libvirt_controller.py
Expand Up @@ -178,7 +178,7 @@ def test_video_driver_virtio(self):

# Check SSH command
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -208,7 +208,7 @@ def test_video_driver_qxl(self):

# Check SSH command
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_session_stop(self):

# Test SSH tunnel close
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand All @@ -270,7 +270,7 @@ def test_remote_user_runtimedir(self):
self.assertEqual(remote_runtimedir, "/run/user/1001")

self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -323,7 +323,7 @@ def test_libvirtd_socket(self):

# Check SSH command
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -400,7 +400,7 @@ def test_session_start(self):

# Test SSH tunnel opening
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -469,7 +469,7 @@ def test_session_start_debug(self):

# Test SSH tunnel opening
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

remote_socket_logger = os.path.join(
Expand Down Expand Up @@ -553,7 +553,7 @@ def test_session_start(self):

# Test SSH tunnel opening
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -630,7 +630,7 @@ def test_session_start_debug(self):

# Test SSH tunnel opening
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

remote_socket_logger = os.path.join(
Expand Down Expand Up @@ -668,7 +668,7 @@ def test_ca_cert(self):
self.assertEqual(ca_cert, "FAKE_CA_CERT")

self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -697,7 +697,7 @@ def test_spice_cert_subject(self):
self.assertEqual(spice_cert_subject, "CN=localhost")

self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -769,7 +769,7 @@ def test_session_start(self):

# Test SSH tunnel opening
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -844,7 +844,7 @@ def test_session_start_debug(self):

# Test SSH tunnel opening
self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
command = fd.read().strip()

remote_socket_logger = os.path.join(
Expand Down
26 changes: 17 additions & 9 deletions tests/test_logger_chromium.py
Expand Up @@ -367,16 +367,24 @@ def setup_test_directory(self, sessions=[], profinit=True, prefsinit=True):
# Create local state file
local_state_data = DEFAULT_LOCAL_STATE_DATA
local_state_data["profile"]["last_active_profiles"] = sessions
with open(os.path.join(TMPDIR, "Local State"), "w") as fd:
with open(os.path.join(TMPDIR, "Local State"), "w", encoding="utf-8") as fd:
fd.write(json.dumps(local_state_data, sort_keys=True))
with open(os.path.join(TMPDIR, "Profile 1/Preferences"), "w") as fd:
with open(
os.path.join(TMPDIR, "Profile 1/Preferences"), "w", encoding="utf-8"
) as fd:
fd.write(json.dumps(profile1_prefs, sort_keys=True))
with open(os.path.join(TMPDIR, "Profile 2/Preferences"), "w") as fd:
with open(
os.path.join(TMPDIR, "Profile 2/Preferences"), "w", encoding="utf-8"
) as fd:
fd.write(json.dumps(profile2_prefs, sort_keys=True))
# Bookmarks data
with open(os.path.join(TMPDIR, "Profile 1/Bookmarks"), "w") as fd:
with open(
os.path.join(TMPDIR, "Profile 1/Bookmarks"), "w", encoding="utf-8"
) as fd:
fd.write(json.dumps(DEFAULT_BOOKMARKS_DATA, sort_keys=True))
with open(os.path.join(TMPDIR, "Profile 2/Bookmarks"), "w") as fd:
with open(
os.path.join(TMPDIR, "Profile 2/Bookmarks"), "w", encoding="utf-8"
) as fd:
fd.write(json.dumps(DEFAULT_BOOKMARKS_DATA, sort_keys=True))

return TMPDIR
Expand Down Expand Up @@ -413,7 +421,7 @@ def simulate_filenotification(clogger):
# Add a new session to the Local State file
local_state_data = DEFAULT_LOCAL_STATE_DATA
local_state_data["profile"]["last_active_profiles"] = ["Profile 1"]
with open(os.path.join(TMPDIR, "Local State"), "w") as fd:
with open(os.path.join(TMPDIR, "Local State"), "w", encoding="utf-8") as fd:
fd.write(json.dumps(local_state_data, sort_keys=True))

# Simulate a local state file modification
Expand All @@ -426,7 +434,7 @@ def simulate_filenotification(clogger):

# Add a new session to the Local State file
local_state_data["profile"]["last_active_profiles"] = ["Profile 1", "Profile 2"]
with open(os.path.join(TMPDIR, "Local State"), "w") as fd:
with open(os.path.join(TMPDIR, "Local State"), "w", encoding="utf-8") as fd:
fd.write(json.dumps(local_state_data, sort_keys=True))

# Simulate a local state file modification
Expand Down Expand Up @@ -516,7 +524,7 @@ def test_07_preferences_monitoring(self):
# Helper method to write prefs and simulate file modified notification
def write_prefs(clogger, prefs, path):
# Write a new supported setting to the preferences file 1
with open(path, "w") as fd:
with open(path, "w", encoding="utf-8") as fd:
fd.write(json.dumps(prefs, sort_keys=True))

# Simulate a change in preferences file 1
Expand Down Expand Up @@ -581,7 +589,7 @@ def test_08_bookmarks_monitoring(self):
# Helper method to write bookmarks and simulate a file modified notification
def write_bmarks(clogger, bmarks, path):
# Write a new supported setting to the preferences file 1
with open(path, "w") as fd:
with open(path, "w", encoding="utf-8") as fd:
fd.write(json.dumps(bmarks, sort_keys=True))

# Simulate a change in preferences file 1
Expand Down
2 changes: 1 addition & 1 deletion tests/test_logger_connmgr.py
Expand Up @@ -48,7 +48,7 @@


def read_file(filename):
with open(filename) as fd:
with open(filename, encoding="utf-8") as fd:
return fd.read()


Expand Down
2 changes: 1 addition & 1 deletion tests/test_logger_firefox.py
Expand Up @@ -138,7 +138,7 @@ def setUp(self):
pass

def file_set_contents(self, filename, contents):
with open(filename, "w") as fd:
with open(filename, "w", encoding="utf-8") as fd:
fd.write(contents)

def setup_test_directory(self, profinit=True, prefsinit=True):
Expand Down
16 changes: 8 additions & 8 deletions tests/test_sshcontroller.py
Expand Up @@ -79,7 +79,7 @@ def test_00_ssh_keypair_generation(self):
ssh.generate_ssh_keypair(self.private_key_file)
# Check parameters
self.assertTrue(os.path.exists(self.ssh_keygen_parms_file))
with open(self.ssh_keygen_parms_file) as fd:
with open(self.ssh_keygen_parms_file, encoding="utf-8") as fd:
parms = fd.read()

self.assertEqual(parms, self.SSH_KEYGEN_PARMS % self.private_key_file)
Expand All @@ -94,7 +94,7 @@ def test_01_scan_host_keys(self):
self.assertEqual(keys, self.SSH_KEYSCAN_OUTPUT % hostname)
# Check parameters
self.assertTrue(os.path.exists(self.ssh_keyscan_parms_file))
with open(self.ssh_keyscan_parms_file) as fd:
with open(self.ssh_keyscan_parms_file, encoding="utf-8") as fd:
parms = fd.read()

self.assertEqual(parms, self.SSH_KEYSCAN_PARMS % (port, hostname))
Expand All @@ -108,14 +108,14 @@ def test_02_add_known_host(self):
ssh.add_to_known_hosts(self.known_hosts_file, hostname, port)
# Check known hosts file exists
self.assertTrue(os.path.exists(self.known_hosts_file))
with open(self.known_hosts_file) as fd:
with open(self.known_hosts_file, encoding="utf-8") as fd:
keys = fd.read()

# Check keys data
self.assertEqual(keys, self.SSH_KEYSCAN_OUTPUT % hostname)
# Add another host
ssh.add_to_known_hosts(self.known_hosts_file, hostname2, port2)
with open(self.known_hosts_file) as fd:
with open(self.known_hosts_file, encoding="utf-8") as fd:
keys = fd.read()

# Check keys data
Expand All @@ -134,7 +134,7 @@ def test_03_check_known_host(self):
result = ssh.check_known_host(self.known_hosts_file, hostname)
self.assertFalse(result)
# Check empty known hosts file
open(self.known_hosts_file, "w").close()
open(self.known_hosts_file, "wb").close()
self.assertTrue(os.path.exists(self.known_hosts_file))
result = ssh.check_known_host(self.known_hosts_file, hostname)
self.assertFalse(result)
Expand Down Expand Up @@ -178,7 +178,7 @@ def test_06_execute_remote_command(self):
)

self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
parms = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_07_open_tunnel(self):
)

self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file, "r") as fd:
with open(self.ssh_parms_file, "r", encoding="utf-8") as fd:
parms = fd.read().strip()

self.assertEqual(
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_08_close_tunnel(self):
)

self.assertTrue(os.path.exists(self.ssh_parms_file))
with open(self.ssh_parms_file) as fd:
with open(self.ssh_parms_file, encoding="utf-8") as fd:
parms = fd.read().strip()

self.assertEqual(
Expand Down
2 changes: 1 addition & 1 deletion tools/chrome-prefs-extract.py
Expand Up @@ -27,7 +27,7 @@
import sys
import json

with open(sys.argv[1], "r") as f:
with open(sys.argv[1], "r", encoding="utf-8") as f:
data = json.loads(f.read())

outdata = {}
Expand Down

0 comments on commit 0890be3

Please sign in to comment.