Skip to content

Commit e539479

Browse files
derrickstoleegitster
authored andcommitted
pack-objects: thread the path-based compression
Adapting the implementation of ll_find_deltas(), create a threaded version of the --path-walk compression step in 'git pack-objects'. This involves adding a 'regions' member to the thread_params struct, allowing each thread to own a section of paths. We can simplify the way jobs are split because there is no value in extending the batch based on name-hash the way sections of the object entry array are attempted to be grouped. We re-use the 'list_size' and 'remaining' items for the purpose of borrowing work in progress from other "victim" threads when a thread has finished its batch of work more quickly. Using the Git repository as a test repo, the p5313 performance test shows that the resulting size of the repo is the same, but the threaded implementation gives gains of varying degrees depending on the number of objects being packed. (This was tested on a 16-core machine.) Test HEAD~1 HEAD --------------------------------------------------- 5313.20: big pack 2.38 1.99 -16.4% 5313.21: big pack size 16.1M 16.0M -0.2% 5313.24: repack 107.32 45.41 -57.7% 5313.25: repack size 213.3M 213.2M -0.0% (Test output is formatted to better fit in message.) This ~60% reduction in 'git repack --path-walk' time is typical across all repos I used for testing. What is interesting is to compare when the overall time improves enough to outperform the --name-hash-version=1 case. These time improvements correlate with repositories with data shapes that significantly improve their data size as well. The --path-walk feature frequently takes longer than --name-hash-version=2, trading some extra computation for some additional compression. The natural place where this additional computation comes from is the two compression passes that --path-walk takes, though the first pass is naturally faster due to the path boundaries avoiding a number of delta compression attempts. For example, the microsoft/fluentui repo has significant size reduction from --name-hash-version=1 to --name-hash-version=2 followed by further improvements with --path-walk. The threaded computation makes --path-walk more competitive in time compared to --name-hash-version=2, though still ~31% more expensive in that metric. Repack Method Pack Size Time ------------------------------------------ Hash v1 439.4M 87.24s Hash v2 161.7M 21.51s Path Walk (Before) 142.5M 81.29s Path Walk (After) 142.5M 28.16s Similar results hold for the Git repository: Repack Method Pack Size Time ------------------------------------------ Hash v1 248.8M 30.44s Hash v2 249.0M 30.15s Path Walk (Before) 213.2M 142.50s Path Walk (After) 213.3M 45.41s ...as well as the nodejs/node repository: Repack Method Pack Size Time ------------------------------------------ Hash v1 739.9M 71.18s Hash v2 764.6M 67.82s Path Walk (Before) 698.1M 208.10s Path Walk (After) 698.0M 75.10s Finally, the Linux kernel repository is a good test for this repacking time change, even though the space savings is more subtle: Repack Method Pack Size Time ------------------------------------------ Hash v1 2.5G 554.41s Hash v2 2.5G 549.62s Path Walk (before) 2.2G 1562.36s Path Walk (before) 2.2G 559.00s Signed-off-by: Derrick Stolee <[email protected]> Signed-off-by: Junio C Hamano <[email protected]>
1 parent 206a1bb commit e539479

File tree

1 file changed

+164
-2
lines changed

1 file changed

+164
-2
lines changed

builtin/pack-objects.c

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2965,6 +2965,7 @@ static void find_deltas(struct object_entry **list, unsigned *list_size,
29652965
struct thread_params {
29662966
pthread_t thread;
29672967
struct object_entry **list;
2968+
struct packing_region *regions;
29682969
unsigned list_size;
29692970
unsigned remaining;
29702971
int window;
@@ -3282,6 +3283,167 @@ static void find_deltas_by_region(struct object_entry *list,
32823283
stop_progress(&progress_state);
32833284
}
32843285

3286+
static void *threaded_find_deltas_by_path(void *arg)
3287+
{
3288+
struct thread_params *me = arg;
3289+
3290+
progress_lock();
3291+
while (me->remaining) {
3292+
while (me->remaining) {
3293+
progress_unlock();
3294+
find_deltas_for_region(to_pack.objects,
3295+
me->regions,
3296+
me->processed);
3297+
progress_lock();
3298+
me->remaining--;
3299+
me->regions++;
3300+
}
3301+
3302+
me->working = 0;
3303+
pthread_cond_signal(&progress_cond);
3304+
progress_unlock();
3305+
3306+
/*
3307+
* We must not set ->data_ready before we wait on the
3308+
* condition because the main thread may have set it to 1
3309+
* before we get here. In order to be sure that new
3310+
* work is available if we see 1 in ->data_ready, it
3311+
* was initialized to 0 before this thread was spawned
3312+
* and we reset it to 0 right away.
3313+
*/
3314+
pthread_mutex_lock(&me->mutex);
3315+
while (!me->data_ready)
3316+
pthread_cond_wait(&me->cond, &me->mutex);
3317+
me->data_ready = 0;
3318+
pthread_mutex_unlock(&me->mutex);
3319+
3320+
progress_lock();
3321+
}
3322+
progress_unlock();
3323+
/* leave ->working 1 so that this doesn't get more work assigned */
3324+
return NULL;
3325+
}
3326+
3327+
static void ll_find_deltas_by_region(struct object_entry *list,
3328+
struct packing_region *regions,
3329+
uint32_t start, uint32_t nr)
3330+
{
3331+
struct thread_params *p;
3332+
int i, ret, active_threads = 0;
3333+
unsigned int processed = 0;
3334+
uint32_t progress_nr;
3335+
init_threaded_search();
3336+
3337+
if (!nr)
3338+
return;
3339+
3340+
progress_nr = regions[nr - 1].start + regions[nr - 1].nr;
3341+
if (delta_search_threads <= 1) {
3342+
find_deltas_by_region(list, regions, start, nr);
3343+
cleanup_threaded_search();
3344+
return;
3345+
}
3346+
3347+
if (progress > pack_to_stdout)
3348+
fprintf_ln(stderr,
3349+
Q_("Path-based delta compression using up to %d thread",
3350+
"Path-based delta compression using up to %d threads",
3351+
delta_search_threads),
3352+
delta_search_threads);
3353+
CALLOC_ARRAY(p, delta_search_threads);
3354+
3355+
if (progress)
3356+
progress_state = start_progress(the_repository,
3357+
_("Compressing objects by path"),
3358+
progress_nr);
3359+
/* Partition the work amongst work threads. */
3360+
for (i = 0; i < delta_search_threads; i++) {
3361+
unsigned sub_size = nr / (delta_search_threads - i);
3362+
3363+
p[i].window = window;
3364+
p[i].depth = depth;
3365+
p[i].processed = &processed;
3366+
p[i].working = 1;
3367+
p[i].data_ready = 0;
3368+
3369+
p[i].regions = regions;
3370+
p[i].list_size = sub_size;
3371+
p[i].remaining = sub_size;
3372+
3373+
regions += sub_size;
3374+
nr -= sub_size;
3375+
}
3376+
3377+
/* Start work threads. */
3378+
for (i = 0; i < delta_search_threads; i++) {
3379+
if (!p[i].list_size)
3380+
continue;
3381+
pthread_mutex_init(&p[i].mutex, NULL);
3382+
pthread_cond_init(&p[i].cond, NULL);
3383+
ret = pthread_create(&p[i].thread, NULL,
3384+
threaded_find_deltas_by_path, &p[i]);
3385+
if (ret)
3386+
die(_("unable to create thread: %s"), strerror(ret));
3387+
active_threads++;
3388+
}
3389+
3390+
/*
3391+
* Now let's wait for work completion. Each time a thread is done
3392+
* with its work, we steal half of the remaining work from the
3393+
* thread with the largest number of unprocessed objects and give
3394+
* it to that newly idle thread. This ensure good load balancing
3395+
* until the remaining object list segments are simply too short
3396+
* to be worth splitting anymore.
3397+
*/
3398+
while (active_threads) {
3399+
struct thread_params *target = NULL;
3400+
struct thread_params *victim = NULL;
3401+
unsigned sub_size = 0;
3402+
3403+
progress_lock();
3404+
for (;;) {
3405+
for (i = 0; !target && i < delta_search_threads; i++)
3406+
if (!p[i].working)
3407+
target = &p[i];
3408+
if (target)
3409+
break;
3410+
pthread_cond_wait(&progress_cond, &progress_mutex);
3411+
}
3412+
3413+
for (i = 0; i < delta_search_threads; i++)
3414+
if (p[i].remaining > 2*window &&
3415+
(!victim || victim->remaining < p[i].remaining))
3416+
victim = &p[i];
3417+
if (victim) {
3418+
sub_size = victim->remaining / 2;
3419+
target->regions = victim->regions + victim->remaining - sub_size;
3420+
victim->list_size -= sub_size;
3421+
victim->remaining -= sub_size;
3422+
}
3423+
target->list_size = sub_size;
3424+
target->remaining = sub_size;
3425+
target->working = 1;
3426+
progress_unlock();
3427+
3428+
pthread_mutex_lock(&target->mutex);
3429+
target->data_ready = 1;
3430+
pthread_cond_signal(&target->cond);
3431+
pthread_mutex_unlock(&target->mutex);
3432+
3433+
if (!sub_size) {
3434+
pthread_join(target->thread, NULL);
3435+
pthread_cond_destroy(&target->cond);
3436+
pthread_mutex_destroy(&target->mutex);
3437+
active_threads--;
3438+
}
3439+
}
3440+
cleanup_threaded_search();
3441+
free(p);
3442+
3443+
display_progress(progress_state, progress_nr);
3444+
stop_progress(&progress_state);
3445+
}
3446+
32853447
static void prepare_pack(int window, int depth)
32863448
{
32873449
struct object_entry **delta_list;
@@ -3307,8 +3469,8 @@ static void prepare_pack(int window, int depth)
33073469
return;
33083470

33093471
if (path_walk)
3310-
find_deltas_by_region(to_pack.objects, to_pack.regions,
3311-
0, to_pack.nr_regions);
3472+
ll_find_deltas_by_region(to_pack.objects, to_pack.regions,
3473+
0, to_pack.nr_regions);
33123474

33133475
ALLOC_ARRAY(delta_list, to_pack.nr_objects);
33143476
nr_deltas = n = 0;

0 commit comments

Comments
 (0)