# Licensed under a 3-clause BSD style license - see LICENSE.rst
# LOCAL
from astropy.utils.xml.iterparser import _fast_iterparse
# SYSTEM
import io
import zlib
# The C-based XML parser for VOTables previously used fixed-sized
# buffers (allocated at __init__() time). This test will
# only pass with the patch that allows a dynamic realloc() of
# the queue. This addresses the bugs:
#
# - "RuntimeError: XML queue overflow"
# https://github.com/astropy/astropy/issues/5824
# (Kudos to Stefan Becker---ARI/ZAH Heidelberg)
#
# - "iterparse.c: add queue_realloc() + move 'buffersize / 2' logic there"
# https://github.com/astropy/astropy/issues/5869
#
# This test code can emulate a combination of network buffering and
# gzip decompression---with different request sizes, it can be used to
# demonstrate both under-reading and over-reading.
#
# Using the 512-tag VOTABLE XML sample input, and various combinations
# of minimum/maximum fetch sizes, the following situations can be
# generated:
#
# maximum_fetch = 1 (ValueError, no element found) still within gzip headers
# maximum_fetch = 80 (ValueError, unclosed token) short read
# maximum_fetch =217 passes, because decompressed_length > requested
# && <512 tags in a single parse
# maximum_fetch =218 (RuntimeError, XML queue overflow)
#
# The test provided here covers the over-reading identified in #5824
# (equivalent to the 217).
# Firstly, assemble a minimal VOTABLE header, table contents and footer.
# This is done in textual form, as the aim is to only test the parser, not
# the outputter!
HEADER = """
"""
ROW = """0 |
"""
FOOTER = """
"""
# minimum passable buffer size => 1024
# 1024 / 2 => 512 tags for overflow
# 512 - 7 tags in header, - 5 tags in footer = 500 tags required for overflow
# 500 / 4 tags (
|
) per row == 125 rows required for overflow
VOTABLE_XML = HEADER + 125*ROW + FOOTER
# UngzipFileWrapper() wraps an existing file-like Object,
# decompressing the content and returning the plaintext.
# This therefore emulates the behavior of the Python 'requests'
# library when transparently decompressing Gzip HTTP responses.
#
# The critical behavior is that---because of the
# decompression---read() can return considerably more
# bytes than were requested! (But, read() can also return less).
#
# inspiration:
# http://stackoverflow.com/questions/4013843/how-to-wrap-file-object-read-and-write-operation-which-are-readonly
class UngzipFileWrapper:
def __init__(self, fd, **kwargs):
self._file = fd
self._z = zlib.decompressobj(16 + zlib.MAX_WBITS)
def read(self, requested_length):
# emulate network buffering dynamics by clamping the read size
clamped_length = max(1, min(1 << 24, requested_length))
compressed = self._file.read(clamped_length)
plaintext = self._z.decompress(compressed)
# Only for real local files---just for the testcase
if len(compressed) == 0:
self.close()
return plaintext
def __getattr__(self, attr):
return getattr(self._file, attr)
# test_iterparser_over_read_simple() is a very cut down test,
# of the original more flexible test-case, but without external
# dependencies. The plaintext is compressed and then decompressed
# to provide a better emulation of the original situation where
# the bug was observed.
#
# If a dependency upon 'zlib' is not desired, it would be possible to
# simplify this testcase by replacing the compress/decompress with a
# read() method emulation that always returned more from a buffer
# that was requested.
def test_iterparser_over_read_simple():
# Take the plaintext of 512 tags, and compression it with a
# Gzip-style header (+16), to most closely emulate the behavior
# of most HTTP servers.
zlib_GZIP_STYLE_HEADER = 16
compo = zlib.compressobj(zlib.Z_BEST_COMPRESSION,
zlib.DEFLATED,
zlib.MAX_WBITS + zlib_GZIP_STYLE_HEADER)
# Bytes vs. String .encode()/.decode() for compatibility with Python 3.5.
s = compo.compress(VOTABLE_XML.encode())
s = s + compo.flush()
fd = io.BytesIO(s)
fd.seek(0)
# Finally setup the test of the C-based '_fast_iterparse()' iterator
# and a situation in which it can be called a-la the VOTable Parser.
MINIMUM_REQUESTABLE_BUFFER_SIZE = 1024
uncompressed_fd = UngzipFileWrapper(fd)
iterable = _fast_iterparse(uncompressed_fd.read,
MINIMUM_REQUESTABLE_BUFFER_SIZE)
list(iterable)