System and program for joining source table rows with target table rows
10120901 ยท 2018-11-06
Assignee
Inventors
- Josep L. Larriba-Pey (Barcelona, ES)
- Victor Muntes-Mulero (Barcelona, ES)
- Hebert W. Pereyra (Toronto, CA)
- Josep Aguilar Saborit (Barcelona, ES)
- Calisto P. Zuzarte (Pickering, CA)
Cpc classification
Y10S707/99931
GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
Y10S707/99943
GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
Y10S707/99933
GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
International classification
Abstract
A data processing system, and an article of manufacturing, join rows associated with a source table column with rows associated with a target table column. A source node and a target node contain the source and target tables, respectively. A reduced representation of selected rows associated with the source table column is generated, as is a representation of the target table column. A filtering module filters the generated reduced representation of selected rows associated with the source table column through the generated representation of the target table column, the filtered generated reduced representation of selected rows identifying source table rows that do not have to be joined with the target table. The rows associated with the source table column minus the filtered generated reduced representation of selected rows are joined to the rows associated with the target table column.
Claims
1. A data processing system for joining rows associated with a column of a source table with rows associated with a column of a target table, the data processing system including a source node containing the source table and including a target node of a plurality of target nodes, the target node containing the target table, the data processing system comprising: a processor; a generating module, executed on the processor, for generating a reduced representation of selected rows associated with the column of the source table, and generating a representation of the column of the target table; a filtering module for filtering the generated reduced representation of selected rows associated with the column of the source table through the generated representation of the column of the target table, and for generating a bit map with the filtered generated reduced representation of selected rows which identifies source table rows that do not have to be joined with the target table; and a joining module for joining, to the rows associated with the column of the target table, the rows associated with the column of the source table minus the filtered generated reduced representation of selected rows, wherein the generating the reduced representation of the selected rows associated with the column of the source table comprises: applying a hash function against the column to obtain hash codes, the column being associated with selected rows identifying the source table rows; storing the hash codes in a hash code buffer, the hash code buffer representing the source table rows to he sent to the corresponding target node of the plurality of target nodes; determining whether data in a data buffer of the source node reaches a threshold; asynchronously sending, in response to the threshold being reached, the hash code buffer to the corresponding target node and obtaining and storing other hash codes in another hash code buffer to be sent to another target node, wherein the hash function maps a joining key to a hash code, and wherein the target node generates the bit map by matching the hash codes with the target table, the bit map identifying the source table rows that are not to be joined with the target table rows.
2. The data processing system of claim 1, wherein the generated representation of the column of the target table is a bit filter.
3. The data processing system of claim 1, further comprising: a using module for using separate reduced representations of the selected source table rows at each source node; and a usage module for using separate reduced representations of selected source table rows to be sent to each target node.
4. The data processing system of claim 1, further comprising: a distributing module for distributing the target table to a set of target nodes prior to joining the rows associated with the column of the source table with the rows associated with the column of the target table.
5. The data processing system of claim 1, wherein the joining of the rows associated with the column of the source table with the rows associated with the column of the target table is a non-collocated join.
6. The data processing system of claim 1, further comprising: a generation module for generating a potentially reduced set of data buffers after filtering the source table rows stored in source data buffers based on the bit map.
7. The data processing system of claim 1, further comprising, if the source table and the target table are collocated: an avoiding module for avoiding transmission of the hash codes contained in the hash code buffer; and an avoidance module for avoiding waiting for the bit map to decide if any rows may be discarded, the discarded rows not to be joined between the source table and the target table.
8. The data processing system of claim 1, wherein the generating module generates, based on predicates of a join request, a first representation of rows associated with the column of the source table, and thereafter, the generating module generates the reduced representation of the selected rows associated with the column of the source table based on the generated first representation, wherein the size of the reduced representation is smaller than the first representation.
9. An article of manufacture for directing a data processing system to join rows associated with a column of a source table with rows associated with a column of a target table, the data processing system including a source node containing the source table and including a target node of a plurality of target nodes, the target node containing the target table, the article of manufacture comprising: a non-transitory program usable medium embodying one or more instructions executable by the data processing system, the one or more instructions comprising: data processing system executable instructions for generating a reduced representation of selected rows associated with the column of the source table, and generating a representation of the column of the target table; data processing system executable instructions for filtering the generated reduced representation of selected rows associated with the column of the source table through the generated representation of the column of the target table, and for generating a bit map with the filtered generated reduced representation of selected rows which identifies source table rows that do not have to be joined with the target table; and data processing system executable instructions for joining, to the rows associated with the column of the target table, the rows associated with the column of the source table minus the filtered generated reduced representation of selected rows, wherein the generating the reduced representation of the selected rows associated with the column of the source table comprises: applying a hash function against the column to obtain hash codes, the column being associated with selected rows identifying the source table rows; storing the hash codes in a hash code buffer, the hash code buffer representing the source table rows to be sent to the corresponding target node of the plurality of target nodes; determining whether data in a data buffer of the source node reaches a threshold; asynchronously sending, in response to the threshold being reached, the hash code buffer to the corresponding target node and obtaining and storing other hash codes in another hash code buffer to be sent to another target node, wherein the hash function maps a joining key to a hash code, and wherein the target node generates the bit map by matching the hash codes with a bit filter associated with the target table, the bit map identifying the source table rows that are not to be joined with the target table rows.
10. The article of manufacture of claim 9, wherein the generated representation of the column of the target table is a bit filter.
11. The article of manufacture of claim 9, further comprising: data processing system executable instructions for using separate reduced representations of the selected source table rows at each source node; and data processing system executable instructions for using separate reduced representations of selected source table rows to be sent to each target node.
12. The article of manufacture of claim 9, further comprising: data processing system executable instructions for distributing the target table to a set of target nodes prior to joining the rows associated with the column of the source table with the rows associated with the column of the target table.
13. The data processing system of claim 1, further comprising a placing module for placing, into a data buffer, selected rows identifying the source table rows at the source node.
14. The data processing system of claim 2, wherein the bit filter is built in response to receiving relevant data tuples.
15. The data processing system of claim 1, wherein the bit map is transmitted from target node to the source node so the source node uses the bit map to filter out data tuples that will not participate in the joining operation.
Description
BRIEF DESCRIPTION OF THE DRAWINGS
(1) The accompanying drawings, which are incorporated in and constitute a part of this specification, illustrate embodiments of the invention and together with the description, serve to explain the principles of the invention.
(2)
(3)
(4)
(5)
(6)
DETAILED DESCRIPTION OF THE EMBODIMENTS
(7) Reference will now be made to exemplary embodiments of the invention, which are illustrated in the accompanying drawings. Wherever possible, the same reference numbers will be used throughout the drawings to refer to the same or like parts.
(8) An embodiment provides a method of communicating data between partitions of a database system. Data related to queries of the database system are received at a first database partition designated as the source node. Data that is destined to a second database partition is identified. The second partition is designated as the target node. The target node has a bit filter created based on the tuples of the relation relevant to the query at this partition. The bit filter may be one that is created as part of a join operation such as a hash join or a merge join. These methods are known methods to those skilled in the art. The identified data at the source node is stored in a set of data buffers assigned to the target node. The identified data is encoded into hash codes and stored into a hash code buffer also assigned to the target node. The data buffers and the hash code buffer are filled until a threshold is reached. The threshold is determined based on the number of partitions involved in each relation of the join, the communication layer's buffer window count, memory allocated for the operation and a measure of reduction factor of the bit filter. A separate asynchronous process transmits the hash code buffer to the target node. The target node checks the hash codes against the bit filter and creates a bitmap that indicates tuples in the data buffer at the source node that will not qualify the join operation. . . . The bitmap is transmitted from the target node to the source node where the data from the corresponding data buffers assigned to the target node can be discarded. The data buffers are compacted to eliminate unused space freed up by the discarded tuples if any. The potentially reduced set of data buffers is then sent from the source node to the target node to perform the join operation at the target node.
(9)
(10) Host node 102 coordinates the operation of system 100, such as query parsing, optimization, and compilation. For example, host node 102 may maintain the schema information for databases stored by system 100. In addition, host node 102 may serve as an interface to other devices and systems on behalf of system 100. For example, host 102 may provide an interface for ad-hoc queries, and embedded queries. Host node 102 may be implemented using known general purpose computers and processors.
(11) Operator nodes 104a-c serves as a site for performing various operations of system 100. As shown in
(12) As noted, operator nodes 104a-c can be configured to communicate with each other based on passing messages over network 108. Small messages, such as those less than 100 bytes can be sent as datagram packets. For larger messages, operator nodes 104a-c may open communications circuit with each other over which messages can be asynchronously transmitted. In some embodiments, after a message has been transmitted, operator nodes 104a-c may release the circuit. In addition, in some embodiments, operator nodes 104a-c may limit the maximum length of a message based on the physical memory available to each processor in operator nodes 104a-c.
(13) Storage devices 106a-c provide local storage for their respective operator nodes, i.e., operator nodes 104a-c. In some embodiments, system 100 may partition tuples of database across storage devices 106a-c. Storage devices 106a-c can be implemented using known devices, such as disk drives or optical disk drives having embedded disk controllers. Storage devices 106a-c may also have its own random access memory to serve as a disk cache.
(14) Network 108 can be any number of interconnections, series of points or nodes, or communication paths for interconnecting the elements of system 100, such as host 102 and operator nodes 104a-c. Network 108 can be configured for communications over various spatial distances and can accommodate various types of data transmission technologies, whether it is public or private. Network 108 can be implemented using hardware and software that are well known to those skilled in the art. For example, network 108 can include a local area network, such as an Ethernet or token ring network. Other types of networks are also consistent with the principles of the present invention.
(15)
(16) Catalog manager 200 can serve as a central repository of all conceptual and internal schema information for each database maintained by system 100, such as schema data 204. Since multiple users may have the same database open at once and a user may reside on a machine (not shown) other than host 102, catalog manager 200 can also ensure consistency among copies of the database cached by each user. In some embodiments, catalog manager 200 can be implemented as a daemon process.
(17) Schema data 204 is a collection of meta-data that describes the relations of the databases maintained by system 100. That is, schema data 204 describes the layout of a database that outlines the way data is organized into tables. In some embodiments, schema data 204 is configured using a series of SQL statements, such as CREATE statements. Schema data 204 can be loaded into the memory of host 102 when a database is first opened.
(18) Query manager 202 serves as an interface to a database and manages the execution of queries. For example, query manager 202 may cache information from schema data 204 and provide an interface for queries. Query manager 202 can be configured to process ad hoc queries entered by a user (not shown) as well as embedded queries requested by a program executing on another machine.
(19) In addition, query manager 202 can be configured to parse queries, determine a query execution plan, optimize the execution plan, and compile the query execution plan. Query manager 202 can use known relational techniques for query parsing, optimization, and code generation for executing a query. In some embodiments, query manager 202 employs hash-based algorithms for joins and other complex operations.
(20) Query manager 202 may assign scheduler processes, such as scheduler process 208, to a query, and pass the query execution plan in query information message 206 to one or more of operator nodes 104a-c. For example, query manager 202 can recognize that certain queries can be directed to only a subset of the nodes in system 100. In the case of a single site query, query manager 202 may directly send the query in message 206 to the appropriate operator node for execution.
(21) A query may also require the involvement of multiple operator nodes, e.g., operator nodes 104a-c. In order to control the execution of a multi-site query, one of operator nodes 104a-c may be designated a coordinator that is responsible for making global decisions related to the query. A node may be designated as a coordinator based on a variety of factors including, hardware capacity, location, user or administrator configuration, etc. For example, as shown in
(22) In order to control the execution of a query among multiple nodes, a scheduler process 208 may be activated in a coordinator, e.g., operator node 104a. Scheduler process 208 can run on any of operator nodes 104a-c and multiple instances of scheduler process 208 can run on a single processor or node of system 100. In some embodiments, scheduler process 208 is activated when query manager 202 establishes a connection. Once scheduler process 208 has been activated, query manager 202 sends the compiled query in message 206. In response to receipt of message 206, scheduler process 208 in turn activates operator processes, such as operator processes 210a-c, in operator nodes 104a-c.
(23) One or more of operator nodes 104a-c may then work in conjunction to process the query specified in query information message 206. Operator nodes 104a-c obtain results for the query. For example, operator nodes 104a-c may access their respective storage devices 106a-c to identify tuples or data that satisfy the query. Operator processes 210a-c then collects these results and passes them back to query manager 202 in the form of one or more messages. Query manager 202 reads the results of the query, compiles them, and returns the results through the query interface to the user or program from which the query was initiated.
(24)
(25) Processor 300 controls the operation of operator node 104a. In particular, processor 300 interprets and executes instructions provided to operator node 104a, such as instructions from operator process 210a. Processor 300 can be implemented as any type of processor, such as a general purpose processors manufactured by the Intel Corporation.
(26) Communications interface 302 couples operator node 104a to network 108. Communications interface 302 can be implemented using known hardware and software. For example, communications interface 302 can be configured as an Ethernet or token ring interface. Other types of communications interfaces are also consistent with the principles of the present invention.
(27) Memory 304 serves as a local main memory for operator node 104a. Memory 304 can be implemented using known types of memory, such as a random access memory. In addition, as shown in
(28) The hash code buffer space 306 serves as a temporary storage location for hash codes used by operator node 104a. As noted, in some embodiments, operator node 104 may use various hash codes to perform actions related to a query. Such hash codes and their associated algorithms are known to those skilled in the art.
(29) Routing table 308 indicates where operator node 104a should route or send messages for portions of a query. Buffer space 310 serves as a temporary location for the data in these messages. One example of the relationship between routing table 308 and buffer space 310 will now be described with reference to
(30)
(31) During query processing, results for a query may be temporarily stored in data buffers 314a-h. When the results are to be sent to another node, the results are routed based on information in routing table 308. For example, as shown, data buffers 314a-d are routed to operator node 104b and data buffers 314e-h are routed to operator node 104c. Of course one skilled in the art will recognize that memory 304 can be configured in a variety of ways consistent with the principles of the embodiment.
(32)
(33) The target operator node creates bit filters from the tuples in the relation prior to or during the initial phase of the join operation. On receiving the hash code buffer from the source operator node, the target operator node creates a bitmap based on the result of checking each hash code against its set of bit filters. The target node then sends the bitmap back to the source node. The source node uses the bitmap to filter out data tuples that will not participate in the join operation. The source node then sends the potentially reduced set of tuples to the target node, where the join operation may be completed. The result of the join operation may then be provided by the target operator node.
(34) In some embodiments, the size of the hash codes and bitmap buffers is smaller than the size of the tuples. For example, in some embodiments, the size of a hash code is 4 bytes, and the size of a bitmap entry per tuple is one bit.
(35) In addition, in some embodiments, communications between operator nodes 104a-c are asynchronous, so that the sending of messages and the processing of tuples can be overlapped. Furthermore, in some embodiments, the data buffers are partitioned into groups of buffers and the hash code buffer are sent after a certain number of data buffers have been filled for a given destination or target operator node.
(36) For purposes of explanation, the following description relates a join operation involving data at operator nodes 104a and 104c. Either operator node 104a or 104c may serve as the coordinator this operation. The various stages for executing the join operation will now be described.
(37) In stage 600, operator nodes 104a and 104c receive information in message 206 related the operations for a query. For example, operator nodes 104a and 104c may receive information in message 206 related to a join operation. This information may be passed to operator nodes 104a and 104b from scheduler process 208, which in turn activates operator processes 210a and 210c. Operator processes 210a and 210c may then activate operator nodes 104a and 104c to perform the operations requested in the information of message 206. Processing may then proceed in parallel in nodes 104a and 104c. The processing in node 104a will now be described.
(38) In stage 602, operator node 104a may perform a projection related to the join operation. In particular, operator node 104a may access storage device 106a and select various tuples from tables in storage device 106 based on predicates of the requested query operation. Storage device 106a may then provide selected tuples 604 to operator node 104a. Processing then flows to stage 606 in operator node 104a.
(39) In stage 606, source operator node 104a buffers tuples 604 into a data buffer designated for target operator node 104c. For example, operator node 104a may store tuples from tuples 604 that are destined for node 104c into data buffer 314e of buffer group 312b. Processing then flows to stage 608 in operator node 104a.
(40) In stage 608, operator node 104a encodes tuples 604 into hash codes. For example, operator node 104a may encode tuples 604 for a join operation based on the well known hybrid hash join algorithm. In particular, the hash codes may be obtained by a hash function that maps the joining key to a hash code. Operator node 104a may then store the hash codes into hash code buffer 306 of memory 304. Processing then flows to stage 610.
(41) In stage 610, operator node 104a determines whether a threshold level has been reached in buffer space 310. In some embodiments, the threshold level is based on buffer level data 612 from the data buffers in buffer space 310. For example, the threshold level may be reached when a single one of data buffers 314a-d or 314e-h is full or substantially full. Alternatively, the threshold level may be based on a predetermined number of data buffers 314a-d or 314e-h are full or substantially full. If the threshold has not been reached, processing in operator node 104a repeats at stage 606. That is, operator node 104 continues to accumulate tuples into its data buffers and build up the hash code buffer corresponding to each target operator node.
(42) However, if the threshold has been reached, then processing flows to stage 613 that asynchronously triggers stage 614 and also goes back to stage 606. In stage 614, operator node 104a transmits the hash codes buffer to the corresponding target operator node 104c. In particular, operator node 104a retrieves the hash codes buffer from hash code buffer space 306 and references routing table 310. Operator node 104a may then send one or more messages, such as hash code message 616, through network 108 to operator node 104c. Asynchronously, source node 104a continues processing at 606 to fill data buffers and hash code buffers for other target nodes. On receiving the hash code buffer, processing at target operator node 104c is active at stage 618.
(43) At stage 618, operator node 104c scans the hash codes based on its bits filters built after receiving relevant data tuples from the second relation received at stage 600 or possibly from the storage device 106c based on the query information. In particular, operator node 104c may scan bit filter data 622 in its local main memory. Bit filter data 622 serves as a summary representation of the tables stored in storage device 106c and may be determined based on well known algorithms. Processing then flows to stage 620.
(44) In stage 620, operator node 104c generates a bit map that indicates the data in bit filter data 622 which matches the hash codes in message 616. Processing then flows to stage 624 in
(45) In stage 624, operator node 104c sends the bit map back to the source operator node 104a. In particular, target operator node 104c refers to its routing table 310 and formats a bit map message 626. Operator node 104c then sends bit map message 626 through network 108 back to operator node 104a. Processing then continues at operator node 104a in stage 628.
(46) In stage 628, operator node 104a probes for non-matches to data in bit map message 626. In particular, operator node 104a scans data buffer for example, 314e based on bit map message 626 and filters out non-matching tuples. Processing then flows to stage 632.
(47) In stage 632, operator node 104a sends potentially reduced non-filtered data tuples 630 to operator node 104c to complete the join operation. In particular, source operator node 104a sends the reduced data tuple message 634 to operator node 104c through network 108.
(48) Message 634 may comprise one or more of data buffers 314e-h. In addition, in some embodiments, the number of data buffers sent in message 634 is smaller than the threshold level depending on the selectivity of the bit filters and the match with the hash codes. Processing then continues at operator node 104c in stage 636.
(49) In stage 636, operator node 104c performs the join operation based on matching tuple message 634 and tuples retrieved from storage device 106c. Processing then flows to stage 638, where operator node 104c determines a result of the join operation and may then return the matched tuples for further processing in the query or store the result, for example, in storage device 106c. Alternatively, operator node 104c may send the result to host 102. In addition, operator nodes 104a and 104c and host 102 may exchange one or more control messages to indicate to each other that processing for the join operation has been completed. Processing is complete after all the tuples relevant to the query at the source nodes and the tuples at all the target nodes have been matched.
(50) Other embodiments of the invention will be apparent to those skilled in the art from consideration of the specification and practice of the invention disclosed herein. It is intended that the specification and examples be considered as exemplary only, with a true scope and spirit of the invention being indicated by the following claims.