# Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, dis- # tribute, sublicense, and/or sell copies of the Software, and to permit # persons to whom the Software is furnished to do so, subject to the fol- # lowing conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # from tests.unit import unittest from boto.sqs.message import MHMessage from boto.sqs.message import RawMessage from boto.sqs.message import Message from boto.sqs.bigmessage import BigMessage from boto.exception import SQSDecodeError from nose.plugins.attrib import attr class TestMHMessage(unittest.TestCase): @attr(sqs=True) def test_contains(self): msg = MHMessage() msg.update({'hello': 'world'}) self.assertTrue('hello' in msg) class DecodeExceptionRaisingMessage(RawMessage): @attr(sqs=True) def decode(self, message): raise SQSDecodeError('Sample decode error', self) class TestEncodeMessage(unittest.TestCase): @attr(sqs=True) def test_message_id_available(self): import xml.sax from boto.resultset import ResultSet from boto.handler import XmlHandler sample_value = 'abcdef' body = """ %s %s %s """ % tuple([sample_value] * 3) rs = ResultSet([('Message', DecodeExceptionRaisingMessage)]) h = XmlHandler(rs, None) with self.assertRaises(SQSDecodeError) as context: xml.sax.parseString(body.encode('utf-8'), h) message = context.exception.message self.assertEquals(message.id, sample_value) self.assertEquals(message.receipt_handle, sample_value) @attr(sqs=True) def test_encode_bytes_message(self): message = Message() body = b'\x00\x01\x02\x03\x04\x05' message.set_body(body) self.assertEqual(message.get_body_encoded(), 'AAECAwQF') @attr(sqs=True) def test_encode_string_message(self): message = Message() body = 'hello world' message.set_body(body) self.assertEqual(message.get_body_encoded(), 'aGVsbG8gd29ybGQ=') class TestBigMessage(unittest.TestCase): @attr(sqs=True) def test_s3url_parsing(self): msg = BigMessage() # Try just a bucket name bucket, key = msg._get_bucket_key('s3://foo') self.assertEquals(bucket, 'foo') self.assertEquals(key, None) # Try just a bucket name with trailing "/" bucket, key = msg._get_bucket_key('s3://foo/') self.assertEquals(bucket, 'foo') self.assertEquals(key, None) # Try a bucket and a key bucket, key = msg._get_bucket_key('s3://foo/bar') self.assertEquals(bucket, 'foo') self.assertEquals(key, 'bar') # Try a bucket and a key with "/" bucket, key = msg._get_bucket_key('s3://foo/bar/fie/baz') self.assertEquals(bucket, 'foo') self.assertEquals(key, 'bar/fie/baz') # Try it with no s3:// prefix with self.assertRaises(SQSDecodeError) as context: bucket, key = msg._get_bucket_key('foo/bar') if __name__ == '__main__': unittest.main()