Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jupyter/jupyter_client
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 6.1.13
Choose a base ref
...
head repository: jupyter/jupyter_client
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 6.2.0
Choose a head ref
  • 6 commits
  • 11 files changed
  • 1 contributor

Commits on Apr 7, 2021

  1. Copy the full SHA
    e779382 View commit details
  2. Pin python>=3.6.1

    davidbrochart committed Apr 7, 2021
    Copy the full SHA
    018d8da View commit details
  3. Merge pull request #636 from davidbrochart/drop_py35

    Pin python>=3.6.1
    davidbrochart authored Apr 7, 2021
    Copy the full SHA
    aef30fe View commit details

Commits on Apr 9, 2021

  1. Fix qtconsole issues

    davidbrochart committed Apr 9, 2021
    Copy the full SHA
    9c0b4c0 View commit details

Commits on Apr 12, 2021

  1. Merge pull request #638 from davidbrochart/fix_threaded_client

    Fix qtconsole issues
    davidbrochart authored Apr 12, 2021
    Copy the full SHA
    c5aceba View commit details
  2. Bumped version to 6.2.0 for release

    davidbrochart committed Apr 12, 2021
    Copy the full SHA
    c16c0c9 View commit details
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ jobs:
pip install --upgrade --upgrade-strategy eager --pre -e .[test] pytest-cov codecov 'coverage<5'
pip freeze
- name: Check types
run: mypy jupyter_client/manager.py jupyter_client/multikernelmanager.py jupyter_client/client.py jupyter_client/blocking/client.py jupyter_client/asynchronous/client.py jupyter_client/channels.py jupyter_client/session.py jupyter_client/adapter.py jupyter_client/connect.py jupyter_client/consoleapp.py jupyter_client/jsonutil.py jupyter_client/kernelapp.py jupyter_client/launcher.py
run: mypy jupyter_client/manager.py jupyter_client/multikernelmanager.py jupyter_client/client.py jupyter_client/blocking/client.py jupyter_client/asynchronous/client.py jupyter_client/channels.py jupyter_client/session.py jupyter_client/adapter.py jupyter_client/connect.py jupyter_client/consoleapp.py jupyter_client/jsonutil.py jupyter_client/kernelapp.py jupyter_client/launcher.py jupyter_client/threaded.py
- name: Run the tests
run: py.test --cov jupyter_client -v jupyter_client
- name: Code coverage
10 changes: 8 additions & 2 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -4,14 +4,20 @@
Changes in Jupyter Client
=========================

6.1.13
======
6.2.0
=====
- Fix threaded client and fix qtconsole issues (:ghpull:`638`)
- Drop Python 3.5 and pin Python >= 3.6.1 (:ghpull:`636`)
- Use pre-commit for code style (:ghpull:`631`)
- Fix kernel client shutdown test (:ghpull:`629`)
- Add MultiKernelManager subclass tests (:ghpull:`627`)
- Add KernelManager subclass tests (:ghpull:`626`)
- Add type annotations, refactor sync/async (:ghpull:`623`)

6.1.13
======
- Yanked (PyPI) and marked as broken (conda)

6.1.12
======
- Shutdown request sequence has been modified to be more graceful, it now is
2 changes: 1 addition & 1 deletion jupyter_client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version_info = (6, 1, 13)
version_info = (6, 2, 0)
__version__ = ".".join(map(str, version_info))

protocol_version_info = (5, 3)
14 changes: 7 additions & 7 deletions jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
@@ -49,15 +49,15 @@ class AsyncKernelClient(KernelClient):
_recv_reply = KernelClient._async_recv_reply

# replies come on the shell channel
execute = reqrep(wrapped, KernelClient._execute)
history = reqrep(wrapped, KernelClient._history)
complete = reqrep(wrapped, KernelClient._complete)
inspect = reqrep(wrapped, KernelClient._inspect)
kernel_info = reqrep(wrapped, KernelClient._kernel_info)
comm_info = reqrep(wrapped, KernelClient._comm_info)
execute = reqrep(wrapped, KernelClient.execute)
history = reqrep(wrapped, KernelClient.history)
complete = reqrep(wrapped, KernelClient.complete)
inspect = reqrep(wrapped, KernelClient.inspect)
kernel_info = reqrep(wrapped, KernelClient.kernel_info)
comm_info = reqrep(wrapped, KernelClient.comm_info)

is_alive = KernelClient._async_is_alive
execute_interactive = KernelClient._async_execute_interactive

# replies come on the control channel
shutdown = reqrep(wrapped, KernelClient._shutdown, channel="control")
shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")
14 changes: 7 additions & 7 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
@@ -53,15 +53,15 @@ class BlockingKernelClient(KernelClient):
_recv_reply = run_sync(KernelClient._async_recv_reply)

# replies come on the shell channel
execute = reqrep(wrapped, KernelClient._execute)
history = reqrep(wrapped, KernelClient._history)
complete = reqrep(wrapped, KernelClient._complete)
inspect = reqrep(wrapped, KernelClient._inspect)
kernel_info = reqrep(wrapped, KernelClient._kernel_info)
comm_info = reqrep(wrapped, KernelClient._comm_info)
execute = reqrep(wrapped, KernelClient.execute)
history = reqrep(wrapped, KernelClient.history)
complete = reqrep(wrapped, KernelClient.complete)
inspect = reqrep(wrapped, KernelClient.inspect)
kernel_info = reqrep(wrapped, KernelClient.kernel_info)
comm_info = reqrep(wrapped, KernelClient.comm_info)

is_alive = run_sync(KernelClient._async_is_alive)
execute_interactive = run_sync(KernelClient._async_execute_interactive)

# replies come on the control channel
shutdown = reqrep(wrapped, KernelClient._shutdown, channel="control")
shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control")
3 changes: 2 additions & 1 deletion jupyter_client/channels.py
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ class HBChannel(Thread):

def __init__(
self,
context: zmq.asyncio.Context,
context: zmq.asyncio.Context = None,
session: t.Optional[Session] = None,
address: t.Union[t.Tuple[str, int], str] = "",
):
@@ -100,6 +100,7 @@ def _create_socket(self) -> None:
# close previous socket, before opening a new one
self.poller.unregister(self.socket)
self.socket.close()
assert self.context is not None
self.socket = self.context.socket(zmq.REQ)
self.socket.linger = 1000
self.socket.connect(self.address)
25 changes: 12 additions & 13 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
@@ -160,7 +160,7 @@ async def _async_wait_for_ready(self, timeout: t.Optional[float] = None) -> None

# Wait for kernel info reply on shell channel
while True:
self._kernel_info()
self.kernel_info()
try:
msg = await self.shell_channel.get_msg(timeout=1)
except Empty:
@@ -386,10 +386,9 @@ async def _async_is_alive(self) -> bool:
# We don't have access to the KernelManager,
# so we use the heartbeat.
return self._hb_channel.is_beating()
else:
# no heartbeat and not local, we can't tell if it's running,
# so naively return True
return True
# no heartbeat and not local, we can't tell if it's running,
# so naively return True
return True

async def _async_execute_interactive(
self,
@@ -463,7 +462,7 @@ async def _async_execute_interactive(
allow_stdin = self.allow_stdin
if allow_stdin and not self.stdin_channel.is_alive():
raise RuntimeError("stdin channel must be running to allow input")
msg_id = self._execute(
msg_id = self.execute(
code,
silent=silent,
store_history=store_history,
@@ -541,7 +540,7 @@ async def _async_execute_interactive(
return await self._async_recv_reply(msg_id, timeout=timeout)

# Methods to send specific messages on channels
def _execute(
def execute(
self,
code: str,
silent: bool = False,
@@ -608,7 +607,7 @@ def _execute(
self.shell_channel.send(msg)
return msg["header"]["msg_id"]

def _complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str:
def complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str:
"""Tab complete text in the kernel's namespace.
Parameters
@@ -631,7 +630,7 @@ def _complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str:
self.shell_channel.send(msg)
return msg["header"]["msg_id"]

def _inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level: int = 0) -> str:
def inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level: int = 0) -> str:
"""Get metadata information about an object in the kernel's namespace.
It is up to the kernel to determine the appropriate object to inspect.
@@ -662,7 +661,7 @@ def _inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level:
self.shell_channel.send(msg)
return msg["header"]["msg_id"]

def _history(
def history(
self,
raw: bool = True,
output: bool = False,
@@ -708,7 +707,7 @@ def _history(
self.shell_channel.send(msg)
return msg["header"]["msg_id"]

def _kernel_info(self) -> str:
def kernel_info(self) -> str:
"""Request kernel info
Returns
@@ -719,7 +718,7 @@ def _kernel_info(self) -> str:
self.shell_channel.send(msg)
return msg["header"]["msg_id"]

def _comm_info(self, target_name: t.Optional[str] = None) -> str:
def comm_info(self, target_name: t.Optional[str] = None) -> str:
"""Request comm info
Returns
@@ -760,7 +759,7 @@ def input(self, string: str) -> None:
msg = self.session.msg("input_reply", content)
self.stdin_channel.send(msg)

def _shutdown(self, restart: bool = False) -> str:
def shutdown(self, restart: bool = False) -> str:
"""Request an immediate kernel shutdown on the control channel.
Upon receipt of the (empty) reply, client code can safely assume that
6 changes: 3 additions & 3 deletions jupyter_client/ssh/tunnel.py
Original file line number Diff line number Diff line change
@@ -23,17 +23,17 @@

SSHException = paramiko.ssh_exception.SSHException
except ImportError:
paramiko = None
paramiko = None # type: ignore

class SSHException(Exception):
class SSHException(Exception): # type: ignore
pass


else:
from .forward import forward_tunnel

try:
import pexpect
import pexpect # type: ignore
except ImportError:
pexpect = None

96 changes: 67 additions & 29 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
""" Defines a KernelClient that provides thread-safe sockets with async callbacks on message
replies.
"""
import asyncio
import atexit
import errno
import sys
import time
from threading import Event
from threading import Thread

from traitlets import Instance
from typing import Any
from typing import Awaitable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import zmq
from traitlets import Instance # type: ignore
from traitlets import Type
from zmq import ZMQError
from zmq.eventloop import ioloop
from zmq.eventloop import zmqstream

from .session import Session
from jupyter_client import KernelClient
from jupyter_client.channels import HBChannel

@@ -22,6 +30,10 @@
# during garbage collection of threads at exit


async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]:
return await msg


class ThreadedZMQSocketChannel(object):
"""A ZMQ socket invoking a callback in the ioloop"""

@@ -31,7 +43,12 @@ class ThreadedZMQSocketChannel(object):
stream = None
_inspect = None

def __init__(self, socket, session, loop):
def __init__(
self,
socket: Optional[zmq.Socket],
session: Optional[Session],
loop: Optional[zmq.eventloop.ioloop.ZMQIOLoop],
) -> None:
"""Create a channel.
Parameters
@@ -55,29 +72,30 @@ def setup_stream():
self.stream.on_recv(self._handle_recv)
evt.set()

assert self.ioloop is not None
self.ioloop.add_callback(setup_stream)
evt.wait()

_is_alive = False

def is_alive(self):
def is_alive(self) -> bool:
return self._is_alive

def start(self):
def start(self) -> None:
self._is_alive = True

def stop(self):
def stop(self) -> None:
self._is_alive = False

def close(self):
def close(self) -> None:
if self.socket is not None:
try:
self.socket.close(linger=0)
except Exception:
pass
self.socket = None

def send(self, msg):
def send(self, msg: Dict[str, Any]) -> None:
"""Queue a message to be sent from the IOLoop's thread.
Parameters
@@ -91,21 +109,25 @@ def send(self, msg):
def thread_send():
self.session.send(self.stream, msg)

assert self.ioloop is not None
self.ioloop.add_callback(thread_send)

def _handle_recv(self, msg):
def _handle_recv(self, future_msg: Awaitable) -> None:
"""Callback for stream.on_recv.
Unpacks message, and calls handlers with it.
"""
ident, smsg = self.session.feed_identities(msg)
assert self.ioloop is not None
msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg))
assert self.session is not None
ident, smsg = self.session.feed_identities(msg_list)
msg = self.session.deserialize(smsg)
# let client inspect messages
if self._inspect:
self._inspect(msg)
self.call_handlers(msg)

def call_handlers(self, msg):
def call_handlers(self, msg: Dict[str, Any]) -> None:
"""This method is called in the ioloop thread when a message arrives.
Subclasses should override this method to handle incoming messages.
@@ -115,13 +137,13 @@ def call_handlers(self, msg):
"""
pass

def process_events(self):
def process_events(self) -> None:
"""Subclasses should override this with a method
processing any pending GUI events.
"""
pass

def flush(self, timeout=1.0):
def flush(self, timeout: float = 1.0) -> None:
"""Immediately processes all pending messages on this channel.
This is only used for the IOPub channel.
@@ -141,14 +163,16 @@ def flush(self, timeout=1.0):
# We do the IOLoop callback process twice to ensure that the IOLoop
# gets to perform at least one full poll.
stop_time = time.time() + timeout
assert self.ioloop is not None
for i in range(2):
self._flushed = False
self.ioloop.add_callback(self._flush)
while not self._flushed and time.time() < stop_time:
time.sleep(0.01)

def _flush(self):
def _flush(self) -> None:
"""Callback for :method:`self.flush`."""
assert self.stream is not None
self.stream.flush()
self._flushed = True

@@ -165,13 +189,13 @@ def __init__(self):

@staticmethod
@atexit.register
def _notice_exit():
def _notice_exit() -> None:
# Class definitions can be torn down during interpreter shutdown.
# We only need to set _exiting flag if this hasn't happened.
if IOLoopThread is not None:
IOLoopThread._exiting = True

def start(self):
def start(self) -> None:
"""Start the IOLoop thread
Don't return until self.ioloop is defined,
@@ -181,15 +205,12 @@ def start(self):
Thread.start(self)
self._start_event.wait()

def run(self):
def run(self) -> None:
"""Run my loop, ignoring EINTR events in the poller"""
if "asyncio" in sys.modules:
# tornado may be using asyncio,
# ensure an eventloop exists for this thread
import asyncio

asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.ioloop = ioloop.IOLoop()
self.ioloop._asyncio_event_loop = loop
# signal that self.ioloop is defined
self._start_event.set()
while True:
@@ -208,7 +229,7 @@ def run(self):
else:
break

def stop(self):
def stop(self) -> None:
"""Stop the channel's event loop and join its thread.
This calls :meth:`~threading.Thread.join` and returns when the thread
@@ -221,7 +242,7 @@ def stop(self):
self.close()
self.ioloop = None

def close(self):
def close(self) -> None:
if self.ioloop is not None:
try:
self.ioloop.close(all_fds=True)
@@ -238,7 +259,14 @@ def ioloop(self):

ioloop_thread = Instance(IOLoopThread, allow_none=True)

def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True):
def start_channels(
self,
shell: bool = True,
iopub: bool = True,
stdin: bool = True,
hb: bool = True,
control: bool = True,
) -> None:
self.ioloop_thread = IOLoopThread()
self.ioloop_thread.start()

@@ -247,13 +275,13 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=Tr

super().start_channels(shell, iopub, stdin, hb, control)

def _check_kernel_info_reply(self, msg):
def _check_kernel_info_reply(self, msg: Dict[str, Any]) -> None:
"""This is run in the ioloop thread when the kernel info reply is received"""
if msg["msg_type"] == "kernel_info_reply":
self._handle_kernel_info_reply(msg)
self.shell_channel._inspect = None

def stop_channels(self):
def stop_channels(self) -> None:
super().stop_channels()
if self.ioloop_thread.is_alive():
self.ioloop_thread.stop()
@@ -263,3 +291,13 @@ def stop_channels(self):
stdin_channel_class = Type(ThreadedZMQSocketChannel)
hb_channel_class = Type(HBChannel)
control_channel_class = Type(ThreadedZMQSocketChannel)

def is_alive(self) -> bool:
"""Is the kernel process still running?"""
if self._hb_channel is not None:
# We don't have access to the KernelManager,
# so we use the heartbeat.
return self._hb_channel.is_beating()
# no heartbeat and not local, we can't tell if it's running,
# so naively return True
return True
4 changes: 2 additions & 2 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
@@ -8,10 +8,10 @@
import os
import sys

import nest_asyncio
import nest_asyncio # type: ignore

if os.name == "nt" and sys.version_info >= (3, 7):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore


def run_sync(coro):
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -63,10 +63,10 @@ def run(self):
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
install_requires=[
'traitlets',
@@ -76,7 +76,7 @@ def run(self):
'tornado>=4.1',
'nest-asyncio>=1.5',
],
python_requires='>=3.5',
python_requires='>=3.6.1',
extras_require={
'test': [
'async_generator',