Load balancing for distributed processing of deterministically assigned data using statistical analysis of block data
11074516 · 2021-07-27
Assignee
Inventors
- Randall Smith (Palo Alto, CA, US)
- Suratna Budalakoti (Foster City, CA, US)
- Alan Wood (San Jose, CA, US)
Cpc classification
G06N7/01
PHYSICS
G06F16/1724
PHYSICS
International classification
Abstract
Dynamic generation and implementation of assignment mappings of data items in large data files to distributed processors to achieve objectives such as reduced overall processing time like. Any appropriate key (e.g., character string) can be identified or obtained for each data item in a data file and the file can be segmented into sequential data blocks, where each data block includes a set of data items. The data items in each of a first plurality of the blocks (e.g., sampled block set) may be initially sorted into one of a plurality of key ranges of a search space (each corresponding to a different respective processor) and analyses conducted on the data items totals in each key range. The key range boundaries can be adjusted by accounting for uncertainty in the sample estimates to more evenly distribute data items from all blocks sent to each processor and thereby achieve the objective.
Claims
1. A method for use in managing loads among a plurality of parallel processors, including: receiving, at a processor executing code to provide a mapping engine, a data file that includes a plurality of data items, wherein the processor is communicatively linked via a communication network to the parallel processors; with the mapping engine, determining a unique key for each data item of the plurality of data items; with the mapping engine, sampling a plurality of first blocks of the data items, wherein each first block includes a different subset of the plurality of data items; with the mapping engine, sorting the unique keys in each first block into one of a plurality of key ranges in a sort space, wherein each key range includes first and second boundaries, and wherein each key range corresponds to a different respective one of the parallel processors; conducting, by the processor using the mapping engine, an analysis on the unique keys in each key range to determine, for each key range, a probability of its corresponding one of the parallel processors completing execution of all data items in the key range last among all of the parallel processors; with the mapping engine, adjusting one or more of the first and second boundaries of one or more of the key ranges such that the probabilities of the parallel processors approach equalization; and sending the data items in the data file associated with unique keys in each key range to the different respective ones of the parallel processors for execution after the adjusting based on a mapping generated by the mapping engine that indicates which different respective one of the parallel processors is to process each of the plurality of data items, wherein the parallel processors execute the respective data items in parallel.
2. The method of claim 1, wherein the conducting includes: summing, for each key range, the unique keys in the key range among all of the first blocks to obtain a sum of unique keys in each key range among all of the first blocks; and analyzing the sums.
3. The method of claim 2, further including: determining, for each key range, a mean number of unique keys per first block in the key range; determining, for each key range, a standard deviation of the unique keys per first block in the key range; and manipulating the mean and standard deviation for each key range, wherein the adjusting is based on the manipulating.
4. The method of claim 3, wherein the manipulating includes: obtaining, for each different respective parallel processor, a measure of uncertainty in the mean number of unique keys per first block (“standard error”) in its corresponding key range based on a) the standard deviation of the unique keys per first block in the corresponding key range and b) a total number of the plurality of first blocks, wherein the probability for each parallel processor is based on a) the mean number of unique keys per first block in its corresponding key range and b) the “standard error” in its corresponding key range.
5. The method of claim 4, wherein the adjusting includes adjusting the one or more of the first and second boundaries of one or more of the key ranges such that for any of the key ranges, a sum of: a) the mean number of unique keys per first block for the key range, and b) a product of i) the standard error for the key range and ii) a constant, is equal to the same value.
6. The method of claim 1, wherein the adjusting includes adjusting the one or more of the first and second boundaries of the one or more of the key ranges so as to encompass fewer data items in the data file when the probability of the one or more parallel processors corresponding to the one or more of the key ranges is greater than the inverse of a total number of the parallel processors, and wherein the adjusting includes adjusting the one or more of the first and second boundaries of the one or more of the key ranges so as to encompass more data items in the data file when the probability of the one or more parallel processors corresponding to the one or more of the key ranges is less than the inverse of a total number of the parallel processors.
7. The method of claim 1, wherein the conducting includes determining that each key range has a different total number of unique keys compared to the other key ranges and wherein the adjusting includes adjusting the one or more of the first and second boundaries of the one or more of the key ranges such that that the total number of unique keys in each key ranges approaches equalization.
8. The method of claim 1, further including: receiving a request for the one of the plurality of parallel processors that processed a particular one of the plurality of data items identified by a particular unique key; and using the particular unique key as an index into the mapping to identify the particular parallel processor.
9. The method of claim 1, wherein each unique key is an alphanumeric string, and wherein each of the first and second boundaries of each of the key ranges is an alphanumeric string.
10. A method of implementing an assignment plan to map each of a plurality of data items in a data file to one of a plurality of parallel processors, comprising: with a mapping engine running on a computer system communicatively coupled to the parallel processors, sorting unique keys of data items in each of a plurality of first blocks of the data file into one of a plurality of key ranges in a sort space, wherein each key range corresponds to a different respective one of the parallel processors; with the mapping engine, determining, for each key range, a mean number of unique keys per first block in the key range; with the mapping engine, determining, for each key range, a standard deviation of the unique keys per first block in the key range; with the mapping engine, obtaining, for each different respective parallel processor, a measure of uncertainty in the mean number of unique keys per first block (“standard error”) based on a) the standard deviation of the unique keys per first block in the corresponding key range and b) a total number of the plurality of first blocks; generating, by the mapping engine based on the a) mean number of unique keys per first block and b) the standard error for each different respective parallel processor, a mapping that indicates which different respective one of the plurality of parallel processors is to process each of the plurality of data items; and sending the data items in the data file associated with unique keys in each key range to the different respective ones of the parallel processors for execution based on the generated mapping, wherein the parallel processors execute the respective data items in parallel.
11. The method of claim 10, further including: using, for each different respective parallel processor, a) the mean number of unique keys per first block and b) the standard error for each different respective parallel processor, to determine a probability of the parallel processor completing execution of the data items in its respective key range last among all of the parallel processors; adjusting one or more of the first and second boundaries of one or more of the key ranges such that the probabilities of the parallel processors approach equalization upon: re-sorting the unique keys of data items in each of the plurality of first blocks of the data file into one of a plurality of key ranges the sort space after the adjusting; re-determining, for each key range, the mean number of unique keys per first block in the key range; and re-determining, for each key range, the standard deviation of the unique keys per first block in the key range.
12. The method of claim 11, wherein the adjusting includes adjusting the one or more of the first and second boundaries of one or more of the key ranges such that for any of the key ranges, a sum of: a) the mean number of unique keys per first block for the key range, and b) a product of i) the standard error for the key range and ii) a constant, is equal to the same value.
13. A system for parallel execution of data items, comprising: a plurality of interconnected processors performing parallel execution of data items; and a computing system interconnected to the plurality of interconnected processors via a communication network, the computing system including: a processor; and a memory interconnected to the processor and including a set of computer readable instructions that are executable by the processor to provide functions of a mapping engine including to: first sort unique keys of data items in each of a plurality of sampled blocks of a data file into one of a plurality of key ranges in a sort space, wherein each key range includes first and second boundaries, and wherein each key range corresponds to a different respective one of the plurality of interconnected processors; adjust, one or more times, one or more of the first and second boundaries of one or more of the key ranges; sort, after each adjustment, the unique keys of the data items in each of the plurality of sampled blocks of the data file into one of the plurality of key ranges in the sort space, wherein each adjustment is made such that a probability of each of the plurality of interconnected processors completing execution of data items in its corresponding key range last among all of the plurality of interconnected processors after the subsequent sort approaches equalization; and send the data items in the data file associated with unique keys in each key range to different respective ones of the plurality of interconnected processors for execution after the one or more adjustments and sorts, wherein the plurality of interconnected processors execute the respective data items in parallel.
14. The system of claim 13, wherein the set of computer readable instructions are executable by the processor to, after each adjustment and sort pair: determine, for each key range, a mean number of unique keys per first block in the key range; determine, for each key range, a standard deviation of the unique keys per first block in the key range; and manipulate the mean and standard deviation for each key range to determine the probability of the corresponding one of the plurality of interconnected processors.
15. The system of claim 14, wherein the set of computer readable instructions are executable by the processor to: obtain, for each different respective one of the plurality of interconnected processors, a measure of uncertainty in the mean number of unique keys per first block (“standard error”) based on a) the standard deviation of the unique keys per first block in the corresponding key range and b) a total number of the plurality of first blocks; and manipulate the standard error for of the mean number of keys per first block in each key range to determine the probability of the corresponding one of the plurality of interconnected processors.
16. The system of claim 15, wherein the set of computer readable instructions are executable by the processor to: use, for each different respective one of the plurality of interconnected processors, a) the mean number of unique keys per first block and b) the standard error for each different respective one of the plurality of interconnected processors, to determine a probability of the one of the plurality of interconnected processors completing execution of the data items in its respective key range last among all of the plurality of interconnected processors.
17. The system of claim 13, wherein the one or more adjustments are made such that after the sort of a final one of the one or more adjustments, a sum of: a) the mean number of unique keys per first block for each key range, and b) a product of i) the standard error for the key range and ii) a constant, is equal to the same value for each of the key ranges.
18. The system of claim 13, wherein each unique key is a numeric or an alphanumeric string, and wherein each of the first and second boundaries of each of the key ranges is a numeric or an alphanumeric string.
19. The system of claim 13, wherein the first and second boundaries of adjacent ones of the key ranges are identical.
Description
BRIEF DESCRIPTION OF THE DRAWINGS
(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
DETAILED DESCRIPTION
(9) The present disclosure is generally directed to various utilities (e.g., systems, methods, etc.) for use in dynamically generating and implementing assignment mappings of data items in large data files to distributed processors to achieve objectives such as reduced overall processing time and the like. Each data item in a data file can be associated with any appropriate key (e.g., numeric, alphanumeric, etc.) and the file can be segmented into sequential data blocks, where each data block includes a set of data items. The data items in each of a first plurality of the blocks (e.g., sampled block set) may be initially sorted into one of a plurality of key ranges of a search space (each corresponding to a different respective processor) and analyses conducted on the data items totals in each key range. The key range boundaries can be adjusted by accounting for uncertainty in the sample estimates to more evenly distribute data items from the blocks sent to each processor and thereby achieve the objective(s).
(10)
(11) As shown, the computing system 104 may include storage 112 (e.g., flash, other non-volatile memory device, etc.) on which the data file 200 may be stored, a memory 116 (e.g., one or more RAM or other volatile memory modules) including a mapping engine 120 (e.g. one or more sets of computer-readable instructions) for generating and implementing an assignment plan 124 to be used in sending the data items 204 to the various processors 108, a processor 128 (e.g., one or more central processing units (CPUs), processor cores, etc.) for executing the computer-readable instructions from the memory 116, and various components 132 (e.g., peripheral components, interfaces, etc.) that are not illustrated in the interest of clarity, all of which may be electrically interconnected by one or more busses 136, networks, and/or the like.
(12)
(13)
(14) Reference is now made to
(15) At steps 304 and 308, the method 300 may include determining a policy to associate respective unique keys 216 with data items 204 in a data file 200 (e.g., according to a policy or the like) and sampling blocks of data items 204 from the data file 200. As discussed previously, for instance, an entirety or substantial entirety of the data items 204 in the data file 200 to be processed by the distributed processors 208 may be segmented into blocks 208 (a plurality of “second blocks”), where a subset of the total number of blocks 208 may be sampled to obtain a plurality of sampled blocks 212 (e.g., a plurality of “first blocks”), and where a total number of the first blocks is less than a total number of the second blocks. For instance, the second blocks may be at least partially non-overlapping with the first blocks. In one arrangement, the computing system 104 may maintain a separate representative file structure or database of the data items 204, unique keys 216, blocks 208, etc. (in the same position or sequence relative to each other as in the data file 200) such that the various utilities disclosed herein (e.g., in relation to sorting, adjusting, etc.) may operate primarily on such representative file structure or database rather than on the actual data file 200 itself.
(16) Again with reference to
(17) In the example illustrated in
(18) Each of the first and second boundaries 258.sub.1, 258.sub.2 for each key range 254 may generally correspond in type to a portion of the type of keys 216 of the data items 204. As each key 216 is in the form of a two character alphanumeric string in the simplistic example illustrated in
(19) In one arrangement, the first and second boundaries 258.sub.1, 258.sub.2 of the various key ranges 254 may be initially selected to such that the key ranges 254 are of a substantially equal size. In other words, the sort space 250 may be essentially divided into M equal parts (key ranges), where M corresponds to the number of distributed processors 108. The computing system 104 (e.g., the mapping engine 120) may then sort the keys 216 of the sampled blocks 212 into one of the key ranges 254 as illustrated in
(20) Turning again to
(21) As mentioned, the first and second boundaries 258.sub.1, 258.sub.2 of the key ranges 254 in
(22) In one embodiment, the value R(i, n, r) may represent the number of data items from a particular sampled block i that maps to processor n with particular dividing point quantities r (e.g., first and second boundaries 258.sub.1, 258.sub.2). With reference to the example in
(23) The associated standard deviation s.sub.n of the distribution of data items μ.sub.n per sampled block 212 may also be determined in any appropriate manner and used to calculate the standard error σ.sub.n in of the mean number of data items μ.sub.n per block 208 for each processor n, where σ.sub.n may be represented as follows:
(24)
where “b” is the number of sampled blocks 212.
(25) In the case where the total number of blocks 208 to be processed (“B”) becomes increasingly small, the standard error σ.sub.n may be written as:
(26)
(27) In one arrangement, it may be assumed that each data item is identical in processing demand, and requires one unit of time to process. Accordingly, a measure of data counts (e.g., μ) may be freely substituted for time.
(28) As discussed previously, one manner in which to complete processing of all data items as quickly (or as substantially as quickly) as possible is to identify a particular combination of first and second boundaries 258.sub.1, 258.sub.2 of the various key ranges 254 that equalizes the probability that each processor n will be the last to finish processing of its data items among all of the processors. When all processors n are equally likely to finish last, they each have a probability of finishing last equal to 1/N, where N is the number of processors.
(29) To calculate the probability of finishing last, the following Gaussian probability distribution functions may be utilized:
(30)
(31) The following numerical integration may be performed for each processor n to attain the probability of finishing last L.sub.n for the processor n:
(32)
(33) Once the above quantities are known, the ranges or dividing point quantities r (e.g., first and second boundaries 258.sub.1, 258.sub.2) can be adjusted 324, the keys resorted 312 and re-aggregated 316, and then L.sub.n re-determined for each processor n, with an eventual goal of achieving equalization or substantial equalization of L.sub.n for all of the processors n. In one variation, the continuous product may be replaced in the integrand with the product that includes all processors, including the case m=n such that the resulting product is independent of n.
(34) For instance,
(35) In the case where L.sub.n for a particular processor n is greater than a target value of 1/N (where N is the number of processors 108), the ranges or dividing point quantities r (e.g., first and second boundaries 258.sub.1, 258.sub.2) can be iteratively adjusted 324 such that the particular processor n receives fewer keys 216 (or fewer respective data items 204) upon a subsequent re-sorting 312. On the other hand, in the case where L.sub.n for a particular processor n is less than the target value of 1/N, the ranges or dividing point quantities r (e.g., first and second boundaries 258.sub.1, 258.sub.2) can be iteratively adjusted 324 such that the particular processor n receives more keys 216 (or more respective data items 204) upon a subsequent re-sorting 312. This approach to iteratively adjusting a vector of values L.sub.n may be similar to one or more “hill climbing” numerical analysis techniques.
(36) One specific manner for obtaining equalization of L.sub.n for each of the processors n will now be discussed. Starting with the first processor n (e.g., processor 108.sub.1), its new upper bound (e.g., second boundary 258.sub.2) may be set according to the following:
(37)
for some arbitrarily chosen 0<α<1 such as α=1/2.
(38) Once r.sub.1′ is determined, the computing system 104 may move on to
(39)
(40) The upper bound r value for processor n (the last key range 254) may be determined as follows:
(41)
(42) Once all N−1 of the new range boundaries have been set, a new set of the L.sub.n may be determined. The process of adjusting the ranges may then be repeated as above after reducing the value of a according to α←α/2. The process may terminate when all the L.sub.n values are sufficiently close to the target value of 1/N or, in another embodiment, when a time threshold has been reached or exceeded.
(43) In one embodiment, L.sub.n may be substantially equalized for all of the processors n when μ.sub.n and σ.sub.n for any key range satisfy the following:
μ.sub.n+λ(N)σ.sub.n=c
(44) Where
(45)
(46) with
(47)
F(z)=∫.sub.−∞.sup.zdz′P(z′)
(48) The above equations may hold assuming c is a constant value and assuming the various σ.sub.n are not too widely spread apart (e.g., such as when a ratio of a difference between any two standard deviations and the average of the standard deviations is much less than 1). Furthermore, λ(N) winds up being a constant value that depends on the number of processors. Because evaluating the sum μ.sub.n+λ(N)σ.sub.n may not require performing a numerical integration (e.g., as does the computation of the probability of finishing last), balancing based only on the estimated total and standard error may be significantly faster. In one embodiment, c may be initially estimated with the average of the estimated totals (
c=
(49)
(50) In some situations, multiple data items 204 can end up with the same sort priority which may be referred to as an occurrence of a “heavy hitter” (e.g., in the case where unique keys for data items 204 are not obtained). In
(51) In the event that a “heavy hitter” crosses over several key ranges 254, one solution is to assure that all items have a unique sort position by way of properly constructing the keys 216. For instance, the file position (e.g., the line number) of the key 216 may be appended at the end of the key 216. In another arrangement, the data items 204 may be sent to the heavy hitter processors 108 in round robin fashion to equalize the load among them. As the first and last processors 108 in the heavy hitter range may also be processing other data as well, their heavy hitter load can be diminished proportionately. To accomplish this, during the Assignment Plan phase, the sample data can be scanned for the processors in question to find the last sort value before the heavy hitter, and the first sort value after the heavy hitter. For example, if the entire data set consists of B blocks, and the sample consists of b blocks, and processor f is the first to handle a heavy hitter, the total amount of data it is expected to process is (B/b)/(r.sub.j−r.sub.j−1). If the first occurrence of the heavy hitter key is at sort position h, then the proportion of the time that processor f may be included in a round robin pass may be (r.sub.f−h)/(r.sub.f−r.sub.f−1). A similar formula could be applied to the last processor f in the heavy hitter range.
(52) It will be readily appreciated that many deviations may be made from the specific embodiments disclosed in the specification without departing from the spirit and scope of the invention. For instance, some arrangements envision that different data item types may require different amounts of processing time by the processors 108 and this may be taken account in the above discussion in any appropriate manner.
(53) As mentioned, embodiments disclosed herein can be implemented as one or more computer program products, i.e., one or more modules of computer program instructions encoded on a computer-readable medium for execution by, or to control the operation of, data processing apparatus (processors, cores, etc.). The computer-readable medium can be a machine-readable storage device, a machine-readable storage substrate, a memory device, a composition of matter affecting a machine-readable propagated signal, or a combination of one or more of them. In addition to hardware, code that creates an execution environment for the computer program in question may be provided, e.g., code that constitutes processor firmware, a protocol stack, a database management system, an operating system, or a combination of one or more of them.
(54) Certain features that are described in this specification in the context of separate embodiments can also be implemented in combination in a single embodiment. Conversely, various features that are described in the context of a single embodiment can also be implemented in multiple embodiments separately or in any suitable subcombination. Moreover, although features may be described above as acting in certain combinations and even initially claimed as such, one or more features from a claimed combination can in some cases be excised from the combination, and the claimed combination may be directed to a subcombination or variation of a subcombination.