# Copyright (c) 2013 Amazon.com, Inc. or its affiliates.
# All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import boto.utils
from datetime import datetime
from time import time
from tests.unit import AWSMockServiceTestCase
from boto.emr.connection import EmrConnection
from boto.emr.emrobject import BootstrapAction, BootstrapActionList, \
ClusterStateChangeReason, ClusterStatus, ClusterSummaryList, \
ClusterSummary, ClusterTimeline, InstanceInfo, \
InstanceList, InstanceGroupInfo, \
InstanceGroup, InstanceGroupList, JobFlow, \
JobFlowStepList, Step, StepSummaryList, \
Cluster, RunJobFlowResponse
# These tests are just checking the basic structure of
# the Elastic MapReduce code, by picking a few calls
# and verifying we get the expected results with mocked
# responses. The integration tests actually verify the
# API calls interact with the service correctly.
class TestListClusters(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
j-aaaaaaaaaaaa
Terminated by user request
USER_REQUEST
TERMINATED
2014-01-24T01:21:21Z
2014-01-24T01:25:26Z
2014-01-24T02:19:46Z
analytics test
10
j-aaaaaaaaaaaab
Terminated by user request
USER_REQUEST
TERMINATED
2014-01-21T02:53:08Z
2014-01-21T02:56:40Z
2014-01-21T03:40:22Z
test job
20
aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
"""
def test_list_clusters(self):
self.set_http_response(status_code=200)
response = self.service_connection.list_clusters()
self.assert_request_parameters({
'Action': 'ListClusters',
'Version': '2009-03-31',
})
self.assertTrue(isinstance(response, ClusterSummaryList))
self.assertEqual(len(response.clusters), 2)
self.assertTrue(isinstance(response.clusters[0], ClusterSummary))
self.assertEqual(response.clusters[0].name, 'analytics test')
self.assertEqual(response.clusters[0].normalizedinstancehours, '10')
self.assertTrue(isinstance(response.clusters[0].status, ClusterStatus))
self.assertEqual(response.clusters[0].status.state, 'TERMINATED')
self.assertTrue(isinstance(response.clusters[0].status.timeline, ClusterTimeline))
self.assertEqual(response.clusters[0].status.timeline.creationdatetime, '2014-01-24T01:21:21Z')
self.assertEqual(response.clusters[0].status.timeline.readydatetime, '2014-01-24T01:25:26Z')
self.assertEqual(response.clusters[0].status.timeline.enddatetime, '2014-01-24T02:19:46Z')
self.assertTrue(isinstance(response.clusters[0].status.statechangereason, ClusterStateChangeReason))
self.assertEqual(response.clusters[0].status.statechangereason.code, 'USER_REQUEST')
self.assertEqual(response.clusters[0].status.statechangereason.message, 'Terminated by user request')
def test_list_clusters_created_before(self):
self.set_http_response(status_code=200)
date = datetime.now()
response = self.service_connection.list_clusters(created_before=date)
self.assert_request_parameters({
'Action': 'ListClusters',
'CreatedBefore': date.strftime(boto.utils.ISO8601),
'Version': '2009-03-31'
})
def test_list_clusters_created_after(self):
self.set_http_response(status_code=200)
date = datetime.now()
response = self.service_connection.list_clusters(created_after=date)
self.assert_request_parameters({
'Action': 'ListClusters',
'CreatedAfter': date.strftime(boto.utils.ISO8601),
'Version': '2009-03-31'
})
def test_list_clusters_states(self):
self.set_http_response(status_code=200)
response = self.service_connection.list_clusters(cluster_states=[
'RUNNING',
'WAITING'
])
self.assert_request_parameters({
'Action': 'ListClusters',
'ClusterStates.member.1': 'RUNNING',
'ClusterStates.member.2': 'WAITING',
'Version': '2009-03-31'
})
class TestListInstanceGroups(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
ig-aaaaaaaaaaaaa
m1.large
ON_DEMAND
Job flow terminated
CLUSTER_TERMINATED
TERMINATED
2014-01-24T01:21:21Z
2014-01-24T01:25:08Z
2014-01-24T02:19:46Z
Master instance group
1
0
MASTER
ig-aaaaaaaaaaab
m1.large
ON_DEMAND
Job flow terminated
CLUSTER_TERMINATED
TERMINATED
2014-01-24T01:21:21Z
2014-01-24T01:25:26Z
2014-01-24T02:19:46Z
Core instance group
2
0
CORE
aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
"""
def test_list_instance_groups(self):
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.list_instance_groups()
response = self.service_connection.list_instance_groups(cluster_id='j-123')
self.assert_request_parameters({
'Action': 'ListInstanceGroups',
'ClusterId': 'j-123',
'Version': '2009-03-31'
})
self.assertTrue(isinstance(response, InstanceGroupList))
self.assertEqual(len(response.instancegroups), 2)
self.assertTrue(isinstance(response.instancegroups[0], InstanceGroupInfo))
self.assertEqual(response.instancegroups[0].id, 'ig-aaaaaaaaaaaaa')
self.assertEqual(response.instancegroups[0].instancegrouptype, "MASTER")
self.assertEqual(response.instancegroups[0].instancetype, "m1.large")
self.assertEqual(response.instancegroups[0].market, "ON_DEMAND")
self.assertEqual(response.instancegroups[0].name, "Master instance group")
self.assertEqual(response.instancegroups[0].requestedinstancecount, '1')
self.assertEqual(response.instancegroups[0].runninginstancecount, '0')
self.assertTrue(isinstance(response.instancegroups[0].status, ClusterStatus))
self.assertEqual(response.instancegroups[0].status.state, 'TERMINATED')
# status.statechangereason is not parsed into an object
#self.assertEqual(response.instancegroups[0].status.statechangereason.code, 'CLUSTER_TERMINATED')
class TestListInstances(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
ci-123456789abc
Cluster was terminated.
CLUSTER_TERMINATED
TERMINATED
2014-01-24T01:21:26Z
2014-01-24T01:25:25Z
2014-01-24T02:19:46Z
ip-10-0-0-60.us-west-1.compute.internal
54.0.0.1
ec2-54-0-0-1.us-west-1.compute.amazonaws.com
i-aaaaaaaa
10.0.0.60
ci-123456789abd
Cluster was terminated.
CLUSTER_TERMINATED
TERMINATED
2014-01-24T01:21:26Z
2014-01-24T01:25:25Z
2014-01-24T02:19:46Z
ip-10-0-0-61.us-west-1.compute.internal
54.0.0.2
ec2-54-0-0-2.us-west-1.compute.amazonaws.com
i-aaaaaaab
10.0.0.61
ci-123456789abe3
Cluster was terminated.
CLUSTER_TERMINATED
TERMINATED
2014-01-24T01:21:33Z
2014-01-24T01:25:08Z
2014-01-24T02:19:46Z
ip-10-0-0-62.us-west-1.compute.internal
54.0.0.3
ec2-54-0-0-3.us-west-1.compute.amazonaws.com
i-aaaaaaac
10.0.0.62
aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
"""
def test_list_instances(self):
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.list_instances()
response = self.service_connection.list_instances(cluster_id='j-123')
self.assertTrue(isinstance(response, InstanceList))
self.assertEqual(len(response.instances), 3)
self.assertTrue(isinstance(response.instances[0], InstanceInfo))
self.assertEqual(response.instances[0].ec2instanceid, 'i-aaaaaaaa')
self.assertEqual(response.instances[0].id, 'ci-123456789abc')
self.assertEqual(response.instances[0].privatednsname , 'ip-10-0-0-60.us-west-1.compute.internal')
self.assertEqual(response.instances[0].privateipaddress , '10.0.0.60')
self.assertEqual(response.instances[0].publicdnsname , 'ec2-54-0-0-1.us-west-1.compute.amazonaws.com')
self.assertEqual(response.instances[0].publicipaddress , '54.0.0.1')
self.assert_request_parameters({
'Action': 'ListInstances',
'ClusterId': 'j-123',
'Version': '2009-03-31'
})
def test_list_instances_with_group_id(self):
self.set_http_response(200)
response = self.service_connection.list_instances(
cluster_id='j-123', instance_group_id='abc')
self.assert_request_parameters({
'Action': 'ListInstances',
'ClusterId': 'j-123',
'InstanceGroupId': 'abc',
'Version': '2009-03-31'
})
def test_list_instances_with_types(self):
self.set_http_response(200)
response = self.service_connection.list_instances(
cluster_id='j-123', instance_group_types=[
'MASTER',
'TASK'
])
self.assert_request_parameters({
'Action': 'ListInstances',
'ClusterId': 'j-123',
'InstanceGroupTypes.member.1': 'MASTER',
'InstanceGroupTypes.member.2': 'TASK',
'Version': '2009-03-31'
})
class TestListSteps(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
abc123
2014-07-01T00:00:00.000Z
PENDING
Step 1
/home/hadoop/lib/emr-s3distcp-1.0.jar
--src
hdfs:///data/test/
--dest
s3n://test/data
CONTINUE
def456
2014-07-01T00:00:00.000Z
COMPLETED
Step 2
my.main.SomeClass
s3n://test/jars/foo.jar
CONTINUE
ghi789
2014-07-01T00:00:00.000Z
FAILED
Step 3
s3n://test/jars/bar.jar
-arg
value
TERMINATE_CLUSTER
eff31ee5-0342-11e4-b3c7-9de5a93f6fcb
"""
def test_list_steps(self):
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.list_steps()
response = self.service_connection.list_steps(cluster_id='j-123')
self.assert_request_parameters({
'Action': 'ListSteps',
'ClusterId': 'j-123',
'Version': '2009-03-31'
})
self.assertTrue(isinstance(response, StepSummaryList))
self.assertEqual(response.steps[0].name, 'Step 1')
valid_states = [
'PENDING',
'RUNNING',
'COMPLETED',
'CANCELLED',
'FAILED',
'INTERRUPTED'
]
# Check for step states
for step in response.steps:
self.assertIn(step.status.state, valid_states)
# Check for step config
step = response.steps[0]
self.assertEqual(step.config.jar,
'/home/hadoop/lib/emr-s3distcp-1.0.jar')
self.assertEqual(len(step.config.args), 4)
self.assertEqual(step.config.args[0].value, '--src')
self.assertEqual(step.config.args[1].value, 'hdfs:///data/test/')
step = response.steps[1]
self.assertEqual(step.config.mainclass, 'my.main.SomeClass')
def test_list_steps_with_states(self):
self.set_http_response(200)
response = self.service_connection.list_steps(
cluster_id='j-123', step_states=[
'COMPLETED',
'FAILED'
])
self.assert_request_parameters({
'Action': 'ListSteps',
'ClusterId': 'j-123',
'StepStates.member.1': 'COMPLETED',
'StepStates.member.2': 'FAILED',
'Version': '2009-03-31'
})
self.assertTrue(isinstance(response, StepSummaryList))
self.assertEqual(response.steps[0].name, 'Step 1')
class TestListBootstrapActions(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b""""""
def test_list_bootstrap_actions(self):
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.list_bootstrap_actions()
response = self.service_connection.list_bootstrap_actions(cluster_id='j-123')
self.assert_request_parameters({
'Action': 'ListBootstrapActions',
'ClusterId': 'j-123',
'Version': '2009-03-31'
})
class TestDescribeCluster(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
j-aaaaaaaaa
us-west-1c
my_secret_key
2.4.2
true
Terminated by user request
USER_REQUEST
TERMINATED
2014-01-24T01:21:21Z
2014-01-24T01:25:26Z
2014-01-24T02:19:46Z
false
test analytics
2.4.2
hadoop
1.0.3
false
ec2-184-0-0-1.us-west-1.compute.amazonaws.com
10
my-service-role
aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
"""
def test_describe_cluster(self):
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.describe_cluster()
response = self.service_connection.describe_cluster(cluster_id='j-123')
self.assertTrue(isinstance(response, Cluster))
self.assertEqual(response.id, 'j-aaaaaaaaa')
self.assertEqual(response.runningamiversion, '2.4.2')
self.assertEqual(response.visibletoallusers, 'true')
self.assertEqual(response.autoterminate, 'false')
self.assertEqual(response.name, 'test analytics')
self.assertEqual(response.requestedamiversion, '2.4.2')
self.assertEqual(response.terminationprotected, 'false')
self.assertEqual(response.ec2instanceattributes.ec2availabilityzone, "us-west-1c")
self.assertEqual(response.ec2instanceattributes.ec2keyname, 'my_secret_key')
self.assertEqual(response.status.state, 'TERMINATED')
self.assertEqual(response.applications[0].name, 'hadoop')
self.assertEqual(response.applications[0].version, '1.0.3')
self.assertEqual(response.masterpublicdnsname, 'ec2-184-0-0-1.us-west-1.compute.amazonaws.com')
self.assertEqual(response.normalizedinstancehours, '10')
self.assertEqual(response.servicerole, 'my-service-role')
self.assert_request_parameters({
'Action': 'DescribeCluster',
'ClusterId': 'j-123',
'Version': '2009-03-31'
})
class TestDescribeStep(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b""""""
def test_describe_step(self):
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.describe_step()
with self.assertRaises(TypeError):
self.service_connection.describe_step(cluster_id='j-123')
with self.assertRaises(TypeError):
self.service_connection.describe_step(step_id='abc')
response = self.service_connection.describe_step(
cluster_id='j-123', step_id='abc')
self.assert_request_parameters({
'Action': 'DescribeStep',
'ClusterId': 'j-123',
'StepId': 'abc',
'Version': '2009-03-31'
})
class TestAddJobFlowSteps(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
Foo
Bar
"""
def test_add_jobflow_steps(self):
self.set_http_response(200)
response = self.service_connection.add_jobflow_steps(
jobflow_id='j-123', steps=[])
# Make sure the correct object is returned, as this was
# previously set to incorrectly return an empty instance
# of RunJobFlowResponse.
self.assertTrue(isinstance(response, JobFlowStepList))
self.assertEqual(response.stepids[0].value, 'Foo')
self.assertEqual(response.stepids[1].value, 'Bar')
class TestBuildTagList(AWSMockServiceTestCase):
connection_class = EmrConnection
def test_key_without_value_encoding(self):
input_dict = {
'KeyWithNoValue': '',
'AnotherKeyWithNoValue': None
}
res = self.service_connection._build_tag_list(input_dict)
# Keys are outputted in ascending key order.
expected = {
'Tags.member.1.Key': 'AnotherKeyWithNoValue',
'Tags.member.2.Key': 'KeyWithNoValue'
}
self.assertEqual(expected, res)
def test_key_full_key_value_encoding(self):
input_dict = {
'FirstKey': 'One',
'SecondKey': 'Two'
}
res = self.service_connection._build_tag_list(input_dict)
# Keys are outputted in ascending key order.
expected = {
'Tags.member.1.Key': 'FirstKey',
'Tags.member.1.Value': 'One',
'Tags.member.2.Key': 'SecondKey',
'Tags.member.2.Value': 'Two'
}
self.assertEqual(expected, res)
class TestAddTag(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
88888888-8888-8888-8888-888888888888
"""
def test_add_mix_of_tags_with_without_values(self):
input_tags = {
'FirstKey': 'One',
'SecondKey': 'Two',
'ZzzNoValue': ''
}
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.add_tags()
with self.assertRaises(TypeError):
self.service_connection.add_tags('j-123')
with self.assertRaises(AssertionError):
self.service_connection.add_tags('j-123', [])
response = self.service_connection.add_tags('j-123', input_tags)
self.assertTrue(response)
self.assert_request_parameters({
'Action': 'AddTags',
'ResourceId': 'j-123',
'Tags.member.1.Key': 'FirstKey',
'Tags.member.1.Value': 'One',
'Tags.member.2.Key': 'SecondKey',
'Tags.member.2.Value': 'Two',
'Tags.member.3.Key': 'ZzzNoValue',
'Version': '2009-03-31'
})
class TestRemoveTag(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
88888888-8888-8888-8888-888888888888
"""
def test_remove_tags(self):
input_tags = {
'FirstKey': 'One',
'SecondKey': 'Two',
'ZzzNoValue': ''
}
self.set_http_response(200)
with self.assertRaises(TypeError):
self.service_connection.add_tags()
with self.assertRaises(TypeError):
self.service_connection.add_tags('j-123')
with self.assertRaises(AssertionError):
self.service_connection.add_tags('j-123', [])
response = self.service_connection.remove_tags('j-123', ['FirstKey', 'SecondKey'])
self.assertTrue(response)
self.assert_request_parameters({
'Action': 'RemoveTags',
'ResourceId': 'j-123',
'TagKeys.member.1': 'FirstKey',
'TagKeys.member.2': 'SecondKey',
'Version': '2009-03-31'
})
class DescribeJobFlowsTestBase(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
2.4.2
2014-01-24T01:21:21Z
Terminated by user request
2014-01-24T01:25:26Z
2014-01-24T01:25:26Z
TERMINATED
2014-01-24T02:19:46Z
true
test analytics
j-aaaaaa
2014-01-24T01:21:21Z
2014-01-24T01:25:26Z
COMPLETED
2014-01-24T01:26:08Z
s3://us-west-1.elasticmapreduce/libs/hive/hive-script
--base-path
s3://us-west-1.elasticmapreduce/libs/hive/
--install-hive
--hive-versions
0.11.0.1
s3://us-west-1.elasticmapreduce/libs/script-runner/script-runner.jar
Setup hive
TERMINATE_JOB_FLOW
us-west-1c
m1.large
my_key
true
2014-01-24T01:21:21Z
0
2014-01-24T01:23:56Z
2014-01-24T01:25:08Z
ENDED
2014-01-24T02:19:46Z
1
m1.large
Job flow terminated
ON_DEMAND
ig-aaaaaa
MASTER
Master instance group
2014-01-24T01:21:21Z
0
2014-01-24T01:25:26Z
2014-01-24T01:25:26Z
ENDED
2014-01-24T02:19:46Z
2
m1.large
Job flow terminated
ON_DEMAND
ig-aaaaab
CORE
Core instance group
m1.large
i-aaaaaa
1.0.3
12
ec2-184-0-0-1.us-west-1.compute.amazonaws.com
3
false
aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
"""
class TestDescribeJobFlows(DescribeJobFlowsTestBase):
def test_describe_jobflows_response(self):
self.set_http_response(200)
response = self.service_connection.describe_jobflows()
self.assertTrue(isinstance(response, list))
jf = response[0]
self.assertTrue(isinstance(jf, JobFlow))
self.assertEqual(jf.amiversion, '2.4.2')
self.assertEqual(jf.visibletoallusers, 'true')
self.assertEqual(jf.name, 'test analytics')
self.assertEqual(jf.jobflowid, 'j-aaaaaa')
self.assertEqual(jf.ec2keyname, 'my_key')
self.assertEqual(jf.masterinstancetype, 'm1.large')
self.assertEqual(jf.availabilityzone, 'us-west-1c')
self.assertEqual(jf.keepjobflowalivewhennosteps, 'true')
self.assertEqual(jf.slaveinstancetype, 'm1.large')
self.assertEqual(jf.masterinstanceid, 'i-aaaaaa')
self.assertEqual(jf.hadoopversion, '1.0.3')
self.assertEqual(jf.normalizedinstancehours, '12')
self.assertEqual(jf.masterpublicdnsname, 'ec2-184-0-0-1.us-west-1.compute.amazonaws.com')
self.assertEqual(jf.instancecount, '3')
self.assertEqual(jf.terminationprotected, 'false')
self.assertTrue(isinstance(jf.steps, list))
step = jf.steps[0]
self.assertTrue(isinstance(step, Step))
self.assertEqual(step.jar, 's3://us-west-1.elasticmapreduce/libs/script-runner/script-runner.jar')
self.assertEqual(step.name, 'Setup hive')
self.assertEqual(step.actiononfailure, 'TERMINATE_JOB_FLOW')
self.assertTrue(isinstance(jf.instancegroups, list))
ig = jf.instancegroups[0]
self.assertTrue(isinstance(ig, InstanceGroup))
self.assertEqual(ig.creationdatetime, '2014-01-24T01:21:21Z')
self.assertEqual(ig.state, 'ENDED')
self.assertEqual(ig.instancerequestcount, '1')
self.assertEqual(ig.instancetype, 'm1.large')
self.assertEqual(ig.laststatechangereason, 'Job flow terminated')
self.assertEqual(ig.market, 'ON_DEMAND')
self.assertEqual(ig.instancegroupid, 'ig-aaaaaa')
self.assertEqual(ig.instancerole, 'MASTER')
self.assertEqual(ig.name, 'Master instance group')
def test_describe_jobflows_no_args(self):
self.set_http_response(200)
self.service_connection.describe_jobflows()
self.assert_request_parameters({
'Action': 'DescribeJobFlows',
}, ignore_params_values=['Version'])
def test_describe_jobflows_filtered(self):
self.set_http_response(200)
now = datetime.now()
a_bit_before = datetime.fromtimestamp(time() - 1000)
self.service_connection.describe_jobflows(states=['WAITING', 'RUNNING'], jobflow_ids=['j-aaaaaa', 'j-aaaaab'], created_after=a_bit_before, created_before=now)
self.assert_request_parameters({
'Action': 'DescribeJobFlows',
'JobFlowIds.member.1': 'j-aaaaaa',
'JobFlowIds.member.2': 'j-aaaaab',
'JobFlowStates.member.1': 'WAITING',
'JobFlowStates.member.2': 'RUNNING',
'CreatedAfter': a_bit_before.strftime(boto.utils.ISO8601),
'CreatedBefore': now.strftime(boto.utils.ISO8601),
}, ignore_params_values=['Version'])
class TestDescribeJobFlow(DescribeJobFlowsTestBase):
def test_describe_jobflow(self):
self.set_http_response(200)
response = self.service_connection.describe_jobflow('j-aaaaaa')
self.assertTrue(isinstance(response, JobFlow))
self.assert_request_parameters({
'Action': 'DescribeJobFlows',
'JobFlowIds.member.1': 'j-aaaaaa',
}, ignore_params_values=['Version'])
class TestRunJobFlow(AWSMockServiceTestCase):
connection_class = EmrConnection
def default_body(self):
return b"""
j-ZKIY4CKQRX72
aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
"""
def test_run_jobflow_service_role(self):
self.set_http_response(200)
response = self.service_connection.run_jobflow(
'EmrCluster', service_role='EMR_DefaultRole')
self.assertTrue(response)
self.assert_request_parameters({
'Action': 'RunJobFlow',
'Version': '2009-03-31',
'ServiceRole': 'EMR_DefaultRole',
'Name': 'EmrCluster' },
ignore_params_values=['ActionOnFailure', 'Instances.InstanceCount',
'Instances.KeepJobFlowAliveWhenNoSteps',
'Instances.MasterInstanceType',
'Instances.SlaveInstanceType'])