DATA PROCESSING SYSTEM, COMPUTING NODE, AND DATA PROCESSING METHOD
20170331886 · 2017-11-16
Assignee
Inventors
Cpc classification
G06F16/1844
PHYSICS
H04L67/1012
ELECTRICITY
G06F16/2465
PHYSICS
G06F17/16
PHYSICS
International classification
Abstract
A data processing system, a computing node, and a data processing method are provided. The data processing system includes a management node and a first class of computing nodes. The management node is configured to allocate first processing tasks to the first class of computing nodes. At least two computing nodes in the first class of computing nodes concurrently perform the first processing tasks allocated by the management node. A computing node performs a combine2 operation and a reduce2 operation on a data block M.sub.x and a data block V.sub.1x, to obtain a first intermediate result. Then, the management node obtains a processing result for a to-be-processed dataset according to first intermediate results obtained by the first class of computing nodes. According to the data processing system, when a combine operation and a reduce operation are being performed on data blocks, memory space occupied by computation can be reduced.
Claims
1. A data processing system, comprising a management node and a plurality of computing nodes that form a first class of computing nodes, wherein the management node is configured to: allocate a first processing task to each of at least two computing nodes in the first class of computing nodes, wherein a computing node FC.sub.x is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer, and wherein the at least two computing nodes in the first class of computing nodes concurrently perform the first processing tasks allocated by the management node; the computing node FC.sub.x is configured to: obtain, according to the first processing task allocated by the management node, a data block M.sub.x and a data block V.sub.1x in a to-be-processed dataset, wherein the data block M.sub.x is a matrix comprising m rows and n columns of data, the data block V.sub.1x is a vector comprising n-dimensional data, m and n are positive integers, and n is not less than 2; and perform a combine2 operation and a reduce2 operation on the data block M.sub.x and the data block V.sub.1x, to obtain a first intermediate result V′.sub.x, wherein the first intermediate result V′.sub.x is a vector comprising m-dimensional data; and the first intermediate result V′.sub.x has an element v′.sub.i, wherein i is a variant, the value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and the value of j ranges from 1 to n; and the management node is further configured to: obtain a first processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes.
2. The data processing system according to claim 1, wherein the data processing system further comprises at least one computing node that form a second class of computing nodes, and the management node is further configured to: allocate, according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, a second processing task to each of the at least one computing node in the second class of computing nodes, wherein a computing node SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer; the computing node SC.sub.y is configured to: obtain, according to the second processing task allocated by the management node, the first intermediate results, wherein the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks in one row in the to-be-processed dataset; and perform a reduce2 operation on the first intermediate results, to obtain a second intermediate result V″.sub.y, wherein the second intermediate result V″.sub.y is a vector comprising m-dimensional data; and the management node is further configured to: obtain a second processing result for the to-be-processed dataset according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes.
3. The data processing system according to claim 2, wherein the to-be-processed dataset further comprises a data block V.sub.2x, and the data block V.sub.2x is a vector comprising m-dimensional data; and the management node is further configured to: allocate, according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes, a third processing task to the at least one computing node in the second class of computing nodes, wherein the at least one computing node in the second class of computing nodes comprises the computing node SC.sub.y; and the computing node SC.sub.y is further configured to: obtain the data block V.sub.2x in the to-be-processed dataset according to the third processing task; and perform an assign operation on the second intermediate result V″.sub.y and the data block V.sub.2x, to obtain a third processing result for the to-be-processed dataset.
4. The data processing system according to claim 3, wherein m=n, and the data block V.sub.1x and the data block V.sub.2x are a same data block.
5. The data processing system according to claim 2, wherein when the second class of computing nodes comprises at least two computing nodes, the at least two computing nodes concurrently perform the second processing tasks allocated by the management node.
6. The data processing system according to claim 2, wherein the management node is a physical computing device, a virtual machine or a central processing unit; wherein the first class of computing nodes comprises a plurality of physical computing devices or a plurality of virtual machines formed on a physical computing device; and wherein the second class of computing nodes comprises one or more physical computing devices or one or more virtual machines formed on a physical computing device.
7. A data processing system, comprising a management node and a plurality of computing nodes that form a first class of computing nodes, wherein the management node is configured to: allocate a first processing task to each of at least two computing nodes in the first class of computing nodes, wherein a computing node FC.sub.x is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer, and wherein the at least two computing nodes in the first class of computing nodes concurrently perform the first processing tasks allocated by the management node; the computing node FC.sub.x is configured to: obtain, according to the first processing task allocated by the management node, a data block M.sub.1x and a data block M.sub.2x that are in a to-be-processed dataset, wherein the data block M.sub.1x is a matrix comprising m rows and n columns of data, the data block M.sub.2x is a matrix comprising n rows and p columns of data, wherein m, n, and p are positive integers, and n is not less than 2; and perform a combine2 operation and a reduce2 operation on the data block M.sub.1x and the data block M.sub.2x, to obtain a first intermediate result M′.sub.x, wherein the first intermediate result M′.sub.x is a matrix comprising m rows and p columns of data; and the first intermediate result M′.sub.x has an element m′.sub.i,j, wherein i and j are variants, the value of i ranges from 1 to m, the value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m′.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and the value of k ranges from 1 to n; and the management node is further configured to: obtain a first processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes.
8. The data processing system according to claim 7, wherein the data processing system further comprises at least one computing node that form a second class of computing nodes, and the management node is further configured to: allocate, according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, a second processing task to each of the at least one computing node in the second class of computing nodes, wherein a computing node SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer; the computing node SC.sub.y is configured to: obtain, according to the second processing task allocated by the management node, the first intermediate results, wherein the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks M.sub.1x in one row and data blocks M.sub.2x in one column in the to-be-processed dataset; and perform a reduce2 operation on the first intermediate results, to obtain a second intermediate result M″.sub.y, wherein the second intermediate result M″.sub.y is a matrix comprising m rows and p columns of data; and the management node is further configured to: obtain a second processing result for the to-be-processed dataset according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes.
9. The data processing system according to claim 8, wherein the to-be-processed dataset further comprises a data block M.sub.3x, and the data block M.sub.3x is a matrix comprising m rows and p columns of data; and the management node is further configured to: allocate, according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes , a third processing task to the at least one computing node in the second class of computing nodes, wherein the at least one computing node in the second class of computing nodes comprises the computing node SC.sub.y; and the computing node SC.sub.y is further configured to: obtain the data block M.sub.3x in the to-be-processed dataset according to the third processing task; and perform an assign operation on the second intermediate result M″.sub.y and the data block M.sub.3x, to obtain a third processing result for the to-be-processed dataset.
10. The data processing system according to claim 9, wherein n=m, and the data block M.sub.2x and the data block M.sub.3x are a same data block.
11. The data processing system according to claim 9, wherein n=p, and the data block M.sub.1x and the data block M.sub.3x are a same data block.
12. The data processing system according to claim 8, wherein when the second class of computing nodes comprises at least two computing nodes, the at least two computing nodes concurrently perform the second processing tasks allocated by the management node.
13. The data processing system according to claim 7, wherein the management node is a physical computing device, a virtual machine or a central processing unit; wherein the first class of computing nodes is a plurality of physical computing devices or a plurality of virtual machines formed on a physical computing device; and wherein the second class of computing nodes is one or more physical computing devices or one or more virtual machines formed on a physical computing device.
14. A data processing method for use in a data processing system that comprises a management node and a plurality of computing nodes that form a first class of computing nodes, the method comprising: allocating, by the management node, a first processing task to each of at least two computing nodes in the first class of computing nodes, wherein a computing node FC is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer, and wherein the at least two computing nodes in the first class of computing nodes concurrently perform the first processing tasks allocated by the management node; obtaining, by the computing node FC.sub.x according to the first processing task allocated by the management node, a data block M.sub.x and a data block V.sub.1x in a to-be-processed dataset, wherein the data block M.sub.x is a matrix comprising m rows and n columns of data, the data block V.sub.1x is a vector comprising n-dimensional data, m and n are positive integers, and n is not less than 2; performing, by the computing node FC.sub.x a combine2 operation and a reduce2 operation on the data block M.sub.x and the data block V.sub.1x, to obtain a first intermediate result V′.sub.x, wherein the first intermediate result V′.sub.x is a vector comprising m-dimensional data; and the first intermediate result V′.sub.x has an element v′.sub.i, wherein i is a variant, the value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and the value of j ranges from 1 to n; and obtaining, by the management node, a first processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes.
15. The method according to claim 14, wherein the data processing system further comprises at least one computing node that form a second class of computing nodes, and the method further comprises: allocating, by the management node according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, a second processing task to each of the at least one computing node in the second class of computing nodes, wherein a computing node SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer; obtaining, by the computing node SC.sub.y according to the second processing task allocated by the management node, the first intermediate results, wherein the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks in one row in the to-be-processed dataset; performing, by the computing node SC.sub.y, a reduce2 operation on the first intermediate results, to obtain a second intermediate result V″.sub.y, wherein the second intermediate result V″.sub.y is a vector comprising m-dimensional data; and obtaining, by the management node, a second processing result for the to-be-processed dataset according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes.
16. The method according to claim 15, wherein the to-be-processed dataset further comprises a data block V.sub.2x, and the data block V.sub.2x is a vector comprising m-dimensional data; and the method further comprises: allocating, by the management node according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes, a third processing task to the at least one computing node in the second class of computing nodes, wherein the at least one computing node in the second class of computing nodes comprises the computing node SC.sub.y; obtaining, by the computing node SC.sub.y, the data block V.sub.2x in the to-be-processed dataset according to the third processing task; and performing, by the computing node SC.sub.y, an assign operation on the second intermediate result V″.sub.y and the data block V.sub.2x, to obtain a third processing result for the to-be-processed dataset.
17. The method according to claim 16, wherein m=n, and the data block V.sub.1x and the data block V.sub.2x are a same data block.
18. The method according to claim 15, wherein when the second class of computing nodes comprises at least two computing nodes, the at least two computing nodes concurrently perform the second processing tasks allocated by the management node.
19. A data processing method for use in a data processing system that comprises a management node and a plurality of computing nodes that form a first class of computing nodes, the method comprising: allocating, by the management node, a first processing task to each of at least two computing nodes in the first class of computing nodes, wherein a computing node FC.sub.x is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer, and wherein the at least two computing nodes in the first class of computing nodes concurrently perform the first processing tasks allocated by the management node; obtaining, by the computing node FC.sub.x according to the first processing task allocated by the management node, a data block M.sub.1x and a data block M.sub.2x that are in a to-be-processed dataset, wherein the data block M.sub.1x is a matrix comprising m rows and n columns of data, the data block M.sub.2x is a matrix comprising n rows and p columns of data, m, n, and p are positive integers, and n is not less than 2; performing, by the computing node FC.sub.x, a combine2 operation and a reduce2 operation on the data block M.sub.1x and the data block M.sub.2x, to obtain a first intermediate result M′.sub.x, wherein the first intermediate result M′.sub.x is a matrix comprising m rows and p columns of data; and the first intermediate result M′.sub.x has an element m′.sub.ij, wherein i and j are variants, the value of i ranges from 1 to m, the value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m′.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and the value of k ranges from 1 to n; and obtaining, by the management node, a first processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes.
20. The method according to claim 19, wherein the data processing system further comprises at least one computing node that form a second class of computing nodes, and the method further comprises: allocating, by the management node according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, a second processing task to each of the at least one computing node in the second class of computing nodes, wherein a computing node SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer; obtaining, by the computing node SC.sub.y according to the second processing task allocated by the management node, the first intermediate results, wherein the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks M.sub.1x in one row and data blocks M.sub.2x in one column in the to-be-processed dataset; performing, by the computing node SC.sub.y, a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain a second intermediate result M″.sub.y, wherein the second intermediate result M″.sub.y is a matrix comprising m rows and p columns of data; and obtaining, by the management node, a second processing result for the to-be-processed dataset according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes.
21. The method according to claim 20, wherein the to-be-processed dataset further comprises a data block M.sub.3x, and the data block M.sub.3x is a matrix comprising m rows and p columns of data; and the method further comprises: allocating, by the management node according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes, a third processing task to the at least one computing node in the second class of computing nodes, wherein the at least one computing node in the second class of computing nodes comprises the computing node SC.sub.y; obtaining, by the computing node SC.sub.y, the data block M.sub.3x in the to-be-processed dataset according to the third processing task; and performing, by the computing node SC.sub.y, an assign operation on the second intermediate result M″.sub.y and the data block M.sub.3x, to obtain a third processing result for the to-be-processed dataset.
22. The method according to claim 21, wherein n=m, and the data block M.sub.2x and the data block M.sub.3x are a same data block.
23. The method according to claim 21, wherein n=p, and the data block M.sub.1x and the data block M.sub.3x are a same data block.
24. The method according to claim 20, wherein when the second class of computing nodes comprises at least two computing nodes, the at least two computing nodes in the second class of computing nodes concurrently perform the second processing tasks allocated by the management node.
Description
BRIEF DESCRIPTION OF DRAWINGS
[0073] The following briefly introduces the accompanying drawings used in describing the embodiments.
[0074]
[0075]
[0076]
[0077]
[0078]
[0079]
[0080]
[0081]
[0082]
[0083]
[0084]
[0085]
[0086]
[0087]
[0088]
[0089]
[0090]
[0091]
[0092]
DESCRIPTION OF EMBODIMENTS
[0093] The following describes the technical solutions in the embodiments of the present disclosure with reference to the accompanying drawings. Apparently, the described embodiments are a part rather than all of the embodiments of the present disclosure.
[0094] For ease of understanding, concepts and related technologies that are involved in this specification are described briefly first.
[0095] Graph:
[0096] A graph is used to describe relationships between objects. Intuitively, a graph includes some dots and lines connecting the dots. The dots are referred to as vertices of the graph. The lines connecting the dots are referred to as edges. An edge may be classified into an undirected edge shown in
[0097] Multiplication:
[0098] In this specification, multiplication may represent generalized multiplication, or may represent multiplication of two numbers. For example, a multiplication in matrix-vector multiplication is a generalized multiplication. That is, a multiplication of an element in a matrix and an element corresponding to a vector is not a traditional multiplication of numbers, but may be other processing such as addition, subtraction, multiplication, division, maximization, or minimization, which is not limited in the embodiments of the present disclosure.
[0099] Adjacency Matrix:
[0100] The adjacency matrix, referred to as a matrix in this specification, is a two-dimensional matrix storing adjacency between vertices in a graph. As shown in
[0101] Matrix-Vector Multiplication Operation:
[0102] The matrix-vector multiplication operation refers to a series of operations between a matrix and a vector. In an example of representing graph computation using a matrix-vector multiplication operation, all friends of V.sub.2 in the directed graph in
[0103] Matrix-Matrix Multiplication Operation:
[0104] The matrix-matrix multiplication operation refers to a series of operations between matrices. In an example of representing graph computation using a matrix-matrix multiplication operation, a quantity of mutual friends (for example, outneighbors) of every two vertices in
[0105] Distributed Matrix:
[0106] Multiple computation units used for computation are included in a cluster environment. Matrix-vector multiplication operations and matrix-matrix multiplication operations are both performed based on matrix partitioning. After being partitioned, a matrix may be referred to as a distributed matrix. Concurrent processing may be performed on the distributed matrix using the multiple computation units.
[0107] In the following, brief descriptions of a prior-art matrix-vector multiplication model are provided.
[0108] A traditional matrix operation is only for numerical values, and limitations are imposed on specific operations. For example, a traditional matrix-vector multiplication is shown in formula 1 below, an operation between M.sub.ij and V.sub.j cannot be anything but a multiplication, and an operation between products of a row of a matrix and a vector (each product is recorded as x.sub.j=M.sub.ijV.sub.j) cannot be anything but an addition. As a result, an algorithm that can be represented by a matrix operation is greatly limited. For example, an SSSP algorithm cannot be represented by the traditional matrix-vector multiplication.
[0109] where M is a matrix with m rows and n columns and M.sub.ij is a matrix block obtained after M is partitioned; V is an n-dimensional column vector, and V.sub.j is a vector block after V is partitioned; and V′ is an m-dimensional column vector.
[0110] For the limitations of the prior-art matrix-vector multiplication model, a GIMV model is proposed based on a big data processing system PEGASUS for matrix-vector multiplication operations. The GIMV model extends the traditional matrix-vector multiplication in:
[0111] 1. Processing on M.sub.ij and V.sub.j is extended to a combine2 operation, where the combine2 is a combine operation performed on a matrix element and a vector element. Types of the matrix element and the vector element may be different, and the combine2 operation may be addition, subtraction, multiplication, division, maximization, minimization, or the like, which is not limited in the embodiments of the present disclosure; and an intermediate value x.sub.j is returned after the combine2.
[0112] 2. A combineAll operation is performed on combine2 results x.sub.1, . . . , and x.sub.n for a row of a matrix, where the combineAll is a combine operation performed on multiple numerical values or a record set, and is generally a function (which may be, for example, an accumulation function), and an intermediate value
[0113] 3. An assign operation is performed on the
[0114] According to the GIMV model, a matrix M and a vector V are input, and a vector V′ is output after the three operations. There are mainly three operators for the operations:
MV=V′ (formula 2)
[0115] where
[0116] 1. combine2(M.sub.ij, V.sub.j): a combine operation is performed on M.sub.ij and V.sub.j, to obtain an intermediate result x.sub.j;
[0117] 2. combineAll(x.sub.1, . . . , x.sub.n): a combine operation is performed on intermediate results x.sub.1, . . . , and x.sub.n for a row of a matrix, to obtain an intermediate result
[0118] 3. assign(V.sub.i,
[0119] The GIMV model can represent more algorithms. For example, an SSSP algorithm may correspond to the following: a combine2 operation is an “addition” operation, that is, combine2(M.sub.ij, V.sub.j)=M.sub.ij+V.sub.j; a combineAll operation is a “minimization” operation, that is, combineAll(x.sub.1, . . . , x.sub.n)=min(x.sub.1, . . . , x.sub.n); and an assign operation is also a “minimization” operation, that is, assign(V.sub.i,
[0120]
[0121] In the algorithm, an initial vector V is first constructed and starts from a vertex 0; therefore, a value at a location 0 (an element in the 1.sup.st row) of the initial vector V is 0, as shown by V in
[0122] An execution procedure of the GIMV model is: a combineAll operation can be performed on all combine2 results for a row of a matrix only after all combine2 operations are completed. Therefore, in the computation process, the combineAll operation needs to occupy intermediate memory space with a size of a matrix. In addition, in a distributed environment, a system needs to transmit a large amount of data. An assign operation is an operation performed on an initial vector V and an intermediate vector
[0123] In the present disclosure, a data processing method is proposed for an improvement towards overcoming the shortcomings in the foregoing solutions, so as to reduce intermediate memory occupancy and volume of transmitted data in a matrix-vector multiplication operation process in data processing. In addition, based on a principle similar to that of the matrix-vector multiplication operation in the present disclosure, a matrix-matrix multiplication operation model is proposed, so that more algorithms can be expressed.
[0124]
[0125] The management node 110 is configured to:
[0126] allocate first processing tasks to at least two computing nodes that include a computing node FC.sub.x 121 and that are in the first class of computing nodes 120, where the FC.sub.x 121 is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer.
[0127] The at least two computing nodes in the first class of computing nodes 120 concurrently process the first processing tasks allocated by the management node 110.
[0128] The computing node FC.sub.x 121 is configured to: obtain, according to a first processing task allocated by the management node 110, a data block M.sub.x and a data block V.sub.1x that are in a to-be-processed dataset, where the data block M.sub.x is a matrix including m rows and n columns of data, the data block V.sub.1x is a vector including n-dimensional data, m and n are positive integers, and a value of n is not less than 2; and
[0129] perform a combine (for example, combine2) operation and a reduce (for example, reduce2) operation on the data block M.sub.x and the data block V.sub.1x, to obtain a first intermediate result V′.sub.x, where the first intermediate result V′.sub.x is a vector including m-dimensional data; and the first intermediate result V′.sub.x has an element v′.sub.i, where i is a variant, a value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and a value of j ranges from 1 to n.
[0130] The management node 110 is further configured to obtain a processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120.
[0131] Therefore, according to the data processing system provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0132] Specifically, the data processing system 100 provided by this embodiment of the present disclosure may be applied to big data processing. Because a volume of data for big data processing is relatively large, the data is usually partitioned, and different data blocks are distributed to different computing nodes for concurrent computation, so as to improve computation efficiency. The data processing system 100 includes the management node 110 and the first class of computing nodes 120. The management node 110 is configured to: receive a data processing task, partition the data processing task into multiple processing tasks, and deliver the processing tasks to computing nodes. The management node 110 is further configured to receive statuses of executing the processing tasks of the computing nodes by the computing nodes, so as to manage a data processing process. The computing nodes are configured to: receive the processing tasks delivered by the management node 110, and obtain data blocks according to the processing tasks, so as to execute the corresponding processing tasks. A computing node may obtain a data block stored in the computing node, to execute a processing task, or may obtain a data block stored in another computing node, to execute a processing task. Because classes of processing tasks are different, the computing nodes may be classified according to the classes of the processing tasks of the computing nodes. For example, a first class of nodes process first processing tasks, and a second class of nodes process second processing tasks.
[0133] In this embodiment of the present disclosure, the management node 110 is configured to allocate the first processing tasks to the at least two computing nodes that include the FC.sub.x 121 and that are in the first class of computing nodes 120, where the FC.sub.x 121 is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer. The at least two computing nodes in the first class of computing nodes 120 concurrently process the first processing tasks allocated by the management node 110.
[0134] After receiving the first processing task allocated by the management node 110, the computing node FC.sub.x 121 obtains, according to the first processing task, the data block M.sub.x and the data block V.sub.1x that are in the to-be-processed dataset, where the data block M.sub.x is a matrix including m rows and n columns of data, the data block V.sub.1x is a vector including n-dimensional data, m and n are positive integers, and a value of n is not less than 2. Processing on the data block M.sub.x and the data block V.sub.1x may be regarded as a matrix-vector multiplication operation.
[0135] The computing node FC.sub.x 121 performs the combine (for example, combine2) operation and the reduce (for example, reduce2) operation on the data block M.sub.x and the data block V.sub.1x, to obtain the first intermediate result V′.sub.x, where the first intermediate result V′.sub.x is a vector including m-dimensional data. The first intermediate result V′.sub.x has an element v′.sub.i, where i is a variant, a value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and a value of j ranges from 1 to n.
[0136] Specifically, the combine operation is performed on an element in the i.sup.th row and the j.sup.th column of the data block M.sub.x and an element in the j.sup.th row of the data block V.sub.1x, to obtain an intermediate result x.sub.j corresponding to the element in the i.sup.th row and the j.sup.th column of the data block M.sub.x and the element in the j.sup.th row of the data block V.sub.1x. The combine operation herein may be the combine2 processing described above, and may be used in a formula to express the intermediate result: x.sub.j=combine2(m.sub.i,j, v.sub.j).
[0137] Then, the reduce2 operation is performed on an intermediate result x.sub.j corresponding to the i.sup.th row of the data block M.sub.x, to obtain an element v′.sub.i corresponding to the i.sup.th row of the data block M.sub.x, where the value of i ranges from 1 to m; in this case, the first intermediate result V′.sub.x can be obtained. The combine2 operation and the reduce2 operation may be: first computing x.sub.1 and x.sub.2, and performing a reduce2 operation on x.sub.1 and x.sub.2; then computing x.sub.3, and performing a reduce2 operation on x.sub.3 and a result obtained after the reduce2 operation is performed on x.sub.1 and x.sub.2; . . . , until each element of the intermediate result x.sub.j corresponding to the i.sup.th row of the data block M.sub.x has undergone a reduce2 operation. The reduce2 operations are not performed after all combine2 operations are completed, but the combine2 operations and the reduce2 operations are performed alternately. In this way, the intermediate result x.sub.j that has undergone the reduce2 operation in the computation process can be deleted, and there is no need to store results of all the combine2 operations in a memory; therefore, memory space can be reduced.
[0138] It should be understood that the foregoing process is in essence an update process. That is, a reduce2 operation is first performed to obtain an intermediate result obtained after the reduce2 operation is performed on two x.sub.j, and then a reduce2 operation is performed on the obtained intermediate result and another x.sub.j or another intermediate result to obtain an intermediate result, so as to continuously update an intermediate result.
[0139] The reduce2 operation herein may be addition, subtraction, multiplication, division, maximization, minimization, or the like, which is not limited in this embodiment of the present disclosure. Herein, for processing, using the reduce2 operation, on intermediate results (for example, x.sub.1, . . . , x.sub.n) corresponding to elements in a row of a matrix, reduce2 processing is performed step by step in a process of computing an intermediate result x.sub.j, instead of being performed after all of x.sub.1, . . . , x.sub.n are computed.
[0140] It should be understood that an advantage of the reduce2 operation is: a sequence of elements on which reduce2 operations are performed does not need to be considered in a computation process, that is, an only result is obtained regardless of the sequence of the elements on which the reduce2 operations are performed. For example, in the Scala language, an array is it=Array(0,1,2,3,4,5), and elements in the array are summed, which may be expressed as it.reduce(_+_). During specific implementation of computation, a value obtained by adding data from left to right is the same as a final value obtained by performing pairwise reduce2 operation on the data. The foregoing description of performing the reduce2 operation on x.sub.3 and the result obtained after the reduce2 operation is performed on x.sub.1 and x.sub.2 is only one implementation manner. An execution sequence of reduce2 operations is not limited to a sequence of x.sub.1, . . . , x.sub.n. A result the same as that obtained after sequential execution can be obtained by performing a reduce2 operation on any two of x.sub.1, . . . , x.sub.n and then performing a reduce2 operation on an obtained result and another x.sub.j. No limitation is imposed on a reduce2 operation sequence in this embodiment of the present disclosure.
[0141] The management node 110 is further configured to obtain the processing result for the to-be-processed dataset according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120. After completing the first processing tasks, the first class of computing nodes 120 notify the management node 110. The management node 110 obtains the processing result for the to-be-processed dataset according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120, or delivers, to a corresponding computing node using the first intermediate results as basic data for another processing task, the another processing task that is to be computed using the first intermediate results.
[0142] Therefore, according to the data processing system provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0143] Optionally, in an embodiment, the data processing system 100 further includes a second class of computing nodes, and the management node 110 is further configured to:
[0144] allocate, according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120, a second processing task to at least one computing node that includes an SC.sub.y and that is in the second class of computing nodes, where the SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer.
[0145] The computing node SC.sub.y is configured to:
[0146] obtain, according to a second processing task, the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120, where the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks in one row in the to-be-processed dataset; and
[0147] perform a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain a second intermediate result V″.sub.y, where the second intermediate result V″.sub.y is a vector including m-dimensional data.
[0148] The management node 110 is further configured to:
[0149] obtain a processing result for the to-be-processed dataset according to a second intermediate result obtained by the at least one computing node in the second class of computing nodes.
[0150] Specifically, after the foregoing processing is completed, other processing may be further performed on the first intermediate results obtained by the first class of computing nodes 120. For example, when the data blocks M.sub.x processed by the at least two computing nodes in the first class of computing nodes 120 are data blocks in one row in the to-be-processed dataset, the management node 110 may allocate, according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120, the second processing task to the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes, where the SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer.
[0151] The computing node SC.sub.y is configured to obtain, according to a second processing task, the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 120, where the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks in one row in the to-be-processed dataset. The second processing task is performing a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain the second intermediate result V″.sub.y, where the second intermediate result V″.sub.y is a vector including m-dimensional data. The reduce2 operation performed on the first intermediate results obtained by the SC.sub.y is similar to the reduce2 operation described above. That is, a reduce2 operation is first performed on two first intermediate results obtained using data blocks in one row in the to-be-processed dataset, and then a reduce2 operation is performed on a result of the foregoing reduce2 operation and another first intermediate result.
[0152] The management node 110 is specifically configured to obtain the processing result for the to-be-processed dataset according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes.
[0153] Optionally, in an embodiment, the dataset further includes a data block V.sub.2x, and the data block V.sub.2x is a vector including m-dimensional data. The management node 110 is further specifically configured to:
[0154] allocate, according to the second intermediate result obtained by the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes, a third processing task to the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes.
[0155] The computing node SC.sub.y is further configured to:
[0156] obtain the data block V.sub.2x in the dataset according to the third processing task; and
[0157] perform an assign operation on the second intermediate result V″.sub.y and the data block V.sub.2x that are obtained by the SC.sub.y, to obtain a processing result for the to-be-processed dataset.
[0158] Specifically, after a reduce2 operation is performed on first intermediate results obtained using data blocks in one row in the to-be-processed dataset, an assign operation may be further performed on an obtained second intermediate result, so as to obtain the processing result for the to-be-processed dataset. Both the second intermediate result V″.sub.y and the data block V.sub.2x are vectors including m-dimensional data. A resultant vector may be obtained by performing an assign operation on corresponding elements, where the resultant vector is an m-dimensional column vector. The assign processing herein may be the assign processing described above, and may be used in a formula to express an element V.sub.3,i=assign(V′.sub.i, V.sub.2,i) in a resultant vector, where m V.sub.3,i form the resultant vector V.sub.3x.
[0159] The foregoing is a matrix-vector multiplication operation process. In conclusion, the matrix-vector multiplication operation in this embodiment of the present disclosure may be expressed as the following formula 3:
M{circle around (x)}V.sub.1⊕V.sub.2=V.sub.3 (formula 3)
[0160] Relative to an existing GIMV model, the vector V.sub.2 is added in the assign processing in this embodiment of the present disclosure; therefore, the resultant vector V.sub.3 may be expressed as:
V.sub.3=α*M*V.sub.1+β*V.sub.2 (formula 4),
[0161] where α and β are numerical values, and formula 4 shows that, relative to the existing GIMV model, the weight V.sub.2 is introduced to the resultant vector V.sub.3 obtained by the data processing system 100 in this embodiment of the present disclosure, so that the assign processing is no longer limited to a vector used for multiplication, and a matrix-vector multiplication operation on a non-square matrix can be supported, expanding an expression range of the matrix-vector multiplication operation.
[0162] The following uses a specific example to describe a function of introducing the weight V.sub.2 to the matrix-vector multiplication operation. For example, in the PageRank algorithm used for a performance test, “a vector used for addition” (corresponding to the foregoing vector V.sub.2) is often set to (1−d)/N for an operation. The vector (1−d)/N may be used to adjust PageRank values of vertices in a corresponding graph, so that the PageRank values of the vertices better meet reality. The PageRank value may be expressed as R, and is specifically expressed as formula 5:
[0163] For example, for some official websites, values of vertices corresponding to the official websites may be set relatively large in a vector; in this case, obtained resultant PageRank values are generally relatively large.
[0164] Optionally, in an embodiment, m=n, and the data block V.sub.1x and the data block V.sub.2x are a same data block. That is, the data block V.sub.1x together with which an operation is performed on the data block M.sub.x and the data block V.sub.2x used for performing the assign operation are a same data block.
[0165] Optionally, in an embodiment, when the second class of computing nodes include at least two computing nodes, the at least two computing nodes in the second class of computing nodes concurrently process second processing tasks allocated by the management node. When the to-be-processed dataset is partitioned both by row and by column, at least two of the second class of computing nodes are required to process the second processing task. When the to-be-processed dataset is partitioned only by column, one of the second class of computing nodes is required. When the to-be-processed dataset is partitioned only by row, no second class of computing nodes is required, that is, the second processing task does not need to be processed.
[0166] Optionally, in an embodiment, the management node, the first class of computing nodes, and the second class of computing nodes each may be a physical machine, a virtual machine, or a central processing unit (CPU), which is not limited in this embodiment of the present disclosure.
[0167] The following describes in detail this embodiment of the present disclosure with reference to a specific example.
[0168]
[0169] S201: Perform preprocessing, and obtain a data block M′, a data block V.sub.1′, and a data block V.sub.2′, where the data block M′ is a matrix, and the data block V.sub.1′ and the data block V.sub.2′ are vectors.
[0170] S202: Perform matrix distribution, partition the matrix M′, and distribute blocks of the matrix M′ to at least two computing nodes of a cluster, where a data block M.sub.x is distributed to a computing node FC.sub.x.
[0171] S203: Distribute the data block V.sub.2′, partition the data block V.sub.2′, and broadcast the partitioned data block V.sub.2′.
[0172] S204: Distribute the data block V.sub.1′, partition the data block V.sub.1′, and broadcast the partitioned data block V.sub.1′. The matrix and the vectors that are partitioned in S202 to S204 are correspondingly distributed to the at least two computing nodes, where the computing nodes are distributed.
[0173] S205: Each computing node performs partial combine2 processing and partial reduce2 processing. Combine2 processing is performed on a data block M.sub.x and a data block V.sub.1x, and before all intermediate results corresponding to the data block M.sub.x and the data block V.sub.1x are obtained, reduce2 processing is performed on an intermediate result to obtain a first intermediate result. Then reduce2 processing is performed on the first intermediate result and a newly obtained intermediate result to obtain a new first intermediate result. A final first intermediate result is a result obtained after reduce2 processing is performed on all the intermediate results corresponding to the data block M.sub.x and the data block V.sub.1x.
[0174] S206: Each computing node performs global data transmission on the final first intermediate results obtained in S205, so that the final first intermediate results are concentrated in one computing node. According to the method in this embodiment of the present disclosure, global data transmission is performed on data obtained after reduce processing. Therefore, relative to an existing matrix-vector multiplication operation, a volume of transmitted data is greatly reduced.
[0175] S207: Perform reduce2 processing on elements of first intermediate results corresponding to at least two matrix blocks at one horizontal location in the matrix M′, to obtain an element of a second intermediate result. That is, reduce2 processing is performed on elements of first intermediate results in each row of multiple matrix blocks at one horizontal location, to obtain the element of the second intermediate result. Multiple elements of the second intermediate result form a second intermediate result V″.sub.y.
[0176] S208: Perform assign processing on the elements of the second intermediate result V″.sub.y and corresponding elements in a data block V.sub.2x, to obtain elements in a resultant vector and further obtain the resultant vector.
[0177] S209: Determine whether a termination condition is met; and if the termination condition is met, the process ends, or if the termination condition is not met, perform S204 to S209 using the resultant vector as a data block V.sub.1′ for next iteration.
[0178] The following describes this embodiment of the present disclosure using an example of resolving, using the SSSP algorithm described above, a problem about determining a shortest distance from a vertex 0 to other vertices. The matrix in the SSSP algorithm is a matrix obtained after transposition is performed on the adjacency matrix M in
[0179] Therefore, according to the data processing method provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0180]
[0181] The management node 310 is configured to:
[0182] allocate first processing tasks to at least two computing nodes that include an FC.sub.x and that are in the first class of computing nodes 320, where the FC.sub.x is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer.
[0183] The at least two computing nodes in the first class of computing nodes 320 concurrently process the first processing tasks allocated by the management node 310.
[0184] The computing node FC.sub.x is configured to: obtain, according to a first processing task allocated by the management node 310, a data block M.sub.1x and a data block M.sub.2x that are in a to-be-processed dataset, where the data block M.sub.1x is a matrix including m rows and n columns of data, the data block M.sub.2x is a matrix including n rows and p columns of data, m, n, and p are positive integers, and a value of n is not less than 2; and
[0185] perform a combine2 operation and a reduce2 operation on the data block M.sub.1x and the data block M.sub.2x, to obtain a first intermediate result M′.sub.x, where the first intermediate result M′.sub.x is a matrix including m rows and p columns of data; and the first intermediate result M′.sub.x has an element m′.sub.i,j, where i and j are variants, a value of i ranges from 1 to m, a value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m′.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and a value of k ranges from 1 to n.
[0186] The management node 310 is further configured to obtain a processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320.
[0187] Therefore, according to the data processing system provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0188] Specifically, the data processing system 300 provided by this embodiment of the present disclosure may be applied to big data processing. Because a volume of data for big data processing is relatively large, the data is usually partitioned, and different data blocks are distributed to different computing nodes for concurrent computation, so as to improve computation efficiency. The data processing system 300 includes the management node 310 and the first class of computing nodes 320. The management node 310 is configured to: receive a data processing task, partition the data processing task into multiple processing tasks, and deliver the processing tasks to computing nodes. The management node 310 is further configured to receive statuses of executing the processing tasks of the computing nodes by the computing nodes, so as to manage a data processing process. The computing nodes are configured to: receive the processing tasks delivered by the management node 310, and obtain data blocks according to the processing tasks, so as to execute the corresponding processing tasks. A computing node may obtain a data block stored in the computing node, to execute a processing task, or may obtain a data block stored in another computing node, to execute a processing task. Because classes of processing tasks are different, the computing nodes may be classified according to the classes of the processing tasks of the computing nodes. For example, a first class of nodes process first processing tasks, and a second class of nodes process second processing tasks.
[0189] In this embodiment of the present disclosure, the management node 310 is configured to allocate the first processing tasks to the at least two computing nodes that include the FC.sub.x 321 and that are in the first class of computing nodes 320, where the FC.sub.x 321 is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer. The at least two computing nodes in the first class of computing nodes 320 concurrently process the first processing tasks allocated by the management node 310.
[0190] After receiving the first processing task allocated by the management node 310, the computing node FC.sub.x 321 obtains, according to the first processing task, the data block M.sub.1x and the data block M.sub.2x that are in the to-be-processed dataset, where the data block M.sub.1x is a matrix including m rows and n columns of data, the data block M.sub.2x is a matrix including n rows and p columns of data, m, n, and p are positive integers, and a value of n is not less than 2. Processing on the data block M.sub.1x and the data block M.sub.2x may be regarded as a matrix-matrix multiplication operation.
[0191] The computing node FC.sub.x 321 performs the combine2 operation and the reduce2 operation on the data block M.sub.1x and the data block M.sub.2x, to obtain the first intermediate result M′.sub.x, where the first intermediate result M′.sub.x is a matrix including m rows and p columns of data. The first intermediate result M′.sub.x has an element m′.sub.i,j, where i and j are variants, a value of i ranges from 1 to m, a value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m′.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and a value of k ranges from 1 to n.
[0192] Specifically, the combine operation is performed on an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x and an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, to obtain an intermediate result x.sub.ikj corresponding to the element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x and the element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x. The combine operation herein may be the combine2 operation described above, and may be used in a formula to express the intermediate result: x.sub.ikj=combine2(m.sub.1[i,k], m.sub.2[k,j]).
[0193] Then, the reduce2 operation is performed on an intermediate result x.sub.ikj corresponding to the i.sup.th row of the data block M.sub.1x and the j.sup.th column of the data block M.sub.2x, to obtain an element m′.sub.i,j corresponding to the i.sup.th row of the data block M.sub.x and the j.sup.th column of the data block M.sub.2x, where the value of i ranges from 1 to m, and the value of j ranges from 1 to p; in this case, the first intermediate result M′.sub.x can be obtained. The combine2 operation and the reduce2 operation may be: first computing x.sub.i1j and x.sub.i2j, and performing a reduce2 operation on x.sub.i1j and x.sub.i2j; then computing x.sub.i3j, and performing a reduce2 operation on x.sub.i3j and a result obtained after the reduce2 operation is performed on x.sub.i1j and x.sub.i2j; . . . , until each element of the intermediate result x.sub.ikj corresponding to the i.sup.th row of the data block M.sub.1x and the j.sup.th column of the data block M.sub.2x has undergone a reduce2 operation. The reduce2 operations are not performed after all combine2 operations are completed, but the combine2 operations and the reduce2 operations are performed alternately. In this way, the intermediate result x.sub.ikj that has undergone the reduce2 operation in the computation process can be deleted, and there is no need to store results of all the combine2 operations in a memory; therefore, memory space can be reduced.
[0194] It should be understood that the foregoing process is in essence an update process. That is, a reduce2 operation is first performed to obtain an intermediate result obtained after the reduce2 operation is performed on two x.sub.ikj, and then a reduce2 operation is performed on the obtained intermediate result and another x.sub.ikj or another intermediate result to obtain an intermediate result, so as to continuously update an intermediate result.
[0195] The reduce2 operation herein may be addition, subtraction, multiplication, division, maximization, minimization, or the like, which is not limited in this embodiment of the present disclosure. Herein, for processing, using the reduce2 operation, on intermediate results (for example, x.sub.i1j, . . . , x.sub.inj) corresponding to elements in a row of a matrix, reduce2 processing is performed step by step in a process of computing an intermediate result x.sub.ikj, instead of being performed after all of the x.sub.1, . . . , x.sub.n are computed.
[0196] It should be understood that an advantage of the reduce2 operation is: a sequence of elements on which reduce2 operations are performed does not need to be considered in a computation process, that is, an only result is obtained regardless of the sequence of the elements on which the reduce2 operations are performed. For example, in the Scala language, an array is it=Array(0,1,2,3,4,5), and elements in the array are summed, which may be expressed as it.reduce(_+_). During specific implementation of computation, a value obtained by adding data from left to right is the same as a final value obtained by performing pairwise reduce2 operation on the data. The foregoing description of performing the reduce2 operation on x.sub.i3j and the result obtained after the reduce2 operation is performed on x.sub.i1j and x.sub.i2j is only one implementation manner. An execution sequence of reduce2 operations is not limited to a sequence of x.sub.i1j, . . . , x.sub.inj. A result the same as that obtained after sequential execution can be obtained by performing a reduce2 operation on any two of x.sub.i1j, . . . , x.sub.inj and then performing a reduce2 operation on an obtained result and another x.sub.ikj. No limitation is imposed on a reduce2 operation sequence in this embodiment of the present disclosure.
[0197] The management node 310 is further configured to obtain the processing result for the to-be-processed dataset according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320. After completing the first processing tasks, the first class of computing nodes 320 notify the management node 310. The management node 310 obtains the processing result for the to-be-processed dataset according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320, or delivers, to a corresponding computing node using the first intermediate results as basic data for another processing task, the another processing task that is to be computed using the first intermediate results.
[0198] Therefore, according to the data processing system provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0199] Optionally, in an embodiment, the data processing system 300 further includes a second class of computing nodes, and the management node 310 is further configured to:
[0200] allocate, according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320, a second processing task to at least one computing node that includes an SC.sub.y and that is in the second class of computing nodes, where the SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer.
[0201] The computing node SC.sub.y is configured to:
[0202] obtain, according to a second processing task, the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320, where the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks M.sub.1x in one row and data blocks M.sub.2x in one column that are in the to-be-processed dataset; and
[0203] perform a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain a second intermediate result M″.sub.y, where the second intermediate result M″.sub.y is a matrix including m rows and p columns of data.
[0204] The management node 310 is configured to:
[0205] obtain a processing result for the to-be-processed dataset according to a second intermediate result obtained by the at least one computing node in the second class of computing nodes.
[0206] Specifically, after the foregoing processing is completed, other processing may be further performed on the first intermediate results obtained by the first class of computing nodes 320. For example, when the data blocks M.sub.1x processed by the at least two computing nodes in the first class of computing nodes 320 are data blocks in one row in the to-be-processed dataset and the data blocks M.sub.2x are data blocks in one column in the to-be-processed dataset, the management node 310 may allocate, according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320, the second processing task to the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes, where the SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer.
[0207] The computing node SC.sub.y is configured to obtain, according to a second processing task, the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes 320, where the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks M.sub.1x and data blocks M.sub.2x, where the data blocks M.sub.1x are data blocks in one row in the to-be-processed dataset, and the data blocks M.sub.2x are data blocks in one column in the to-be-processed dataset. The second processing task is performing a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain the second intermediate result M″.sub.y, where the second intermediate result M″.sub.y is a matrix including m rows and p columns of data. The reduce2 operation performed on the first intermediate results obtained by the SC.sub.y is similar to the reduce2 operation described above. That is, a reduce2 operation is first performed on two first intermediate results obtained using data blocks M.sub.1x and data blocks M.sub.2x, and then a reduce2 operation is performed on a result of the foregoing reduce2 operation and another first intermediate result, where the data blocks M.sub.1x are data blocks in one row in the to-be-processed dataset, and the data blocks M.sub.2x are data blocks in one column in the to-be-processed dataset.
[0208] The management node 310 is specifically configured to obtain the processing result for the to-be-processed dataset according to the second intermediate result obtained by the at least one computing node in the second class of computing nodes.
[0209] Optionally, in an embodiment, the dataset further includes a data block M.sub.3x, and the data block M.sub.3x is a matrix including m rows and p columns of data. The management node 310 is further configured to:
[0210] allocate, according to the second intermediate result obtained by the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes, a third processing task to the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes.
[0211] The computing node SC.sub.y is further configured to:
[0212] obtain the data block M.sub.3x in the dataset according to the third processing task; and
[0213] perform an assign operation on the second intermediate result M″.sub.y and the data block M.sub.3x that are obtained by the SC.sub.y, to obtain a processing result for the to-be-processed dataset.
[0214] Specifically, after a reduce2 operation is performed on first intermediate results obtained using data blocks in one row of the M.sub.1x and data blocks in one column of the M.sub.2x that are in the to-be-processed dataset, an assign operation may be further performed on an obtained second intermediate result, so as to obtain the processing result for the to-be-processed dataset. Both the second intermediate result M″.sub.y and the data block M.sub.3x are matrices including m rows and p columns of data. A resultant matrix may be obtained by performing an assign operation on corresponding elements, where the resultant matrix is a matrix including m rows and p columns of data. The assign processing herein may be the assign processing described above.
[0215] The foregoing is a matrix-matrix multiplication operation process. In conclusion, the matrix-matrix multiplication operation in this embodiment of the present disclosure may be expressed as the following formula 6:
AB⊕C=D (formula 6)
[0216] Optionally, in an embodiment, the second class of computing nodes in the data processing system 300 are further configured to perform row processing on the r.sup.th row of the processing result for the to-be-processed dataset, where the row processing is processing on elements in the r.sup.th row.
[0217] Specifically, a resultant matrix D may be further processed. For example, reduce processing may be performed on elements in the r.sup.th row of the resultant matrix D, which may be expressed using a formula reduceRow(D.sub.i1, . . . , D.sub.in), where the reduceRow processing may be maximization, minimization, selection of Q greatest values, selection of Q smallest values, summation of data in the row, or the like, which is not limited in this embodiment of the present disclosure. A result obtained after the reduceRow processing may be stored still in a corresponding matrix form. For example, the i.sup.th row of the resultant matrix D is maximized, and a maximum value is D.sub.i1; in this case, the numerical value D.sub.i1 is stored in the i.sup.th row and the 1.sup.st column of a stored matrix, and a numerical value 0 is stored in other columns (or 0 is not stored). When the result obtained after the reduceRow processing is stored, only a numerical value obtained after the processing may be stored. For example, the summation processing is performed on the i.sup.th row of the resultant matrix D, and a result of the summation is Y; in this case, the numerical value Y is stored. No limitation is imposed on a storage manner in this embodiment of the present disclosure.
[0218] Similarly, in an embodiment, the second class of computing nodes in the data processing system 300 are further configured to perform column processing on the c.sup.th column of the processing result for the to-be-processed dataset, where the column processing is processing on elements in the c.sup.th column. To avoid repetition, no details are further repeated in this specification.
[0219] Optionally, in an embodiment, n=p, and the data block M.sub.1x and the data block M.sub.3x are a same data block. It should be understood that, for example, the data block M.sub.1x is a matrix with 3 rows and 4 columns, the data block M.sub.2x is a matrix with 4 rows and 4 columns, combine processing is performed on the data block M.sub.1x and the data block M.sub.2x to obtain a matrix with 3 rows and 4 columns, and assign processing is performed on the matrix with 3 rows and 4 columns and the data block M.sub.3x to obtain a resultant matrix; therefore, the data block M.sub.1x and the data block M.sub.3x may be a same data block.
[0220] Optionally, in an embodiment, n=m, and the data block M.sub.2x and the data block M.sub.3x are a same data block. It should be understood that in this embodiment of the present disclosure, an operation such as transposition may be performed on at least one of the data block M.sub.1x, the data block M.sub.2x, or the data block M.sub.3x used for computation, so as to meet a computation requirement. Therefore, the data block M.sub.2x and the data block M.sub.3x are a same data block.
[0221] Optionally, in an embodiment, when the second class of computing nodes include at least two computing nodes, the at least two computing nodes in the second class of computing nodes concurrently process second processing tasks allocated by the management node.
[0222] Optionally, in an embodiment, the management node, the first class of computing nodes, and the second class of computing nodes each may be a physical machine, a virtual machine, or a central processing unit (CPU).
[0223] The following describes in detail this embodiment of the present disclosure with reference to a specific example.
[0224]
[0225] S401: Perform preprocessing, and obtain a data block M.sub.1, a data block M.sub.2, and a data block M.sub.3, where all of the data block M.sub.1, the data block M.sub.2, and the data block M.sub.3 are matrices.
[0226] S402: Perform matrix partitioning, and partition the data block M.sub.1 and the data block M.sub.2, where the data block M.sub.1 is partitioned into multiple data blocks M.sub.1x, and the data blocks M.sub.1x are matrices including m rows and n columns of data; the data block M.sub.2 is partitioned into multiple data blocks M.sub.2x, and the data blocks M.sub.2x are matrices including n rows and p columns of data.
[0227] S403: Perform matrix distribution, distribute the data blocks M.sub.1x to at least one computing node by row, and correspondingly, distribute the data blocks M.sub.2x to the at least one computing node by column.
[0228] S404: Each of a first class of computing nodes performs partial combine2 processing and partial reduce2 processing. Combine2 processing is performed on a data block M.sub.1x and a data block M.sub.2x, and before all intermediate results corresponding to a row of the data block M.sub.1x and a corresponding column of the data block M.sub.2x are obtained, reduce2 processing is performed on an intermediate result to obtain a first intermediate result. Then reduce2 processing is performed on the first intermediate result and a newly obtained intermediate result to obtain a new first intermediate result. A final first intermediate result is a result obtained after reduce2 processing is performed on all the intermediate results corresponding to the row of the data blocks M.sub.1x and the corresponding column of the data blocks M.sub.2x. Further, by analogy, final first intermediate results corresponding to all combinations of a row of the data block M.sub.1x and a column of the data block M.sub.2x are obtained, where the final first intermediate results can form a matrix.
[0229] S405: Each of the first class of computing nodes performs global data transmission on the final first intermediate results obtained in S404, so that the final first intermediate results are concentrated in one of a second class of computing nodes. According to the method in this embodiment of the present disclosure, global data transmission is performed on data obtained after reduce processing; therefore, a volume of transmitted data is relatively small.
[0230] S406: Perform reduce2 processing on first intermediate results corresponding to at least two data blocks that are corresponding to a row of the data block M.sub.1 and a column of the data block M.sub.2, to obtain a second intermediate result. Multiple second intermediate results form an intermediate matrix X; the intermediate matrix X is partitioned and is distributed to at least one computing node.
[0231] S407: Partition and distribute the data block M.sub.3, distribute data blocks M.sub.3x of the data block M.sub.3 to computing nodes on which matrix blocks of the intermediate matrix X are located, where the data blocks M.sub.3x are matrices including m rows and p columns of data.
[0232] S408: Perform assign processing on elements in the matrix blocks of the intermediate matrix X and corresponding elements in the data blocks M.sub.3x, to obtain elements in matrix blocks of a resultant matrix D and further obtain the matrix blocks of the resultant matrix D .
[0233] S409: Perform reduceRow processing on each matrix block of the resultant matrix D by row.
[0234] S410: Perform data transmission, to transmit each result obtained in S409; and then perform reduceRow processing on results corresponding to matrix blocks in one row, to obtain a matrix Y .
[0235] S411: Perform reduceCol processing on each matrix block of the matrix Y by column.
[0236] S412: Perform data transmission, to transmit each result obtained in S411; and then perform reduceCol processing on results corresponding to matrix blocks in one column, to obtain a matrix Z.
[0237] Therefore, according to the data processing method provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0238] Probability propagation is one of recommended algorithms. A “user-project” interaction record database needs to recommend, to each user, several projects in which the user may be interested. The probability propagation is based on global data, and can be used to compute, at a time, all projects potentially interested by users. The algorithm has a solid theoretical foundation: The probability propagation is evolved from “law of conservation of energy” in physics. An operation of a matrix is similar to energy propagation between different substances, and for a finally obtained interest degree matrix and an original matrix, sums of elements in corresponding rows are absolutely equal, and sums of elements in corresponding columns are also absolutely equal, which manifests conservation of energy.
[0239] A probability propagation algorithm may be implemented using an operation on a matrix. In implementation of an existing probability propagation algorithm using a matrix, a matrix of “degrees of attraction to users by films ” and a matrix of “interest similarities between users” can be obtained; then a matrix-matrix multiplication operation is performed to obtain a matrix of “new degrees of attraction to users by films”; next, films that have been watched by the users are screened out; and finally, only top k films that have not been watched are recommended to each user. According to the probability propagation algorithm, only several films are provided for each user, a relatively sparse resultant matrix (there are relatively many zero elements) is obtained, and generally, a data volume is relatively small (in such scenario, the zero elements do not need to be stored). However, in an existing solution, a very dense matrix of “new degrees of attraction to users by films” is usually obtained in a computation process, and a data volume is very large. This results in a large amount of occupied intermediate memory and a large amount of data transmitted by a system.
[0240] In the probability propagation algorithm implemented using the matrix-matrix multiplication operation in the embodiments of the present disclosure, there are m films and n users in an original dataset, and top k films are recommended to each user. A first matrix A is a matrix of “degrees of attraction to users by films ” with m rows and n columns, a second matrix B is a matrix of “interest similarities between users” with n rows and n columns, and a third matrix C used in an assign processing and the first matrix A are a same matrix. In this case, a formula for recommending, to each user, top k films that have not been watched is:
AB⊕C=D (formula 7)
[0241] A specific computation process is as follows: first performing combine2 processing and reduce2 processing on elements in the first matrix A and elements in the second matrix B. Herein, the reduce2 processing is performed before all intermediate results are obtained through the combine2 processing on the elements in the first matrix A and the elements in the second matrix B, that is, the combine2 processing and the reduce2 processing are performed alternately. After the combine2 processing and the reduce2 processing are performed on the elements in the first matrix A and the elements in the second matrix B, a first intermediate result corresponding to the first matrix A and the second matrix B can be obtained. Assign processing is performed on the first intermediate result and the third matrix C that is used in the assign processing, to obtain a resultant matrix D. Finally, reduceCol processing is performed on columns of the resultant matrix D.
[0242] The combine2 processing is “multiplication” processing, and combine2(A.sub.ij, B.sub.jk)=A.sub.ij*B.sub.jk. The reduce2 processing is “addition” processing, and x.sub.ik=reduce2(x.sub.i1k, x.sub.i2k)=x.sub.i2k. The assign processing is “screening” processing, and
That is, if an element at a corresponding location in the third matrix C on which the assign processing is performed is nonzero, the element is screened out (the element is set to 0). In other words, if a user does not watch film, the data is reserved, or if the user has watched film, the data is screened out (the element is set to 0), so as to execute reduceCol processing. The reduceCol processing is “top k evaluation” processing, and reduceCol(D.sub.1j, . . . , D.sub.mj)=(D.sub.1j, . . . , D.sub.mj).topk, that is, k greatest values in the j.sup.th column are evaluated, where k in this example is 1.
[0243]
[0244] According to the matrix-matrix multiplication operation in the embodiments of the present disclosure, in a process of computing a matrix of “new degrees of attraction of films to users” using the probability propagation algorithm, films that have been watched can be screened out (even computation on records that need to be screened out can be directly omitted), and films that ranked top k according to scores from users are recommended, so that intermediate memory occupancy and a volume of data transmitted by the system are reduced.
[0245] It should be understood that in the process of the matrix-vector multiplication operation and the matrix-matrix multiplication operation in the embodiments of the present disclosure, isCompute operators can be introduced to determine whether computation needs to be performed on the row. If the computation is not required, the row is skipped, and a next row continues to be computed. If the computation is required, a combine2 operation and a reduce2 operation are performed according to the algorithm. Usually, the isCompute operator in the matrix-vector multiplication operation may be a column vector whose quantity of elements is equal to a quantity of rows of a data block M.sub.x, and the isCompute operator in the matrix-matrix multiplication operation may be a matrix, which is not limited in the embodiments of the present disclosure.
[0246] The following describes performance of the matrix-vector multiplication operation and performance of the matrix-matrix multiplication operation in the embodiments of the present disclosure.
[0247] In a general concurrent computation framework spark, performance of an extended GIMV model in the prior art is compared with that of the matrix-vector multiplication operation in the embodiments of the present disclosure. A test environment is a cluster including three machines (three RH2285, 12 cores, 24 threads, 192G memory, 100G configuration). Data in a wiki_talk dataset is tested, and a test result indicates that, a computation time for the extended GIMV model in the prior art exceeds 3600s while the matrix-vector multiplication operation in the embodiments of the present disclosure requires 340s.
[0248] Similarly, in the spark, the performance of the matrix-matrix multiplication operation in the embodiments of the present disclosure is compared with performance of an operation of implementing “seeking a recommended film” in the prior art. Table 1 shows sizes of tested datasets and test results. Data in the interactive Internet Protocol television (IPTV) dataset and the Nasdaq NFLX (NETFLIX™) dataset is tested. It can be seen from the test results that according to the embodiments of the present disclosure, intermediate memory occupancy can be effectively reduced, and a computation time can be shortened; therefore, a larger dataset can be processed.
TABLE-US-00001 TABLE 1 Computation time (s) in the Quantity Quantity Quantity of Computation time embodiments of the present Dataset of users of films records (s) in the prior art disclosure IPTV 92,995 8,593 3,383,327 364.421 153.917 NETFLIX 480,189 17,770 100,480,507 Unobtainable 897.577
[0249] With reference to
[0250]
[0251] a receiving module 501, configured to receive a first processing task allocated by the management node;
[0252] an obtaining module 502, configured to obtain, according to the first processing task allocated by the management node and received by the receiving module 501, a data block M.sub.x and a data block V.sub.1x that are in a to-be-processed dataset, where the data block M.sub.x is a matrix including m rows and n columns of data, the data block V.sub.1x is a vector including n-dimensional data, m and n are positive integers, and a value of n is not less than 2; and
[0253] a processing module 503, configured to perform a combine2 operation and a reduce2 operation on the data block M.sub.x and the data block V.sub.1x, to obtain a first intermediate result V′.sub.x, where the first intermediate result V′.sub.x is a vector including m-dimensional data; and the first intermediate result V′.sub.x has an element v′.sub.i, where i is a variant, a value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and a value of j ranges from 1 to n.
[0254] Optionally, in an embodiment, the computing node is a physical machine, a virtual machine, or a central processing unit (CPU), which is not limited in this embodiment of the present disclosure.
[0255] Therefore, according to the computing node provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0256]
[0257] a receiving module 601, configured to receive a first processing task allocated by the management node;
[0258] an obtaining module 602, configured to obtain, according to the first processing task allocated by the management node and received by the receiving module 601, a data block M.sub.1x and a data block M.sub.2x that are in a to-be-processed dataset, where the data block M.sub.1x is a matrix including m rows and n columns of data, the data block M.sub.2x is a matrix including n rows and p columns of data, m, n, and p are positive integers, and a value of n is not less than 2; and
[0259] a processing module 603, configured to perform a combine2 operation and a reduce2 operation on the data block M.sub.1x and the data block M.sub.2x, to obtain a first intermediate result M′.sub.x, where the first intermediate result M′.sub.x is a matrix including m rows and p columns of data; and the first intermediate result M′.sub.x has an element m′.sub.ij, where i and j are variants, a value of i ranges from 1 to m, a value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m+.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and a value of k ranges from 1 to n.
[0260] Optionally, in an embodiment, the computing node is a physical machine, a virtual machine, or a central processing unit (CPU), which is not limited in this embodiment of the present disclosure.
[0261] Therefore, according to the computing node provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0262] As shown in
[0263] receive a first processing task allocated by a management node; and
[0264] obtain, according to the first processing task allocated by the management node, a data block M.sub.x and a data block V.sub.1x that are in a to-be-processed dataset, where the data block M.sub.x is a matrix including m rows and n columns of data, the data block V.sub.1x is a vector including n-dimensional data, m and n are positive integers, and a value of n is not less than 2.
[0265] The processor 701 is configured to:
[0266] perform a combine (for example, combine2) operation and a reduce (for example, reduce2) operation on the data block M.sub.x and the data block V.sub.1x, to obtain a first intermediate result V′.sub.x, where the first intermediate result V′.sub.x is a vector including m-dimensional data; and the first intermediate result V′.sub.x has an element v′.sub.i, where i is a variant, a value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and a value of j ranges from 1 to n.
[0267] Therefore, according to the computing node provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0268] It should be understood that in this embodiment of the present disclosure, the processor 701 may be a central processing unit (CPU). The processor 701 may alternatively be another general-purpose processor, a digital signal processor (DSP), an application-specific integrated circuit (ASIC), a field programmable gate array (FPGA) or another programmable logic device, a discrete gate or transistor logic device, a discrete hardware component, or the like. The general-purpose processor may be a microprocessor, or the processor may be any conventional processor or the like.
[0269] The memory 702 may include a read-only memory and a random access memory and provide an instruction and data for the processor 701. A part of the memory 702 may further include a non-volatile random access memory. For example, the memory 702 may further store information about a device type.
[0270] In addition to including a data bus, the bus system 703 may further include a power bus, a control bus, a status signal bus, or the like. However, for clear description, all kinds of buses are marked as the bus system 703 in the figure.
[0271] In an implementation process, all steps of the foregoing method may be completed using an integrated logic circuit of hardware in the processor 701 or using an instruction in a software form. The steps of the method that is disclosed with reference to this embodiment of the present disclosure may be executed and completed by a hardware processor, or may be executed and completed using a combination of hardware and software modules in the processor. The software module may be located in a storage medium mature in the art such as a random access memory, a flash memory, a read-only memory, a programmable read-only memory, an electrically erasable programmable memory, or a register. The storage medium is located in the memory 702. The processor 701 reads the information in the memory 702, and completes the steps of the foregoing method with reference to hardware of the processor 701. To avoid repetition, no details are further repeated herein.
[0272] Optionally, in an embodiment, the computing node 700 is a physical machine, a virtual machine, or a central processing unit (CPU).
[0273] It should be understood that the computing node 700 in this embodiment of the present disclosure may correspond to an entity for executing the method in the foregoing embodiment of the present disclosure, or may correspond to the computing node 500 in the foregoing embodiment of the present disclosure. In addition, the foregoing or other operations and/or functions of the modules of the computing node 700 are to implement corresponding procedures of the data processing method. For conciseness, no details are further repeated herein.
[0274] Therefore, according to the computing node provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0275] As shown in
[0276] receive a first processing task allocated by a management node; and
[0277] obtain, according to the first processing task allocated by the management node, a data block M.sub.1x and a data block M.sub.2x that are in a to-be-processed dataset, where the data block M.sub.1x is a matrix including m rows and n columns of data, the data block M.sub.2x is a matrix including n rows and p columns of data, m, n, and p are positive integers, and a value of n is not less than 2.
[0278] The processor 801 is configured to:
[0279] perform a combine2 operation and a reduce2 operation on the data block M.sub.1x and the data block M.sub.2x, to obtain a first intermediate result M′.sub.x, where the first intermediate result M′.sub.x is a matrix including m rows and p columns of data; and the first intermediate result M′.sub.x has an element m′.sub.ij, where i and j are variants, a value of i ranges from 1 to m, a value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m′.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and a value of k ranges from 1 to n.
[0280] Therefore, according to the computing node provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0281] It should be understood that in this embodiment of the present disclosure, the processor 801 may be a central processing unit (CPU). The processor 801 may alternatively be another general-purpose processor, a digital signal processor (DSP), an application-specific integrated circuit (ASIC), a field programmable gate array (FPGA) or another programmable logic device, a discrete gate or transistor logic device, a discrete hardware component, or the like. The general-purpose processor may be a microprocessor, or the processor may be any conventional processor or the like.
[0282] The memory 802 may include a read-only memory and a random access memory and provide an instruction and data for the processor 801. A part of the memory 802 may further include a non-volatile random access memory. For example, the memory 802 may further store information about a device type.
[0283] In addition to including a data bus, the bus system 803 may further include a power bus, a control bus, a status signal bus, or the like. However, for clear description, all kinds of buses are marked as the bus system 803 in the figure.
[0284] In an implementation process, all steps of the foregoing method may be completed using an integrated logic circuit of hardware in the processor 801 or using an instruction in a software form. The steps of the method that is disclosed with reference to this embodiment of the present disclosure may be executed and completed by a hardware processor, or may be executed and completed using a combination of hardware and software modules in the processor. The software module may be located in a storage medium mature in the art such as a random access memory, a flash memory, a read-only memory, a programmable read-only memory, an electrically erasable programmable memory, or a register. The storage medium is located in the memory 802. The processor 801 reads the information in the memory 802, and completes the steps of the foregoing method with reference to hardware of the processor 801. To avoid repetition, no details are further repeated herein.
[0285] Optionally, in an embodiment, the computing node 800 is a physical machine, a virtual machine, or a central processing unit (CPU).
[0286] It should be understood that the computing node 800 in this embodiment of the present disclosure may correspond to an entity for executing the method in the foregoing embodiment of the present disclosure, or may correspond to the computing node 600 in the foregoing embodiment of the present disclosure. In addition, the foregoing or other operations and/or functions of the modules of the computing node 800 are to implement corresponding procedures of the data processing method. For conciseness, no details are further repeated herein.
[0287] Therefore, according to the computing node provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0288] With reference to
[0289]
[0290] S901: The management node allocates first processing tasks to at least two computing nodes that include an FC.sub.x and that are in the first class of computing nodes, where the FC.sub.x is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer; and the at least two computing nodes in the first class of computing nodes concurrently process the first processing tasks allocated by the management node.
[0291] S902: The computing node FC.sub.x obtains, according to a first processing task allocated by the management node, a data block M.sub.x and a data block V.sub.1x that are in a to-be-processed dataset, where the data block M.sub.x is a matrix including m rows and n columns of data, the data block V.sub.1x is a vector including n-dimensional data, m and n are positive integers, and a value of n is not less than 2.
[0292] S903: The computing node FC.sub.x performs a combine2 operation and a reduce2 operation on the data block M.sub.x and the data block V.sub.1x, to obtain a first intermediate result V′.sub.x, where the first intermediate result V′.sub.x is a vector including m-dimensional data; and the first intermediate result V′.sub.x has an element v′.sub.i, where i is a variant, a value of i ranges from 1 to m, v′.sub.i=v′.sub.i,n, v′.sub.i,n is obtained according to v′.sub.i,j=reduce2(v′.sub.i,j−1, combine2(m.sub.i,j, v.sub.j)), m.sub.i,j is an element in the data block M.sub.x, v.sub.j is an element in the data block V.sub.1x, j is a variant, and a value of j ranges from 1 to n.
[0293] S904: The management node obtains a processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes.
[0294] Therefore, according to the data processing method provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0295] Optionally, in an embodiment, the data processing system further includes a second class of computing nodes. The method 900 further includes:
[0296] allocating, by the management node according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, a second processing task to at least one computing node that includes an SC.sub.y and that is in the second class of computing nodes, where the SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer;
[0297] obtaining, by the computing node SC.sub.y according to the second processing task, the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, where the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks in one row in the to-be-processed dataset;
[0298] performing, by the computing node SC.sub.y, a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain a second intermediate result V″.sub.y, where the second intermediate result V″.sub.y is a vector including m-dimensional data; and
[0299] obtaining, by the management node, a processing result for the to-be-processed dataset according to a second intermediate result obtained by the at least one computing node in the second class of computing nodes.
[0300] Optionally, in an embodiment, the dataset further includes a data block V.sub.2x, and the data block V.sub.2x is a vector including m-dimensional data. The method 900 further includes:
[0301] allocating, by the management node according to the second intermediate result obtained by the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes, a third processing task to the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes;
[0302] obtaining, by the computing node SC.sub.y, the data block V.sub.2x in the dataset according to the third processing task; and
[0303] performing, by the computing node SC.sub.y, an assign assign operation on the second intermediate result V″.sub.y and the data block V.sub.2x that are obtained by the SC.sub.y, to obtain a processing result for the to-be-processed dataset.
[0304] Optionally, in an embodiment, m=n, and the data block V.sub.1x and the data block V.sub.2x are a same data block.
[0305] Optionally, in an embodiment, when the second class of computing nodes include at least two computing nodes, the at least two computing nodes in the second class of computing nodes concurrently process second processing tasks allocated by the management node.
[0306] Optionally, in an embodiment, the management node, the first class of computing nodes, and the second class of computing nodes each may be a physical machine, a virtual machine, or a central processing unit (CPU).
[0307] Therefore, according to the data processing method provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0308]
[0309] S1001: The management node allocates first processing tasks to at least two computing nodes that include an FC.sub.x and that are in the first class of computing nodes, where the FC.sub.x is the x.sup.th computing node in the at least two computing nodes, and x is a positive integer; and the at least two computing nodes in the first class of computing nodes concurrently process the first processing tasks allocated by the management node.
[0310] S1002: The computing node FC.sub.x obtains, according to a first processing task allocated by the management node, a data block M.sub.1x and a data block M.sub.2x that are in a to-be-processed dataset, where the data block M.sub.1x is a matrix including m rows and n columns of data, the data block M.sub.2x is a matrix including n rows and p columns of data, m, n, and p are positive integers, and a value of n is not less than 2.
[0311] S1003: The computing node FC.sub.x performs a combine (for example, combine2) operation and a reduce (for example, reduce2) operation on the data block M.sub.1x and the data block M.sub.2x, to obtain a first intermediate result M′.sub.x, where the first intermediate result M′.sub.x is a matrix including m rows and p columns of data; and the first intermediate result M′.sub.x has an element m′.sub.ij, where i and j are variants, a value of i ranges from 1 to m, a value of j ranges from 1 to p, m′.sub.i,j=m′.sub.i,j,n, m′.sub.i,j,n is obtained according to m′.sub.i,j,k=reduce2(m′.sub.i,j,k−1, combine2(m.sub.1[i,k], m.sub.2[k,j])), m.sub.1[i,k] is an element in the i.sup.th row and the k.sup.th column of the data block M.sub.1x, m.sub.2[k,j] is an element in the k.sup.th row and the j.sup.th column of the data block M.sub.2x, k is a variant, and a value of k ranges from 1 to n.
[0312] S1004: The management node obtains a processing result for the to-be-processed dataset according to first intermediate results obtained by the at least two computing nodes in the first class of computing nodes.
[0313] Therefore, according to the data processing method provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0314] Optionally, in an embodiment, the data processing system further includes a second class of computing nodes. The method 1000 further includes:
[0315] allocating, by the management node according to the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, a second processing task to at least one computing node that includes an SC.sub.y and that is in the second class of computing nodes, where the SC.sub.y is the y.sup.th computing node in the at least one computing node, and y is a positive integer;
[0316] obtaining, by the computing node SC.sub.y according to the second processing task, the first intermediate results obtained by the at least two computing nodes in the first class of computing nodes, where the first intermediate results obtained by the SC.sub.y are first intermediate results obtained according to data blocks M.sub.1x in one row and data blocks M.sub.2x in one column that are in the to-be-processed dataset;
[0317] performing, by the computing node SC.sub.y, a reduce2 operation on the first intermediate results obtained by the SC.sub.y, to obtain a second intermediate result M″.sub.y, where the second intermediate result M″.sub.y is a matrix including m rows and p columns of data; and
[0318] obtaining, by the management node, a processing result for the to-be-processed dataset according to a second intermediate result obtained by the at least one computing node in the second class of computing nodes.
[0319] Optionally, in an embodiment, the dataset further includes a data block M.sub.3x, and the data block M.sub.3x is a matrix including m rows and p columns of data. The method 1000 further includes:
[0320] allocating, by the management node according to the second intermediate result obtained by the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes, a third processing task to the at least one computing node that includes the SC.sub.y and that is in the second class of computing nodes;
[0321] obtaining, by the computing node SC.sub.y, the data block M.sub.3x in the dataset according to the third processing task; and
[0322] performing, by the computing node SC.sub.y, an assign assign operation on the second intermediate result M″.sub.y and the data block M.sub.3x that are obtained by the SC.sub.y, to obtain a processing result for the to-be-processed dataset.
[0323] Optionally, in an embodiment, n=m, and the data block M.sub.2x and the data block M.sub.3x are a same data block.
[0324] Optionally, in an embodiment, n=p, and the data block M.sub.1x and the data block M.sub.3x are a same data block.
[0325] Optionally, in an embodiment, when the second class of computing nodes include at least two computing nodes, the at least two computing nodes in the second class of computing nodes concurrently process second processing tasks allocated by the management node.
[0326] Optionally, in an embodiment, the management node, the first class of computing nodes, and the second class of computing nodes each may be a physical machine, a virtual machine, or a central processing unit (CPU).
[0327] Therefore, according to the data processing method provided by this embodiment of the present disclosure, when a combine operation and a reduce operation are being performed on data blocks, there is no need to perform all combine operations before the reduce operation is performed; instead, the combine operation and the reduce operation are performed alternately, so that memory space occupied by computation can be reduced, and a computation time can be reduced.
[0328] A person of ordinary skill in the art may be aware that, in combination with the examples described in the embodiments disclosed in this specification, units and algorithm steps may be implemented by electronic hardware, computer software, or a combination thereof. To clearly describe the interchangeability between the hardware and the software, the foregoing has generally described compositions and steps of each example according to functions. Whether the functions are performed by hardware or software depends on particular applications and design constraint conditions of the technical solutions. A person skilled in the art may use different methods to implement the described functions for each particular application, but it should not be considered that the implementation goes beyond the scope of the present disclosure.
[0329] It may be clearly understood by a person skilled in the art that, for the purpose of convenient and brief description, for a detailed working process of the foregoing system, apparatus, and unit, reference may be made to a corresponding process in the foregoing method embodiments, and details are not described herein again.
[0330] A person skilled in the art may clearly understand that, the technologies in the embodiments of the present disclosure may be implemented by software in addition to a necessary general hardware platform. Based on such an understanding, the technical solutions of the present disclosure essentially, or the part contributing to the prior art, or all or a part of the technical solutions may be implemented in the form of a software product. The software product is stored in a storage medium and includes several instructions for instructing a computer device (which may be a personal computer, a server, or a network device) to perform all or a part of the steps of the methods described in the embodiments of the present disclosure. The foregoing storage medium may include any non-transitory machine-readable medium capable of storing program code, such as a USB flash drive, a removable hard disk, a magnetic disk, an optical disc, a random-access memory (RAM), a solid state disk (SSD), or a non-volatile memory. The foregoing descriptions are merely specific embodiments of the present disclosure, but are not intended to limit the protection scope of the present disclosure.