SYSTEM AND METHODS FOR PROCESSING LARGE SCALE DATA
20170364558 · 2017-12-21
Inventors
Cpc classification
International classification
Abstract
A clustered system is provided for querying large amounts of data at fast speed allowing for variable sampling and speculation to speed up subsequent queries. An API using actor messages is provided to the user to be able to send SQL queries, the desired sample rater and the cube schema in which the user believes all queries in this session should fit. The underlying data store is agnostic and can utilize any system that supports aggregation.
Claims
1. A system for importing data, comprising: a master application in communication with a configuration management tool; and at least one slave application; wherein the master application executes a query through the at least one slave application.
Description
BRIEF DESCRIPTION OF THE DRAWINGS
[0012] In the following descriptions of the drawings and exemplary embodiments, like reference numerals across the several views refer to identical or equivalent features, and:
[0013]
[0014]
[0015]
[0016]
[0017]
[0018]
[0019]
[0020]
[0021]
DETAILED DESCRIPTION OF THE EXEMPLARY EMBODIMENTS
[0022]
[0023] The communication between the master application and slave applications is illustrated in
[0024] The master application 10 also manages query speculation, by generating speculation queries and running them against the slave applications 14. The results of the speculation queries are executed and cached by the slave applications 14. Speculation queries may be executed until a real-time user query is received by the master application 10. Any speculation queries in process at a slave application 14 are halted upon receipt of the next user query. If a user query matches a speculation query, the result is obtained from the cached results. This process allows for reduced processing times.
[0025] Speculation may be achieved through the use of an algorithm. The algorithm anticipates what will be done next using histograms of the data that are generated when data is inserted and the cube schema that the user has provided as part of the query session. The algorithm used may be that developed by The Ohio State University (referred to as the “DICE” algorithm), shown below, or it may be another algorithm.
TABLE-US-00001 DICE Execution Strategy: DICE-PLAN-DETERMINE(Q.sub.Spec, CF) 1 P.sub.Spec ← GET-SPEC-PROBABILITIES(Q.sub.Spec) 2 PS.sub.Spec ← SIBLING-ADJUSTMENT(P.sub.Spec, CF) 3 Accuracy_Gains ← GENERATE-ACCURACY-GAINS-VECTOR( ) 4 Unified_Queries ← QUERY-UNIFICATION(Q.sub.Spec) 5 P.sub.Unified.sub.
[0026] In other embodiments different algorithms may be used. In one embodiment, the system may use the algorithm used in Cubert (https://github.com/linkedin/Cubert).
[0027] A user may request information about query accuracy. The master application will request accuracy information from the nodes, upon which the node applications will return the error bounds for aggregation queries, based on the variance (across samples) of the measure for each group. The variance for the same group is combined across multiple queries, after which the variances across multiple groups are combined to give an error estimate for the entire query. The results are presented to the master application, which returns them to the user.
[0028] In exemplary embodiments, data injection occurs through an application program interface (API) that allows the user to perform table creation and indexing. A user also has the ability to specify the number of table columns and partitions. Partitioning may be performed through the use of a pluggable data partitioner that is Consistent Hash Ring based. Table creation may also allow a user to set the number of nodes that define a write action as a success, and how many times the data must be replicated in order to be able to stay fault tolerate. For example, in an exemplary embodiment the default on number of node to define a success may be 2, and the number of times data must be replicated to stay fault tolerant may be 3. For tables that are intended to be temporary, the interest in having a fast write speed may best be served by setting the write success to 1 and the replication factor to 1. Other tables which need higher fault tolerance may be set to higher numbers, but write speed may slow since multiple copies will be written.
[0029] The API may also allow a user to drop tables and indexes. Updates and deletions of data may also be performed by the user.
[0030] After a desired table is created, a user may also be able to inject data via individual inserts or bulk load by providing a singular csv (comma separated values) file or a directory of csv files to be imported. Upon insertion of data, a VectorClock and a unique identifier (“DID”) may be assigned to every row of data for later use. The DID is a 15 byte data structure that holds (1) time, (2) partition hash the data lives on, and (3) incremented number. In an example, a DID is written as the hex string representation 56abf2de6436343539616232134404. The partition hash of this DID is d6459ab2, the timestamp is 1454109406, and the counter is 1262596. The byte layout of an exemplary DID is shown in the table below:
TABLE-US-00002 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 time partition hash inc
[0031] In different embodiments a variety of ObjectIDs of different byte sizes and configurations may be used. One example is the MongoDB ObjectID which is readily available on the internet.
[0032] During data injection, the system may gather statistics about the data and store the statistics. Unlike other systems, including RDBMS, in a system of an exemplary embodiment histogram data is collected in real time to allow the system to know what the bounds of the data is. The statistic information may later be used by the system to assist with speculation queries.
[0033] In exemplary embodiments the system performs data repair functions. In order to fix data failures and/or inconsistencies, the system uses Vector Clocks in order to create a sense of “time.” When a singular get request is made by a unique identifier (DID) for each row, it checks to see whether or not repair is necessary. All data is pulled from all active nodes and the vector clocks are compared. The comparison allows for a determination of the latest version of the data and necessary updating.
[0034] In exemplary embodiments, data is not permanently deleted at first, but rather is “soft” deleted by adding a timestamp to the row when it occurs. Each slave executes a process that runs to see if there are any records in the database that need to be permanently deleted. Timestamps associated with the “soft” deletions are reviewed, and if any soft deletions are found to be older than a predetermined time (for example, 7 days) then the associated records will be permanently deleted.
[0035] In an exemplary embodiment, an API allows a user to search via an SQL query submitted to the system. The user can also use the API to define the data sample rate that they would like to use, which may be any amount up to and including 100%. The sample rate selected by a user dictates the number of table partitions that the system will search for a particular query. It may be desired by a user to only query 20% of the available data in order to obtain a quicker response than a query of all the data will provide. However, if a user believes that a low rate will present an anomaly, or if a user otherwise desires a larger set of data to search, the user may select a higher percentage (such as 90% or 100%) of the data to query. Selection of a higher rate may cause a longer delay in result time. It will be appreciated by those of ordinary skill in the art that users may have various reasons for selecting different data sampling rates.
[0036] The user also has the ability to define the dimensions of the cube that the user's query session will utilize.
[0037] In an example, a table has a six-column structure with the following structure:
TABLE-US-00003 userid year month day type domain
[0038] Each of the columns is a dimension. A query based on time and domain would look like year, month, day; type. A comma dictates a hierarchical dimension, giving the system more information on how it should do speculation queries, whereas a semicolon separates the different dimensions. As a user queries using one of the dimensions in a hierarchy speculation engine will only query up or down the hierarchy causing less speculation queries to be generated but more correct queries to allow for less work and more benefit for the user's query speed. If defined as a non-hierarchical cube dimension like: year; month; day; type there would be 2× as many queries executed because the speculation engine would use all dimensions defined to build out the cube instead of the better defined hierarchy of dimensions.
[0039] Through the API the user may also be able to request the accuracy/error rate of a given query and sampling rate. In exemplary embodiments the system is not only able to determine the accuracy/error rate for a single aggregate, but may also perform accuracy calculations across multiple aggregates as would be seen in more complex queries.
[0040] In an exemplary embodiment, the system can support various types of data types, including SMALLINT, INTEGER, BIGINT, DECIMAL, REAL, DOUBLE, BOOLEAN, CHAR, VARCHAR, DATE, TIMESTAMP, and ARRAY. In other embodiments different data types may be supported. In some embodiments of the system, unsupported data formats can be converted into accepted formats.
[0041] In an exemplary embodiment the system is Linux based, and readily-available software tools used to build and operate the system include the following: LOGBACK (http://logback.gos.ch/, ch.qos.logback:logback-classic:1.1.7, ch.qos.logback:logback-core:1.1.7); JACKSON (https://github.com/FasterXML/jackson, com.fasterxml.jackson.core:Jackson-annotations:2.7.0, com.fasterxml.jackson.core:Jackson-core:2.7.4, com.fasterxml.jackson.core:Jackson-databind:2.7.4, com.fasterxml.jackson.datatype:Jackson-datatype-guava:2.7.4); GUAVA (https://github.com/google/guava, com.google.guava:guava:19.0); FLIPTABLES (https://github.com/JakeWharton/flip-tables, com.jakewharton.fliptables:fliptables:1.0.2); AKKA (http://akka.io/, com.typesafe.akka:akka-actor_2.11:2.4.4, com.typesafe.akka:akka-cluster-metrics_2.11:2.4.4, com.typesafe.akka:akka-cluster-tools_2.11:2.4.4, com.typesafe.akka:akka-cluster_2.11:2.4.4, com.typesafe.akka:akka-kernel_2.11:2.4.4, com.typesafe.akka:akka-protobuf_2.11:2.4.4, com.typesafe.akka:akka-romote_2.11:2.4.4, com.typesafe.akka:akka-slf4j_2.11:2.4.4); TYPESAFE CONFIG (https://github.com/typesafehub/config, com.typesafe:config: 1.3.0); HIKARI CONNECTION POOL (https://github.com/brettwooldridge/HikariCP, com.zaxxer:HikariCP:2.4.6); APACHE COMMONS (https://commons.apache.org, commons-cli:commons-cli:1.3.1, commons-io:commons-io:2.5, org.apache.commons:commons-collections4:4.1, org.apache.commons:commons-lang3:3.4, org.apache.commons:commons-math3:3.6.1); NETTY (http://netty.io/, io.netty:netty:3.10.3.Final); JODA TIME (http://www.joda.org/joda-time/, Joda-time:joda-time:2.9.3); ANTLR (http://www.antlr.org/, org.antlr:antlr4-runtime:4.5.3, org.antlr:antlr4:4.5.3); JOOQ (http://www.jooq.org/, org.jooq:jooq:3.8.0); MSGPACK (http://msgpack.org/index.html, org.msgpack:Jackson-dataformat-msgpack:0.8.7, org.msgpack:msgpack-core:0.8.7); POSTG RES JDBC DRIVER (https://jdbc.postgresql.org/, org.postgresql:postgresql:9.4-1206-jdbc42); SCALA (http://www.scala-lang.org, org.scala-lang.modules:scala-java8-compat_2.11:0.7.0, org.scala-lang:scala-library:2.11.7); SLF4J (http://www.slf4j.org, org.slf4j:log 4j-over-slf4j:1.7.21, org.slf4j:slf4j-api:1.7.21); UNCOMMONS MATH (http://maths.uncommons.org., org.uncommons.maths:uncommons-maths:1.2.2a); JEDIS (https://github.com/xetorthio/jedis, redis.clients:jedis:2.8.1).
[0042] It will be appreciated by one of ordinary skill in the art that many other software tools, and combinations of tools, may be used to create the embodiments described herein, and that the preceding list is in no way limiting.
Example 1
[0043] In an exemplary system embodiment as illustrated in
[0044] Once an initial query is run on the data, subsequent queries may be run on the results of the prior query.
[0045]
[0046]
[0047]
[0048]
[0049]
Example 2
[0050] A comparison of query processing times on publicly available data from Center of Medicare and Medicaid Services was made against Amazon Redshift. The raw data set size was a total of 92,500,000 records.
[0051] Amazon Web Services EC2 instances were used. Five slave nodes were created using a m3.2xlarge size, as well as one master node using a m3.xlarge size. Each of the slaves utilized Postgres 9.3 as the underlying datastore. The data size was the same as the raw data set size, 92,500,000 records, and the records were split across 185 partitions. The partitions were split across the five slave nodes, causing each slave node to handle 37 partitions a piece.
[0052] An Amazon Web Services Redshift cluster of 2 nodes using the dc1.8xlarge instance was used. Since Redshift does not support the Postgres Array structure, the data set was formed into 197,451,376 records since each entry in the Array had to be its own row in a table instead of a single row with an array of values.
[0053] A series of queries were executed against both systems. Those queries and the time taken for the results are shown in the table below:
TABLE-US-00004 TABLE 1 COMPARISON OF EXEMPLARY EMBODIMENT WITH REDSHIFT REDSHIFT DELHPICDB QUERY (ms) (ms) select icd9dgnscode,COUNT(deceased),SUM(total_linecoinsrncamt) from 780.25 126.33 beneficiary_carrier_1_claims_mat where deceased=‘true’ and age<‘50’ group by icd9dgnscod select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 398.75 182.33 where deceased=‘true’ and ‘75319’=ANY(icd9dgnscode) group by age select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 357.75 126 where deceased=‘true’ and ‘7533’=ANY(icd9dgnscode) group by age select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 346.5 75.67 where deceased=‘true’ group by age select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 405 123.67 where deceased=‘true’ and ‘5851’=ANY(icd9dgnscode) group by age select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 416 239 where deceased=‘true’ and ‘5852’=ANY(icd9dgnscode) group by age select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 421.5 150.33 where deceased=‘true’ and ‘5853’=ANY(icd9dgnscode) group by age select age,COUNT(deceased),SUM(total_linecoinsrncamt) from beneficiary_carrier_1_claims_mat 414.75 192 where deceased=‘true’ and ‘5854’=ANY(icd9dgnscode) group by age select icd9dgnscode,COUNT(deceased),SUM(total_linecoinsrncamt) from 521.25 61 beneficiary_carrier_1_claims_mat where deceased=‘true’ and age<‘50’ and benesexidentcd=1 group by icd9dgnscode AVERAGE TIME TAKEN: 451.25 141.67
[0054] While certain exemplary system and method embodiments are described in detail above, the scope of the invention is not considered limited by such disclosure, and modifications are possible without departing from the spirit of the invention as evidenced by the claims.