Thursday, April 21, 2011

Data Aggregation System

Introduction

DAS stands for Data Aggregation System. It is developed for CMS experiment at CERN. It is used to search and aggregate meta-data from various relational and non-relational data-services.

As you can imaging any big project carries a lot of data. In case of CMS we talk about PB scale. The associated meta-data is at the order of TB and spread across multiple data-services. Those data-services are usually developed using well known 3-tier architecture and backed up by various databases. At CMS we have variety of database back-ends, from ORACLE to SQLite, depending on scale and purpose of the application. Those data-services are developed by different (international) teams and dedicated to serve specific tasks in CMS. For example, the data bookkeeping system keeps tracks of meta-data produced by experiment, while condition database knows about detector operational details at certain period of time. It worth to mention that all of these data-services are developed using different languages and satisfy certain requirements and security policies. At the end, our users face out a problem of searching different kinds of meta-data for their needs. Such tasks include a data discovery part via browsing of data, aggregation tasks to make summaries, production tasks to schedule jobs, etc. As you can imaging very soon such tasks become very challenging with growing number of data services. The differences in data-formats, semantics become a large burden for non-expert users. Moreover our users want to see data correlation, consistency checks, etc. Well, you got the idea. How you can solve this problem? A naive user immediate respond: do it as google did. But the problem is that we can't index data-services (databases), right? So we went another route. We build DAS.

The idea ...

In nutshell, the DAS is a very simple. It works with data-providers who can answer certain questions. For instance, what is a weather forecast in certain region, or what are the gas prices in my area. The data providers knows such information (they probably stores it in RDMS) and if they are accessible in one form or another you can place queries against them. The DAS uses a query language (QL). It is intuitive enough for end-users and very close to free text based searches used in search engines. For example, zip=111, means I'm looking for specific information about zip code 111. You can say, common this is silly, google knows about it. Right, but it knows only what it indexes and no further. What if you'll impose a slightly different question: give me housing prices for prices at zip=111? Google will give you a bunch of documents about real estate, which means you spend your time going through many of those to find what you want.  So, we're trying to fill this gap. A simple query, e.g. zip=111, can be associated with different data-providers who can return a different information about it from their internal sources and DAS can aggregate it on demand.

DAS translates input query into a set of URL calls to participated data-providers and store output results into MongoDB for subsequent data look-up. Turns out MongoDB is excellent fit for this job. First, it provides a flexible QL. Second, it is schema-less document oriented database, which means that DAS can ask any data-service, parse its data and just store results without knowing meaning of the data. And finally, MongoDB is fast and can scale, see initial benchmarks. You may ask a question here, how DAS knows about participated data-providers, does it do auto-discovery, like WSDL? No, not really. DAS uses external maps (configuration) which provides mapping between DAS QL keys and associated data-providers, so it is configurable. You can configure DAS to talk to 2 data-providers or to 100, it is your choice and domain expertise. We use DAS within high-energy physics experiment, but DAS test suit are written against google data-services.

Fair enough, let's see how it works:


Very simple, isn't? An end-user places a query, DAS translates it into URL calls, collects required information and presents it to the user. Wait, but how does it make this translation? Well, it is very similar to the way people, who speak different languages are communicating among each other. You need a dictionary or translator! So far so good, the translator will translate input query into series of URL calls who can answer this query. Let's assume that we have 2 data providers, the weather and geo-location services. What is common among them. Well, they both provide information about specific region. Such region can be identified as city name or zip code. But their output will contain a different type of information, the weather service will tell you about current temperature/forecast for given city, while geo-location can tell you its latitude/longitude and population. What common between them is a city name (or similar identifier), right? So, if they both returns results with city name you can aggregate those results. Well, not so easy. What if their semantics are different? That means you need another translation, a translation from data-service output into DAS, who can aggregate the results. For instance, one data-service returns zip, while another returns zip_code. As human being we understand that this is identical information, but machine need to be trained. So we reach the point when DAS work-flow can be shown as following



The diagram actually shows that DAS works as data-retrieval system and can be used as a cache. Yes, that's right. It does not own the data, it get them from participated data-providers and can hold data in internal cache (MongoDB). If data providers provide a valid HTTP expires part of the header DAS can use it, otherwise valid defaults can be assigned at configuration time. But it's not a full story. We designed DAS to be intelligent cache. We run analytics services to analyze the user queries and use dedicated robots to pre-fetch most popular data on demand:


This diagram shows actual communication of DAS with CMS data-services (dbs, sitedb, phedex, etc.). Internally, DAS uses MongoDB for three purposes: to hold the data from participated providers in DAS cache db, to keep aggregated information in DAS merge db and perform analytics task by collecting information in DAS analytics db. The DAS Query Language (QL) maps directly into MongoDB QL and serves a dual role. It used to map input  query into data-provider's URL calls and at the same time to query results from underlying DAS DB's, for more information see [1].

I think its time to stop now. Next time we will talk about DAS QL.


References:
1. The CMS data aggregation system, V. Kuznetsov, D. Evans, S. Metson, International Conference on Computational Science, ICCS 2010.









Wednesday, September 15, 2010

DAS benchmarks

I want to evaluate the performance of CMS Data Aggregation System (DAS) using MongoDB back-end. I'll give a brief introduction of DAS workflow and present preliminary benchmark results.

Introduction

The DAS aggregates meta-data from several CMS data-services. The workflow is the following


The data has been requested by a user via DAS Query Language (QL). DAS-QL naturally mapped into MongoDB QL. The DAS workflow look-up results into DAS merge collection. If results are not found, it checks DAS raw API cache collection and aggregates data on demand, storing results into merge collection, for more details see [Ref]. The DAS consists of RESTful interface for DAS cache and web servers written in python within CherryPy web framework.

Testbed and tools

To do the benchmarking tests, I setup DAS cache server on 64-bit Linux node with 8 cores (2.33GHz) and 16 GB of RAM. Both MongoDB and DAS cache server resides on the same node. The test client was written in python and used multiprocessing module to spawn requests to DAS server. The reason I used custom python tool instead of available tools, e.g. apache benchmark, is to have the ability to generate random queries for N parallel clients talking to a single DAS server REST API.

Test data

For all tests I used ~50K datasets from CMS DBS system and ~450K block records from both DBS and Phedex CMS systems. All of them were populated into DAS cache up-front, since I was only interested in read tests (DAS have an ability to populate the cache).
The tests consist of three different types of queries:
  1. all clients use fixed value for DAS queries, e.g. dataset=/a/b/c or block=/a/b/c#123
  2. all clients use fixed pattern for DAS queries, e.g. dataset=/a* or block=/a*
  3. all clients use random patterns, e.g. dataset=/a*, dataset=/Z* or block=/a*, block=/Z*
Once the query has been placed into DAS cache server I retrieve only first record out of the result set and ship it back to the client. The respond time is measured as the total time DAS server spends for a particular query.

Benchmark results

First, I tested our CherryPy server and it can sustain a load of 500 parallel clients at the level of 10^-5 sec. Then I populated MongoDB with 50k dataset and 500k block records from DBS and Phedex CMS systems. I performed the read test of MongoDB and DAS using 1-500 parallel clients. During these tests I found several issues with my code, such as missing index, wrong usage of regular expressions and repetition of DAS merging step for non-existing queries. Once those issues were fixed I repeat the tests and found satisfactory results. So, to spice things up, I populated MongoDB with 10x and 100x of statistics and repeated the tests. The plot showing below represents comparison of DAS (red lines) versus MongoDB (blue lines) read tests for 50k (circles), 500k (down triangles), 5M (up triangles) and 50M (stars):
I found these results very satisfactory. As was expected MongoDB can easily sustain such load at the level of few mili-seconds. The DAS numbers also seems reasonable since DAS workflow is much more complicated. It includes DAS parsing, query analysis, analytics, etc.
The most important, the DAS performance seems to be driven by MongoDB back-end and has constant scale factor which can be tuned later.

Next I performed three tests discussed above, see criteria #1-3, with 50M blocks. Here is the plot
The curve with circles points represents test #1, i.e. fixed key-value, while top/down triangles represents pattern value and random pattern value, tests #2 and #3, respectively. As can be seen pattern tests are differ by the order of magnitude from fixed key-value, but almost identical among each other.

Finally, I tested DAS/MongoDB with random queries and random data access, by asking to return me a single record from entire collection (not only the first one as shown above). For that purpose I generated a random index and used idx/limit for MongoDB queries. Here is the results
The blue line shows MongoDB performance, while red shows the DAS. This time the difference between DAS and MongoDB is only one order of magnitude differ with respect to first shown plot and driven by DAS workflow.

For time being I would conclude this blog and continue testing DAS code.

References