SYSTEM AND METHOD FOR DATA ANALYTICS
20230334037 · 2023-10-19
Inventors
Cpc classification
International classification
Abstract
Systems and methods for storing and querying voluminous “big data” are described, with application to append-only hyperscale databases. The methods dispense with blob abstractions such as objects and file systems, instead storing data as addressable binary sequences at all stages of storage and querying. In particular, a microservices architecture system arrangement combines with a powerful data format to create an architecture for executing an external query using a distributed process.
Claims
1. (canceled)
2. (canceled)
3. (canceled)
4. (canceled)
5. (canceled)
6. (canceled)
7. A data record management method comprising the use of a key-value store, wherein the key value store utilizes a batch identifier as a key for an associated value, and wherein the associated value indicates a sequential range of logical block addresses on a plurality of storage media and other metadata information needed to locate, access, or manage records stored within the indicated range of block addresses.
8. A cascade method for applying a filter to a data record comprising providing machine-readable instructions causing one or more system modules to perform the following upon execution: Receiving a first input of an isolation mask represented by a first plurality of bytes; Receiving a second input of a matching mask represented by a second plurality of bytes; Receiving a data record; Executing an operation of applying the isolation mask to the data record to produce a first output; Executing an operation of applying the matching mask to the first output to produce a second output; and Determining whether or not the second output indicates that the data record passes through the filter based on a Boolean operation or other predetermined criterion.
9. The method of claim 8, applied in parallel to a plurality of data records.
Description
BRIEF DESCRIPTION OF THE DRAWINGS
[0066]
[0067]
[0068]
[0069]
[0070]
[0071]
[0072]
[0073]
[0074]
[0075]
[0076]
[0077]
[0078]
[0079]
[0080]
[0081]
[0082]
[0083]
[0084]
[0085]
[0086]
[0087]
DETAILED DESCRIPTION AND BEST MODE OF IMPLEMENTATION
[0088] Embodiments of the present invention will now be described in detail with reference to the drawings, which are provided as illustrative examples so as to enable those skilled in the art to practice the invention. Notably, the figures and example(s) below are not intended to limit the scope of the present invention to a single embodiment, but other embodiments are possible by way of interchange of some or all of the described or illustrated elements. A modern network data collection and telemetry system is used as an illustrative example, and does not limit the scope of the invention. The system and method are broadly applicable.
[0089] Wherever convenient, the same reference numbers will be used throughout the drawings to refer to same or like parts or steps. Where certain elements of these embodiments can be partially or fully implemented using known components, only those portions of such known components that are necessary for an understanding of the present invention will be described, and detailed descriptions of other portions of such known components will be omitted so as not to obscure the invention.
[0090] In the present specification, an embodiment illustrating a singular component should not be considered limiting. Rather, the invention is intended to encompass other embodiments including a plurality of the same component, and vice-versa, unless explicitly stated otherwise herein. Moreover, applicants do not intend for any term in the specification or claims to be ascribed an uncommon or special meaning unless explicitly set forth as such. Further, the present invention encompasses present and future known equivalents to the components referred to herein by way of illustration.
[0091] For example, various disclosed functions may be assigned to a plurality of different modules. However, a skilled artisan will immediately recognize that the scope of the invention includes a system wherein the relevant functions are distributed differently among the modules, for example, and that references to a singular module include one or more modules, even when such variations or embodiments are not explicitly described. Similarly, “records” may refer to individual records, blocks of records, or batches of records.
[0092] An improved system and method for data analytics is disclosed below. The features and advantages of the disclosed method and apparatus will become more apparent to those skilled in the art after considering the following detailed description in connection with the accompanying drawings.
[0093] By way of non-limiting examples, applications may include network telemetry, self-driving cars, astronomy, population census analysis, toll road system data, political election returns, various military applications, income tax calculation and processing, climate change analysis, facial recognition, pharmaceutical development, optimizing business performance, correlating worldwide scientific data on virtually any topic to eliminate statistical variations and discover new correlations, etc. Virtually any large-scale voluminous and at least partially structured data stream can be advantageously analyzed in the manner disclosed herein, particularly high velocity volumetric data.
[0094] This disclosure describes a novel and improved data analytics system and method applicable to large data systems. The system and method embodiments may comprise hardware, software, and methodology implementing various combinations of features such as: [0095] 1) A Format enabling a method for directly storing data of a single dataset in a physical medium without virtualization or extrapolation, constituting an optimized system for write once read many (WORM) times data used for complex analytics. A Record Storage Allocation System (RSAS) is an example of such a format, in accordance with some embodiments described herein. [0096] 2) A Data Storage Module for the management of pre-formatted physical storage media such as HDD, SSD, Flash, and/or RAM. The module includes the process of writing in a low trust reliability environment and filtered reading and streaming of stored data. An RSAS Media Management (RMM) is an example of such a data storage module, in accordance with some embodiments described herein; [0097] 3) A Distribution Module for clustering data storage modules and pre-formatted physical storage. The distribution module includes the processes of receiving the set of records of a hyperscale dataset, and distributing those records among the storage modules to create a high-availability distributed system where every node is an active system in a distributed process. The distribution module also distributes and tracks the placement of records of an append-only hyperscale dataset across multiple physical storage media or devices, for example in the form of sequentially stored non-serialized binary sequences such as records, blocks of records or small batches of records. An RSAS Cluster Controller (RCC) is an example of such a distribution module, in accordance with some embodiments described herein. [0098] 4) A Query Manager Module to receive an external query for select records or aggregates of records of a hyperscale dataset, and, in response to receiving the external query, generate or execute machine-readable instructions to break the external query down into machine-executable components. A Query Manager is an example of such a query manager module, in accordance with some embodiments described herein. [0099] 5) An RSAS Database Repository and Deployment System (RD2S): An RD2S stores and shares complete systems, modules, and/or algorithms used in an RSAS Data System, including the deployment of processes to RMMs, RCCs, IQSs, and standard compute process such as data ingest and visualization. [0100] 6) An Aggregation Module to aggregate a set of data records into an aggregate record, and to send the aggregate record to an at least one external target based at least in part on instructions received from a query manager module. An Interrupted Querying Systems (IQS) node is an example of an embodiment of at least some aspects of an aggregation module. An interrupted queuing system breaks the querying process into a distributed system similar to a microservices architecture wherein each function is a distributed single task process. In that manner, a single storage system can process data in many different ways. The IQS may include the capability to morph, fork, and process data in the stream at various IQS nodes. The IQS may be applicable, for example, to super aggregation and analytics such as p95 by an hour or distributed AI processing. [0101] 7) A data record management method comprising the use of a key-value store, wherein the key value store utilizes a batch identifier as a key for an associated value, and wherein the associated value indicates a sequential range of logical block addresses on a plurality of storage media and other metadata information needed to locate, access, or manage records stored within the indicated range of block addresses. [0102] 8) A cascade method for applying a filter to a data record comprising providing machine-readable instructions causing, for example, an RMM or other system module(s) to perform the following operations upon execution: [0103] a) Receiving a first input of an isolation mask represented by a first plurality of bytes; [0104] b) Receiving a second input of a matching mask represented by a second plurality of bytes; [0105] c) Receiving a data record; [0106] d) Applying the isolation mask to the data record to produce a first output; [0107] e) Applying the matching mask to the first output to produce a second output; and [0108] f) Determining whether or not the second output indicates that the data record passes through the filter based on a Boolean operation or other predetermined criterion. [0109] 9) The cascade method described in the preceding paragraph, applied in parallel to a plurality of data records.
[0110] HIGH PERFORMANCE DATA ANALYTICS SYSTEM—A data analytics system 100 in accordance with a preferred embodiment is illustrated in
[0111] The ingest process 102 takes in records received from a data source, where those records are, in preferred embodiments of the present invention, processed at least to normalize them and render them in a format usable by the system; an example of such a format is discussed in detail below.
[0112]
[0113]
[0114]
[0115] The foregoing high-level illustrations provide a simple conceptual understanding as to how the system modules could be implemented. In some preferred embodiments, a single Network IC specific to each module type above performs the primary function of the module as shown. In other embodiments, the Network IC design shown in
[0116] Referring to
[0117] A distribution module 110 receiving data from the ingest process distributes the ingested data to one or more of the storage modules, each of which is in turn connected to at least one block storage device. In some preferred embodiments, the distribution module is designed to designate one or more storage modules to which to forward a received record for storage, and/or to create a metadata record of the storage locations of all of the records that the distribution module forwards for storage. In addition, the distribution module may create a hash chain of the forwarded records to use to verify and maintain the state of the one or more storage modules. In some embodiments, when the distribution module receives a machine-executable component of the external query, it is designed to designate at least one storage module to which to send the machine-executable component.
[0118] The function of a storage module 111 is to store the data it receives in block storage 103, such that the ingested records are not conformed to a file format, and to make a record of the location of where the ingested records are stored. For example, the storage module may store a received record as a non-serialized binary sequence directly in a sequential memory location or block of a block storage device, without any serialization or file construct. In some embodiments, the storage module may create a metadata record identifying the storage device and the first sequential memory location or block of the storage device on where the record was stored.
[0119] In some embodiments, the one or more storage modules are designed to respond to receiving a machine-executable component of an external query by retrieving records, facilitated by the metadata record. For example, a storage module may read the metadata record identifying the storage device and the first sequential memory location or block on the storage device where the received record was stored, read memory locations or blocks of the identified block storage device sequentially, and use a bitmask filter of the machine-executable component to select records to be sent to the at least one external target, based on the metadata record.
[0120] Records are retrieved from block storage by means of a query. In the process of executing a query to return a query result, the system employs Aggregation Modules 112 configured to aggregate a set of data records into an aggregate record to be sent to an external target such as a Data Processor 103. In some embodiments, an aggregation module is designed to respond to receiving a machine-executable component of a query and records from a hyperscale dataset, create and aggregate record, and to select at least one aggregate record to forward to the external target. Because the records are never serialized to a file format, they can be stored and retrieved efficiently without the use of objects or file constructs. In some other embodiments, the aggregate record is forwarded to the external target using remote direct memory access (RDMA).
[0121] A query manager module is designed to receive an external query for select records, or for aggregates of records, of a hyperscale dataset. On receipt of the external query, the query manager module generates or executes machine-executable instructions to break the query down into machine-executable components to create a distributed process, as opposed to, for example, running a parallel shard query on a partitioned table. The query manager module creates the distributed process for running the external query by generating bitmask filters for other system modules to use to filter and select records or aggregates of records of a hyperscale dataset, manages the aggregation of records thereby selected, and designates an external target to which to send the aggregate record.
[0122] For example, in some preferred embodiments the query manager module generates machine-executable components including a filtering component such as a record filter in the form of a bitmask filter, to be used to select a record and to designate a target distribution or storage module to which the selected record is to be sent, and an aggregating component such as an aggregate record filter in the form of a bitmask filter, to be used to select an aggregate record, and to designate the at least one external target to which the selected aggregate record is to be sent. In another embodiment, the external target is a worker instance of a distributed process, such as artificial intelligence (AI) or machine learning (ML). In some other embodiments, the query manager module is connected to an external data processor designated as an external target, and configured to provide a final aggregation of records, or of aggregates of records, selected by a query, into a single result set.
[0123] The modules combine to create a scalable system easily expanded by merely installing additional system modules, such that an increasing volume of data may be accommodated, for example by using the following method.
[0124] First, a plurality of system modules collectively configured in a microservices architecture similar to the one described above for storing and querying records or aggregates of records of a hyperscale dataset of aggregate size greater than one terabyte at rest must be provided and communicably connected to a data source, for example a network telemetry data stream or hyperscale database. The system modules are configured to received machine-executable instructions which, upon execution, result in the following method steps being performed. In some preferred embodiments, the hyperscale dataset is an append-only set of data records formatted in a standardized record schema.
[0125] A distribution module receives a set of data records of a hyperscale dataset from an ingest process and distributes the set of data records to an at least one of the one or more storage modules. In some preferred embodiments, the distribution module designates one or more storage modules to which to forward a received record for storage, and/or to create a metadata record of the storage locations of all of the records that the distribution module forwards for storage. In addition, the distribution module may create a hash chain of the forwarded records to use to verify and maintain the state of the one or more storage modules. In some embodiments, when the distribution module receives a machine-executable component of the external query, it designates at least one storage module to which to send the machine-executable component.
[0126] A storage module manages the storage of the set of data records as non-serialized binary sequences to at least one of a plurality of communicably connected block storage devices, without serialization to a file construct. In some embodiments, the storage module may create a metadata record identifying the storage device and the first sequential memory location or block of the storage device on where the record was stored.
[0127] In some embodiments, the one or more storage modules respond to receiving a machine-executable component of an external query by retrieving records, facilitated by the metadata record. For example, a storage module may read the metadata record identifying the storage device and the first sequential memory location or block on the storage device where the received record was stored, read memory locations or blocks of the identified block storage device sequentially, and use a bitmask filter of the machine-executable component to select records to be sent to the at least one external target, based on the metadata record.
[0128] An aggregation module aggregates the set of data records into an aggregate record and sends the aggregate record to an at least one external target based at least in part on instructions received from an at least one of the one or more query manager modules. In the process of executing a query to return a query result, the system employs Aggregation Modules 112 to aggregate a set of data records into an aggregate record to be sent to an external target such as a Data Processor 103. In some embodiments, an aggregation module response to receiving a machine-executable component of a query and records from a hyperscale dataset, creates and aggregate record, selects at least one aggregate record to forward to the external target. Because the records are never serialized to a file format, they can be stored and retrieved efficiently without the use of objects or file constructs. In some other embodiments, the aggregate record is forwarded to the external target using remote direct memory access (RDMA).
[0129] A query manager module receives an external query for select records or aggregates of records of a hyperscale dataset, and, in response, breaks the external query down into machine-executable components, wherein breaking the external query down includes generating at least one bitmask filter and sending the bitmask filter to an at least one of the system modules, wherein the at least one of the system modules uses the at least one bitmask filter to select records of a hyperscale dataset or aggregates thereof, and designates the at least one external target to which the aggregate records should be sent.
[0130] For example, in some preferred embodiments the query manager module generates machine-executable components including a filtering component such as a record filter in the form of a bitmask filter, to be used to select a record and to designate a target distribution or storage module to which the selected record is to be sent, and an aggregating component such as an aggregate record filter in the form of a bitmask filter, to be used to select an aggregate record, and to designate the at least one external target to which the selected aggregate record is to be sent. In another embodiment, the external target is a worker instance of a distributed process, such as artificial intelligence (AI) or machine learning (ML). In some other embodiments, the query manager module is connected to an external data processor designated as an external target, and configured to provide a final aggregation of records, or of aggregates of records, selected by a query, into a single result set.
[0131]
[0132]
[0133] RECORD STORAGE ALLOCATION SYSTEM—A Record Storage Allocation System (RSAS) comprising a format for storing data into either volatile or nonvolatile memory is disclosed. The RSAS is a novel and unique method which addresses a long-felt need presented by the real-world challenges of modern large-scale pervasive data collection and analysis and analytics applications. The RSAS format has initially been optimized for use in scaled datasets over 1 TB in size at rest on directly-addressable memory location storage media types such as Solid-State Disk (SSD) or Random-Access Memory (RAM). However, the RSAS concept and format are generally applicable, and by no means limited to those media.
[0134] Recognizing that each hyperscale data-based application may fully consume a given storage resource, an RSAS addresses both the storage media and the data stored thereon as a non-shared resource which is only consumed by a single data manipulator. Further, an RSAS addresses the nature of both structured and semi-structured. Rather than treating semi-structured data as a blob within a containing structured data element, RSAS treats it as an allocation of storage space mapped from the structured portion of the data.
[0135] In an RSAS format, unstructured data is put into a referenceable framework and/or related to other metadata directly or indirectly. With that understanding, an RSAS system addresses unstructured data as a component of semi-structured data.
[0136] While an RSAS is designed with a dataset in which records are a mix of structured and semi-structured data in mind, it can be used to store records with a homogeneous data format. In such purely homogeneous cases, an RSAS may optionally use the RSAS elements designed for a record's data format which is not in use. Specifically, an RSAS may divide storage locations into one or more headers, structured data, and semi-structured data. An RSAS neither defines nor excludes encryption and or compression. In some embodiments, encryption and or compression may be a function of the application which is using the dataset.
[0137] For the purposes of this disclosure, a “header” is an array of fixed byte groupings which may be used to define the structure of the RSAS for a given storage unit. In a preferred embodiment, an RSAS-formatted physical or logical storage unit is assigned a unique header using the first bytes of the storage unit. The same header may be repeated, for example using the last bits of the storage unit. Optionally, additional copies of the header may be stored at other locations within the storage unit.
[0138]
[0139] The RSAS header of
[0140] In this example, the RSAS header comprises a Cluster ID 920 which defines the group of one or more RSAS storage units to which all format and data of a dataset is replicated and a Storage Unit ID 930 which uniquely identifies a single RSAS storage unit.
[0141] The RSAS header example illustrated in
[0142] In some embodiments, the semi-structured data portion of a single record may utilize some or all of the bytes of one or more clusters in such a manner that no cluster holds the semi-structured data portions of more than one record.
[0143] The bytes containing information concerning storage unit size and availability 940 further comprise 2 bytes associated with a Next Open SD Cluster which points to the memory location where the next structured data portation of a record should be written, and 2 bytes associated with a Next Open SSD Cluster which points to the memory location where the next semi-structured data portation of a record should be written.
[0144] Optionally, referring to
[0145] Optionally, in some embodiments, the header associated with the RSAS may comprise a Structured Data Field Template 960 which may in turn comprise a Template Length which represents the number of bytes used by the Data Field Template including that element, or the count of the Data Fields defined in the Data Field Template.
[0146] A sample Structured Data Field Template 1060 for inclusion in RSAS header 900 is shown is
[0147] The RSAS header may further comprise bytes associated with a Field ID 300 as shown in
[0148] In some embodiments, an RSAS-formatted physical or logical storage unit may optionally comprise statically or dynamically defined memory locations for the structured data portions of a record in the dataset. Such data could be stored in byte sequence as defined in the header without separators or definition. An RSAS may define an optional hash verification for the record, and/or the structured data portion of the record and/or the semi-structured portion of the record. That check could be used as an integrity check for the record.
[0149] As shown in the preferred embodiment 1200 of
[0158] In some embodiments, an RSAS-formatted physical or logical storage unit may optionally comprise statically or dynamically defined memory locations for the semi-structured data portions of a record in the dataset 1280. The precise location of each semi-structured data portion of a record should be defined by the fields of the structured data portions of the same record.
[0159] CLUSTERING OF RSAS-FORMATTED STORAGE FOR STORING AND QUERYING LARGE-SCALE DATASETS—Data collection and telemetry systems which produce trillions of records in mixed structural formats present a challenge in terms of storing and querying the data. Storage formats such as the RSAS described above increase the performance of a single storage unit with such large-scale datasets. However, they may not directly address the resiliency and availability of the stored data.
[0160] Using an RSAS Cluster Controller (RCC) to control a local or distributed cluster of RSAS-formatted storage media and their associated operating systems solves that problem. An RCC can replicate the data, maintain a synchronized state across all storage media and distribute queries among RSAS-formatted storage within the cluster. In that manner, an RCC can maintain a high level of system performance through resiliency and availability while shielding the core data analytics system from the pitfalls of other systems with rigid standardized fragmentable file formats and applications running on various platforms or operating systems.
[0161] A RSAS Cluster Controller (RCC) maintains resilience and integrity of the stored records in a cluster of RSAS-formatted storage. In preferred embodiments, the RCC achieves that by replicating new data to all RSAS-formatted storage in the cluster using verification feedback with each storage participant. The integrity states among the storage units are maintained by the RCC through the use of chained transaction signatures. RCC can then bring any storage participant whose state is out of sync back to the correct state, either incrementally (repair) or through full replication (replacement).
[0162] In some embodiments, query performance may further be enhanced by utilizing an RCC to distribute queries between RSAS-formatted storage media in the cluster. Such a distribution ensures the parallel processing of queries with overlapping execution processing. Using that method, the read request blocking and/or queuing inherent to queries attempting to access the same physical storage medium simultaneously is avoided, until the number of parallel queries exceeds the number of storage systems in the cluster. Performance may be further optionally enhanced by the RCC performing processing of the data records meeting the queries filters upon retrieval from storage. In some embodiments, that processing may take the form of producing data structures matching the requested aggregation and/or metrics of a query, which are returned to the requestor.
[0163] The use of an RCC provides a method for both performant handling of multiple queries, and for ensuring the integrity and availability of the records in a large-scale dataset. As a method of clustering RSAS-formatted storage, an RCC is intentionally designed to bring durability in performance, accessibility, and integrity of hyperscale data-based applications which fully consume a given RSAS-formatted storage resource.
[0164] In the embodiment shown in
[0165]
[0166] In some embodiments, the Pre-Ingest process may also manipulate or perform work on the data. An example of data manipulation might be the mutilation of a data record to include additional data, removal of data elements, or changing the values of data elements. An example of work performed on data could include additional stream processing through a query engine, AI or ML application which produces additional metadata on some, part, all, or a segment of the data handled by a Pre-Ingest process. Additionally, the Pre-Ingest process may add a Namespace identification to the data record. If allowed by the datasource or through other means such as a load balancer there may be multiple Pre-Ingest processes running at the same time to meet the needs of the overall application. Each Pre-Ingest process may or may not be providing identical functionality. While a Pre-Ingest process may forward to multiple Ingest 1303 processes to which it and or other Pre-Ingest processes are forwarding data records to, the Pre-Ingest process should only forward an individual data record to only one Ingest process.
[0167] The main function of an Ingest node is to determine to which RSAS Cluster Controller (RCC) 1304 a data record should be forwarded. That could be determined by a user definable algorithm such as round robin, time based, or based on value or range of values of one or more elements of the data record. The Ingest process may branch or loop the data records through additional processing such as AI or ML which may produce data which is either independently routed to an external system, added to the data record(s), used to modify the data record(s), or replace the data record(s). The Ingest process may add an element to the data record as a super index identifying the group to which that data record was routed. The Ingest process also calculates a cryptographic hash of the data record, which it may add to the data record as an additional element or otherwise reference to the data record. The data record, including hash, is then forwarded to the selected RCC by the Ingest process.
[0168] Referring to
[0169] The RCC next calculates a crypto hash called the chain hash 1525. It does that by calculating a crypto hash using the hash of the data record with that of the last calculated chain hash. The RCC may keep a copy of the hashes in persistent storage. The chain hash may be added to the data record as an additional element or otherwise reference to the data record by the RCC. The RCC also has the function of distributing query requests to an RMM for fulfillment. That may be done using an algorithm such as round robin. In some preferred embodiments, the RCC also keeps an index of namespaces and superindexes even if they are added by another process.
[0170] The RMM 1505 embodies the function of storing and reading data records to and from persistent storage, respectively. When new data records arrive at an RMM, it initially calculates a crypto hash 1530 on the data record without reference to the hashes calculated by the Ingest process or the RCC 1504 process. As with the RCC described above, the RMM also calculates a crypto hash called the chain hash 1535 by calculating a crypto hash using the hash of the data record with that of the last calculated chain hash. The RMM compares the hashes it has calculated with those provided by the RCC 1540.
[0171] In the case the hashes do not match the record is dropped and an error is generated which should be logged, and may result in a notification to the RCC 1555. If the hash for the data record does not match, the RMM may request or be sent a new copy of the data record from the RCC. If the hash for the data record matches but the chain hash does not, the RCC may audit the chain history of the RMM and require the RMM to overwrite all data records with a non-matching chain hash from another RMM. If both hashes match the RMM will write 1545 the data record 1550 to an RSAS-formatted persistent storage medium 1311 of
[0172] In some embodiments, the RMM 1505 may optionally keep an index of namespaces and superindexes even if they are added by another process. After writing, the RMM will notify the RCC 1504 by sending it both calculated hashes. The RCC will then compare the hashes to the ones it created 1555. If either hash does not match the RCC may audit the chain history of the RMM and require the RMM to overwrite all data records with a non-matching chain hash from another RMM.
[0173] By way of non-limiting example, in lieu of imposing a predetermined structure on the records to allow for the data within the record to be formatted for use within system memory, such as CSV or Parquet file structure formats, the data can be written as uniform sequential bites to an RSAS-formatted persistent storage medium. For instance, a dataset comprising multiple fields may be written such that the fixed number of bytes of the first record's first field is stored, followed by the fixed number of bytes of the second field, and so on, for all fields of a record in the dataset. Without a preamble or separator, the bytes of the next record are stored following the previous record. Using a form of binary reduction, a block of this uniform sequential data corresponding to the size of the addressable memory and transfer bandwidth can be processed by matching it to a smaller set of byte arrays that act as a mask and filter for each record byte array.
[0174] Such a binary reduction may be accomplished by dividing the number of bits in each record into one or more uniform segments of SL length. The mask and filter byte arrays are divided into segments of SL length, and each segment is identified as MSn and FSn for the mask and filter. The block of bytes read from storage is then also divided into segments of SL length, with each segment labeled SSn. By way of example, the following compound bitwise operations may be performed on each SS segment to filter records with an equality query:
SSn & MS[n−└n/number of segments in a record┘*number of segments in a record]{circumflex over ( )}FS[n−└n/number of segments in a record┘*number of segments in a record]
[0175] If the results of the bitwise operations on all segments of a record equal 0 then the corresponding original segments read from storage are deemed matching and included in the result set. This reduction may occur in a loop, in parallel, or looping parallel execution. By processing the data as presented in the transfer stream, a system may locate matching records more efficiently when reading from less as opposed to targeting larger blocks of data such as in persistent data storage.
[0176] One of the challenges of storing data records directly to block storage is creating an index that does not require the full storage unit to be accessed. The investors clam a system which allows for more direct location access of data records stored in block storage through the use of such an index. Records are received by the storage process identified in batches of one or more records as stored at sequential logical block address in a block storage device such as an SSD. A Batch_ID may be a common element to all records in the batch. This may include a simple incrementing number, a starting timestamp, or a hash of some common value for the same field in a record. The index is a non-recessive and non-nested key value store which uses the batch_id as key with its value being a minimum of the starting LBA and the number of sequential blocks or the last LBA used on the block storage device to which records of that batch id are stored. The value can be extended with any additional amount of data such as timestamp, time to live, expiration timestamp, and or storage device id.
[0177] The index is used by extracting or otherwise computing one or more batch_ids from a query then looking them up in the index key value store. The LBA range and any additional metadata about the batch retrieved from the index key value store is then used to read records from the referenced LBAs on a default or otherwise indicated block storage device. The key value store may be ephemeral or persistent and may run fully or partly in volatile system memory.
[0178] In some preferred embodiments, data is retrieved from persistent storage by way of query. Referring back to
[0179] An RMM 1305 which has received a raw data record filter query step from a RCC streams matching records into the IQS. As data is compiled through the IQS it is streamed to the originating Post Process function. This function may execute one or more of several steps including but not limited to continued processing of the data, forwarding the data to another function in the Post Process process, exporting of the data records to an external system, or sending the data to the Data Visualization Engine.
[0180] The formula function of a step is a user-defined function such as batch aggregation (Sum, Minimum, Maximum, Average, . . . ), or more complex functions such as adding a virtual element based on a formula using the one or more elements of a data record as its put. A branch function sends a copy of the resulting data record to a secondary independent function after the formula and filter functions in the step.
[0181]
[0182] For illustration purposes,
[0183] MULTI-DIMENSIONAL SCALING QUERY ENGINE—Modern database query engines tend to exhibit monolithic architectures in which a single entity executes a single process on a single set of compute resources. Much as crafted product production is scaled up by adding additional craftsmen, performance is driven by using parallelization processes in which each query engine works on a chunk of data.
[0184] To understand the effects a query engine has on performance it is important to first understand the nature of the data on which it is operating. Hyperscale datasets are not only large, but by their very nature tend to be uncapped data streams with no defined upper limit. As a result, the structures of the datasets are isomorphic and the individual data records are immutable, sometimes referred to as Write Once Read Many (WORM) data. State changes in data record are recorded to the stream as a new record. In some cases, a process called tombstoning is used to mark no longer valid data records, but as immutable data records they are never updated or removed individually. Consequently, all Hyperscale datasets have a temporal super index. That cornerstone of the RSAS format structure allows it to support single direction read from storage media.
[0185] The uncapped nature of the streaming datasets dictates that storage and long-term validity may inevitably become issues as well. The most common method of dealing with that issue to use a process to “age out” obsolete or unwanted data. Typically, data older than some period of time, which in some embodiments may be a predetermined data retention period, is removed from storage or allowed to be overwritten. In some embodiments, the purging of data may be performed at predetermined data purge intervals.
[0186] Most query engines employ the scale out approach of query management. In that scheme, a super and or primary index is used to generate some number of copies of the query, where each query has been modified to filter to only a small section of the original queries selected super and or primary index range, thereby allowing for parallelization of the original query. Those queries are then sent to child databases that process the queries one at a time, returning the result to the master query engine, which in turn recombines them into a single results dataset. That architecture works by ensuring a smaller results dataset is produced by individual systems for each query segment, preventing issues common to limited resources as the query is executed, and by allowing for more than one query process to run simultaneously. While no one process runs any faster, the legacy scale out query management approach can operate to reduce overall query processing time.
[0187] In some preferred embodiments with features similar to those shown in
[0188] Referring to
[0189] If found the RMM 2005 will scan 2004 the RSAS formatted media 2011 connected to it for the structured portion of data records using the index as a reference, or scanning all data records for one that match the element filter(s) of the structured components. Because the structured parts of all data records have the same structure, the increment in Bytes that must be read between elements and records is always a fixed distance allowing for a scan to jump from one memory location to the next without a lookup and in a single direction.
[0190] In a preferred implementation, the RMM would use a persistent Key-Value Store to maintain an index of namespaces and superindexes as illustrated in
[0191] As each request is made to the block storage device (2225) the device fills a storage buffer (2230) managed by the host system. In a preferred implementation, the bytes in the buffer (2230) are copied to a network buffer (2240) through a set of bitwise operators embedded in a parallel binary reduction structure that act in a similar logical fashion to Ternary Content-Addressable Memory.
[0192] If matching data records have an attached blob, it is retrieved 2045, 2050 and any additional binary filtering matching is applied 2055. The complete matching post-filter data record which has passed all filters is directly streamed to the IQS 2009. If a query identification was also provided for this query, that identification is also sent with the data record.
[0193] After the RMM 2005 has scanned all records in the range, it will send a complete message to the IQS along with any query identification, which may be used as an event trigger by an IQS function. The IQS is composed of a series of autonomous functions stacked in layers. The output from one or more functions in one layer is sent to the input of a function in the next layer until the data is provided to an external system 2080 or the Post Process 2015. All IQS functions in each layer receive the same query component step from the QM 2008.
[0194] A query component step contains a binary math formula and filter steps, which may include a final branching secondary function that is executed by the IQS function. If a query identification is provided data records which have been identified with the same query identification are passed to that secondary function. In this way a single system can operate more than one query at a time.
[0195] The IQS function completes this by first executing any basic non-floating-point-intensive operations 2060. If there are floating-point-intensive operations, the IQS function then executes those 2065. The resulting data record is compared to a filter 2070 and any data record which matches the filter may be passed to a secondary function 2075 which provides additional treatment to the data record(s) before sending it to an external system and or on to the next IQS function layer 2085. If no secondary function is defined, the data record is directly forwarded to the next IQS function layer.
[0196] DISTRIBUTED ROUTE ANALYTICS—The purpose of route analytics is to help network operators understand how routing affects traffic within their network in all three temporal windows: past, present, and future. A number of protocols and telemetry types are used including routing data, configuration data, and network flow data. Historically this has been a major challenge for the industry as modeling complex networks can be a horrendously or even prohibitively arduous task given the amount of data involved. The task has become so daunting that route analytics in general have fallen out of favor, even though there has also been a resurgence in demand for predictive analysis traffic engineering, one of the outputs of route analytics tools. With a hyperscale data system, a new type of route analytics tool can be built comprising a set of post processor 1306 functions. As shown in
[0197] These functions work with data from the hyperscale data system data store 1712 which was collected from routers 1720. The data collected from routers are route advertisements 1745 such as BGP or OSPF and Network Flow such as sFlow or IPFIX 1708. Optimally an API connection to the router such as NetConf is used to collect data 1707 from the router's Forwarding Information Base 1730 as well.
[0198] When requested by an outside process such as the Data Visualization 1710 function of the hyperscale data system or via another application or user 1707, the MQRE 1760 uses the collected data to perform route analytics. Requests to the MQRE can come in one of three forms: traffic modeling where the nature of the traffic as reported by network flow records is modified, route modeling where a new route is added to the network, or both. All modeling requests to the MQRE should include a network flow query for the time and optional traffic it would like to use from the hyperscale data system's data store. In the case of traffic modeling the MQRE prepares a data transformation template to be used with the network flow data retrieved from the hyperscale data system's data store. In the case of route modeling the MQRE prepares a template of route withdraws and route injections. If there are any prepared route changes the MQRE will initiate 1 Route Engine 1765 function per physical router in the modeled network. The information about the modeled network is externally provided by the user or through automation and includes information about the routing devices and how they are physically connected to each other.
[0199] Each Route Engine 1765 is loaded with the chosen state of the router it represents. In order of preference, the chosen router state is FIB 1730 information or alternatively RIB 1725 as presented by Routing announcements 1745. Using a process the same as or similar to that of the router which a particular Route Engine represents, that particular Route Engine builds a new temporal profile of the router's FIB using the route modeling template created by the MQRE 1760. The resultant routing temporal profile is then sent back to the MQRE process.
[0200] Upon receipt of the routing temporal profiles from the Routing Engine, the MQRE 1760 provides the temporal profiles and the information about the modeled network to the Route Graph Engine 1770. Using that information, the Route Graph Engine calculates a graph of the relationships between routers over the time period being modeled. The calculated graph is returned to the MQRE for later use.
[0201] The Route Graph Engine 1770 then initiates Flow Engines 1775 for each router in the modeled network for which it has a temporal profile. The Route Graph Engine provides to each Flow Engine its relationship to its neighbors, after which it instructs all Flow Engines to initiate the traffic modeling process.
[0202] In the traffic modeling process, Flow Engines 1775 which were identified as having an exterior connection to the graph make a request to the MQRE 1760 for the data transformation template and a stream from the hyperscale data system data store using the query provided to the MQRE. Using the graph information from the Route Graph Engine 1770, the Flow Engine filters incoming network flow records from the MQRE. Network flow records which pass the filter are then processed through the data transformation template (adding, dropping, and modifying network flow records as needed).
[0203] Using the graph information from the Route Graph Engine 1770 again, the Flow Engine 1775 identifies which Flow Engine or point external to the graph to which it should forward to the network flow record. Before forwarding the network flow record, the Flow Engine appends (“tags”) its own identification to the network flow record. If the forwarding destination is a point external to the graph, the network flow record is returned to the MQRE 1760.
[0204] When all of the network flow data has been fully processed by the Flow Engines 1775, the MQRE 1760 forwards the graph calculated by the Route Graph Engine 1770 and the aggregate data set of network flow records generated by the Flow Engines to the original requester, the Data Validation engine 1710 or other actor 1707. The original requestor may make additional and recursive requests to the MQRE to model additional changes including changes based on the output from the MQRE on successive requests.
[0205] METRIC BUS SYSTEM—In some embodiments, a modular system architecture may be employed in the course of data ingestion. Each module may be associated with a certain data type, such as SNMP or Flow in the case of network analytics. Each module may operate as an independent bus for delivering copies of datagrams to subscribing publishers in native format.
[0206] Some embodiments may comprise a data management and high-performance data analytics platform for hyperscale datasets with at least some of the following features:
[0207] A format for storing data records of a structured and or semi-structured dataset directly in logical or physical storage unit(s) such as volatile and/or nonvolatile memory. [0208] (a) Where records are not grouped or encapsulated into files which are stored using a file system designed to share the memory of a physical or logical storage unit. [0209] (b) Where a logical or physical storage unit is not used to store unrelated data and or other purposes.
[0210] Use of a predefined fixed order of grouped bits and or bytes of a physical or logical storage unit. [0211] (a) Where the predefined fixed order of grouped bits and or bytes are located relative the storage unit at the top, bottom, and or middle [0212] (b) Where the predefined fixed order of grouped bits and or bytes identify the storage unit. [0213] (c) Where the predefined fixed order of grouped bits and or bytes identify the dataset. [0214] (d) Where the predefined fixed order of grouped bits and or bytes define the fixed order of grouped bits and or bytes used for storing structured data elements of each record in the dataset.
[0215] Use of a consecutive repeating fixed order of grouped bits and or bytes of a physical or logical storage unit. [0216] (a) Where each repetition contains structured data elements from a record. [0217] (b) Where some of the groups of bits or bytes optionally have the value of an element of the record. [0218] (c) Where each group is directly created and or accessed by an application to which it has meaning, value, or use from the storage unit. [0219] (d) Where each repetition may or may not contain a group of bits or bytes pointing to a memory location where additional structured or unstructured data for that record are located. [0220] (e) Where optionally a group of bits or bytes in each repetition represent the status of the record and or gives notice of one or more bad bits or bytes in the area covered by that repetition or in the area of the physical or logical storage unit where the semi structured data portion of data records are stored.
[0221] A format for storing data on a physical or logical storage unit which implements an integrity check for the datastore, each record, and or record component. [0222] (a) Where the integrity check provides a means of state synchronization between storage units. [0223] (b) Where the integrity check provides a means of verify the integrity of the record and its place in the dataset.
[0224] A distributed query architecture for providing a composite dataset from stored data to a user or system. [0225] (a) Where query components are broken down to distinct and independently operated filter or filter, aggregation or filter, aggregation, and analysis microservices. [0226] (b) Where independent data storage devices read data records matching a filter and stream them to microservice in the distributed query architecture. [0227] (c) Where microservices in the distributed query architecture perform discrete aggregation and or filter functions on the data records in the stream. [0228] (d) Where one or more microservices in the distributed query architecture may stream their output(s) to another microservice in the distributed query architecture. In some embodiments, the output may assume the form of relatively small “data micro-batches” consisting for example of aggregate and/or filtered data records representing a dataset, where the data micro-batches may originate from within a distributed process prior to assembly of a composite query result. For instance, each of the integrated circuits or functional modules discussed herein may represent a node of the distributed process from which partial or preliminary data may be extracted. [0229] (e) Where a microservice may make use of one or more artificial intelligence accelerators including but not limited to Application Specific Integrated Circuit, Flash Programmable Gate Array, Graphics Processing Unit, System on Chip, Tensor Processing Unit, Dataflow Processing Unit, Matrix Processing Engine for the purpose of training and or inference of artificial intelligence and or machine learning algorithms and or models.
[0230] A package management system to store, deploy, and manage component elements of a high-performance data analytics system. [0231] (a) Where a package is a software archive of one or more discrete component elements used in the operation of a high-performance data analytics system. [0232] (b) Where a package component element may be a dataset schema or instructions identifying structures, and unstructured elements in each data record of the dataset. [0233] (c) Where a package component element may be an algorithm for preparing and or enhancing a data record. [0234] (d) Where a package component element may be a microservice in the query process. [0235] (e) Where a package component element may be a data processing service including but not limited to serverless functions, no or low code applications, or container image. [0236] (f) Where a package component element may be a data vitalization definition. [0237] (g) Where a package component element may be configuration information for all other elements in the same package.
[0238] In one or more exemplary embodiments, the functions and processes described may be implemented in hardware, software, firmware, or any combination thereof. If implemented in software, the functions may be stored on or transmitted over as one or more instructions or code on a computer-readable medium. Computer-readable media include both computer storage media and communication media including any medium that facilitates transfer of a computer program from one place to another.
[0239] Although the description provided above provides detail for the purpose of illustration based on what are currently considered to be the most practical and preferred embodiments, it is to be understood that such detail is solely for that purpose and that the disclosure is not limited to the expressly disclosed embodiments, but, on the contrary, is intended to cover modifications and equivalent arrangements that are within the spirit and scope of the appended claims. For example, it is to be understood that the present disclosure contemplates that, to the extent possible, one or more features of any embodiment can be combined with one or more features of any other embodiment.
[0240] Moreover, the previous description of the disclosed implementations is provided to enable any person skilled in the art to make or use the present invention. Various modifications to these implementations will be readily apparent to those skilled in the art, and the generic principles defined herein may be applied to other implementations without departing from the spirit or scope of the invention. Thus, the present invention is not intended to be limited to the features shown herein but is to be accorded the widest scope consistent with the principles and novel features disclosed herein.