From be919395f0514c6e18fdc0bc61827aee25a59dd2 Mon Sep 17 00:00:00 2001 From: Jesper Eskilson Date: Wed, 31 Jan 2024 21:03:36 +0100 Subject: [PATCH] Rip out optimizations which did not conform to the rules --- rebar.config | 12 +-- src/aggregate.erl | 199 ----------------------------------------- src/erlang_1brc.erl | 212 ++++++++++---------------------------------- 3 files changed, 48 insertions(+), 375 deletions(-) delete mode 100644 src/aggregate.erl diff --git a/rebar.config b/rebar.config index 15ed921..73e9f6f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,11 +1,5 @@ -{erl_opts, [no_debug_info]}. -{deps, [getopt]}. - -{escript_incl_apps, [erlang_1brc, getopt]}. +{erl_opts, []}. +{deps, []}. +{escript_incl_apps, [erlang_1brc]}. {escript_main_app, erlang_1brc}. {escript_name, erlang_1brc}. - -%% Profiles -{profiles, [{test, - [{erl_opts, [debug_info]} - ]}]}. diff --git a/src/aggregate.erl b/src/aggregate.erl deleted file mode 100644 index 0760fb2..0000000 --- a/src/aggregate.erl +++ /dev/null @@ -1,199 +0,0 @@ --module(aggregate). - --export([ run/1 - , chunk_processor/0 - ]). - --compile({inline,[{process_temp,2},{process_line,3}]}). - --define(BUFSIZE, 2 * 1024 * 1024). - -run([Filename]) -> - process_flag(trap_exit, true), - {ok, FD} = file:open(Filename, [raw, read, binary]), - NumProcessors = erlang:system_info(logical_processors), - {ProcessorPids, AllPids} = start_processors(NumProcessors), - {ok, Bin} = file:pread(FD, 0, ?BUFSIZE), - read_chunks(FD, 0, byte_size(Bin), Bin, ?BUFSIZE, ProcessorPids, NumProcessors * 3), - Map = wait_for_completion(AllPids, #{}), - Fmt = format_final_map(Map), - io:format("~ts~n", [Fmt]). - -start_processors(NumProcs) -> - lists:foldl( - fun(_, {ProcessorPids, AllPids}) -> - ProcessorPid = proc_lib:start_link(?MODULE, chunk_processor, []), - ProcessorPid ! {result_pid, self()}, - {[ProcessorPid|ProcessorPids], - [ProcessorPid|AllPids]} - end, {[], []}, lists:seq(1, NumProcs)). - -read_chunks(FD, N, Offset, PrevChunk, BufSize, TargetPids, 0) -> - receive - give_me_more -> - read_chunks(FD, N, Offset, PrevChunk, BufSize, TargetPids, 1) - end; -read_chunks(FD, N, Offset, PrevChunk, BufSize, TargetPids, Outstanding) -> - TargetPid = lists:nth((N rem length(TargetPids)) + 1, TargetPids), - case file:pread(FD, Offset, BufSize) of - {ok, Bin} -> - Size = byte_size(Bin), - %% Read chunks pair-wise and split them so that each processed - %% chunk is on an even newline boundary - case binary:split(Bin, <<"\n">>) of - [First, NextChunk] -> - send_chunk([PrevChunk, First], TargetPid), - read_chunks(FD, N + 1, Offset + Size, NextChunk, BufSize, TargetPids, Outstanding - 1); - [Chunk] -> - send_chunk([Chunk], TargetPid), - read_chunks(FD, N + 1, Offset + Size, <<>>, BufSize, TargetPids, Outstanding - 1) - end; - eof -> - %% Reached end of file, process the last chunk - send_chunk([PrevChunk], TargetPid), - lists:foreach(fun(Pid) -> Pid ! eof end, TargetPids), - ok - end. - -send_chunk(Chunk, TargetPid) -> - TargetPid ! {chunk, Chunk}. - -wait_for_completion([], Map) -> - Map; -wait_for_completion(Pids, Map) -> - receive - {'EXIT', Pid, normal} -> - wait_for_completion(Pids -- [Pid], Map); - {result, _SenderPid, NewMap} -> - wait_for_completion(Pids, merge_location_data(Map, NewMap)); - give_me_more -> - wait_for_completion(Pids, Map); - M -> - logger:error(#{label => "Unexpected message", msg => M}) - end. - -merge_location_data(Map1, Map2) -> - Stations = lists:usort(maps:keys(Map1) ++ maps:keys(Map2)), - lists:foldl( - fun(Station, Map) when Station =:= '$ancestors' orelse - Station =:= '$initial_call' -> - Map; - (Station, Map) -> - case {maps:get(Station, Map1, undefined), - maps:get(Station, Map2, undefined)} of - {Data1, undefined} -> maps:put(Station, Data1, Map); - {undefined, Data2} -> maps:put(Station, Data2, Map); - {Data1, Data2} -> - {Min1, Max1, Count1, Sum1} = Data1, - {Min2, Max2, Count2, Sum2} = Data2, - maps:put( - Station, - {min(Min1, Min2), - max(Max1, Max2), - Count1 + Count2, - Sum1 + Sum2}, - Map) - end - end, #{}, Stations). - -format_final_map(Map) -> - "{" ++ - lists:join( - ", ", - lists:map( - fun({Station, {Min, Max, Count, Sum}}) -> - Mean = Sum / Count, - io_lib:format("~ts=~.1f/~.1f/~.1f", - [Station, Min/10, Mean/10, Max/10]) - end, lists:sort(maps:to_list(Map)))) - ++ "}". - -%% -%% Chunk processor: this step in the pipeline takes a binary -%% consisting of an even number of line, splits it at "\n" and ";" and -%% passes it on to the line processor. -%% -chunk_processor() -> - proc_lib:init_ack(self()), - try - chunk_processor_loop(undefined) - catch E:R:ST -> - logger:error(#{ crashed => {E,R,ST} }) - end. - -chunk_processor_loop(Pid) -> - receive - {result_pid, NewPid} -> - chunk_processor_loop(NewPid); - {chunk, [Chunk]} -> - process_station(Chunk), - Pid ! give_me_more, - chunk_processor_loop(Pid); - {chunk, [First, Second]} -> - case process_station(First) of - <<>> -> - ok; - {Rest, Station} -> - process_temp(<>, Station); - Rest -> - process_station(<>) - end, - Pid ! give_me_more, - chunk_processor_loop(Pid); - eof -> - Map = maps:from_list(get()), - Pid ! {result, self(), Map}, - ok; - M -> - io:format("Unexpected message: ~w~n", [M]) - end. - -%% -%% The line processor -%% - -process_station(Station) -> - process_station(Station, Station, 0). -process_station(Bin, <<";", Rest/bitstring>>, Cnt) -> - <> = Bin, - process_temp(Rest, Station); -process_station(Bin, <<_:8, Rest/bitstring>>, Cnt) -> - process_station(Bin, Rest, Cnt + 1); -process_station(Bin, _, _Cnt) -> - Bin. - --define(TO_NUM(C), (C - $0)). - -process_temp(<<$-, A, B, $., C, Rest/binary>>, Station) -> - process_line(Rest, Station, -1 * (?TO_NUM(A) * 100 + ?TO_NUM(B) * 10 + ?TO_NUM(C))); -process_temp(<<$-, B, $., C, Rest/binary>>, Station) -> - process_line(Rest, Station, -1 * (?TO_NUM(B) * 10 + ?TO_NUM(C))); -process_temp(<>, Station) -> - process_line(Rest, Station, ?TO_NUM(A) * 100 + ?TO_NUM(B) * 10 + ?TO_NUM(C)); -process_temp(<>, Station) -> - process_line(Rest, Station, ?TO_NUM(B) * 10 + ?TO_NUM(C)); -process_temp(Rest, Station) -> - {Rest, Station}. - -process_line(Rest, Station, Temp) -> - case get(Station) of - undefined -> - put(Station, { Temp % min - , Temp % max - , 1 % count - , Temp % sum - }); - - {OldMin, OldMax, OldCount, OldSum} -> - put(Station, { min(OldMin, Temp) - , max(OldMax, Temp) - , OldCount + 1 - , OldSum + Temp - }) - - end, - case Rest of - <<>> -> <<>>; - <<"\n",NextStation/binary>> -> - process_station(NextStation) - end. diff --git a/src/erlang_1brc.erl b/src/erlang_1brc.erl index 7dfe42f..d362d8f 100644 --- a/src/erlang_1brc.erl +++ b/src/erlang_1brc.erl @@ -1,12 +1,8 @@ -module(erlang_1brc). --include_lib("stdlib/include/assert.hrl"). - -feature(maybe_expr, enable). --export([ main/1 %% Entrypoint for escript - , run/1 %% Entrypoint for run.sh - ]). +-export([main/1]). %% These inlinings are actually necessary. On my machine, it yields a %% 15-20% performance improvement. @@ -21,142 +17,39 @@ %% process_* functions. -define(BUFSIZE, 2 * 1024 * 1024). -%% We pre-compute a mapping using the `KEY' macro from cities to -%% (smallish) integers and store in the process dictionary. When we -%% iterate over the chunks, matching binaries as we go, this mapping -%% is such that it can be computed byte-by-byte, so we do not need to -%% keep the entire city name around. This makes it easier to leverage -%% the "match context reuse" optimization, as we do not need any -%% "look-back" to extract the station name once we reach the ";". -%% -%% The formula is totally non-scientific, but it seems to work well in -%% practise. -%% -%% For the large inputs, the loop in `process_station/2' turns out to -%% be the really hot part, and the match-context reuse optimization -%% almost halves the total runtime. --define(KEY(C, Acc), ((C * 17) bxor Acc) bsl 1). - -%% The worker threads will produce a map of #{Key => TempData} where -%% the key is an integer (see the `KEY' macro). To be able to convert -%% the key back into a station name, we compute the mapping between -%% keys and their station name upfront by reading a single chunk of -%% size `MAP_CITIES_BUFSIZE'. This should be large enough to include -%% all citites, but small enough such the scanning of it is several -%% magnitudes faster than the the total runtime. -%% -%% For my 1B-file, 64k seems to be the smallest buffer we can read and -%% still get all the city names. -%% -%% TODO Compute this dynamically instead. This can be done once we -%% have collected all the temperature data; we can then scan the file -%% from the beginning until we have found stations for all the keys -%% used in the temperature data map. --define(MAP_CITIES_BUFSIZE, 64 * 1024). - -%% Just as a precaution, check that we have actually found all the -%% cities. --define(EXPECTED_NUM_CITIES, 413). - -options() -> - [ {file, $f, "file", {string, "measurements.txt"}, "The input file."} - , {eprof, $e, "eprof", undefined, "Run code under eprof."} - ]. +input_filename([Filename]) -> + Filename; +input_filename([]) -> + "measurements.txt". main(Args) -> - {ok, {Opts, []}} = getopt:parse(options(), Args), - - Time = - case proplists:get_value(eprof, Opts) of - true -> - logger:info(#{label => "Enabling eprof"}), - eprof:start(), - eprof:start_profiling(erlang:processes()), - T = do_main(Opts), - eprof:stop_profiling(), - eprof:analyze(), - eprof:stop(), - T; - _ -> - do_main(Opts) - end, - - io:format("Finished, time = ~w seconds~n", - [erlang:convert_time_unit(Time, microsecond, second)]). - -%% Allow any logger events to be printed to console before exiting. -flush() -> - logger_std_h:filesync(default). - -do_main(Opts) -> - {Time, _} = timer:tc(fun() -> run(proplists:get_value(file, Opts)) end), + {Time, _} = timer:tc(fun() -> run(input_filename(Args)) end), + io:format("Elapsed: ~f seconds~n", [Time / 1000000.0]), Time. -run([Filename]) -> - run(Filename); -run(Filename) when is_atom(Filename) -> - run(atom_to_list(Filename)); run(Filename) -> - {Time, _} = timer:tc(fun() -> map_cities(Filename) end), - NumCities = length(get()), - ?assertEqual(?EXPECTED_NUM_CITIES, NumCities), - io:format("Mapped ~p citites in ~w ms~n", - [NumCities, erlang:convert_time_unit(Time, microsecond, millisecond)]), - try - process_flag(trap_exit, true), - case file:open(Filename, [raw, read, binary]) of - {ok, FD} -> - ProcessorPids = start_processors(), - {ok, Bin} = file:pread(FD, 0, ?BUFSIZE), - read_chunks(FD, 0, byte_size(Bin), Bin, ?BUFSIZE, ProcessorPids, length(ProcessorPids) * 3), - Map = wait_for_completion(ProcessorPids, #{}), - Fmt = format_final_map(Map), - io:format("~ts~n", [Fmt]); - {error, Reason} -> - io:format("*** Failed to open ~ts: ~p~n", [Filename, Reason]), - flush(), - erlang:halt(1) - end - catch Class:Error:Stacktrace -> - io:format("*** Caught exception: ~p~n", [{Class, Error, Stacktrace}]), - flush(), - erlang:halt(1) - end. - -map_cities(Filename) -> - case file:open(Filename, [raw, read, binary]) of - {ok, FD} -> - {ok, Bin} = file:pread(FD, 0, ?MAP_CITIES_BUFSIZE), - map_cities0(Bin, 1); - {error, Reason} -> - io:format("*** Failed to open ~ts: ~p~n", [Filename, Reason]), - flush(), - erlang:halt(1) - end. - -station_key(Station) -> - lists:foldl(fun(C, Acc) -> ?KEY(C, Acc) end, - 0, binary_to_list(Station)). - -map_cities0(<<>>, _) -> - ok; -map_cities0(Bin, N) -> - maybe - [First, Rest] ?= binary:split(Bin, <<"\n">>), - [Station, _] ?= binary:split(First, <<";">>), - Key = station_key(Station), - case get({key, Key}) of - undefined -> - put({key, Key}, {station, Station}), - map_cities0(Rest, N + 1); - {station, Clash} when Clash =/= Station -> - throw({name_clash, Key, Station, Clash}); - _ -> - map_cities0(Rest, N + 1) - end - end. + process_flag(trap_exit, true), + Workers = start_workers(), + read_chunks(Filename, Workers), + io:format("Finished reading input file, waiting for workers to finish.~n", []), + Map = wait_for_completion(Workers, #{}), + print_results(Map). + +print_results(Map) -> + Str = "{" ++ + lists:join( + ", ", + lists:sort(lists:map( + fun({Station, {Min, Max, Count, Sum}}) -> + Mean = Sum / Count, + Station0 = list_to_binary(lists:reverse(binary_to_list(Station))), + io_lib:format("~ts=~.1f/~.1f/~.1f", + [Station0, Min/10, Mean/10, Max/10]) + end, maps:to_list(Map)))) + ++ "}", + io:format("~ts~n", [Str]). -%% Wait for processors to finish +%% Wait for workers to finish wait_for_completion([], Map) -> Map; wait_for_completion(Pids, Map) -> @@ -169,11 +62,16 @@ wait_for_completion(Pids, Map) -> {result, Data} -> wait_for_completion(Pids, merge_location_data(Map, Data)); give_me_more -> - %% These are received when the chunk processors wants more data, - %% but we have already consumed the entire file. + %% These are received when the workers wants more data, but we + %% have already consumed the entire file. wait_for_completion(Pids, Map) end. +read_chunks(Filename, Workers) -> + {ok, FD} = file:open(Filename, [raw, read, binary]), + {ok, Bin} = file:pread(FD, 0, ?BUFSIZE), + read_chunks(FD, 0, byte_size(Bin), Bin, ?BUFSIZE, Workers, length(Workers) * 3). + read_chunks(FD, N, Offset, PrevChunk, BufSize, TargetPids, 0) -> receive give_me_more -> @@ -228,45 +126,25 @@ merge_location_data(Map1, Map2) -> end end, #{}, Stations). -format_final_map(Map) -> - "{" ++ - lists:join( - ", ", - lists:sort(lists:map( - fun({Station, {Min, Max, Count, Sum}}) -> - Mean = Sum / Count, - case get({key, Station}) of - {station, StationBin} -> - io_lib:format("~ts=~.1f/~.1f/~.1f", - [StationBin, Min/10, Mean/10, Max/10]); - Other -> - io:format("~p~n", [get()]), - throw({failed_to_lookup_station, Other, Station}) - end - end, maps:to_list(Map)))) - ++ "}". - -%% Chunk processor. Receives chunks from the main process, and parses -%% them into temperatures. -start_processors() -> - start_processors(erlang:system_info(logical_processors)). +start_workers() -> + start_workers(erlang:system_info(logical_processors)). -start_processors(NumProcs) -> +start_workers(NumProcs) -> Self = self(), - io:format("Starting ~p parallell chunk processors~n", [NumProcs]), + io:format("Starting ~p parallel workers~n", [NumProcs]), lists:foldl( fun(_, Pids) -> - [spawn_link(fun() -> chunk_processor(Self) end)|Pids] + [spawn_link(fun() -> worker_loop(Self) end)|Pids] end, [], lists:seq(1, NumProcs)). -chunk_processor(Pid) -> +worker_loop(Pid) -> receive %% This only happens on the very last line. {chunk, [Chunk]} -> process_station(Chunk), Pid ! give_me_more, - chunk_processor(Pid); + worker_loop(Pid); {chunk, [First, Second]} -> case process_station(First) of <<>> -> @@ -277,7 +155,7 @@ chunk_processor(Pid) -> process_station(<>) end, Pid ! give_me_more, - chunk_processor(Pid); + worker_loop(Pid); eof -> Map = maps:from_list(get()), Pid ! {result, Map}, @@ -287,12 +165,12 @@ chunk_processor(Pid) -> end. process_station(Station) -> - process_station(Station, 0). + process_station(Station, <<>>). process_station(<<";", Rest/bitstring>>, Station) -> process_temp(Rest, Station); process_station(<>, Station) -> - process_station(Rest, ?KEY(C, Station)); %% magic happens here + process_station(Rest, <>); process_station(Bin, _) -> Bin.