Browse Source

Add parallelism to 'branches' command

Spread the operation of querying which local branches exist across a
pool of processes and build the name map of projects -> branches as
these tasks finish rather than blocking on the entire query. The search
operations are submitted in batches to reduce the overhead of interprocess
communication. The `chunksize` argument used to control this batch size
was selected by incrementing through powers of two until it stopped being
faster.

Change-Id: Ie3d7f799ee8e83e5058536caf53e2979175408b7
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/291342
Tested-by: Chris Mcdonald <cjmcdonald@google.com>
Reviewed-by: Mike Frysinger <vapier@google.com>
Chris McDonald 5 years ago
parent
commit
8add62325d
2 changed files with 43 additions and 4 deletions
  1. 1 0
      .gitignore
  2. 42 4
      subcmds/branches.py

+ 1 - 0
.gitignore

@@ -7,6 +7,7 @@ __pycache__
 .repopickle_*
 /repoc
 /.tox
+/.venv
 
 # PyCharm related
 /.idea/

+ 42 - 4
subcmds/branches.py

@@ -15,10 +15,20 @@
 # limitations under the License.
 
 from __future__ import print_function
+import itertools
+import multiprocessing
 import sys
 from color import Coloring
 from command import Command
 
+# Number of projects to submit to a single worker process at a time.
+# This number represents a tradeoff between the overhead of IPC and finer
+# grained opportunity for parallelism. This particular value was chosen by
+# iterating through powers of two until the overall performance no longer
+# improved. The performance of this batch size is not a function of the
+# number of cores on the system.
+WORKER_BATCH_SIZE = 32
+
 
 class BranchColoring(Coloring):
   def __init__(self, config):
@@ -97,20 +107,32 @@ is shown, then the branch appears in all projects.
 
 """
 
+  def _Options(self, p):
+    """Add flags to CLI parser for this subcommand."""
+    default_jobs = min(multiprocessing.cpu_count(), 8)
+    p.add_option(
+        '-j',
+        '--jobs',
+        type=int,
+        default=default_jobs,
+        help='Number of worker processes to spawn '
+        '(default: %s)' % default_jobs)
+
   def Execute(self, opt, args):
     projects = self.GetProjects(args)
     out = BranchColoring(self.manifest.manifestProject.config)
     all_branches = {}
     project_cnt = len(projects)
+    with multiprocessing.Pool(processes=opt.jobs) as pool:
+      project_branches = pool.imap_unordered(
+          expand_project_to_branches, projects, chunksize=WORKER_BATCH_SIZE)
 
-    for project in projects:
-      for name, b in project.GetBranches().items():
-        b.project = project
+      for name, b in itertools.chain.from_iterable(project_branches):
         if name not in all_branches:
           all_branches[name] = BranchInfo(name)
         all_branches[name].add(b)
 
-    names = list(sorted(all_branches))
+    names = sorted(all_branches)
 
     if not names:
       print('   (no branches)', file=sys.stderr)
@@ -180,3 +202,19 @@ is shown, then the branch appears in all projects.
       else:
         out.write(' in all projects')
       out.nl()
+
+
+def expand_project_to_branches(project):
+  """Expands a project into a list of branch names & associated information.
+
+  Args:
+    project: project.Project
+
+  Returns:
+    List[Tuple[str, git_config.Branch]]
+  """
+  branches = []
+  for name, b in project.GetBranches().items():
+    b.project = project
+    branches.append((name, b))
+  return branches