-
Notifications
You must be signed in to change notification settings - Fork 0
/
iohelper.py
64 lines (49 loc) · 1.45 KB
/
iohelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from StringIO import StringIO
import gzip
import zlib
linebr = '\r\n'
# To read all line until hitting an empty one
def readheader(reader):
global linebr
header = ''
line = reader.readline()
while line != '':
header += (line + linebr)
line = reader.readline()
return header
# Read chunked payload (Note all ICAP payload use chunked encoding)
def readpayload(reader):
payload = ''
numofbytes = int(reader.readline().split()[0], 16)
while numofbytes > 0:
payload += reader.readbytes(numofbytes)
reader.readline() # Empty line expected after each chunk
numofbytes = int(reader.readline(), 16)
return payload
# Wrap raw payload into a chunk followed by an 0-length chunk indicating the end of payload
def wrappayload(payload):
global linebr
response = (hex(len(payload))[2:] + linebr)
response += (payload + linebr)
if len(payload) > 0:
response += ('0' + linebr + linebr)
return response
# HTTP GZip compress
def GzipCompress(string):
strio = StringIO()
compressor = gzip.GzipFile(fileobj=strio, mode='wb', compresslevel=9)
compressor.write(string)
compressor.close()
return strio.getvalue()
# HTTP GZip Decompress
def GzipDecompress(string):
return zlib.decompress(string, 16 + zlib.MAX_WBITS)
# HTTP inflation of Deflate
def inflate(string):
try:
return zlib.decompress(string, -zlib.MAX_WBITS)
except zlib.error:
return zlib.decompress(string)
# HTTP deflation of Deflate
def deflate(string):
return zlib.compress(string)[2:-4]