Optimizing Communication in Distributed Computing Network
20210367861 · 2021-11-25
Inventors
Cpc classification
H04L41/5022
ELECTRICITY
H04L47/263
ELECTRICITY
International classification
Abstract
A computing system includes a distributed computing cluster including a plurality of computing nodes interconnected by an interconnect network over which the computing nodes of the plurality of computing nodes communicate with each other by passing messages. The computing nodes are configured with a first parameter governing transmissions of messages by the computing nodes over the interconnect network. The computing nodes are configured to accumulate messages for transmission as a group of messages according to the first parameter, and the computing system is configured to limit injections of computing requests into the distributed computing cluster according to a second parameter. A controller is configured to receive at least one predetermined service level requirement and to control a value of the second parameter and a value of the first parameter to control a computational throughput of the distributed computing cluster while complying with the at least one service level requirement.
Claims
1. A computing system (104) including: a distributed computing cluster (112) including: a plurality of computing nodes (105) interconnected by an interconnect network (107) over which the computing nodes of the plurality of computing nodes communicate with each other by passing messages, each of at least some of the computing nodes being configured with a first parameter governing transmissions of messages by the computing node over the interconnect network, wherein the at least some computing nodes are configured to accumulate messages for transmission as a group of messages according to the first parameter, and wherein the computing system is configured to limit injections of computing requests into the distributed computing cluster according to a second parameter; and the computing system further includes a controller (110) configured to receive at least one predetermined service level requirement and to control a value of the second parameter and a value of the first parameter to control a computational throughput of the distributed computing cluster while complying with the at least one service level requirement.
2. The computing system of claim 1 wherein the distributed computing cluster includes a rate limiter (117) configured to limit a rate of injection of the computing requests into the distributed computing system according to the second parameter.
3. The computing system of claim 1 wherein each service level requirement of the one or more service level requirements specifies a maximum allowable time difference between injecting a computing request into the distributed computing cluster and receiving a response to the injected computing request from the distributed computing cluster.
4. The computing system of claim 1 further including a monitor (119) configured to observe a rate of output of the distributed computing cluster and a processing latency of the distributed computing system.
5. The computing system of claim 4 wherein the observed processing latency is measured by determining a time difference between injecting a computing request into the distributed computing cluster and receiving a response to the injected computing request from the distributed computing cluster.
6. The computing system of claim 4 wherein requests injected into the distributed computing cluster and results output by the distributed computing cluster pass through the monitor.
7. The computing system of claim 4 wherein the computational throughput is observed by measuring, by the monitor, a rate of data elements outputted by the cluster.
8. The computing system of claim 4 wherein the controller is configured to process the observed computational throughput of the distributed computing cluster and the observed processing latency to adjust the value of the second parameter and the value of the first parameter.
9. The computing system of claim 8 wherein the controller repeatedly processes the observed computational throughput of the distributed computing cluster and the observed processing latency to adjust the value of the second parameter and the value of the first parameter while the distributed computing cluster is processing injected computing requests.
10. The computing system of claim 8 wherein the controller implements a feedback loop with the distributed computing cluster to process the observed computational throughput of the distributed computing cluster and the observed processing latency to adjust the value of the second parameter and the value of the first parameter.
11. The computing system of claim 1 wherein the controller uses an optimization algorithm to control the value of the first and/or second parameter to maximize the computational throughput of the distributed computing cluster while complying with the at least one service level requirement.
12. The system of claim 1 wherein the first parameter includes a delay parameter governing a minimum time interval between transmission of messages by the computing node over the interconnect network.
13. The system of claim 12 wherein the at least some computing nodes are configured to accumulate messages for transmission as a group of messages during the minimum time interval.
14. The system of claim 1 wherein the second parameter includes a rate of input parameter configured to limit a rate of the injection of computing requests into the distributed computing cluster.
15. The system of claim 1 wherein controlling the computational throughput of the distributed computing cluster while complying with the at least one service level requirements includes the maximizing a computational throughput of the distributed computing cluster without violating any of the at least one service level requirements.
16. The system of claim 1 wherein the first parameter governs a number of messages accumulated between transmissions of messages by the computing node over the interconnect network.
17. The computing system of claim 1 further including one or more interface components, each configured to receive input data, inject computing requests into the distributed computing cluster for processing the input data, receive processing results from the distributed computing cluster, and generate output data from the processing results, the interface component being configured to limit a rate of injection of computing requests into the distributed computing system according to the second parameter.
18. The computing system of claim 17 wherein at least some of the one or more interface components execute on a client system (106) separate from systems on which the distributed computing cluster executes.
19. The computing system of claim 17 wherein at least some of the received input data is associated with priority levels, and the one or more interface components are configured to manage injection of the input data into the distributed data cluster according to the priority levels.
20. A method including: limiting injections of computing requests into a distributed computing cluster according to a second parameter, wherein the distributed computing system includes: a plurality of computing nodes (105) interconnected by an interconnect network (107) over which the computing nodes of the plurality of computing nodes communicate with each other by passing messages, each of at least some of the computing nodes being configured with a first parameter governing transmissions of messages by the computing node over the interconnect network, accumulating, at the at least some computing nodes, messages for transmission as a group of messages according to the first parameter, receiving, at a controller (115), at least one predetermined service level requirement, and controlling, using the controller, a value of the second parameter and a value of the first parameter to control a computational throughput of the distributed computing cluster while complying with the at least one service level requirement.
21. A computer-readable medium storing software in a non-transitory form, the software including instructions for causing a computing system to: limit a rate of injection of computing requests into a distributed computing cluster according to a second parameter, wherein the distributed computing system includes: a plurality of computing nodes (105) interconnected by an interconnect network (107) over which the computing nodes of the plurality of computing nodes communicate with each other by passing messages, each of at least some of the computing nodes being configured with a first parameter governing transmissions of messages by the computing node over the interconnect network, accumulate, at the at least some computing nodes, messages for transmission as a group of messages according to the first parameter, receive, at a controller (115), at least one predetermined service level requirement, and control, using the controller, a value of the second parameter and a value of the first parameter to control a computational throughput of the distributed computing cluster while complying with the at least one service level requirement.
22. A computing system including: means for limiting a rate of injection of computing requests into a distributed computing cluster according to a second parameter, wherein the distributed computing system includes: a plurality of computing nodes (105) interconnected by an interconnect network (107) over which the computing nodes of the plurality of computing nodes communicate with each other by passing messages, each of at least some of the computing nodes being configured with a first parameter governing a transmissions of messages by the computing node over the interconnect network, means for accumulating, at the at least some computing nodes, messages for transmission as a group of messages according to the first parameter, means for receiving, at a controller (115), at least one predetermined service level agreement, and means for controlling, using the controller, a value of the second parameter and a value of the first parameter to control a computational throughput of the distributed computing cluster while complying with the at least one service level requirement.
Description
DESCRIPTION OF DRAWINGS
[0023]
[0024]
[0025]
[0026]
[0027]
[0028]
[0029]
[0030]
[0031]
[0032]
[0033]
[0034]
[0035]
DESCRIPTION
[0036]
[0037] An execution environment 104 includes a distributed computing cluster 112 (sometimes referred to as an “active data cluster”) that is usable by one or more client systems 106 to perform computation tasks as is described in greater detail below. One example of a distributed computing cluster is the distributed computing cluster described in U.S. patent application Ser. No. 16/175,133, which is incorporated herein by reference. The execution environment 104 may be hosted, for example, on one or more general-purpose computers under the control of a suitable operating system, such as a version of the UNIX operating system. For example, the execution environment 104 can include a multiple-node parallel computing environment including a configuration of computer systems using multiple central processing units (CPUs) or processor cores, either local (e.g., multiprocessor systems such as symmetric multi-processing (SMP) computers), or locally distributed (e.g., multiple processors coupled as clusters or massively parallel processing (MPP) systems, or remote, or remotely distributed (e.g., multiple processors coupled via a local area network (LAN) and/or wide-area network (WAN)), or any combination thereof.
[0038] In some examples, the execution environment 104 reads data from the data source 102 and stores results back into the data source 102 or provides results to other, downstream systems for further processing. Storage devices providing the data source 102 may be local to the execution environment 104, for example, being stored on a storage medium connected to a computer hosting the execution environment 104 (e.g., hard drive 108), or may be remote to the execution environment 104, for example, being hosted on a remote system (e.g., mainframe 110) in communication with a computer hosting the execution environment 104, over a remote connection (e.g., provided by a cloud computing infrastructure).
[0039] The data storage system 116 is also accessible to a development environment 118 in which a developer 120 is able to specify computer programs for execution in the execution environment (e.g., by the distributed computing cluster 112 via the one or more client systems 106). The development environment 118 is, in some implementations, a system for developing applications as dataflow graphs that include vertices (representing data processing components or datasets) connected by directed links (representing flows of work elements, i.e., data) between the vertices. For example, such an environment is described in more detail in U.S. Publication No. 2007/0011668, titled “Managing Parameters for Graph-Based Applications,” incorporated herein by reference. A system for executing such graph-based computations is described in U.S. Pat. No. 5,966,072, titled “EXECUTING COMPUTATIONS EXPRESSED AS GRAPHS,” incorporated herein by reference. Dataflow graphs made in accordance with this system provide methods for getting information into and out of individual processes represented by graph components, for moving information between the processes, and for defining a running order for the processes. This system includes algorithms that choose interprocess communication methods from any available methods (for example, communication paths according to the links of the graph can use TCP/IP or UNIX domain sockets, or use shared memory to pass data between the processes).
[0040] In some examples, the data source 102 maintains data in any number of different forms of database systems. The data may be organized as records having values for respective fields (also called “attributes” or “columns”), including possibly null values. When first reading data from a data source, the execution module 104 typically starts with some initial format information about records in that data source. In some circumstances, the record structure of the data source may not be known initially and may instead be determined after analysis of the data source or the data. The initial information about records can include, for example, the number of bits that represent a distinct value, the order of fields within a record, and the type of value (e.g., string, signed/unsigned integer) represented by the bits.
1 Execution Environment Overview
[0041] Referring to
[0042] In some examples, the controller 110 receives information characterizing a processing Latency and a rate of output, R.sub.out associated with the distributed computing cluster 112 from the monitor 119. The controller 110 uses the processing Latency and the rate of output, R.sub.out to determine a communication delay, d (sometimes referred to as a “first parameter”) of messages transmitted within the distributed computing cluster 112 and a rate of injection of requests, R.sub.in (sometimes referred to as a “second parameter”) into the distributed computing cluster 112. The rate of output may be a rate at which the cluster 112 outputs output data elements 113, which may be responses by the nodes 105 to the injected requests 111. The computational throughput of the cluster 112 may be observed by measuring, by the monitor 119, the rate of output R.sub.out. The communication delay, d is provided to the distributed computing cluster 112 and is used to configure computing nodes 105 in the distributed computing cluster, as is described in greater detail below. The rate of injection of requests, R.sub.in is provided to the throttle 117 and is used to limit rate of injection of new requests into the distributed computing cluster 112.
[0043] In some examples, the input requests 111 include requests for processing data according to a procedure defined in a component external to the distributed computing cluster 112 (e.g., the call cluster component). The distributed computing cluster 112 processes the input requests 111 and provides results 113 (responses by nodes 105 to the injected computing requests 111) of the processing to the component external to the distributed computing cluster 112 (e.g., back to the call cluster component).
[0044] The throttle 117 is configured to limit a rate of injection of requests, R.sub.in to the distributed computing cluster 112. Very generally, the rate of injection of requests parameter is adjustable to ensure that requests are not injected into the distributed computing cluster 112 if those requests are unlikely to be completed within an amount of time specified by an SLA associated with the requests.
2 Distributed Computing Cluster
[0045] The distributed computing cluster 112 includes a number of computing nodes 105 connected to one another by an interconnection network 107 (or simply an “interconnect”). The interconnect 107 facilitates the communication of data between computing nodes 105 for processing data in a distributed manner. For example, the interconnect 107 includes an Internet protocol (IP)-based and/or Ethernet based network. Very generally, processing requests are injected into the distributed computing cluster 112 by one or more external components (not shown). The distributed computing cluster 112 processes the requests using the computing nodes 105 and the interconnection network 107 to generate the result data (e.g., responses by nodes 105 to the injected computing request) that is provided back to the external component.
[0046] In the course of processing requests, messages are frequently passed between the computation nodes 105 via the interconnect 107. The communication delay, d indicates how long the computing nodes 105 in the distributed computing cluster 112 wait between transmissions of messages to one another over the interconnect 107. The larger values of the communication delay cause the computing nodes 105 to wait a longer duration of time between transmissions. While the computing nodes 105 are waiting, they accumulate a group of messages for later transmission (e.g., in a single network package). So, larger values of the communication delay are associated with more messages being sent per transmission, resulting in greater throughput for the distributed computing cluster 112 (i.e., due to a reduction in network overhead associated with packet formation). But waiting for a long time duration also causes greater computing latency. Similarly, smaller values of the communication delay, d are associated with lesser throughput for the distributed computing cluster (i.e., due to increased network overhead associated with an increased number of packets being formed) but also with lesser computing latency.
3 Controller
[0047] As is mentioned above, the controller 110 implements a feedback control mechanism to maximize throughput in the distributed computing cluster 112 while ensuring compliance with service level agreements (SLAB) or other promises associated with the requests. The controller 110 uses a monitor 119 to monitor a processing Latency and a rate of output, R.sub.out associated with the distributed computing cluster 112. The controller 110 receives the Latency, the rate of output, R.sub.out, and a service level agreement, SLA as input and processes those inputs to determine the input rate, R.sub.in and the communication delay, d for the distributed computing cluster 112.
[0048] The input rate, R.sub.in determined by the controller 110 is provided to the throttle 117 which maintains the rate of injection of requests 111 into the distributed computing cluster 112 at R.sub.in. The communication delay, d determined by the controller 110 is provided to the distributed computing cluster 112 which maintains the time interval between transmissions of messages over the interconnect 107 of the distributed computing cluster 112 according to the communication delay, d.
3.1 Control Algorithm
[0049] Referring to
[0050] After the adjust(d) function returns, the algorithm implemented by the controller 110 proceeds to Line 4 where a test is performed to determine whether the rate of output, R.sub.out from the distributed computing cluster is equal to the rate of injection of requests, R.sub.in into the distributed computing cluster (i.e., does the distributed computing cluster currently have sufficient throughput to keep up?) AND if the difference between the SLA and the Latency is greater than zero (i.e., is the distributed computing cluster exceeding the SLA?).
[0051] At Line 5 of the algorithm, if the conditions set forth in Line 4 are met, the controller 110 increases the rate of injection of requests, R.sub.in by an amount, δ. At line 6 of the algorithm, if one or more of the conditions set forth in Line 4 of the algorithm are not satisfied, the controller 110 decreases the rate of injection of requests, R.sub.in by an amount, δ. That is, if the distributed computing cluster 112 is currently keeping up and exceeding the SLA, then the algorithm tries to increase the input rate, R.sub.in. Otherwise, if the distributed computing cluster 112 is either failing to keep up with its processing load or is failing to meet the SLA, the controller 110 decreases the input rate, R.sub.in.
[0052] The loop then repeats with the version of R.sub.in that was updated in the previous iteration of the loop. The adjust(d) function call in the current iteration of the loop modifies d to reach the lowest value of d where further increasing d does not increase R.sub.out for the updated version of R.sub.in. If the adjust(d) function results in a value of d that causes R.sub.out to be equal to the updated value of R.sub.in AND causes the difference between the SLA and the Latency to be greater than zero, then the algorithm again increases R.sub.in. Otherwise, the algorithm reduces R.sub.in.
[0053] By monitoring the Latency and throughput of the distributed computing cluster 112, the algorithm of
4 Example 1
[0054] Referring to
[0055] Referring to
[0056] Referring to
[0057] Referring to
[0058] Referring to
[0059] Referring to
[0060] Referring to
[0061] The controller 110 continues to execute the algorithm of
5 Example 2
[0062] Referring to
[0063] With the number of computing nodes 105 in the distributed computing cluster 112 reduced from five to three, the output rate R.sub.out is reduced from 3000 to 2000, while the input rate, R.sub.in remains at 3000. The Latency increases to 85 ms.
[0064] Referring to
[0065] Referring to
[0066] Referring to
[0067] In the configuration of
6 Alternatives
[0068] In the description of
[0069] Also, the control algorithm in
[0070] The adjust(d) function described above can be implemented using any one of a number of known optimization techniques (e.g., gradient descent or stochastic optimization techniques).
[0071] While the examples described above illustrate injection of requests from only a single external component (e.g., a call cluster component) interacting with the distributed data cluster, it is likely that several external components will be interacting with the distributed data cluster at the same time. In such cases, the controller monitors the latencies and throughputs of many or all of the external components and balances their access to the resources of the distributed data cluster to ensure that throughput is maximized, and the SLAs of the requests from the external components are satisfied.
[0072] In some examples, input data that arrives at the external component (e.g., at a call cluster component) is prioritized and the requests are injected according to the priority level.
[0073] In some examples, latency is measured at the service request level (e.g., remote procedure call latency) or simply by the time it takes between injecting a request into the distributed data cluster and receiving a response.
[0074] In some examples, throughput is measured by monitoring CPU load or a number of messages “in flight.”
[0075] In some examples, each computing node maintains a timer and decides when to transmit messages over the interconnect based on the timer and the communication delay, d. In other examples, a centralized clock periodically distributes a trigger signal, where the period is defined as the communication delay, d. The trigger signal, when received at the computing nodes, causes the computing nodes to transmit messages.
[0076] While not shown in the figures, in some examples, the output of the throttle 117 is fed through the monitor 119 to assist in determining the processing latency of the distributed computing cluster 112.
[0077] As is mentioned above, the distributed computing cluster interacts with one or more external components (sometimes referred to as “call cluster components” or “interface components). The external components may be associated with corresponding data processing procedures. In some examples, an external component receives input data for processing according to its corresponding data processing procedure. The external component forms a processing request using the input data and provides the processing request into the distributed computing cluster. The distributed computing cluster processes the request using the control scheme described above and generates an output. The output is provided back to the external component. The external component processes the output to generate output data, which it transmits to one more downstream components.
7 Implementations
[0078] The approaches described above can be implemented, for example, using a programmable computing system executing suitable software instructions or it can be implemented in suitable hardware such as a field-programmable gate array (FPGA) or in some hybrid form. For example, in a programmed approach the software may include procedures in one or more computer programs that execute on one or more programmed or programmable computing system (which may be of various architectures such as distributed, client/server, or grid) each including at least one processor, at least one data storage system (including volatile and/or non-volatile memory and/or storage elements), at least one user interface (for receiving input using at least one input device or port, and for providing output using at least one output device or port). The software may include one or more modules of a larger program, for example, that provides services related to the design, configuration, and execution of dataflow graphs. The modules of the program (e.g., elements of a dataflow graph) can be implemented as data structures or other organized data conforming to a data model stored in a data repository.
[0079] The software may be stored in non-transitory form, such as being embodied in a volatile or non-volatile storage medium, or any other non-transitory medium, using a physical property of the medium (e.g., surface pits and lands, magnetic domains, or electrical charge) for a period of time (e.g., the time between refresh periods of a dynamic memory device such as a dynamic RAM). In preparation for loading the instructions, the software may be provided on a tangible, non-transitory medium, such as a CD-ROM or other computer-readable medium (e.g., readable by a general or special purpose computing system or device), or may be delivered (e.g., encoded in a propagated signal) over a communication medium of a network to a tangible, non-transitory medium of a computing system where it is executed. Some or all of the processing may be performed on a special purpose computer, or using special-purpose hardware, such as coprocessors or field-programmable gate arrays (FPGAs) or dedicated, application-specific integrated circuits (ASICs). The processing may be implemented in a distributed manner in which different parts of the computation specified by the software are performed by different computing elements. Each such computer program is preferably stored on or downloaded to a computer-readable storage medium (e.g., solid state memory or media, or magnetic or optical media) of a storage device accessible by a general or special purpose programmable computer, for configuring and operating the computer when the storage device medium is read by the computer to perform the processing described herein. The inventive system may also be considered to be implemented as a tangible, non-transitory medium, configured with a computer program, where the medium so configured causes a computer to operate in a specific and predefined manner to perform one or more of the processing steps described herein.
[0080] A number of embodiments of the invention have been described. Nevertheless, it is to be understood that the foregoing description is intended to illustrate and not to limit the scope of the invention, which is defined by the scope of the following claims. Accordingly, other embodiments are also within the scope of the following claims. For example, various modifications may be made without departing from the scope of the invention. Additionally, some of the steps described above may be order independent, and thus can be performed in an order different from that described.