SYSTEMS AND METHODS FOR FAST AND SCALABLE DATA-INTENSIVE COMPUTATION
20230251918 · 2023-08-10
Inventors
Cpc classification
G06F2009/4557
PHYSICS
G06F9/4881
PHYSICS
G06F9/4862
PHYSICS
International classification
G06F9/455
PHYSICS
Abstract
The present invention relates to a system (100) and method with a fast distributed data processing engine (105) for fast and scalable data-intensive computation comprising a data model (110) defining an addressable collection of a message space, addressing a message space constellation (115) and the message space with one or more coordinates, updating the message space and the program state with the message following a consistency model, defining an attribute for an area within the message space and running a task in the message space constellation (115). In particular, each task accesses a part of the message space in a subset of the message space constellation (115).
Claims
1. A system with a fast distributed data processing engine for fast and scalable data-intensive computation comprising: a data model to define an addressable collection, or a constellation of addressable “message spaces”, combine state of the message spaces to form a global program state, and regulate mutations of message-based operations in a message space and a program state; attributes to be defined within areas or position ranges in a space wherein some attributes make spaces to intersect to share data and exchange information; the program state or a data space materialized as messages in the message spaces with messages and data, dynamically migrating from one node to another at a run time; and a plurality of tasks running in the message space constellation and, with each task accessing a subset of the message space constellation and all tasks combined together maintaining and mutating the program state of the computation.
2. The system as claimed in claim 1, wherein the message spaces contain areas with different attributes defined by a designer as applicable, and typical attributes include a persistent (P) attribute, a universal (U) attribute and a replicated (R) attribute.
3. The system according to claim 1, wherein the message spaces are addressable in the message space constellation, and the messages are addressable within the message spaces.
4. The system as claimed in claim 2, wherein an area in the message spaces may have the universal (U) attribute configured to facilitate communication among spaces by crosscutting the area into a plurality of spaces of the message space constellation, or intersecting the message spaces in the area.
5. The system as claimed in claim 1, wherein the data model is configured to abstract a program state or data residing in the message space constellation and materializes the data as the message in the message space.
6. The system as claimed in claim 1, wherein the message space is addressable in the message space constellation and the message is addressable within the message space.
7. The system as claimed in claim 1, wherein the message is a sequence of bytes of a bounded size.
8. The system as claimed in claim 1, wherein the data spaces are logical constructs, and messages or parts of a message (message fragments) are potentially distributed among multiple nodes.
9. The system as claimed in claim 1, wherein the program state or data are able to dynamically migrate from one node to another at a run time as the messages and message fragments distribute and re-distribute among nodes.
10. The system as claimed in claim 1, wherein when a computing task is created, the task materializes accessible data as visible parts of messages in the message spaces and reads and/or accesses and/or posts and/or processes and/or mutates the message in the message space.
11. The system as claimed in claim 1, wherein the system further includes a globalizer configured to schedule the task to run on a compute node, which can be a container, a VM, a server, or any other types of compute node.
12. A method for fast and scalable data-intensive computation representing and mutating a program state of a distributed computation, comprising steps of: defining an addressable collection of a message space in a data model to combine state of a message space to form a global program state and regulate mutations of the message space and the program state in a message; defining an area in the message spaces to have certain attributes including a persistent (P) attribute, a universal (U) attribute and a replicated (R) attribute; addressing a message space constellation and the message space with one or more coordinates; running a task in the message space constellation, wherein each task accesses a subset of the message space constellation; and updating the message space and the program state with the message following a consistency model; wherein the task materializes accessible data as visible parts of messages in the message space and reads and/or accesses and/or posts and/or processes and/or mutates the message in the message space.
13. The method according to claim 12, wherein the method performed by the task comprising steps of: abstracting a data to be messages residing in the message space constellation; materializing the data as messages in message spaces, wherein the message is read, processed and mutated by the task; processing a set of messages from an accessible space, generating a new set of messages after processing a set of messages from the accessible space and posting the new set of messages in the message space; and sending a new set of messages to the message space following a certain consistency model; wherein the consistency model enforces a level of predictable behavior for concurrent reads and writes of messages.
14. The method according to claim 12, wherein, the method for posting the message in the message space includes steps of: examining the message posted in a related message set; determining whether the message posted in a related message set is observed; preparing a new horizon; determining whether the new horizon is consistent; and applying changes in message and instantiating the horizon.
15. The method according to claim 12, wherein message is a sequence of bytes of a bounded size.
16. The method according to claim 12, wherein the method further includes a step of scheduling the task by a globalizer.
17. The method as claimed in claim 12, wherein an area in the message space contains a universal (U) attribute for facilitating communication among spaces by crosscutting the- area into a plurality of spaces of the message space constellation, or intersecting the message spaces in the area.
Description
BRIEF DESCRIPTION OF THE DRAWINGS
[0026] So that the manner in which the above recited features of the present invention is be understood in detail, a more particular description of the invention, briefly summarized above, may be had by reference to embodiments, some of which are illustrated in the appended drawings. It is to be noted, however, that the appended drawings illustrate only typical embodiments of this invention and are therefore not to be considered limiting of its scope, for the invention may admit to other equally effective embodiments.
[0027]
[0028]
[0029]
[0030]
[0031]
DETAILED DESCRIPTION
[0032] The present invention relates to a system (100) with a fast distributed data processing engine (105) for fast and scalable data-intensive computation by implementing a data model (110).
[0033] The principles of the present invention and their advantages are best understood by referring to
[0034] The following detailed description is, therefore, not to be taken in a limiting sense, and the scope of the present disclosure is defined by the appended claims and equivalents thereof. References within the specification to “one embodiment,” “an embodiment,” “embodiments,” or “one or more embodiments” are intended to indicate that a particular feature, structure, or characteristic described in connection with the embodiment is included in at least one embodiment of the present disclosure.
[0035] Terms an addressable collection, or constellation are used interchangeably for convenience.
[0036] Terms data, data state and program state are used interchangeably for convenience.
[0037]
[0038] In accordance with an embodiment of the present invention, the data model (110) defines the semantics of messages in spaces with horizons to efficiently instantiate change and move large quantities of data in the system (100). Particularly, the data model (110) is a unified global data space constellation composed of a set of data spaces with well-defined message-oriented operational semantics. Moreover, the data model (110) defines an addressable collection, or constellation of addressable message spaces, combines the state of the message spaces to form the global program state and mutates the message space and program state in message-based operations. Further, the data model (110) abstracts data to be messaged residing in a message space constellation (115). And, the data is materialized as messages in message spaces. The messages may be read, processed and mutated by tasks in the message spaces. After processing a set of messages, the task may generate a new set of messages, and post them in the message space. The processing logic materializes in the system (100) as the task runs in the message space.
[0039] A software construct called container is constructed in the system (100) to facilitate the execution of a compute task. Each container provides system services to the task. In particular, the container packages a certain amount of computing resources, such as CPU cores and RAM capacity and provides system services related to the data. Moreover, the container facilitates access to the message spaces. In an implementation, it is often convenient to let the container also monitor fault events, control heartbeats, and handle communication details.
[0040] In accordance with an embodiment of the present invention, the task accesses messages in its space and Space U (intersecting space with other spaces). As long as the resources on all nodes can materialize the message space constellation, the total quantity of processed data by one task may exceed the memory and secondary storage capacity of the container and the node on which this task runs. Particularly, the container facilitates such accesses through inter-container communication. A container runs on a compute node.
[0041] In accordance with an embodiment of the present invention, a message space may instantiate on zero, one or more containers or nodes. If a space is not instantiated on any container, no compute task can access the messages in that space. Otherwise, the task may access messages partially materialized in local or remote containers or nodes, and perform transfer for messages or message fragments. Particularly, the globalizer determines or speculates the messages a task accesses, and allocates the task to execute in a container that instantiates the message spaces. Ideally, the tasks are scheduled to containers that have most of the messages (data) needed by the tasks. And, if all containers with good locality for the task are busy and the task has to be scheduled to a container that does not have the messages (data), the messages (data) will be dynamically migrated to the container and the data in the newly posted message by the task will be likely stored in or near to that container. Thus, the data processing engine changes the data distribution in the system (100) dynamically according to the tasks and container distribution. Further, the scheduler communicates and coordinates with the message registry on the active locations of messages.
[0042] To quickly locate messages, there may be message registries that manage messages or message blocks in the message space constellation (115). Particularly, the message registry is maintained on one or more designated compute nodes, and changes to the registry are recorded and coordinated by the registry. The registry conducts coordination for the state change, i.e., message posting of the system (100). For example, the registry keeps track of the state of message posting. Subsequently, the changes of the messages on appropriate horizons are handled by containers and inter-container communication.
[0043] In accordance with an embodiment of the present invention, containers help to transfer accessed message data from remote nodes to the local message space if the data are not present locally. Particularly, for posting messages the container coordinates with the message registry to update the message space state and conducts data transfer from local nodes to other nodes for overflow handling, locality control or replication. Although pre-fetching is performed, the behavior is still dominated by on-demand transfer. The transfer method is effective for random accesses in computation as the system (100) avoids a large amount of communication when there is no real access to certain parts of a message.
[0044] In accordance with an embodiment of the present invention, the data model (110) enforces data consistency by imposing new constraints to the task. In constraint 1, the task resides in one message space besides Space U limiting the data and the task may access to the capacity of a space (16 TB by default). In constraint 2, the task observes one horizon. And in constraint 3 the task's lifespan terminates after it sends all messages. The constraints 2 and 3 combine to limit the task to conduct processing on one horizon and create at most one new horizon in an event-driven programming style. Particularly, the task is not supposed to conduct multiple send-receive round-trip communication cycles to interact with other tasks in its lifespan.
[0045] In accordance with another embodiment of the present invention, the system (100) includes I/O channels through which the task can interact with another task or external programs in multiple send-receive cycles. But messages in such communication are considered “out-of-horizon”—they do not belong to any horizons.
[0046] In accordance with an embodiment of the present invention, the globalizer (125) is configured to schedule the task to run in a container on a compute node. And, multiple programming language interfaces connect the system (100) service to the applications. The data model (110) is not programming language specific, and may support multiple programming languages. To support a new language, library functions for the new language are written to access the message spaces. The data model (110) requires one task resides in one space and implements the task functions by creating a stub task in the same space as the user task and maps messages to a chunk of memory maintained by the library.
[0047] In accordance with an embodiment of the present invention, the message space constellation (115) is configured to formalize data of different properties. Particularly, the message space constellation (115) includes one or more message spaces with one or more areas wherein each area is associated with a set of attributes defined within a position range. In accordance with an embodiment, the message spaces in the data model (110) are numbered from 0 to U−1. The value of U is configurable with a default value of 256. The special space, Space U, intersects all other spaces. Hence, total spaces in the message space constellation (115) is U+1 space.
[0048] In accordance with an embodiment of the present invention, program state or data (120) are in message spaces, and a message space may be instantiated on multiple compute nodes. Therefore, the program state or data (120) may be services by a plurality of nodes, and can migrate from one node to another at runtime.
[0049]
[0050] Although it is not required, it is convenient for a message space to have a fixed capacity, S, in bytes.
[0051] In accordance with an embodiment of the present invention, without loss of generality, a message is a sequence of bytes of a bounded size, although other forms of message organizations and definitions of coordinates are possible. Particularly, the message is posted in a space at a byte position given that the space is able to contain the entire byte sequence of the message.
[0052] In an exemplary case, when a message of size M is posted at the position P in Space X. Then the message resides in positions P to P+M−1 in the space, with the first byte of the message residing at position P and all the remaining bytes following the sequence order
[0053] In accordance with an embodiment of the present invention, all spaces in the message space constellation (115) may have the same capacity, and a space may address positions with byte offsets from 0 to 2.sup.44-1. Thus, a compute task may address a message with an identifying vector, such as a number pair <message space number, byte offset>. If necessary, an element for the terminating position can be added to the vector. We call such an identifying vector a coordinate.
[0054] When multiple spaces intersect, a communication pattern with delays among tasks is made possible. Space U in the message space constellation (115) facilitates such inter-task communication. The Space U starts and ends at configurable positions and intersects other spaces in the position range. Because of the consistency enforcement, such full cycle of such communication does not complete until the lifespan of one task is terminated. This creates a delay in the communication, but ensures consistency of the message operation semantics among a plurality of compute tasks.
[0055] In accordance with an embodiment, for a message-based abstraction, the behavior of data read and write among multiple tasks is defined with natural message posting and reception semantics in the message space. In particular, the data model (110) defines data in terms of messages. A task can post a message. If the posting operation is successful, a new horizon is created and the content of the message on the new horizon is what the task has just posted. A message can be received and read after it is posted. The content of the message is the instantiated on the current horizon when the message is received. Note the “current horizon” for the receiving task may be different from the “new horizon” for the posting task because there can be intermediate horizons in between, and the message content may have changed several times between the posting and receiving operations. Each task reads the content of the message as received from the message space, and mutate its content. Moreover, multiple tasks can read and mutate the content of a message simultaneously. The mutations in that message do not affect the observations of other tasks who have received the same message in the same space.
[0056] In accordance with another embodiment, different tasks may read different messages at the same position of a space. Particularly, the horizons materialize the presence of messages in the spaces. And, every space materializes on a number of horizons, and a horizon may be built on part of a space, one space or multiple spaces, depending on implementation. The same message in the space on different horizons may be different.
[0057] The currently visible parts of all horizons are called the Horizon Of Present Existence (HOPE). Roughly speaking, HOPE is the current “presentation” of all the message spaces. The content of messages are observed on HOPE and it may combine contents from multiple horizons. The “visible” part of a horizon is the coordinates on that horizon for which no newer horizon defines a new instantiation. In the simplified case that the system always instantiates a horizon on an entire space, HOPE is the combination of all most recent horizons for all message spaces.
[0058] In accordance with an embodiment of the present invention, the area in a message space is associated with certain attributes. In particular, possible attributes include but are not limited to persistent (P), universal (U) and replicated (R) as depicted in
[0059] The attribute P indicates that the region is persistent. And, the messages posted in this P region are automatically written to hard disks.
[0060] The attribute R indicates that the region is replicated, and the system (100) makes replicated copies of the message contents in that area, often on different nodes, to enhance reliability.
[0061] An area with both P and R attributes naturally provides a storage function equivalent to the 3-way replicated underlying storage of GFS and HDFS. In particular, in the data model (110), the system state changes when a task's lifespan terminates, the messages are posted and, potentially, the horizon is moved to the next level. The system (100) conducts replication as an innate step of the task termination process. If the replication fails, then message posting fails and the messages are dropped.
[0062] The attribute U indicates that the area is in Space U (the crosscutting space that intersects other spaces). A message posted in Space U is mirrored to all sibling areas in Space 0, Space 1, . . . Space U−1. The operations are coordinated in multiple spaces through the pivotal area of Space U and the messages posted in Space U are universally atomic. For example, if a set of messages are posted in Space U successfully, all of them are delivered to all spaces.
[0063] The areas with an attribute are defined with the position ranges in a space, such as [10, 19], [2.sup.41, 2.sup.42, [4 TB, 6 TB]. The boundaries of the ranges have effect on all applicable message spaces. For example, when a range [10, 19] has an attribute, then the area in the range [10, 19] in Space U has that attribute, too, if the range [10, 19] is in Space U.
[0064] An exemplary setting of the ranges with P, U and R attributes is illustrated in
[0065] In accordance with an embodiment of the present invention, when posting a message, the message is either delivered or lost. When a message is delivered, the data model (110) requires the message to satisfy certain criteria defined by the implementation. For example, the criterion can be the message being delivered in its entirety or the message passing a checksum verification. Moreover, if the task posts multiple messages, the system may require the messages in one space are either all delivered, or all dropped.
[0066] In accordance with an embodiment, the task exists on HOPE and observes the content of messages in a space on one or more horizons constituent to HOPE. Thus, when a task posts a message, the message is visible to other tasks only when no other message changes HOPE in a way that blocks the view of the message. If the message posting is successful, the message materializes on a new HOPE. When other tasks in the previous HOPE can still post messages, and the messages materialize on new HOPE as long as the messages are not blocking each other. An implementation may use reasonable rules to decide whether two messages block each other. For example, two message that do not share coordinates may be considered non-blocking, or two messages far enough from each other may be considered non-blocking.
[0067]
[0068] At step 305, an addressable collection of a message space is defined in a data model (110) to combine the state of the message space to form a global program state. In particular, the addressable collection of the message space also regulates mutations of the message space and the program state in a message operation.
[0069] Step 305 proceeds to step 310. At step 310, an attribute is defined for the area within the message space. An individual attribute is defined for an area, while the area and intersecting spaces have a combined group of compatible attributes. Step 310 proceeds to step 315. At step 315, the message space constellation (115) and the messages are addressed with one or more coordinates.
[0070] Step 315 proceeds to step 320. At step 320, a plurality of computing tasks run in the message space constellation (115). In particular, each task accesses a part of the message space in a subset of the message space constellation (115).
[0071] Step 320 proceeds to step 325. At step 325, the message space and the program state are updated with the message operations following a consistency model.
[0072]
[0073] At step 405 the method starts. At step 410, the messages posted in related message blocks are examined.
[0074] At step 415, a determination is made whether the message in the message set can be observed, i.e., not blocked.
[0075] In one embodiment, when the determination is “YES” and the message in the message set is observed, then the method proceeds to step 420.
[0076] In another embodiment, when the determination is “NO” and the message in the message set is not observed, then the method proceeds to step 425.
[0077] At step 420, a new horizon is prepared. Step 420 proceeds to step 430.
[0078] At step 425, if the message in a message set is not observed due to but is not limited to blocked view to HOPE, multiple retries are made for the observation. Then the system (100) retries and nullifies the posting of the entire message set. If after retrying N times also the message is not observed then the posting of the entire message set is nullified. Step 425 proceeds to step 440.
[0079] At step 430, another determination is made whether the new horizon is consistent.
[0080] In one embodiment, when the determination is “YES” and the horizon is consistent, then the method proceeds to step 435.
[0081] At step 435, changes are applied to the messages posted in the related message blocks and the new horizon is instantiated.
[0082] In another embodiment, when the determination is “NO” and the horizon is not consistent, then the method proceeds to step 425.
[0083] Steps 425 and 435 proceed to step 440. At step 440, the process is terminated.
[0084]
[0085] At step 505, data to be messages residing in the message space constellation (115) is abstracted.
[0086] At 510, the data as messages in message spaces is materialized. In particular, the message is read, processed and mutated by the task.
[0087] At 515, a set of messages from an accessible space is processed.
[0088] At 520, a new set of messages are generated and posted in the message space.
[0089] At step 525, a new set of messages is sent to the message space and posted following a certain consistency model.
[0090] The present invention provides advantages including higher scalability, enhanced efficiency, efficient scheduling, Efficiency of memory usage, data and tasks scheduling, persistency, high performance, wide spectrum data intensive workload support, low-latency operations. The system (100) scales almost linearly to a very large installation size avoiding the complication of synchronization control among a large number of loosely-coupled compute nodes. Moreover, dynamic data migration improves locality and enables Data model (110) to optimize the internal data and work flows to enhance the performance of the computation.
[0091] The invention permits efficient implementation on commodity computing hardware in various platforms such as but not limited to Intel 64 platforms and in various languages and systems such as but not limited to a local Linux system to manage local resources on a compute node.
[0092] In view of the foregoing, it will now be appreciated that the elements of the block diagram and flowcharts support combinations of means for carrying out the specified functions and processes, combinations of steps for performing the specified functions and processes, program instruction means for performing the specified functions and processes, and so on.
[0093] It will be appreciated that computer program instructions may include computer executable code. A variety of languages for expressing program instructions are possible, including without limitation C, C++, Java, JavaScript, assembly language, Perl, and so on. Such languages may include assembly languages, hardware description languages, database programming languages, functional programming languages, imperative programming languages, and so on.
[0094] The functions, systems and method herein described could be utilized and presented in a multitude of languages. Individual systems may be presented in one or more languages and the language may be changed with ease at any point in the process or method described above. One of ordinary skill in the art would appreciate that there are numerous languages the system could be provided in, and embodiments of the present disclosure are contemplated for use with any language.
[0095] The invention is capable of myriad modifications in various obvious aspects, all without departing from the spirit and scope of the present disclosure. Accordingly, the drawings and descriptions are to be regarded as illustrative in nature and not restrictive.
[0096] The features described herein may be combined to form additional embodiments and sub-elements of certain embodiments may form yet further embodiments. The foregoing summary of the present disclosure with the preferred embodiment should not be construed to limit the scope of the invention. It should be understood and obvious to one skilled in the art that the embodiments of the invention thus described may be further modified without departing from the spirit and scope of the invention.