# Copyright (c) 2010 Jeremy Thurgood # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, dis- # tribute, sublicense, and/or sell copies of the Software, and to permit # persons to whom the Software is furnished to do so, subject to the fol- # lowing conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. # NOTE: These tests only cover the very simple cases I needed to test # for the InstanceGroup fix. import xml.sax from boto import handler from boto.emr import emrobject from boto.resultset import ResultSet from tests.compat import unittest JOB_FLOW_EXAMPLE = b""" 2009-01-28T21:49:16Z 2009-01-28T21:49:16Z STARTING s3://elasticmapreduce/libs/hue/install-hue Install Hue true Hue MyJobFlowName mybucket/subdir/ 2009-01-28T21:49:16Z PENDING MyJarFile MyMailClass arg1 arg2 MyStepName CONTINUE j-3UN6WX5RRO2AG us-east-1a m1.small m1.small myec2keyname 4 true 9cea3229-ed85-11dd-9877-6fad448a8419 """ JOB_FLOW_COMPLETED = b""" 2010-10-21T01:00:25Z Steps completed 2010-10-21T01:03:59Z 2010-10-21T01:03:59Z COMPLETED 2010-10-21T01:44:18Z RealJobFlowName s3n://example.emrtest.scripts/jobflow_logs/ s3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar s3n://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch Setup Hadoop Debugging TERMINATE_JOB_FLOW 2010-10-21T01:00:25Z 2010-10-21T01:03:59Z COMPLETED 2010-10-21T01:04:22Z /home/hadoop/contrib/streaming/hadoop-0.20-streaming.jar -mapper s3://example.emrtest.scripts/81d8-5a9d3df4a86c-InitialMapper.py -reducer s3://example.emrtest.scripts/81d8-5a9d3df4a86c-InitialReducer.py -input s3://example.emrtest.data/raw/2010/10/20/* -input s3://example.emrtest.data/raw/2010/10/19/* -input s3://example.emrtest.data/raw/2010/10/18/* -input s3://example.emrtest.data/raw/2010/10/17/* -input s3://example.emrtest.data/raw/2010/10/16/* -input s3://example.emrtest.data/raw/2010/10/15/* -input s3://example.emrtest.data/raw/2010/10/14/* -output s3://example.emrtest.crunched/ testjob_Initial TERMINATE_JOB_FLOW 2010-10-21T01:00:25Z 2010-10-21T01:04:22Z COMPLETED 2010-10-21T01:36:18Z /home/hadoop/contrib/streaming/hadoop-0.20-streaming.jar -mapper s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step1Mapper.py -reducer s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step1Reducer.py -input s3://example.emrtest.crunched/* -output s3://example.emrtest.step1/ testjob_step1 TERMINATE_JOB_FLOW 2010-10-21T01:00:25Z 2010-10-21T01:36:18Z COMPLETED 2010-10-21T01:37:51Z /home/hadoop/contrib/streaming/hadoop-0.20-streaming.jar -mapper s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step2Mapper.py -reducer s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step2Reducer.py -input s3://example.emrtest.crunched/* -output s3://example.emrtest.step2/ testjob_step2 TERMINATE_JOB_FLOW 2010-10-21T01:00:25Z 2010-10-21T01:37:51Z COMPLETED 2010-10-21T01:39:32Z /home/hadoop/contrib/streaming/hadoop-0.20-streaming.jar -mapper s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step3Mapper.py -reducer s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step3Reducer.py -input s3://example.emrtest.step1/* -output s3://example.emrtest.step3/ testjob_step3 TERMINATE_JOB_FLOW 2010-10-21T01:00:25Z 2010-10-21T01:39:32Z COMPLETED 2010-10-21T01:41:22Z /home/hadoop/contrib/streaming/hadoop-0.20-streaming.jar -mapper s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step4Mapper.py -reducer s3://example.emrtest.scripts/81d8-5a9d3df4a86c-step4Reducer.py -input s3://example.emrtest.step1/* -output s3://example.emrtest.step4/ testjob_step4 TERMINATE_JOB_FLOW 2010-10-21T01:00:25Z 2010-10-21T01:41:22Z COMPLETED 2010-10-21T01:43:03Z j-3H3Q13JPFLU22 m1.large i-64c21609 us-east-1b 2010-10-21T01:00:25Z 0 2010-10-21T01:02:09Z 2010-10-21T01:03:03Z ENDED 2010-10-21T01:44:18Z 1 m1.large ON_DEMAND Job flow terminated MASTER ig-EVMHOZJ2SCO8 master 2010-10-21T01:00:25Z 0 2010-10-21T01:03:59Z 2010-10-21T01:03:59Z ENDED 2010-10-21T01:44:18Z 9 m1.large ON_DEMAND Job flow terminated CORE ig-YZHDYVITVHKB slave 40 0.20 m1.large ec2-184-72-153-139.compute-1.amazonaws.com myubersecurekey 10 false c31e701d-dcb4-11df-b5d9-337fc7fe4773 """ class TestEMRResponses(unittest.TestCase): def _parse_xml(self, body, markers): rs = ResultSet(markers) h = handler.XmlHandler(rs, None) xml.sax.parseString(body, h) return rs def _assert_fields(self, response, **fields): for field, expected in fields.items(): actual = getattr(response, field) self.assertEquals(expected, actual, "Field %s: %r != %r" % (field, expected, actual)) def test_JobFlows_example(self): [jobflow] = self._parse_xml(JOB_FLOW_EXAMPLE, [('member', emrobject.JobFlow)]) self._assert_fields(jobflow, creationdatetime='2009-01-28T21:49:16Z', startdatetime='2009-01-28T21:49:16Z', state='STARTING', instancecount='4', jobflowid='j-3UN6WX5RRO2AG', loguri='mybucket/subdir/', name='MyJobFlowName', availabilityzone='us-east-1a', slaveinstancetype='m1.small', masterinstancetype='m1.small', ec2keyname='myec2keyname', keepjobflowalivewhennosteps='true') def test_JobFlows_completed(self): [jobflow] = self._parse_xml(JOB_FLOW_COMPLETED, [('member', emrobject.JobFlow)]) self._assert_fields(jobflow, creationdatetime='2010-10-21T01:00:25Z', startdatetime='2010-10-21T01:03:59Z', enddatetime='2010-10-21T01:44:18Z', state='COMPLETED', instancecount='10', jobflowid='j-3H3Q13JPFLU22', loguri='s3n://example.emrtest.scripts/jobflow_logs/', name='RealJobFlowName', availabilityzone='us-east-1b', slaveinstancetype='m1.large', masterinstancetype='m1.large', ec2keyname='myubersecurekey', keepjobflowalivewhennosteps='false') self.assertEquals(6, len(jobflow.steps)) self.assertEquals(2, len(jobflow.instancegroups))