TASK ALLOCATION ACROSS PROCESSING UNITS OF A DISTRIBUTED SYSTEM

20230229519 · 2023-07-20

    Inventors

    Cpc classification

    International classification

    Abstract

    An organization's distributed data storage and processing system produces an enormous volume of source data (such as log files or other statistics). The organization uses a data item processing system to process the source data in prioritized chunks, and to further assign the data items within the chunks to different processing units based on estimates of processing completion time. In this way, it becomes feasible to process the source data for analysis by consumer clients within a reasonable amount of time, and the aggregate use of the processing units is made more efficient.

    Claims

    1. A computer-implemented method performed by a processing system for prioritized processing of large data sets of semi-structured data, the computer-implemented method comprising: accessing a set of files in semi-structured or unstructured format across a plurality of distributed computer systems; determining dates corresponding to the files; sorting the files according to dates; grouping the files into chunks according to date and to a given date range size; computing expected amounts of time required to process the files using a machine learning model trained on features including observed amounts of processing time and at least one of file size, file name, file type, time of processing, or user ID; sorting the files within the chunks according to the expected amounts of time; allocating the files in the sorted order across processing units of a processing subsystem, the allocating comprising: iterating through the chunks according to date, and for each chunk: iterating through the files of the chunk, and for each data item: allocating the file to a processing unit having a lowest computed load, and updating the computed load of the processing unit having the lowest computed load according to the expected amount of time of the file and a processing ability of the processing unit; and the processing units processing the files assigned to the processing units and making results of the processing available to consumers within a structured database.

    2. A computer-implemented method for prioritizing processing of large data sets comprising a set of data items to process, the computer-implemented method comprising: sorting the data items according to a chunking attribute; chunking the data items into chunks according to the chunking attribute; computing weights for processing the data items; sorting the data items within the chunks according to the weights; and according to sorted order of the chunks based on the chunking attribute, allocating the data items in each chunk across processing units of a processing subsystem, the allocating comprising: iterating through the data items in the chunk in an order according to the sorting, allocating each data item to a least-loaded processing unit; and wherein the processing units process the data items and make results of the processing available to consumers within a structured database.

    3. The computer-implemented method of claim 2, wherein the chunking attribute is data item date, and wherein the grouping comprises forming groups of date ranges of fixed sizes.

    4. The computer-implemented method of claim 2, wherein computing the weights comprises computing an expected amount of time for processing a data item.

    5. The computer-implemented method of claim 4, wherein computing an expected amount of time for processing a data item comprises determining a size of the data item.

    6. The computer-implemented method of claim 2, wherein allocating each data item to a least-loaded processing unit comprises normalizing the expected amount of time using a scaling factor representing relative processing abilities of the processing units.

    7. A computer system comprising: a computer processor; and a non-transitory computer-readable storage medium storing instructions that when executed by the computer processor perform actions comprising: sorting a set of data items according to a chunking attribute; chunking the data items into chunks according to the chunking attribute; computing weights for processing the data items; sorting the data items within the chunks according to the weights; and according to sorted order of the chunks based on the chunking attribute, allocating the data items in each chunk across processing units of a processing subsystem, the allocating comprising: iterating through the data items in the chunk in an order according to the sorting, allocating each data item to a least-loaded processing unit; and wherein the processing units process the data items and make results of the processing available to consumers within a structured database.

    8. The computer system of claim 7, wherein the chunking attribute is data item date, and wherein the grouping comprises forming groups of date ranges of fixed sizes.

    9. The computer system of claim 7, wherein computing the weights comprises computing an expected amount of time for processing a data item.

    10. The computer system of claim 9, wherein computing an expected amount of time for processing a data item comprises determining a size of the data item.

    11. The computer system of claim 7, wherein allocating each data item to a least-loaded processing unit comprises normalizing the expected amount of time using a scaling factor representing relative processing abilities of the processing units.

    Description

    BRIEF DESCRIPTION OF DRAWINGS

    [0009] FIG. 1 illustrates a processing environment in which an organization promptly and efficiently processes data items of source data for access to consumer clients, according to one embodiment.

    [0010] FIG. 2 is a high-level block diagram illustrating physical components of a computer used as part of the organization system from FIG. 1, according to one embodiment.

    [0011] The figures depict embodiments of the present invention for purposes of illustration only. One skilled in the art will readily recognize from the following description that alternative embodiments of the structures and methods illustrated herein may be employed without departing from the principles of the invention described herein.

    DETAILED DESCRIPTION

    [0012] FIG. 1 illustrates a processing environment in which an organization promptly and efficiently processes data items of source data for access to consumer clients, according to one embodiment. A very large number of data items are generated within (or otherwise available to) the organization's system 101. The organization's system uses a queuing subsystem 110 to allocate processing of the data items in a sequence of prioritized chunks, allocated among the various processing units available to the organization system so as to efficiently distribute the processing load.

    [0013] The organization system 101 represents the components constituting, or used by, an organization. As illustrated in FIG. 1, the organization system 101 has (or has access to) a number of data sources 105 and their associated data items 107. The data items 107 are input to a queuing subsystem 110, which chunks (groups) the data items and allocates the chunked items to a preprocessing subsystem 120. (Optionally, as illustrated in FIG. 1, before being processed by the processing subsystem 120 the data items may first be preprocessed by a preprocessing subsystem 115.) The output of the processing subsystem 120 is stored in a storage subsystem 125, where it is available to consumer clients 130. The clients may be part of the organization system 101, such as consumer client 130A (e.g., a computer program of a data analyst employed by the organization), or a third-party, external client, such as consumer client 130B (e.g., a third-party industry analyst or a program thereof). These various components are now described in additional detail.

    [0014] A number of data sources 105 collectively provide a set of data items 107, which together constitute the source data to be processed. The data sources 105 may be located at different locations over a computer network, such as on different physical machines within a data center of the organization system 101, or within a cloud storage provider used by the organization, for example. The data items 107 represent distinct units of information, such as log files. Each data item has one or more attributes, such as an associated date and/or time (e.g., a file creation date, for data items represented by digital files), a data size in the bytes, or the like. The data items 107 are in a semi-structured (e.g., files with some organization, but lacking a precise schema) or unstructured (e.g., text files permitting arbitrary data layout) format not suitable for direct querying. This contrasts with structured data (e.g., data in a relational database management system), which has a schema that precisely defines the units of semantic information within the data items.

    [0015] The queuing subsystem 110 takes the data items 107 as input and determines an order in which to assign them to processing units, as well as which processing units to assign them to. In order to determine a coarse-grained order of assignment for the various data items 107, the queuing subsystem 110 has a chunking module 112 that determines a chunking attribute value for each data item 107 and groups the data items into groups using the chunking attribute value. In some embodiments the chunking module 112 uses a single attribute of the data items 107 as the chunking attribute; in other embodiments, the chunking module 112 instead determines a value for the chunking attribute as an aggregate of values of other attributes of the data item 107. The chunking attribute is of a type whose values can be ordered in a sequence, such as dates/times, numbers, letters, or the like. The chunking module 112 uses different techniques in different embodiments to perform the chunking, such as identifying a sequence of equally-sized ranges of the chunking attribute value (e.g., consecutive ranges of a size determined by a time granularity interval for a chunking attribute based on date). In such cases, the range size is referred to as the chunk size. In other embodiments, the chunking module 112 uses a clustering algorithm, such as a k-means clustering algorithm, rather than fixed chunk sizes.

    [0016] The queuing subsystem 110 further includes a weighting module 113 that assigns, for each chunk, a weight to each data item within that chunk. The weight indicates an expected amount of time, or amount of processing operations, required to process the data item by the processing subsystem 120. In some embodiments, the expected amount of time is represented by the size in bytes of a data item, on the assumption that on average the time to process a data item varies linearly with its size in bytes. In other embodiments, the expected amount of time is computed with other types of techniques, such as by comparing to historical data about past processing times (e.g., by using a machine learning technique to train a model for estimating computation time based on attributes such as file size, file name, file type, user ID of user associated with the computation task, time/date of computation, or the like).

    [0017] The queuing subsystem 110 additionally includes an allocation module 114 that assigns the data items 107 of the chunks in order of their chunking attributes. For example, in some embodiments using date as the chunking attribute, the allocation module 114 first assigns the data items of chunks with more recent dates before those with less recent dates. Within a given chunk, the allocation module 114 assigns the data items of that chunk in order of their weights representing the respective expected amounts of processing time for the data items, such as with larger weights first.

    [0018] The assignment of a given data item to a processing unit of the processing subsystem 120 attempts to distribute processing load equally among all processing units. Specifically, the allocation module 114 initially assigns a load value of 0 to each processing unit, indicating that no work has yet been assigned to any of the processing units. When assigning processing of a data item to a processing unit, the allocation module 114 assigns the processing to the processing unit having the lowest current load value, and then adds an expected amount of processing time for the data item to that processing unit. In some embodiments, if the processing units differ in processing abilities, and the relative processing abilities are known, this may be taken into account by the allocation module 114 when adding the expected amount of processing time, as a form of normalization among the processing units. For example, if processing unit P.sub.1 has 50% greater processing ability than processing unit P.sub.2, and P.sub.2 has an average amount of processing ability among the processing units, then a task estimate that takes 15 units of processing time for P.sub.1 could be scaled by a factor of (1/1.5) when adding it to the task queue of P.sub.1, so that only 10 units of processing time are allocated to P.sub.1. This takes relative processor abilities into consideration when equally spreading the processing load. The processing subsystem 120 may additionally and/or alternatively distribute the processing load to minimize monetary cost (rather than processing time). As another example, in the case of cloud-based computation, if the cloud computation service charges different amounts, or has different cost structures (e.g., on-demand vs. reserved), for different processing units, this can additionally and/or alternatively be taken into account when distributing the load, e.g., distributing less of the computation to processing units with higher costs.

    [0019] In some embodiments, a preprocessing subsystem 115 converts the data items 107 to a format expected by the processing code running on the processing units 121. This provides a greater degree of abstraction by allowing client code to provide data in different formats, relying on the preprocessing subsystem 115 (rather than the code used by the processing subsystem 120) to handle such different formats. Such preprocessing may make place before, after, or concurrent with the queuing performed by the queueing subsystem 110.

    [0020] The processing subsystem 120 performs the desired processing operations on the data items, and is composed of the various processing units 121 to which the processing of the various data items 107 is assigned. The processing units 121 need not be individual processors, or even individual systems with multiple processors, but may also be logical processing units within a data center or cloud computing infrastructure, such as a Hadoop cluster of networked nodes used to perform parallel computations on the large data set made up by the data items 107.

    [0021] The processing units 121 of the processing subsystem 120 run code (e.g., code written by employees of the organization) that processes the data items 107 in whatever manner is desired. As one simple example in which the data items 107 are log files of financial transactions, the processing subsystem 120 might include code that evaluates the set of data items and computes the total count of transactions over different periods of time. The processing subsystem 120 may run any number of different code programs to perform the different analyses desired by the organization.

    [0022] The output of the processing subsystem 120 is stored within a storage subsystem 125. The formed of the stored output have a greater degree of structure than the original data items 107. For example, where the data items 107 may have been unstructured or semi-structured (e.g., purely textual data such as log files), the output in the storage subsystem 125 may be highly structured (such as in a relational database management system (RDBMS) form, or at least in a format that is more suitable for executing queries. In some embodiments, for example, the storage subsystem 125 includes a Hadoop Distributed Filesystem (HDFS) that can directly support Structured Query Language (SQL) queries.

    [0023] The consumer clients 130 are software programs executed by those who are interested in the results of the processing subsystem 120 that are stored in the storage subsystem 125, and who have been granted access to that data by the organization. As noted above, the consumer clients 130 may be internal clients that are part of the organization itself, or they may be external entities that have been granted access. The consumer clients 130 may use the data in the storage subsystem 125 for numerous purposes. As one specific example, a data scientist of the organization may write a program for a consumer client 130A that analyzes all the data items 107 over a particular time interval (where, as in above examples, the data items represent financial transactions), identifying data for financial transactions that failed, deriving features for those transactions, and providing the features to a machine learning training algorithm that produces a model for identifying transactions that are likely to fail. The data scientist can then employ the trained model to flag in real time the transactions that will likely not be completed and take action accordingly. As another specific example, code for a photo-sharing system might accept the upload of a large batch of photos, uploading them over the network and storing them on disk in order of creation date (newest to oldest), with photo-sharing website code rendering them to the user's web page as soon as they are processed and uploaded.

    [0024] Note that components illustrated in FIG. 1 as being within the organization system 101 need not be located within the physical control of the organization, such as within its internal network, but may instead be externally-administered components of other organizations. Thus, some or all of the physical or virtual infrastructure of the organization system 101 could be provided by cloud providers, rather than being part of on-premises data centers. Additionally, different logical component could be combined in a particular embodiment. For example, the data items 107 and the output of the processing subsystem 120 could all be stored within the storage subsystem 125. Similarly, any or all the processing components 110, 115, 120, and 130 could execute on the same system (e.g., within a virtual cloud system).

    [0025] Components of the organization system 101 may be connected by any computer network suitable communications network for data transmission. For example, the network may use standard communications technologies and/or protocols and can include the Internet. In another embodiment, the entities use custom and/or dedicated data communications technologies.

    [0026] FIG. 2 is a high-level block diagram illustrating physical components of a computer 200 used as part of the organization system from FIG. 1, according to one embodiment. Illustrated are at least one processor 202 coupled to a chipset 204. Also coupled to the chipset 204 are a memory 206, a storage device 208, a graphics adapter 212, and a network adapter 216. A display 218 is coupled to the graphics adapter 212. In one embodiment, the functionality of the chipset 204 is provided by a memory controller hub 220 and an I/O controller hub 222. In another embodiment, the memory 206 is coupled directly to the processor 202 instead of the chipset 204.

    [0027] The storage device 208 is any non-transitory computer-readable storage medium, such as a hard drive, compact disk read-only memory (CD-ROM), DVD, or a solid-state memory device. The memory 206 holds instructions and data used by the processor 202. The graphics adapter 212 displays images and other information on the display 218. The network adapter 216 couples the computer 200 to a local or wide area network.

    [0028] As is known in the art, a computer 200 can have different and/or other components than those shown in FIG. 2. In addition, the computer 200 can lack certain illustrated components. In one embodiment, a computer 200 acting as a server may lack a graphics adapter 212, and/or display 218, as well as a keyboard or pointing device. Moreover, the storage device 208 can be local and/or remote from the computer 200 (such as embodied within a storage area network (SAN)).

    [0029] As is known in the art, the computer 200 is adapted to execute computer program modules for providing functionality described herein. As used herein, the term “module” refers to computer program logic utilized to provide the specified functionality. Thus, a module can be implemented in hardware, firmware, and/or software. In one embodiment, program modules are stored on the storage device 208, loaded into the memory 206, and executed by the processor 202.

    [0030] Embodiments of the entities described herein can include other and/or different modules than the ones described here. In addition, the functionality attributed to the modules can be performed by other or different modules in other embodiments. Moreover, this description occasionally omits the term “module” for purposes of clarity and convenience.

    Other Considerations

    [0031] One possible embodiment has been described herein. Those of skill in the art will appreciate that other embodiments may likewise be practiced. First, the particular naming of the components and variables, capitalization of terms, the attributes, data structures, or any other programming or structural aspect is not mandatory or significant, and the mechanisms described may have different names, formats, or protocols. Also, the particular division of functionality between the various system components described herein is merely for purposes of example, and is not mandatory; functions performed by a single system component may instead be performed by multiple components, and functions performed by multiple components may instead be performed by a single component.

    [0032] Some portions of the above description present the inventive features in terms of algorithms and symbolic representations of operations on information. These algorithmic descriptions and representations are the means used by those skilled in the data processing arts to most effectively convey the substance of their work to others skilled in the art. These operations, while described functionally or logically, are understood to be implemented by computer programs. Furthermore, it has also proven convenient at times, to refer to these arrangements of operations as modules or by functional names, without loss of generality.

    [0033] Unless specifically stated otherwise as apparent from the above discussion, it is appreciated that throughout the description, discussions utilizing terms such as “determining” or “displaying” or the like, refer to the action and processes of a computer system, or similar electronic computing device, that manipulates and transforms data represented as physical (electronic) quantities within the computer system memories or registers or other such information storage, transmission or display devices.

    [0034] Certain aspects described herein include process steps and instructions in the form of an algorithm. It should be noted that the process steps and instructions could be embodied in software, firmware or hardware, and when embodied in software, could be downloaded to reside on and be operated from different platforms used by real time network operating systems.

    [0035] The concepts described herein also relate to an apparatus for performing the operations herein. This apparatus may be specially constructed for the required purposes, or it may comprise a general-purpose computer selectively activated or reconfigured by a computer program stored on a computer readable medium that can be accessed by the computer. Such a computer program may be stored in a non-transitory computer readable storage medium, such as, but is not limited to, any type of disk including floppy disks, optical disks, CD-ROMs, magnetic-optical disks, read-only memories (ROMs), random access memories (RAMs), EPROMs, EEPROMs, magnetic or optical cards, application specific integrated circuits (ASICs), or any type of computer-readable storage medium suitable for storing electronic instructions, and each coupled to a computer system bus. Furthermore, the computers referred to in the specification may include a single processor or may be architectures employing multiple processor designs for increased computing capability.

    [0036] The algorithms and operations presented herein are not inherently related to any particular computer or other apparatus. Various general-purpose systems may also be used with programs in accordance with the teachings herein, or it may prove convenient to construct more specialized apparatus to perform the required method steps. The required structure for a variety of these systems will be apparent to those of skill in the art, along with equivalent variations. In addition, the concepts described herein are not described with reference to any particular programming language. It is appreciated that a variety of programming languages may be used to implement the teachings as described herein, and any references to specific languages are provided for purposes of enablement and best mode.

    [0037] The concepts described herein are well suited to a wide variety of computer network systems over numerous topologies. Within this field, the configuration and management of large networks comprise storage devices and computers that are communicatively coupled to dissimilar computers and storage devices over a network, such as the Internet.

    [0038] Finally, it should be noted that the language used in the specification has been principally selected for readability and instructional purposes, and may not have been selected to delineate or circumscribe the inventive subject matter. Accordingly, the disclosure is intended to be illustrative, but not limiting, of the scope of the concepts described herein, which are set forth in the following claims.