Skip to content

Commit

Permalink
Merge pull request #949 from NewRedo/reuse-python-handler-processes
Browse files Browse the repository at this point in the history
Reuse Python handler processes for performance.
  • Loading branch information
dherault committed May 5, 2020
2 parents b4b1418 + 2551156 commit 79c05b5
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 77 deletions.
5 changes: 2 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -152,6 +152,7 @@
"chalk": "^3.0.0",
"cuid": "^2.1.8",
"execa": "^4.0.0",
"extend": "^3.0.2",
"fs-extra": "^8.1.0",
"js-string-escape": "^1.0.1",
"jsonpath-plus": "^3.0.0",
Expand Down
3 changes: 3 additions & 0 deletions src/config/commandOptions.js
Expand Up @@ -72,4 +72,7 @@ export default {
useDocker: {
usage: 'Uses docker for node/python/ruby',
},
functionCleanupIdleTimeSeconds: {
usage: 'Number of seconds until an idle function is eligible for cleanup',
},
}
1 change: 1 addition & 0 deletions src/config/defaultOptions.js
Expand Up @@ -22,4 +22,5 @@ export default {
useWorkerThreads: false,
websocketPort: 3001,
useDocker: false,
functionCleanupIdleTimeSeconds: 60,
}
9 changes: 6 additions & 3 deletions src/lambda/LambdaFunctionPool.js
Expand Up @@ -24,8 +24,11 @@ export default class LambdaFunctionPool {
const { idleTimeInMinutes, status } = lambdaFunction
// console.log(idleTimeInMinutes, status)

// 45 // TODO config, or maybe option?
if (status === 'IDLE' && idleTimeInMinutes >= 1) {
if (
status === 'IDLE' &&
idleTimeInMinutes >=
this.#options.functionCleanupIdleTimeSeconds / 60
) {
// console.log(`removed Lambda Function ${lambdaFunction.functionName}`)
lambdaFunction.cleanup()
lambdaFunctions.delete(lambdaFunction)
Expand All @@ -35,7 +38,7 @@ export default class LambdaFunctionPool {

// schedule new timer
this._startCleanTimer()
}, 10000) // TODO: config, or maybe option?
}, (this.#options.functionCleanupIdleTimeSeconds * 1000) / 2)
}

_cleanupPool() {
Expand Down
129 changes: 67 additions & 62 deletions src/lambda/handler-runner/python-runner/PythonRunner.js
@@ -1,6 +1,8 @@
import { EOL, platform } from 'os'
import { delimiter, join, relative, resolve } from 'path'
import execa from 'execa'
import { spawn } from 'child_process'
import extend from 'extend'
import readline from 'readline'

const { parse, stringify } = JSON
const { cwd } = process
Expand All @@ -18,12 +20,42 @@ export default class PythonRunner {
this.#env = env
this.#handlerName = handlerName
this.#handlerPath = handlerPath
this.#runtime = runtime
this.#runtime = platform() === 'win32' ? 'python.exe' : runtime

if (process.env.VIRTUAL_ENV) {
const runtimeDir = platform() === 'win32' ? 'Scripts' : 'bin'
process.env.PATH = [
join(process.env.VIRTUAL_ENV, runtimeDir),
delimiter,
process.env.PATH,
].join('')
}

const [pythonExecutable] = this.#runtime.split('.')

this.handlerProcess = spawn(
pythonExecutable,
[
'-u',
resolve(__dirname, 'invoke.py'),
relative(cwd(), this.#handlerPath),
this.#handlerName,
],
{
env: extend(process.env, this.#env),
shell: true,
},
)

this.handlerProcess.stdout.readline = readline.createInterface({
input: this.handlerProcess.stdout,
})
}

// no-op
// () => void
cleanup() {}
cleanup() {
this.handlerProcess.kill()
}

_parsePayload(value) {
let payload
Expand Down Expand Up @@ -57,68 +89,41 @@ export default class PythonRunner {

// invokeLocalPython, loosely based on:
// https://github.com/serverless/serverless/blob/v1.50.0/lib/plugins/aws/invokeLocal/index.js#L410
// invoke.py, copy/pasted entirely as is:
// invoke.py, based on:
// https://github.com/serverless/serverless/blob/v1.50.0/lib/plugins/aws/invokeLocal/invoke.py
async run(event, context) {
const runtime = platform() === 'win32' ? 'python.exe' : this.#runtime

const input = stringify({
context,
event,
})

if (process.env.VIRTUAL_ENV) {
const runtimeDir = platform() === 'win32' ? 'Scripts' : 'bin'
process.env.PATH = [
join(process.env.VIRTUAL_ENV, runtimeDir),
delimiter,
process.env.PATH,
].join('')
}

const [pythonExecutable] = runtime.split('.')

const python = execa(
pythonExecutable,
[
'-u',
resolve(__dirname, 'invoke.py'),
relative(cwd(), this.#handlerPath),
this.#handlerName,
],
{
env: this.#env,
input,
// shell: true,
},
)

let result

try {
result = await python
} catch (err) {
// TODO
console.log(err)

throw err
}

const { stderr, stdout } = result
return new Promise((accept, reject) => {
const input = stringify({
context,
event,
})

const onErr = (data) => {
// TODO
console.log(data.toString())
}

if (stderr) {
// TODO
console.log(stderr)
}
const onLine = (line) => {
try {
const parsed = this._parsePayload(line.toString())
if (parsed) {
this.handlerProcess.stdout.readline.removeListener('line', onLine)
this.handlerProcess.stderr.removeListener('data', onErr)
return accept(parsed)
}
return null
} catch (err) {
return reject(err)
}
}

try {
return this._parsePayload(stdout)
} catch (err) {
// TODO
console.log('No JSON')
this.handlerProcess.stdout.readline.on('line', onLine)
this.handlerProcess.stderr.on('data', onErr)

// TODO return or re-throw?
return err
}
process.nextTick(() => {
this.handlerProcess.stdin.write(input)
this.handlerProcess.stdin.write('\n')
})
})
}
}
26 changes: 17 additions & 9 deletions src/lambda/handler-runner/python-runner/invoke.py
Expand Up @@ -75,23 +75,31 @@ def log(self):
module = import_module(args.handler_path.replace('/', '.'))
handler = getattr(module, args.handler_name)

input = json.load(sys.stdin)
# Keep a reference to the original stdin so that we can continue to receive
# input from the parent process.
stdin = sys.stdin

if sys.platform != 'win32':
try:
if sys.platform != 'darwin':
subprocess.check_call('tty', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except (OSError, subprocess.CalledProcessError):
pass
else:
# Replace stdin with a TTY to enable pdb usage.
sys.stdin = open('/dev/tty')

context = FakeLambdaContext(**input.get('context', {}))
result = handler(input['event'], context)
while True:
input = json.loads(stdin.readline())

context = FakeLambdaContext(**input.get('context', {}))
result = handler(input['event'], context)

data = {
# just an identifier to distinguish between
# interesting data (result) and stdout/print
'__offline_payload__': result
}
data = {
# just an identifier to distinguish between
# interesting data (result) and stdout/print
'__offline_payload__': result
}

sys.stdout.write(json.dumps(data))
sys.stdout.write(json.dumps(data))
sys.stdout.write('\n')

0 comments on commit 79c05b5

Please sign in to comment.