Wednesday, March 12, 2008
Data Mining Brings Down Governor Spitzer
But how was he caught? The answer is that the complicated financial transactions he made in an attempt to disguise his spending on prostitutes were flagged by fraud detection software that banks now use routinely to detect money laundering and other financial crimes. In a news report on NPR this morning, reporter Adam Davidson interviewed a representative from Actimize, an Israeli company that specializes in fraud detection and compliance software. The software scores every bank transaction with a number from 0 to 100 indicating the probability of fraud. The software takes into account attributes of the particular transaction, but also its relationship to other transaction (as when several small transactions with the same source and destination are used to disguise a large transaction), the relationship of account owners involved in the transaction, and attributes of the account owner such as credit score and, unfortunately for Governor Spitzer, whether or not the account owner is a "PEP" (politically exposed person). PEPs attract more scrutiny since they are often in a position to be bribed or engage in other corrupt practices.
Banks are required to report SARs (Suspicious Activity Reports) to FinCEN, the Treasury Department's financial crimes enforcement network. The reports--about a million of them in 2006--go into a database hosted at the IRS and teams of investigators around the country look into them. One such team, based in Long Island, looked into Sptizer's suspicious transactions and eventually discovered the connection to the prostitution ring.
Ironically, one of the reasons there are so many more SARs filed each year now than there were before 2001 is that in 2001, then New York Attorney General, Elliott Spitzer aggressively pursued wrong-doing at financial institutions and said they had to be aware of criminal activity conducted through their accounts. Apparently, the software banks installed to find transactions that criminal organizations are trying to hide from the IRS is also capable of finding transactions that Johns are trying to hide from their wives.
Saturday, February 9, 2008
MapReduce and K-Means Clustering
One of the claims made in this particular presentation is that "it can be necessary to send tons of data to each Mapper Node. Depending on your bandwidth and memory available, this could be impossible." This claim is false, which in turn removes much of the motivation for the alternative algorithm, which called "canopy clustering".
The K-Means Clustering Algorithm
There are many good introductions to k-means clustering available, including our book Data Mining Techniques for Marketing, Sales, and Customer Support. The Google presentation mentioned above provides a very brief introduction.
Let's review the k-means clustering algorithm. Given a data set where all the columns are numeric, the algorithm for k-means clustering is basically the following:
(1) Start with k cluster centers (chosen randomly or according to some specific procedure).
(2) Assign each row in the data to its nearest cluster center.
(3) Re-calculate the cluster centers as the "average" of the rows in (2).
(4) Repeat, until the cluster centers no longer change or some other stopping criterion has been met.
In the end, the k-means algorithm "colors" all the rows in the data set, so similar rows have the same color.
K-Means in a Parallel World
To run this algorithm, it seems, at first, as though all the rows assigned to each cluster in Step (2) need to be brought together to recalculate the cluster centers.
However, this is not true. K-Means clustering is an example of an embarrassingly parallel algorithm, meaning that that it is very well suited to parallel implementations. In fact, it is quite adaptable to both SQL and to MapReduce, with efficient algorithms. By "efficient", I mean that large amounts of data do not need to be sent around processors and that the processors have minimum amounts of communication. It is true that the entire data set does need to be read by the processors for each iteration of the algorithm, but each row only needs to be read by one processor.
A parallel version of the k-means algorithm was incorporated into the Darwin data mining package, developed by Thinking Machines Corporation in the early 1990s. I do not know if this was the first parallel implementation of the algorithm. Darwin was later purchased by Oracle, and became the basis for Oracle Data Mining.
How does the parallel version work? The data can be partitioned among multiple processors (or streams or threads). Each processor can read the previous iteration's cluster centers and assign the rows on the processor to clusters. Each processor then calculates new centers for its of data. Each actual cluster center (for the data across all processors) is then the weighted average of the centers on each processor.
In other words, the rows of data do not need to be combined globally. They can be combined locally, with the reduced set of results combined across all processors. In fact, MapReduce even contains a "combine" method for just this type of algorithm.
All that remains is figuring out how to handle the cluster center information. Let us postulate a shared file that has the centroids as calculated for each processor. This file contains:
- The iteration number.
- The cluster id.
- The cluster coordinates.
- The number of rows assigned to the cluster.
There are two ways to do this in the MapReduce framework. The first uses map, combine, and reduce. The second only uses map and reduce.
K-Means Using Map, Combine, Reduce
Before begining, a file is created accessible to all processors that contains initial centers for all clusters. This file contains the cluster centers for each iteration.
The Map function reads this file to get the centers from the last finished iteration. It then reads the input rows (the data) and calculates the distance to each center. For each row, it produces an output pair with:
- key -- cluster id;
- value -- coordinates of row.
- key is cluster
- value is number of records and average values of the coordinates.
The Reduce function (and one of these is probably sufficient for this problem regardless of data size and the number of Maps) calcualtes the weighted average of its input. Its output should be written to a file, and contain:
- the iteration number;
- the cluster id;
- the cluster center coordinates;
- the size of the cluster.
K-Means Using Just Map and Reduce
Using just Map and Reduce, it is possible to do the same things. In this case, the Map and Combine functions described above are combined into a single function.
So, the Map function does the following:
- Initializes itself with the cluster centers from the previous iteration;
- Keeps information about each cluster in memory. This information is the total number of records assigned to the cluster in the processor and the total of each coordinate.
- For each record, it updates the information in memory.
- It then outputs the key-value pairs for the Combine function described above.
K-Means Using SQL
Of course, one of my purposes in discussing MapReduce has been to understand whether and how it is more powerful than SQL. For fifteen years, databases have been the only data-parallel application readily available. The parallelism is hidden underneath the SQL language, so many people using SQL do not fully appreciate the power they are using.
An iteration of k-means looks like:
SELECT @iteration+1, cluster_id,
.......AVERAGE(d.data) as center
FROM (SELECT d.data, cc.cluster_id,
.............ROW_NUMBER() OVER (PARTITION BY d.data
................................ORDER BY DISTANCE(d.data, cc.center) as ranking
......FROM data d CROSS JOIN
.....(SELECT *
......FROM cluster_centers cc
......WHERE iteration = @iteration) cc
.....) a
WHERE ranking = 1
GROUP BY cluster_id
This code assumes the existence of functions or code for the AVERAGE() and DISTANCE() functions. These are placeholders for the correct functions. Also, it uses analytic functions. (If you are not familiar with these, I recommend my book Data Analysis Using SQL and Excel.)
The efficiency of the SQL code is determined, to a large extent, by the analytic function that ranks all the cluster centers. We hope that a powerful parallel engine will recognize that the data is all in one place, and hence that this function will be quite efficient.
A Final Note About K-Means Clustering
The K-Means clustering algorithm does require reading through all the data for each iteration through the algorithm. In general, it tends to converge rather quickly (tens of iterations), so this may not be an issue. Also, the I/O for reading the data can all be local I/O, rather than sending large amounts of data through the network.
For most purposes, if you are dealing with a really big dataset, you can sample it down to a fraction of its original size to get reasonable clusters. If you are not satisfied with this method, then sample the data, find the centers of the clusters, and then use these to initialize the centers for the overall data. This will probably reduce the number of iterations through the entire data to less than 10 (one pass for the sample, a handful for the final clustering).
When running the algorithm on very large amounts of data, numeric overflow is a very real issue. This is another reason why clustering locally, taking averages, and then taking the weighted average globally is beneficial -- and why doing sample is a good way to begin.
Also, before clustering, it is a good idea to standardize numeric variables (subtract the average and divide by the standard deviation).
--gordon
Check out my latest book Data Analysis Using SQL and Excel.
Wednesday, February 6, 2008
Using SQL to Emulate MapReduce Functionality
- MapReduce implements functions using a full-fledged programming language. This is more powerful than the functions permitted in SQL.
- MapReduce allows one row to be part of more than one aggregation group.
First, let me note that the first limitation is not serious, because I assume that SQL can be extended by adding new scalar and aggregation user defined functions. Although more cumbersome than built-in programming constructs, the ability to add user defined functions does make it possible to add in a wide breadth of functionality.
The second strength can be emulated by assuming the existence of a table, which I'll call enumerate, that simply contains one column which contains numbers starting at 1.
How does such a table help us? Consider a table that has a start date and a stop date for each customer. The SQL code to count up the starts and the stops might look like:
SELECT thedate, SUM(isstart) as numstarts, SUM(isstop) as numstops
FROM ((SELECT start_date as thedate, 1 as isstart, 0 as issend
.......FROM customer c) union all
......(SELECT stop_date as thedate, 0 as isstart, 1 as issend
.......FROM customer c)) a
GROUP BY thedate
ORDER BY 1
This is inelegant, particularly as the expressions for the tables get more complicated -- imagine what happens if the customer table is actually a complicated set of joins and aggregations. In addition, we can see how expressing the SQL suggests that two full passes are needed through the table. Yuck!
Let's assume that we have the enumerate table. In this case, the same query could be expressed as:
SELECT (CASE WHEN e.i = 1 THEN start_date ELSE end_date END) as thedate,
.......SUM(CASE WHEN e.i = 1 THEN 1 ELSE 0 END) as numstarts,
.......SUM(CASE WHEN e.i = 2 THEN 1 ELSE 0 END) as numstops
FROM customer c CROSS JOIN
.....(SELECT * FROM enumerate WHERE i <= 2) e
GROUP BY (CASE WHEN e.i = 1 THEN start_date ELSE end_date END)
ORDER BY 1
This query is setting up a counter that lets us, conceptually, loop through each row in the table. The counter is set to take on two values. On the first pass through the loop, the query uses the start date; on the second, it uses the stop date. The result is the same as for the previous query. The SQL, though, is different because it does not express two passes through the data.
This example is simple. It is obvious how to extend it further, for instance, if there were more dates stored in each row. It should also be obvious how this can be expressed as map/reduce functions.
One of the most common places where MapReduce is used is for parsing text strings. Say we have a list of product descriptions that are like:
- "green,big,square"
- "red,small,square"
- "grey"
- "medium,smelly,cube,cotton"
- NumWords(string
, sepchar : This function takes a string and a separate character and returns the number of words in the string.) - GetWord(string
, sepchar : This function takes a string, a separator character, and a word number and returns the word in the string., i )
- 3 and "green", "big", and "square"
- 3 and "red", "small", "square"
- 1 and "grey"
- 4 and "medium", "smelly", "cube", "cotton"
These functions are not difficult to write in a procedural programming language.
With such functions, the SQL to count up the attributes in our products looks like:
The structure of this query is very similar to the previous query. The one difference is that each row has a different loop counter, because there are a different number of words in any given product description. Hence, the two tables are joined using a standard inner join operator, rather than a cross join.
SELECT GetWord(p.desc, ',', e.i) as attribute, COUNT(*)
FROM product p JOIN
.....enumerate e
.....ON e.i <= NumWords(p.desc, ',')
GROUP BY GetWord(p.desc, ',', e.i)
ORDER BY 2 DESC
An enumerate table, in conjunction with user defined functions, can give SQL much of the functionality of MapReduce.
One objection to the previous example is that such a database structure violates many rules of good database design. Such a product description string is not normalized, for instance. And packing values in strings is not a good design practice. My first reaction is that the real world is filled with similar examples, so regardless of what constitutes good design, we still need to deal with it.
A more honest answer is that the world is filled with strings that contain useful information -- description strings, URLs, and so on. SQL cannot just ignore such data, or dismiss it as not being normalized.
There is a problem: performance. One way to do the join is to create an intermediate table that is the cross product (or a large subset of the cross product) of the two tables. Of course, such an intermediate table is equivalent to reading the first table twice, so we have not gained anything.
This is likely to happen in the first case. Without more information, SQL is likely to do nested loop joins. If the customer table is the outer loop, then each customer row is read and duplicated, once with i=1 and the second time with i=2. This is not actually so bad. The original row is actually read once and then processed multiple times in memory for each value of i.
Of course, there is no guarantee that the SQL engine won't put the enumerate table as the outer loop, which requires two full reads of the customer table.
The situation becomes worse if the data is partitioned in a parallel environment. This is important, because MapReduce's advantage is that it always runs in parallel.
The SQL engine is likely to run a nested loop join on a single processor, even if the customer table is partitioned (or if the database is configured to be "multithreaded" or "multiprocessor"). Only a very smart optimizer would figure out that the enumerate table could be duplicated and distributed so the nested loop join could run in parallel.
The optimization problem is even worse in the second case, because the number of rows needed from enumerate varies for different products. Of course, database directives were invested to tell SQL optimizers how to do joins. I would prefer that the database have the enumerate table built-in, so the optimize can take full advantage of it.
Much of MapReduce's functionality comes from the ability to give a singel row multiple aggregation keys, while running in parallel. Even on large datasets, we can set up SQL to solve many problems that MapReduce does by combining user-defined functions, the enumerate table, and appropriate compiler directives so the large joins are done in parallel.--gordon
Friday, January 25, 2008
MapReduce and SQL Aggregations
One of the claims that they make is:
To draw an analogy to SQL, map is like the group-by clause of an aggregate query. Reduce is analogous to the aggregate function (e.g., average) that is computed over all the rows with the same group-by attribute.
This claim is worth discussing in more detail because it is a very powerful and intuitive analogy. And, it is simply incorrect. MapReduce is much more powerful than SQL aggregations.
Another reason why I find their claim interesting is because I use the same analogy to describe MapReduce to people familiar with databases. Let me explain this in a bit more detail. Consider the following SQL query to count the number of customers who start in each month:
SELECT MONTH(c.start_date), COUNT(*)
FROM customer c
GROUP BY MONTH(c.start_date)
Now, let's consider how MapReduce would "implement" this query. Of course, MapReduce is a programming framework, not a query interpreter, but we can still use it to solve the same problem. The MapReduce framework solves this problem in the following way:
First, the map phase would read records from the customer table and produce an output record with two parts. The first part is called the "key", which is populated with MONTH(c.start_date). The second is the "value", which can be arbitrarily complicated. In this case, it is as simple as it gets. The value part simply contains "1".
The reduce phase then reads the key-value pairs, and aggregates them. The MapReduce framework sorts the data so records with the same key always occur together. This makes it easy for the reduce phase to add together all the "1"s to get the count for each key (which is the month number). The result is a count for each key.
I am intentionally oversimplified this process by describing it at a high level. The first simplification is leaving out all the C++ or Java overhead for producing the programs (although there are attempts at interpreted languages to greatly simplify this process). Another is not describing the parallel processing aspects. And yet another oversimplification is leaving out the "combine" step. The above algorithm can be made more efficient by first "reducing" the values locally on each processor to get subtotals, and then "reducing" these again. This post, however, is not about computational efficiency.
The important thing to note is the following three correspondences between MapReduce and SQL aggregates.
- First, MapReduce uses a "key". This key is the GROUP BY expression in a SQL aggregation statement.
- Second, MapReduce has a "map" function. This is the expression inside the parentheses. This can be an arbitrary function or CASE statement in SQL. In databases that support user defined functions, this can be arbitrarily complicated, as with the "map" function in MapReduce.
- Third, MapReduce has a "reduce" function. This is the aggregation function. In SQL, this is traditionally one of a handful of functions (SUM(), AVG(), MIN(), MAX()). However, in some databases support user defined aggregation functions, which can be arbitrarily complicated.
Wrong!
The next example extends the previous one by asking how many customers start and how many stop in each month. There are several ways of approaching this. The following shows one approach using a FULL OUTER JOIN:
SELECT m, ISNULL(numstarts, 0), ISNULL(numstops, 0)
FROM (SELECT MONTH(start_date) as m, COUNT(*) as numstarts
......FROM customer c
......GROUP BY MONTH(start_date)
.....) start FULL OUTER JOIN
.....(SELECT MONTH(stop_date) as m, COUNT(*) as numstops
......FROM customer c
......GROUP BY MONTH(stop_date)
.....) stop
.....ON start.m = stop.m
Another formulation might use an aggregation and UNION:
SELECT m, SUM(isstart), SUM(isstop)
FROM ((SELECT MONTH(start_date) as m, 1 as isstart, 0 as isstop
.......FROM customer c)
......UNION ALL
........(SELECT MONTH(stop_date) as m, 0 as isstart, 1 as isstop
........FROM custommer c)) a
GROUP BY m
Now, in both of these, there are two pieces of processing, one for the starts and one for the stops. In almost any databases optimizer that I can think of, both these queries (and other, similar queries) require two passes through the data, one pass for the starts and one pass for the stops. And, regardless of the optimizer, the SQL statements describe two passes through the data.
The MapReduce framework has a more efficient, and perhaps, even more intuitive solution. The map phase can produce two output keys for each record:
- The first has a key that is MONTH(start_date) and the value is a structure containing isstart and isstop with values of 1 and 0 respectively.
- The second has a key that is MONTH(stop_date) and the value are 0 and 1 respectively.
How much better becomes apparent when we look in more detail at what is happening. When processing data, SQL limits us to one key per record for aggregation. MapReduce does not have this limitation. We can have as many keys as we like for each record.
This difference is particularly important when analyzing complex data structures to extract features -- of which processing text from web pages is an obvious example. To take another example, one way of doing content filtering of email for spam involves looking for suspicious words in the text and then building a scoring function based on those words (naive Bayesian models would be a typical approach).
Attempting to do this in MapReduce is quite simple. The map phase looks for each word and spits out a key value pair for that word (in this case, the key is actually the email id combined with the word). The reduce phase counts them all up. Either the reduce phase or a separate program can then apply the particular scoring code. Extending such a program to include more suspicious words is pretty simple.
Attempting to do this in SQL . . . well, I wouldn't do it in the base language. It would be an obfuscated mess of CASE statements or a non-equi JOIN on a separate word table. The MapReduce approach is simpler and more elegant in this case. Ironically, I usually describe the problem as "SQL is not good in handling text." However, I think the real problem is that "SQL aggregations are limited to one key per record."
SQL has many capabilities that MapReduce does not have. Over time, MapReduce may incorporate many of the positive features of SQL for analytic capabilities (and hopefully not incorporate unneeded overhead for transactions and the like). Today, SQL remains superior for most analytic needs -- and it can take advantage of parallelism without programming. I hope SQL will also evolve, bringing together the enhanced functionality from other approaches.
--gordon
Wednesday, January 23, 2008
Relational Databases for Analysis
The response to this article has, for the most part, been to defend MapReduce, which I find interesting because MapReduce is primarily useful for analytic applications. Both technologies make it possible to run large analytic tasks in parallel (taking advantage of multiple processors and multiple disks), without learning the details of parallel hardware and software. This makes both of them powerful for analytic purposes.
However, Professors Stonebraker and DeWitt make some points that are either wrong, or inconsequential with respect to using databases for complex queries and data warehousing.
(1) They claim that MapReduce lacks support for updates and transactions, implying that these are important for data analysis.
This is not true for complex analytic queries. Although updating data within a databases is very important for transactional systems, it is not at all important for analytic purposes and data warehousing. In fact, updates imply certain database features that can be quite detrimental to performance.
Updates imply row-level locking and logging. Both of these are activities that take up CPU and disk resources, but are not necessary for complex queries.
Updates also tend to imply that databases pages are only partially filled. This makes it possible to insert new data without splitting pages, which is useful in transactional systems. However, partially filled pages slow down queries that need to read large amounts of data.
Updates also work against vertical partitioning (also called columnar databases), where different columns of data are stored on different pages. This makes working on wide tables quite feasible, and is one of the tricks used by newer database vendors such as Netezza.
(2) They claim that MapReduce lacks indexing capabilities, implying that indexing is useful for data analysis.
One of the shortcomings of the MapReduce framework in comparison to SQL is that MapReduce does not facilitate joins. However, the major use of indexes for complex queries are for looking up values in smaller reference tables, which can often be done in memory. We can assume that all large tables require full table scans.
(3) MapReduce is incompatible with database tools, such as data mining tools.
The article actually sites Oracle Data Mining (which grew out of the Darwin project developed by Thinking Machines when I was there) and IBM Intelligent Miner. This latter reference is particular funny, because IBM has withdrawn this product from the market (see here). The article also fails to cite the most common of these tools, Microsoft SQL Server Data Mining, which is common because it is bundled with the database.
However, data mining within databases is not a technology that has taken off. One reason is pricing. Additional applications on database platforms often increase the need for hardware -- and more hardware often implies larger database costs. In any case, networks are quite fast and tools can access data in databases without having to be physically colocated with them. Serious data mining practitioners are usually using other tools, such as SAS, SPSS, S-Splus, or R.
By the way, I am not a convert to MapReduce (my most recent book is calld Data Analysis with SQL and Excel). Its major shortcoming is that it is a programming interface, and having to program detracts from solving business problems. SQL, for all its faults, is still much easier for most people to learn than Java or C++, and, if you do want to program, user-defined extensions can be quite beneficial. However, there are some tasks that I would not want to tackle in SQL, such as processing log files, and MapReduce is one scalable option for such processing.
--gordon
Monday, January 14, 2008
Data Mining to Prevent Airline Crashes
Airline crashes are extremely rare. Rare events pose a challenge in data mining. This article points out one solution which is to model a more common event which is sometimes a precursor to the very rare event of interest.
(Click the title of this post to go to the Washington Post article.)
Saturday, November 24, 2007
Constructing a Model Set for Reccuring Events
- Visits to a web page.
- Purchases of additional minutes for a pre-paid phone plan.
- Subscription renewals.
- Repeat purchases.
- Pregnancies.
- Incarcerations.
- Posts to a blog.
These are customers making multiple purchases during an observation window. Each time a customer makes a purchase, a transaction record is created. When we add this data to a table in the counting process style, each customer contributes several rows. There is a row for the time from time 0, which may be the time of the initial purchase, to the second purchase, a row for the time to each subsequent purchase, and a row for the time between the final observed purchase and the end of the observation period.
Depending on the style of analysis used, each event may be seen as starting a new time 0 with the number of previous events as a covariate, or each event may be modeled separately with a customer only becoming part of the at-risk pool for event n after experiencing event n-1.
Either way, it is important to include the final censored time period. This period does not correspond to any transaction, but customers are "at risk" for another purchase during that period.
My approach to creating the table is to first create the table without the censored observations, which is reasonably straightforward. Each of these rows contains a flag indicating it is a complete, uncensored observation. Next I create just the censored observations by creating an observation going from the latest observed purchase to the end of the observation period (in this case, 22May2006). The censored rows can then be appended to the uncensored rows. These could, of course, be turned into subqueries in order to avoid creating the temporary tables.
This fully expanded version of the data is what is referred to as the counting process style of input. In a realistic situation where there might be millions of customers, it makes more sense to group by tenure so that there is one row showing how many customers made a purchase with that tenure and how many customers experienced the tenure and so could have made a purchase. This is the data needed to estimate the intensity function.
In Gordon Linoff's book, Data Analysis Using SQL and Excel, he provides sample code for making a related, but different table using the data available on the book's companion page. I reproduce it here for reference.
The code uses the DATEDIFF function to subtract a household's first order date from all its other order dates to put things on the tenure timeline. It then counts the number of second (or third, or fourth, . . .) purchases that happen at each tenure. This query does not track the population at risk so it is not the actual intensity function, but it never the less gives a nice visual image of the way intensity peaks at yearly intervals as many customers make regular annual purchases, just as the purchasers of calendars in the previous posting did.