# # Copyright 2015 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Integration tests for uploading and downloading to GCS. These tests exercise most of the corner cases for upload/download of files in apitools, via GCS. There are no performance tests here yet. """ import json import os import unittest import six import apitools.base.py as apitools_base import storage _CLIENT = None def _GetClient(): global _CLIENT # pylint: disable=global-statement if _CLIENT is None: _CLIENT = storage.StorageV1() return _CLIENT class DownloadsTest(unittest.TestCase): _DEFAULT_BUCKET = 'apitools' _TESTDATA_PREFIX = 'testdata' def setUp(self): self.__client = _GetClient() self.__ResetDownload() def __ResetDownload(self, auto_transfer=False): self.__buffer = six.StringIO() self.__download = storage.Download.FromStream( self.__buffer, auto_transfer=auto_transfer) def __GetTestdataFileContents(self, filename): file_path = os.path.join( os.path.dirname(__file__), self._TESTDATA_PREFIX, filename) file_contents = open(file_path).read() self.assertIsNotNone( file_contents, msg=('Could not read file %s' % filename)) return file_contents @classmethod def __GetRequest(cls, filename): object_name = os.path.join(cls._TESTDATA_PREFIX, filename) return storage.StorageObjectsGetRequest( bucket=cls._DEFAULT_BUCKET, object=object_name) def __GetFile(self, request): response = self.__client.objects.Get(request, download=self.__download) self.assertIsNone(response, msg=( 'Unexpected nonempty response for file download: %s' % response)) def __GetAndStream(self, request): self.__GetFile(request) self.__download.StreamInChunks() def testZeroBytes(self): request = self.__GetRequest('zero_byte_file') self.__GetAndStream(request) self.assertEqual(0, self.__buffer.tell()) def testObjectDoesNotExist(self): self.__ResetDownload(auto_transfer=True) with self.assertRaises(apitools_base.HttpError): self.__GetFile(self.__GetRequest('nonexistent_file')) def testAutoTransfer(self): self.__ResetDownload(auto_transfer=True) self.__GetFile(self.__GetRequest('fifteen_byte_file')) file_contents = self.__GetTestdataFileContents('fifteen_byte_file') self.assertEqual(15, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents, self.__buffer.read()) def testFilenameWithSpaces(self): self.__ResetDownload(auto_transfer=True) self.__GetFile(self.__GetRequest('filename with spaces')) # NOTE(craigcitro): We add _ here to make this play nice with blaze. file_contents = self.__GetTestdataFileContents('filename_with_spaces') self.assertEqual(15, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents, self.__buffer.read()) def testGetRange(self): # TODO(craigcitro): Test about a thousand more corner cases. file_contents = self.__GetTestdataFileContents('fifteen_byte_file') self.__GetFile(self.__GetRequest('fifteen_byte_file')) self.__download.GetRange(5, 10) self.assertEqual(6, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents[5:11], self.__buffer.read()) def testGetRangeWithNegativeStart(self): file_contents = self.__GetTestdataFileContents('fifteen_byte_file') self.__GetFile(self.__GetRequest('fifteen_byte_file')) self.__download.GetRange(-3) self.assertEqual(3, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents[-3:], self.__buffer.read()) def testGetRangeWithPositiveStart(self): file_contents = self.__GetTestdataFileContents('fifteen_byte_file') self.__GetFile(self.__GetRequest('fifteen_byte_file')) self.__download.GetRange(2) self.assertEqual(13, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents[2:15], self.__buffer.read()) def testSmallChunksizes(self): file_contents = self.__GetTestdataFileContents('fifteen_byte_file') request = self.__GetRequest('fifteen_byte_file') for chunksize in (2, 3, 15, 100): self.__ResetDownload() self.__download.chunksize = chunksize self.__GetAndStream(request) self.assertEqual(15, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents, self.__buffer.read(15)) def testLargeFileChunksizes(self): request = self.__GetRequest('thirty_meg_file') for chunksize in (1048576, 40 * 1048576): self.__ResetDownload() self.__download.chunksize = chunksize self.__GetAndStream(request) self.__buffer.seek(0) def testAutoGzipObject(self): # TODO(craigcitro): Move this to a new object once we have a more # permanent one, see: http://b/12250275 request = storage.StorageObjectsGetRequest( bucket='ottenl-gzip', object='50K.txt') # First, try without auto-transfer. self.__GetFile(request) self.assertEqual(0, self.__buffer.tell()) self.__download.StreamInChunks() self.assertEqual(50000, self.__buffer.tell()) # Next, try with auto-transfer. self.__ResetDownload(auto_transfer=True) self.__GetFile(request) self.assertEqual(50000, self.__buffer.tell()) def testSmallGzipObject(self): request = self.__GetRequest('zero-gzipd.html') self.__GetFile(request) self.assertEqual(0, self.__buffer.tell()) additional_headers = {'accept-encoding': 'gzip, deflate'} self.__download.StreamInChunks(additional_headers=additional_headers) self.assertEqual(0, self.__buffer.tell()) def testSerializedDownload(self): def _ProgressCallback(unused_response, download_object): print 'Progress %s' % download_object.progress file_contents = self.__GetTestdataFileContents('fifteen_byte_file') object_name = os.path.join(self._TESTDATA_PREFIX, 'fifteen_byte_file') request = storage.StorageObjectsGetRequest( bucket=self._DEFAULT_BUCKET, object=object_name) response = self.__client.objects.Get(request) # pylint: disable=attribute-defined-outside-init self.__buffer = six.StringIO() download_data = json.dumps({ 'auto_transfer': False, 'progress': 0, 'total_size': response.size, 'url': response.mediaLink, }) self.__download = storage.Download.FromData( self.__buffer, download_data, http=self.__client.http) self.__download.StreamInChunks(callback=_ProgressCallback) self.assertEqual(15, self.__buffer.tell()) self.__buffer.seek(0) self.assertEqual(file_contents, self.__buffer.read(15)) if __name__ == '__main__': unittest.main()