DISTRIBUTED PROCESSING SYSTEM AND DISTRIBUTED PROCESSING METHOD
20210117783 · 2021-04-22
Assignee
Inventors
- Kenji Kawai (Tokyo, JP)
- Junichi Kato (Tokyo, JP)
- Huycu Ngo (Tokyo, JP)
- Yuki Arikawa (Tokyo, JP)
- Tsuyoshi Ito (Tokyo, JP)
- Takeshi Sakamoto (Tokyo, JP)
Cpc classification
International classification
Abstract
Each of distributed processing nodes [n] (n=1, . . . , and N) packetizes pieces of distributed data [m, n] as packets for every M weights w [m] ((m=1, . . . , and M) of a neural network to be learned in an order of numbers m, transmits the packets to a consolidation processing node, receives a packet transmitted from the consolidation processing node to acquire consolidated data R [m] in the order of numbers m and update the weights w [m] of the neural network on the basis of the consolidated data R [m].
Claims
1.-4. (canceled)
5. A distributed processing system comprising: a consolidation processor; and N distributed processors, wherein N is an integer equal to or greater than 2, and wherein each of the N distributed processors is configured to: packetize distributed data D [m, n] as first packets for each of M weights w [m] of a neural network to transmit the first packets to the consolidation processor, the distributed data D[m, n] is packetized in an order of numbers m, n=1, . . . , and N, m=1, . . . , and M, and M is an integer equal to or greater than 2; and receive second packets, from the consolidation processor, to acquire consolidated data R [m] to update the M weights w [m] of the neural network according to the consolidated data R [m], the consolidated data R[m] is received in the order of the numbers m; and wherein the consolidation processor is configured to: receive the first packets transmitted from each of the distributed processors to acquire the distributed data D [m, n]; generate the consolidated data R [m] by consolidating the distributed data D [m, n] of the distributed processors for each of the M weights w [m]; and packetize the consolidated data R [m] as the second packets to transmit the second packets to each of the N distributed processors, wherein the consolidated data R [m] is packetized in the order of the numbers m.
6. The distributed processing system according to claim 5, wherein each of the N distributed processors includes: a transmitter configured to packetize the distributed data D [m, n] as the first packets in the order of the numbers m to transmit the first packets to the consolidation processor; a receiver configured to receive the second packets from the consolidation processor to acquire the consolidated data R [m] in the order of the numbers m; and a weight updating processor configured to update the M weights w [m] of the neural network according to the consolidated data R [m].
7. The distributed processing system according to claim 5, wherein the consolidation processor includes: a receiver configured to receive the first packets transmitted from each of the N distributed processors to acquire the distributed data D [m, n] in the order of the numbers m; a consolidation processor configured to generate the consolidated data R [m] by consolidating the distributed data D [m, n] of the N distributed processors for each of the M weights w [m]; and a transmitter configured to packetize the consolidated data R [m] as the second packets in the order of the numbers m to transmit the second packets to each of the N distributed processors.
8. The distributed processing system according to claim 5, wherein each of the N distributed processors further includes: a gradient calculation processor configured to calculate a respective gradient of a loss function of the neural network for each piece of sample data with respect to each of the M weights w [m] when sample data for learning the neural network is input; and an in-node consolidation processor configured to generate and store the distributed data D [m, n], wherein the distributed data D[m,n] is numerical values obtained by consolidating the respective gradient for each piece of the sample data with respect to each of the M weights w [m].
9. A method comprising: packetizing, by each of N distributed processors, distributed data D [m, n] as first packets for each of M weights w [m] of a neural network to transmit the first packets to a consolidation processor, the distributed data D[m, n] is packetized in an order of numbers m, N is an integer equal to or greater than 2, n=1, . . . , and N, m=1, . . . , and M, and M is an integer equal to or greater than 2; and receiving, by each of the N distributed processors from the consolidation processor, second packets, to acquire consolidated data R [m] to update the M weights w [m] of the neural network according to the consolidated data R [m], the consolidated data R[m] is received in the order of the numbers m.
10. The method according to claim 9 further comprising: receiving, by the consolidation processor, the first packets transmitted from each of the N distributed processors to acquire the distributed data D [m, n]; generating, by the consolidation processor, the consolidated data R [m] by consolidating the distributed data D [m, n] of the distributed processors for each of the M weights w [m]; and packetizing, by the consolidation processor, the consolidated data R [m] as the second packets to transmit the second packets to each of the N distributed processors, wherein the consolidated data R [m] is packetized in the order of the numbers m.
11. The method of claim 9, further comprising: calculating a respective gradient of a loss function of the neural network for each piece of sample data with respect to each of the M weights w [m] when sample data for learning the neural network is input; and generating and storing the distributed data D [m, n], wherein the distributed data D[m,n] is numerical values obtained by consolidating the respective gradient for each piece of the sample data with respect to each of the M weights w [m].
Description
BRIEF DESCRIPTION OF THE DRAWINGS
[0025]
[0026]
[0027]
[0028]
[0029]
[0030]
[0031]
[0032]
[0033]
[0034]
[0035]
[0036]
[0037]
DETAILED DESCRIPTION OF ILLUSTRATIVE EMBODIMENTS
First Example
[0038] Examples of the present invention will be described below with reference to the drawings.
[0039] Note that, in embodiments of the present invention, “nodes” refers to devices such as servers dispersedly disposed on a network.
[0040]
[0041]
[0042] Note that the present invention is not limited to a sample data collecting method performed by a data collecting node and a method of dividing collected sample data into N sets and dispatching each of the sets to each of distributed processing nodes 2[n], and any method can be applied.
[0043] When sample data x[n, s] is input, the gradient calculation processing unit 21 of each of the distributed processing nodes 2[n] (n=1, . . . , and N) calculates a gradient G[m, n, s] of a loss function of the neural network 26 for each piece of sample data x[n, s] with respect to each of M weights w [m] (m=1, . . . , and M) of the neural network 26 to be learned (M is an integer equal to or greater than 2) (step S101 in
[0044] A method of constructing the neural network 26 in each of the distributed processing nodes 2[n] as software, a weight w [m] of the neural network 26, a loss function, which is an indicator indicating of the degree of poorness of performance of the neural network 26, and a gradient G[m, n, s] of the loss function are well-known techniques, and thus detailed description thereof will be omitted.
[0045] Next, the in-node consolidation processing unit 22 of each of the distributed processing nodes 2[n] (n=1, . . . , and N) generates and stores distributed data D [m, n], which is numerical values obtained by consolidating a gradient G[m, n, s] for each piece of sample data, for each weight w [m] (step S102 in
[Equation1]
D[m,n]=Σ.sub.s=1, . . . ,sG[m,n,s] (1)
[0046] Note that a gradient calculation process performed by the gradient calculation processing unit 21 and an in-node consolidation process performed by the in-node consolidation processing unit 22 can be performed in a pipelined manner in units of sample data (the gradient calculation process for any sample data and the in-node consolidation process of consolidating a gradient obtained from one sample data prior to the sample data can be performed at the same time).
[0047]
[0048] In this case, the transmission unit 23 of each of the distributed processing nodes 2[n](n=1, . . . , and N) divides M stored pieces of distributed data D [m, n] (m=1, . . . , and M) into Pg aggregation communication packets (Pg is an integer equal to or greater than 2) by Lg (Lg is an integer equal to or greater than 1 and less than M) (step S103 in
[0049] Note that (M−Lg×(Pg−1)) pieces of distributed data D[i, n] (I=Lg×(Pg−1)+q, q=1, . . . , and M−Lg×(Pg−1)) are stored in the Pg-th aggregation communication packet SP[Pg, n] in a condition where M cannot be divided by Lg.
[0050] Numerical values of {Lg−(M−Lg×(Pg−1))} dummies may be added after (M−Lg×(Pg−1)) pieces of distributed data D[i, n] for the Pg-th aggregation communication packet SP[Pg, n], and all of the aggregation communication packets may equally store Lg pieces of data.
[0051]
[0052] The consolidation processing node 1 acquires Lg pieces of distributed data D[i, n] (i=Lg×(p−1)+l, l=1, . . . , and Lg) stored by the distributed processing node 2[n] from the received aggregation communication packet SP[p, n] (step S201 in
[0053] In this manner, the consolidation processing node 1 can acquire the distributed dataD [m, n] (m=, . . . , and M) stored by each of the distributed processing nodes 2[n] (n=1, . . . , and N) in the order of numbers m of weights w [m].
[0054]
[Equation2]
R[m]=Σ.sub.n=1, . . . ,ND m,n] (2)
[0055] In this manner, a consolidation process is a process of calculating consolidated data R [m] on the basis of distributed data D [m, n] acquired in an order of numbers m. Thus, the consolidation processing node 1 can generate the consolidated data R [m] in the order of the numbers m.
[0056]
[0057] In this case, the consolidation processing node 1 divides M pieces of consolidated data R [m] (m=1, . . . , and M) into Ps dispatch communication packets (Ps is an integer equal to or greater than 2) by Ls pieces of the consolidated data (Ls is an integer equal to or greater than 1 and less than M) (step S204 in
[0058] Note that (M−Ls×(Ps−1)) pieces of consolidated data R[j] (j=Ls×(Ps−1)+0; 0=1, . . . , and M−Ls×(Ps−1)) are stored in the Ps-th dispatch communication packet DP [Ps, n] in a condition where M cannot be divided by Ls.
[0059] Numerical values of {Ls−(M−Ls×(Ps−1))} dummies may be added after (M−Ls×(Ps−1)) pieces of consolidated data R[j] for the Ps-th dispatch communication packet DP[Ps, n], and all of the dispatch communication packets may equally store Ls pieces of data.
[0060]
[0061] Then, the reception unit 24 of each of the distributed processing nodes 2[n] (n=1, . . . , and N) acquire Ls pieces of consolidated data R[j] (j=Ls×(p−1)+k, k=1, . . . , and Ls) generated by the consolidation processing node 1 from the received dispatch communication packets DP[p, n] (step S107 in
[0062] In this manner, each of the distributed processing nodes 2[n] (n=1, . . . , and N) can acquire consolidated data R [m] (m=1, . . . , and M) generated by the consolidation processing node 1 in the order of numbers m of weights w [m].
[0063] Note that the same consolidated data R[j] (j=Ls×(p−1)+k, k=1, . . . , and Ls) regarding all of the distributed processing nodes 2[n] is stored in the p-th dispatch communication packet DP[p, n] which is transmitted by the consolidation processing node 1. Thus, in a case where it is not necessary to designate a destination for the dispatch communication packet DP[p, n] (for example, in a case where a path is different for each distributed processing node as shown in
[0064]
[0065] In the weight updating process, a weight w [m] may be updated for each number m so that a loss function is minimized on the basis of a gradient of the loss function which is indicated by consolidated data R [m]. The updating of a weight w [m] is a well-known technique, and thus detailed description thereof will be omitted.
[0066] In this manner, the weight updating process is a process of updating a weight w [m] on the basis of the pieces of consolidated data R [m] acquired in the order of numbers m of weights w [m]. For this reason, each of the distributed processing nodes 2 [n] (n=1, . . . , and N) can perform a weight updating process for a weight w [m] in the order of numbers m.
[0067] One mini batch learning is terminated due to the termination of the weight updating process, and each of the distributed processing nodes 2 [n] (n=1, . . . , and N) and the consolidation processing node 1 continuously perform the next mini batch learning process on the basis of the updated weights. That is, each of the distributed processing nodes 2 [n] receives sample data for the next mini batch learning from a data collecting node which is not shown in the drawing, and repeat the above-described mini batch learning process to improve the accuracy of inference of the neural network 26.
[0068] Note that the termination of repetition of the mini batch learning includes (A) a case where the number of times of mini batch learning reaches a value designated in advance, (B) a case where the accuracy of inference of the neural network 26 (for example, a percentage of correct answers when the neural network 26 infers a known problem) exceeds a threshold value designated in advance, (C) a case where an improvement in the accuracy of inference of the neural network 26 is stopped (in a case where an increase in the accuracy of inference falls below a threshold value designated in advance when the number of times of mini batch learning designated in advance is repeated), or (D) a case where a combination of at least two cases of (A) to (C) occurs. The termination of such repetition of mini batch learning may be determined individually by each of the distributed processing nodes 2 [n] (n=1, . . . , and N), or may be determined comprehensively by the consolidation processing node 1.
[0069]
[0070] Further, the consolidation processing node 1 performs an all-nodes consolidation process of generating consolidated data R [m] (m=1, . . . , and M) in the order of numbers m on the basis of the M pieces of distributed data D [m, n] (m=1, . . . , and M) acquired in the order of numbers m of weights w [m].
[0071] Further, the consolidation processing node 1 packetizes the M pieces of consolidated data R [m] (m=1, . . . , and M) as packets generated in the order of numbers m of weights w [m] and transmits the packets to each of the distributed processing nodes 2 [n] (n=1, . . . , and N), and each of the distributed processing nodes 2 [n] (n=1, . . . , and N) performs a dispatch communication process of acquiring M pieces of consolidated data R [m] (m=1, . . . , and M) in the order of numbers m.
[0072] Further, each of the distributed processing nodes 2 [n] (n=1, . . . , and N) performs a weight updating process of updating M weights w [m] in the order of numbers m on the basis of M pieces of consolidated data R [m] (m=1, . . . , and M) acquired in the order of numbers m.
[0073] In the present example, an aggregation communication process, an all-nodes consolidation process, a dispatch communication process, and a weight updating process can be performed in parallel at substantially the same time (in a pipelined manner), and a processing time can be drastically reduced as compared to a sequence (
[0074] In other words, when the transmission unit 23 of each of the distributed processing nodes 2 [n] (n=1, . . . , and N) and the consolidation processing node 1 perform the aggregation communication process described in
[0075] The consolidation processing node 1 performs the all-nodes consolidation process described in
[0076] The consolidation processing node 1 and the reception unit 24 of each of the distributed processing nodes 2 [n] (n=1, . . . , and N) perform the dispatch communication process described in
[0077] The weight updating processing unit 25 of each of the distributed processing nodes 2 [n] (n=1, . . . , and N) perform the weight updating process described in
[0078] Thus, for example, in a case where a time T is required for each of an aggregation communication process, an all-nodes consolidation process, a dispatch communication process, and a weight updating process, a time of 4T is required for the termination of all of these processes in the related art, but a time of T+α is required in the present example. Here, the α is a delay time from a point in time when any distributed processing node 2 [n] transmits any distributed data D [m, n] to the consolidation processing node 1 to when the updating of a weight w [m] is completed. In the present example, processes are performed in a pipelined manner in units of numbers m of weights w [m], and thus a time α is a sufficiently short period of time as compared to T. Thus, in the present example, a time required for an aggregation communication process, an all-nodes consolidation process, a dispatch communication process, and a weight updating process can be shortened to approximately ¼ as compared to the related art.
Second Example
[0079] Next, a second example of the present invention will be described. In the present example, a configuration example of the consolidation processing node 1, which is a component of the distributed processing system for deep learning in the first example, is described.
[0080] The consolidation processing node 1 includes reception units 10 [n] (n=1, . . . , and N), reception First In, First Out (FIFO) buffers 11 [n], a consolidation processing unit 12, and transmission units 13 [n].
[0081] As described in the first example, the consolidation processing node 1 receives M pieces of distributed data D [m, n] (m=1, . . . , and M) as Pg aggregation communication packets SP [p, n] (p=1, . . . , and Pg) divided by Lg pieces of consolidated data from each of the distributed processing nodes 2 [n] (n=1, . . . , and N) in an aggregation communication process. Lg pieces of distributed data D [i, n] (i=Lg×(p−1)+l, l=1, . . . , and Lg) are stored in the aggregation communication packets SP [p, n] (p=1, . . . , and Pg).
[0082] In addition, the consolidation processing node 1 divides M pieces of consolidated data R [m] (m=1, . . . , and M) to each of the distributed processing nodes 2 [n] (n=1, . . . , and N) by Ls pieces of consolidated data to PS aggregation communication packets DP [p, n] (p=1, . . . , and Ps) and transmits the Ps aggregation communication packets DP [p, n] (p=1, . . . , and Ps) in a dispatch communication process.
[0083] As shown in
[0084] Each of the reception unit 10 [n] perform the aggregation communication process described in
[0085] As shown in
[0086] Specifically, the reception FIFO buffer 11 [n] accumulates Lg pieces of distributed data D [i, n] (i=Lg×(p−1)+l, l=1, . . . , and Lg) transmitted from the corresponding reception unit 10 [n] in the order of numbers i (i is a portion of a number m). The accumulation is started from a state where each of the reception FIFO buffers 11 [n] is empty. The reception of the aggregation communication packet SP [p, n] and the accumulation of the distributed data D [i, n] are performed Pg times, so that M pieces of distributed data D [m, n] are accumulated in each of the reception FIFO buffers 11 [n].
[0087] Thus, in a case where the same number of pieces of distributed data among the pieces of distributed data accumulated in each of each of the reception FIFO buffers 11 [n] is read, the pieces of distributed data D [m, n] read from each of the reception FIFO buffers 11 [n] are arranged in the order of m=1, . . . , and M.
[0088] Each of the reception FIFO buffers 11 [n] (n=1, . . . , and N) outputs an accumulation presence/absence signal U [n] indicating whether or not distributed data has been accumulated to the consolidation processing unit 12.
[0089] In a case where all of the accumulation presence/absence signals U [n] (n=1, . . . , and N) indicate that distributed data has been accumulated, the consolidation processing unit 12 reads the distributed data one by one from each of the reception FIFO buffers 11 [n]. Note that each of the reception FIFO buffers 11 [n] accumulates distributed data in the order of numbers m, and the consolidation processing unit 12 reads the same number of pieces of distributed data from each of the reception FIFO buffers 11 [n]. For this reason, the numbers m of the pieces of distributed data read from each of the respective reception FIFO buffers 11 [n] has the same value between the reception FIFO buffers 11 [n]. Thus, the accumulation presence/absence signal U [n] does not need to specify the number m of distributed data and only needs to indicate whether or not distributed data to be read next has been accumulated in each of the reception FIFO buffers 11 [n].
[0090] As will be described later, the consolidation processing unit 12 stores consolidated data R [m] generated on the basis of distributed data D [m, n] that has been read in the dispatch communication packet and transmits the stored data from each of the transmission units 13 [n](n=1, . . . , and N). However, in a state where a dispatch communication packet is not transmitted (for example, while another dispatch communication packet is transmitted), the consolidation processing unit 12 holds the reading of the next distributed data D [m, n] until a dispatch communication packet can be transmitted.
[0091] For this reason, each of the transmission units 13 [n] (n=1, . . . , and N) outputs a transmission permission signal V [n] indicating that a dispatch communication packet can be transmitted to the consolidation processing unit 12 when the dispatch communication packet can be transmitted.
[0092] The consolidation processing unit 12 receives accumulation presence/absence signals U [n] from each of the reception FIFO buffers 11 [n] (n=1, . . . , and N) and transmission permission signals V [n] (n=1, . . . , and N) from each of the transmission units 13 [n] (n=1, . . . , and N) and determines whether or not to read distributed data from each of the reception FIFO buffers 11 [n].
[0093] Specifically, the consolidation processing unit 12 reads distributed data D [m, n] from each of the reception FIFO buffers 11 [n] when the accumulation presence/absence signal U [n] indicates that distributed data D [m, n] to be read next has been accumulated and the transmission permission signal V [n] indicates that a dispatch communication packet including consolidated data R [m] generated from the read distributed data D [m, n] can be transmitted.
[0094] Further, the consolidation processing unit 12 generates pieces of consolidated data R [m] in the order of numbers m on the basis of pieces of distributed data D [m, n] (n=1, . . . , and N) read in the order of numbers m from each of the respective reception FIFO buffers 11 [n] and transmits the generated consolidated data R [m] to the transmission unit 13 [n] at the subsequent stage in the order of numbers m. Here, the same consolidated data is transmitted to each of the transmission units 13 [n]. A calculation equation for the consolidated data R [m] is as shown in Equation (2).
[0095] The transmission unit 13 [n] for transmitting a dispatch communication packet to each of the distributed processing nodes 2 [n] (n=1, . . . , and N) is provided for each distributed processing node 2 [n]. The transmission unit 13 [n] performs the dispatch communication process described in
[0096] Each of the transmission units 13 [n] divides pieces of consolidated data R [m] (m=1, . . . , and M) transmitted in the order of numbers m from the consolidation processing unit 12 into Ps dispatch communication packets by Ls dispatch communication packets and transmits distributed data. That is, Ls pieces of consolidated data R [j] (=Ls×(p−1)+k, k=1, . . . , and Ls) are stored in the p-th dispatch communication packet DP [p, n] (p=1, . . . , and Ps) to be transmitted toward the distributed processing node 2 [n]. As described above, each of the transmission units 13 [n] outputs a transmission permission signal V [n] to the consolidation processing unit 12 when the dispatch communication packet DP [p, n] can be transmitted.
[0097] As described in the first example, each of the transmission units 13 [n] stores (M−Ls×(Ps−1)) pieces of consolidated data R [j]=Ls×(Ps−1)+0; 0=1, . . . , and M−Ls×(Ps−1)) in the Ps-th dispatch communication packet DP [Ps, n] in a condition where M cannot be divided by Ls. In addition, each of the transmission units 13 [n] may add numerical values of {Ls−(M−Ls×(Ps−1))} dummies after (M−Ls×(Ps−1)) pieces of consolidated data R [j] for the Ps-th dispatch communication packet DP [Ps, n], and all of the dispatch communication packets may equally store Ls pieces of data.
[0098] As described above, the reception units 10 [n] (n=1, . . . , and N) extract pieces of distributed data D [m, n] in the order of numbers m (m=1, . . . , and M) of weights w [m] from the aggregation communication packet received from the distributed processing node 2 [n] and store the extracted data in each of the reception FIFO buffer 11 [n] for each distributed processing node in the order of numbers m.
[0099] The consolidation processing unit 12 reads the distributed data D [m, n] in the order of numbers m from each of the reception FIFO buffers 11[n] to generate consolidated data R [m] on the basis of the read distributed data D [m, n]. Further, each of the transmission units 13 [n] stores the generated consolidated data R [m] in the dispatch communication packet in the order of numbers m and transmits the dispatch communication packet to each of the distributed processing nodes 2 [n].
[0100] In the related art described in
[0101] On the other hand, in the present example, an aggregation communication process, an all-nodes consolidation process, and a dispatch communication process in the consolidation processing node 1 can be performed in a pipelined manner for different numbers m. For this reason, a time from when pieces of distributed data D [m, n] are received from the each of distributed processing nodes 2 [n] to when pieces of consolidated data R [m] obtained by consolidating the distributed data D [m, n] for all nodes are returned to each of the distributed processing nodes 2 [n] can be drastically reduced as compared to the related art.
[0102] For example, assuming that a time required for processes related to a number m is t, a time from when pieces of distributed data D [m, n] are received from each of the distributed processing nodes 2 [n] to when pieces of consolidated data R [m] obtained by consolidating the distributed data D [m, n] for all of the distributed processing nodes 2 [n] are returned to each of the distributed processing nodes 2 [n] is 4t (the number of pipeline stages=4) in embodiments of the present invention.
[0103] On the other hand, in the related art, a time is required for processes by m times, and thus a time from when pieces of distributed data D [m, n] are received from each of the distributed processing nodes 100 [n] to when pieces of consolidated data R [m] are returned to each of the distributed processing nodes 100 [n] is 4t×M. Thus, in the present example, a time can be shortened to 1/M (m is the number of weights w [m], which may be a value of approximately 100,000,000).
[0104] The other components of the distributed processing system are the same as those described in the first example, and thus description thereof will be omitted in the present example.
[0105] Each of the consolidation processing node 1 and the distributed processing node 2 [n] described in the first and second examples can be realized by a computer including a central processing unit (CPU), a storage device, and an interface, and programs for controlling these hardware resources. The CPU of each of the consolidation processing node 1 and the distributed processing node 2 [n] executes the processes described in the first and second examples in accordance with programs stored in each of the storage devices.
INDUSTRIAL APPLICABILITY
[0106] The present invention can be applied to techniques for performing machine learning of a neural network.
REFERENCE SIGNS LIST
[0107] 1 Consolidation processing node [0108] 2 Distributed processing node [0109] 10 Reception unit [0110] 11 Reception FIFO buffer [0111] 12 Consolidation processing unit [0112] 13 Transmission unit [0113] 20 Sample input unit [0114] 21 Gradient calculation processing unit [0115] 22 In-node consolidation processing unit [0116] 23 Transmission unit [0117] 24 Reception unit [0118] 25 Weight updating processing unit [0119] 26 Neural network