test_project.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. # -*- coding:utf-8 -*-
  2. #
  3. # Copyright (C) 2019 The Android Open Source Project
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """Unittests for the project.py module."""
  17. from __future__ import print_function
  18. import contextlib
  19. import os
  20. import shutil
  21. import subprocess
  22. import tempfile
  23. import unittest
  24. import error
  25. import git_config
  26. import project
  27. @contextlib.contextmanager
  28. def TempGitTree():
  29. """Create a new empty git checkout for testing."""
  30. # TODO(vapier): Convert this to tempfile.TemporaryDirectory once we drop
  31. # Python 2 support entirely.
  32. try:
  33. tempdir = tempfile.mkdtemp(prefix='repo-tests')
  34. subprocess.check_call(['git', 'init'], cwd=tempdir)
  35. yield tempdir
  36. finally:
  37. shutil.rmtree(tempdir)
  38. class RepoHookShebang(unittest.TestCase):
  39. """Check shebang parsing in RepoHook."""
  40. def test_no_shebang(self):
  41. """Lines w/out shebangs should be rejected."""
  42. DATA = (
  43. '',
  44. '# -*- coding:utf-8 -*-\n',
  45. '#\n# foo\n',
  46. '# Bad shebang in script\n#!/foo\n'
  47. )
  48. for data in DATA:
  49. self.assertIsNone(project.RepoHook._ExtractInterpFromShebang(data))
  50. def test_direct_interp(self):
  51. """Lines whose shebang points directly to the interpreter."""
  52. DATA = (
  53. ('#!/foo', '/foo'),
  54. ('#! /foo', '/foo'),
  55. ('#!/bin/foo ', '/bin/foo'),
  56. ('#! /usr/foo ', '/usr/foo'),
  57. ('#! /usr/foo -args', '/usr/foo'),
  58. )
  59. for shebang, interp in DATA:
  60. self.assertEqual(project.RepoHook._ExtractInterpFromShebang(shebang),
  61. interp)
  62. def test_env_interp(self):
  63. """Lines whose shebang launches through `env`."""
  64. DATA = (
  65. ('#!/usr/bin/env foo', 'foo'),
  66. ('#!/bin/env foo', 'foo'),
  67. ('#! /bin/env /bin/foo ', '/bin/foo'),
  68. )
  69. for shebang, interp in DATA:
  70. self.assertEqual(project.RepoHook._ExtractInterpFromShebang(shebang),
  71. interp)
  72. class FakeProject(object):
  73. """A fake for Project for basic functionality."""
  74. def __init__(self, worktree):
  75. self.worktree = worktree
  76. self.gitdir = os.path.join(worktree, '.git')
  77. self.name = 'fakeproject'
  78. self.work_git = project.Project._GitGetByExec(
  79. self, bare=False, gitdir=self.gitdir)
  80. self.bare_git = project.Project._GitGetByExec(
  81. self, bare=True, gitdir=self.gitdir)
  82. self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir)
  83. class ReviewableBranchTests(unittest.TestCase):
  84. """Check ReviewableBranch behavior."""
  85. def test_smoke(self):
  86. """A quick run through everything."""
  87. with TempGitTree() as tempdir:
  88. fakeproj = FakeProject(tempdir)
  89. # Generate some commits.
  90. with open(os.path.join(tempdir, 'readme'), 'w') as fp:
  91. fp.write('txt')
  92. fakeproj.work_git.add('readme')
  93. fakeproj.work_git.commit('-mAdd file')
  94. fakeproj.work_git.checkout('-b', 'work')
  95. fakeproj.work_git.rm('-f', 'readme')
  96. fakeproj.work_git.commit('-mDel file')
  97. # Start off with the normal details.
  98. rb = project.ReviewableBranch(
  99. fakeproj, fakeproj.config.GetBranch('work'), 'master')
  100. self.assertEqual('work', rb.name)
  101. self.assertEqual(1, len(rb.commits))
  102. self.assertIn('Del file', rb.commits[0])
  103. d = rb.unabbrev_commits
  104. self.assertEqual(1, len(d))
  105. short, long = next(iter(d.items()))
  106. self.assertTrue(long.startswith(short))
  107. self.assertTrue(rb.base_exists)
  108. # Hard to assert anything useful about this.
  109. self.assertTrue(rb.date)
  110. # Now delete the tracking branch!
  111. fakeproj.work_git.branch('-D', 'master')
  112. rb = project.ReviewableBranch(
  113. fakeproj, fakeproj.config.GetBranch('work'), 'master')
  114. self.assertEqual(0, len(rb.commits))
  115. self.assertFalse(rb.base_exists)
  116. # Hard to assert anything useful about this.
  117. self.assertTrue(rb.date)
  118. class CopyLinkTestCase(unittest.TestCase):
  119. """TestCase for stub repo client checkouts.
  120. It'll have a layout like:
  121. tempdir/ # self.tempdir
  122. checkout/ # self.topdir
  123. git-project/ # self.worktree
  124. Attributes:
  125. tempdir: A dedicated temporary directory.
  126. worktree: The top of the repo client checkout.
  127. topdir: The top of a project checkout.
  128. """
  129. def setUp(self):
  130. self.tempdir = tempfile.mkdtemp(prefix='repo_tests')
  131. self.topdir = os.path.join(self.tempdir, 'checkout')
  132. self.worktree = os.path.join(self.topdir, 'git-project')
  133. os.makedirs(self.topdir)
  134. os.makedirs(self.worktree)
  135. def tearDown(self):
  136. shutil.rmtree(self.tempdir, ignore_errors=True)
  137. @staticmethod
  138. def touch(path):
  139. with open(path, 'w') as f:
  140. pass
  141. def assertExists(self, path, msg=None):
  142. """Make sure |path| exists."""
  143. if os.path.exists(path):
  144. return
  145. if msg is None:
  146. msg = ['path is missing: %s' % path]
  147. while path != '/':
  148. path = os.path.dirname(path)
  149. if not path:
  150. # If we're given something like "foo", abort once we get to "".
  151. break
  152. result = os.path.exists(path)
  153. msg.append('\tos.path.exists(%s): %s' % (path, result))
  154. if result:
  155. msg.append('\tcontents: %r' % os.listdir(path))
  156. break
  157. msg = '\n'.join(msg)
  158. raise self.failureException(msg)
  159. class CopyFile(CopyLinkTestCase):
  160. """Check _CopyFile handling."""
  161. def CopyFile(self, src, dest):
  162. return project._CopyFile(self.worktree, src, self.topdir, dest)
  163. def test_basic(self):
  164. """Basic test of copying a file from a project to the toplevel."""
  165. src = os.path.join(self.worktree, 'foo.txt')
  166. self.touch(src)
  167. cf = self.CopyFile('foo.txt', 'foo')
  168. cf._Copy()
  169. self.assertExists(os.path.join(self.topdir, 'foo'))
  170. def test_src_subdir(self):
  171. """Copy a file from a subdir of a project."""
  172. src = os.path.join(self.worktree, 'bar', 'foo.txt')
  173. os.makedirs(os.path.dirname(src))
  174. self.touch(src)
  175. cf = self.CopyFile('bar/foo.txt', 'new.txt')
  176. cf._Copy()
  177. self.assertExists(os.path.join(self.topdir, 'new.txt'))
  178. def test_dest_subdir(self):
  179. """Copy a file to a subdir of a checkout."""
  180. src = os.path.join(self.worktree, 'foo.txt')
  181. self.touch(src)
  182. cf = self.CopyFile('foo.txt', 'sub/dir/new.txt')
  183. self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
  184. cf._Copy()
  185. self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt'))
  186. def test_update(self):
  187. """Make sure changed files get copied again."""
  188. src = os.path.join(self.worktree, 'foo.txt')
  189. dest = os.path.join(self.topdir, 'bar')
  190. with open(src, 'w') as f:
  191. f.write('1st')
  192. cf = self.CopyFile('foo.txt', 'bar')
  193. cf._Copy()
  194. self.assertExists(dest)
  195. with open(dest) as f:
  196. self.assertEqual(f.read(), '1st')
  197. with open(src, 'w') as f:
  198. f.write('2nd!')
  199. cf._Copy()
  200. with open(dest) as f:
  201. self.assertEqual(f.read(), '2nd!')
  202. def test_src_block_symlink(self):
  203. """Do not allow reading from a symlinked path."""
  204. src = os.path.join(self.worktree, 'foo.txt')
  205. sym = os.path.join(self.worktree, 'sym')
  206. self.touch(src)
  207. os.symlink('foo.txt', sym)
  208. self.assertExists(sym)
  209. cf = self.CopyFile('sym', 'foo')
  210. self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
  211. def test_src_block_symlink_traversal(self):
  212. """Do not allow reading through a symlink dir."""
  213. src = os.path.join(self.worktree, 'bar', 'passwd')
  214. os.symlink('/etc', os.path.join(self.worktree, 'bar'))
  215. self.assertExists(src)
  216. cf = self.CopyFile('bar/foo.txt', 'foo')
  217. self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
  218. def test_src_block_copy_from_dir(self):
  219. """Do not allow copying from a directory."""
  220. src = os.path.join(self.worktree, 'dir')
  221. os.makedirs(src)
  222. cf = self.CopyFile('dir', 'foo')
  223. self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
  224. def test_dest_block_symlink(self):
  225. """Do not allow writing to a symlink."""
  226. src = os.path.join(self.worktree, 'foo.txt')
  227. self.touch(src)
  228. os.symlink('dest', os.path.join(self.topdir, 'sym'))
  229. cf = self.CopyFile('foo.txt', 'sym')
  230. self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
  231. def test_dest_block_symlink_traversal(self):
  232. """Do not allow writing through a symlink dir."""
  233. src = os.path.join(self.worktree, 'foo.txt')
  234. self.touch(src)
  235. os.symlink('/tmp', os.path.join(self.topdir, 'sym'))
  236. cf = self.CopyFile('foo.txt', 'sym/foo.txt')
  237. self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
  238. def test_src_block_copy_to_dir(self):
  239. """Do not allow copying to a directory."""
  240. src = os.path.join(self.worktree, 'foo.txt')
  241. self.touch(src)
  242. os.makedirs(os.path.join(self.topdir, 'dir'))
  243. cf = self.CopyFile('foo.txt', 'dir')
  244. self.assertRaises(error.ManifestInvalidPathError, cf._Copy)
  245. class LinkFile(CopyLinkTestCase):
  246. """Check _LinkFile handling."""
  247. def LinkFile(self, src, dest):
  248. return project._LinkFile(self.worktree, src, self.topdir, dest)
  249. def test_basic(self):
  250. """Basic test of linking a file from a project into the toplevel."""
  251. src = os.path.join(self.worktree, 'foo.txt')
  252. self.touch(src)
  253. lf = self.LinkFile('foo.txt', 'foo')
  254. lf._Link()
  255. dest = os.path.join(self.topdir, 'foo')
  256. self.assertExists(dest)
  257. self.assertTrue(os.path.islink(dest))
  258. self.assertEqual('git-project/foo.txt', os.readlink(dest))
  259. def test_src_subdir(self):
  260. """Link to a file in a subdir of a project."""
  261. src = os.path.join(self.worktree, 'bar', 'foo.txt')
  262. os.makedirs(os.path.dirname(src))
  263. self.touch(src)
  264. lf = self.LinkFile('bar/foo.txt', 'foo')
  265. lf._Link()
  266. self.assertExists(os.path.join(self.topdir, 'foo'))
  267. def test_src_self(self):
  268. """Link to the project itself."""
  269. dest = os.path.join(self.topdir, 'foo', 'bar')
  270. lf = self.LinkFile('.', 'foo/bar')
  271. lf._Link()
  272. self.assertExists(dest)
  273. self.assertEqual('../git-project', os.readlink(dest))
  274. def test_dest_subdir(self):
  275. """Link a file to a subdir of a checkout."""
  276. src = os.path.join(self.worktree, 'foo.txt')
  277. self.touch(src)
  278. lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar')
  279. self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub')))
  280. lf._Link()
  281. self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar'))
  282. def test_src_block_relative(self):
  283. """Do not allow relative symlinks."""
  284. BAD_SOURCES = (
  285. './',
  286. '..',
  287. '../',
  288. 'foo/.',
  289. 'foo/./bar',
  290. 'foo/..',
  291. 'foo/../foo',
  292. )
  293. for src in BAD_SOURCES:
  294. lf = self.LinkFile(src, 'foo')
  295. self.assertRaises(error.ManifestInvalidPathError, lf._Link)
  296. def test_update(self):
  297. """Make sure changed targets get updated."""
  298. dest = os.path.join(self.topdir, 'sym')
  299. src = os.path.join(self.worktree, 'foo.txt')
  300. self.touch(src)
  301. lf = self.LinkFile('foo.txt', 'sym')
  302. lf._Link()
  303. self.assertEqual('git-project/foo.txt', os.readlink(dest))
  304. # Point the symlink somewhere else.
  305. os.unlink(dest)
  306. os.symlink('/', dest)
  307. lf._Link()
  308. self.assertEqual('git-project/foo.txt', os.readlink(dest))