# -*- coding: utf-8 -*- # Copyright 2013 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Integration tests for the defacl command.""" from __future__ import absolute_import import re from gslib.cs_api_map import ApiSelector import gslib.tests.testcase as case from gslib.tests.testcase.integration_testcase import SkipForS3 from gslib.tests.util import ObjectToURI as suri PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"' @SkipForS3('S3 does not support default object ACLs.') class TestDefacl(case.GsUtilIntegrationTestCase): """Integration tests for the defacl command.""" _defacl_ch_prefix = ['defacl', 'ch'] _defacl_get_prefix = ['defacl', 'get'] _defacl_set_prefix = ['defacl', 'set'] def _MakeScopeRegex(self, role, entity_type, email_address): template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' % (entity_type, email_address, role)) return re.compile(template_regex, flags=re.DOTALL) def testChangeDefaultAcl(self): """Tests defacl ch.""" bucket = self.CreateBucket() test_regex = self._MakeScopeRegex( 'OWNER', 'group', self.GROUP_TEST_ADDRESS) test_regex2 = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) self.RunGsUtil(self._defacl_ch_prefix + ['-g', self.GROUP_TEST_ADDRESS+':FC', suri(bucket)]) json_text2 = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertRegexpMatches(json_text2, test_regex) self.RunGsUtil(self._defacl_ch_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)]) json_text3 = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertRegexpMatches(json_text3, test_regex2) stderr = self.RunGsUtil(self._defacl_ch_prefix + ['-g', self.GROUP_TEST_ADDRESS+':WRITE', suri(bucket)], return_stderr=True, expected_status=1) self.assertIn('WRITER cannot be set as a default object ACL', stderr) def testChangeDefaultAclEmpty(self): """Tests adding and removing an entry from an empty default object ACL.""" bucket = self.CreateBucket() # First, clear out the default object ACL on the bucket. self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)]) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) empty_regex = r'\[\]\s*' self.assertRegexpMatches(json_text, empty_regex) group_regex = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) self.RunGsUtil(self._defacl_ch_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)]) json_text2 = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertRegexpMatches(json_text2, group_regex) if self.test_api == ApiSelector.JSON: # TODO: Enable when JSON service respects creating a private (no entries) # default object ACL via PATCH. For now, only supported in XML. return # After adding and removing a group, the default object ACL should be empty. self.RunGsUtil(self._defacl_ch_prefix + ['-d', self.GROUP_TEST_ADDRESS, suri(bucket)]) json_text3 = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertRegexpMatches(json_text3, empty_regex) def testChangeMultipleBuckets(self): """Tests defacl ch on multiple buckets.""" bucket1 = self.CreateBucket() bucket2 = self.CreateBucket() test_regex = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) self.RunGsUtil(self._defacl_ch_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket1), suri(bucket2)]) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) def testChangeMultipleAcls(self): """Tests defacl ch with multiple ACL entries.""" bucket = self.CreateBucket() test_regex_group = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) test_regex_user = self._MakeScopeRegex( 'OWNER', 'user', self.USER_TEST_ADDRESS) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex_group) self.assertNotRegexpMatches(json_text, test_regex_user) self.RunGsUtil(self._defacl_ch_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ', '-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex_group) self.assertRegexpMatches(json_text, test_regex_user) def testEmptyDefAcl(self): bucket = self.CreateBucket() self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)]) stdout = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertEquals(stdout.rstrip(), '[]') self.RunGsUtil(self._defacl_ch_prefix + ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) def testDeletePermissionsWithCh(self): """Tests removing permissions with defacl ch.""" bucket = self.CreateBucket() test_regex = self._MakeScopeRegex( 'OWNER', 'user', self.USER_TEST_ADDRESS) json_text = self.RunGsUtil( self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) self.RunGsUtil(self._defacl_ch_prefix + ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) json_text = self.RunGsUtil( self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) self.RunGsUtil(self._defacl_ch_prefix + ['-d', self.USER_TEST_ADDRESS, suri(bucket)]) json_text = self.RunGsUtil( self._defacl_get_prefix + [suri(bucket)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) def testTooFewArgumentsFails(self): """Tests calling defacl with insufficient number of arguments.""" # No arguments for get, but valid subcommand. stderr = self.RunGsUtil(self._defacl_get_prefix, return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) # No arguments for set, but valid subcommand. stderr = self.RunGsUtil(self._defacl_set_prefix, return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) # No arguments for ch, but valid subcommand. stderr = self.RunGsUtil(self._defacl_ch_prefix, return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) # Neither arguments nor subcommand. stderr = self.RunGsUtil(['defacl'], return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) class TestDefaclOldAlias(TestDefacl): _defacl_ch_prefix = ['chdefacl'] _defacl_get_prefix = ['getdefacl'] _defacl_set_prefix = ['setdefacl']