Incremental Data Processing for BAM

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Incremental Data Processing for BAM

Sinthuja Ragendran

Hi all,

This mail thread to discuss about the $subject. BAM team had an internal discussion about the $subject, and the below approach is proposed.

Requirement

Currently BAM Hive script considers the entire data in a column family during the summarization process, and it summarizes repeatedly already summarized data.


Solution

To obtain this requirement we have some sub topics that we need to solve. There are three sub topics and I have elaborated them below in detail.


1) Representing the incremental processing on hive queries.

A column family can be processed by many hive queries, and each of them should incrementally process the data. Therefore we can't simply have one column in a row and mark it as processed, and ignore them for the next execution. And also there is a possibility of using more than one column family in a hive query. (Eg: join queries)

To cater this requirements annotation functionality can be used, which can do some pre-processing of Hive queries before they are being passed to the Hive engine.

Eg: @incremental(name='marker', tableNames='Table1,Table2,Table3,..')


Inline image 1

As shown in the above diagram, there will be a meta column family which stores the information such as markers name, column family name and the last processed event's time stamp. Before running the hive query, the last processed timestamp of the event will be retrieved for each column family name  and data received after this times stamp will be considered for the next summarization process in the respective column family.


2) Fetching new events from the cassandra which are not processed by the hive query

We can use the similar model as BAM 2.0 ALPHA design. The below diagram shows the proposed model.

Inline image 2

The data and index table will be stored as shown in the above diagram. Event Index column family will be created per Event table (ie, Per event stream). Since the row keys are not sorted, we are having special row called index_row where row keys will be stored as columns. In cassandra since the column keys are sorted, we can first do a range query on the index_row and get the respective row keys which has the newly arrived data. Then we can again do a range query in each rows that has time-stamp greater than time-stamp which is stored against for Event table name in Marker-Meta-Column family.



3) Incremental summarization process in Hive Engine

In above two steps of we have been able to get the unprocessed raw data by each queries. The summarised result from eariler execution and the current execution should be joined, and new result should be produced. For this we can have another meta information table in Hive meta store, which holds the result of last summarization execution result along with all fields it has in the query and some more additional information to do some reverse calculation. For example, if we consider that we are going to do a simple average calculation in a field from cassandra column family, then we can't only the store the final average value what we have obtained, we need to calculate how much rows which has contributed to find that average. So that we can find the final average by following calculation.

(last_average * last_total_rows_count)+(new_average * new_total_rows_count) / (last_total_rows_count+new_total_rows_count)

Therefore we need to store some additional information within the meta information table, and this additional information we require may vary for each hive operation. And we need to handle separately in each hive operation about the additional information we need to do the reverse calculation and produce the final result.

With this entire incremental data processing approach, we will also have some restrictions anyhow such as you cannot do rank, sort, etc which needs the entire data within the column family to produce the final result.

Suggestion are welcome. Please provide your feedback on above approach.


Thanks,
Sinthuja.



--
Sinthuja Rajendran
Software Engineer
WSO2, Inc.:http://wso2.com

Blog: http://sinthu-rajan.blogspot.com/
Mobile: +94774273955



_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Dipesh Chheda
Sinthuja Ragendran wrote
3) Incremental summarization process in Hive Engine

In above two steps of we have been able to get the unprocessed raw data by each queries. The summarised result from eariler execution and the current execution should be joined, and new result should be produced. For this we can have another meta information table in Hive meta store, which holds the result of last summarization execution result along with all fields it has in the query and some more additional information to do some reverse calculation. For example, if we consider that we are going to do a simple average calculation in a field from cassandra column family, then we can't only the store the final average value what we have obtained, we need to calculate how much rows which has contributed to find that average. So that we can find the final average by following calculation.

(last_average * last_total_rows_count)+(new_average * new_total_rows_count) / (last_total_rows_count+new_total_rows_count)

Therefore we need to store some additional information within the meta information table, and this additional information we require may vary for each hive operation. And we need to handle separately in each hive operation about the additional information we need to do the reverse calculation and produce the final result.
With this entire incremental data processing approach, we will also have some restrictions anyhow such as you cannot do rank, sort, etc which needs the entire data within the column family to produce the final result.
+1 on how it should be implemented and it would extremely helpful for the end users if the product handles it. However I can't figure out how its even possible and wondering should it be even part of the product.

As you rightly mentioned, metadata requirements may vary by each Hive Operation. So shouldn't be best handled in the hive scripts by the end users.

Say when an average is computed based on the data from 'Source CF' and stored in a 'Target CF', other relevant data points such as rows_count required for incremental average computation; should also be stored in 'Target CF'.

Hive query should join Source and Target CFs to compute 'regular average' for the new records and 'incremental average' for the existing records. Hopefully, it should be possible to implement this using HQL. I shall investigate more on that.
(As you may know, MySQL has INSERT...ON DUPLICATE KEY UPDATE... and Oracle has MERGE...; to achieve this in the relational world)

At the most, BAM may provide few UDFs relevant to Incremental Aggregation such as INCR_AVG(last avg, last_total...). And whatever else that will help to implement this using HQL.

And of course, one sample implementation always helps.
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Dipesh Chheda
In reply to this post by Sinthuja Ragendran
Sinthuja Ragendran wrote
1) Representing the incremental processing on hive queries.

A column family can be processed by many hive queries, and each of them
should incrementally process the data. Therefore we can't simply have one
column in a row and mark it as processed, and ignore them for the next
execution. And also there is a possibility of using more than one column
family in a hive query. (Eg: join queries)

 To cater this requirements annotation functionality can be used, which can
do some pre-processing of Hive queries before they are being passed to the
Hive engine.

Eg: @incremental(name='marker', tableNames='Table1,Table2,Table3,..')

[image: Inline image 1]

 As shown in the above diagram, there will be a meta column family which
stores the information such as markers name, column family name and the
last processed event's time stamp. Before running the hive query, the last
processed timestamp of the event will be retrieved for each column family
name  and data received after this times stamp will be considered for the
next summarization process in the respective column family.
+1, and ability to induce 'buffer duration' (delay) that guarantees data processing of source/raw data points does not start until all potential data points from the source are made available. This could be one possible way to prevent incremental aggregation scenarios as long as delay and potential consequences (risk) are acceptable to the stakeholders.

*Sample Use Case*

 - Compute average by 1 minute rollup window (group by 1 min)
 - Possible delay from the event data generation to persistence: up to 20 sec
 - As per the requirements, Business users are okay with delay up to 15 mins on the availability of the analyzed data by 1 minute rollup window
 - Script scheduler: Executes every minute

Eg: @incremental(name='marker', tableNames='Table1,Table2,Table3,..', buffer=10)
 - where, new parameter $buffer is in minutes i.e. 10 minutes

So as per the annotation in the script:
  $buffer = 10 minutes

And, assuming:
  $currentTime = System time during execution = 2013-04-20_14:01:05
  %lastProcessedTime = Timestamp of the Last Processed event = 2013-04-20_13:51:00

Based on the preconditions defined above, the script should process event records, whose event_time >= $fromTime AND < $toTime.
Where,
  $fromTime = $lastProcessedTime = 2013-04-20_13:51:00
  $toTime = EndTime of 1 min Rollup Window on ($currentTime - $buffer) = 1min_Bucket_EndTime(13:51:05) = 2013-04-20_13:52:00

Sinthuja Ragendran wrote
2) Fetching new events from the cassandra which are not processed by the hive query
...
...
The data and index table will be stored as shown in the above diagram.
I am assuming that Column Key of Event-Index-Table is shown and described as 'Timestamp' type for the sake of simplicity and readability. In actual implementation, the column key would be 'TimeUUID' type (for the reasons described here).

Also, Event-Index-Table could be used for the time-range filter conditions (as Custom Index).

Although not explicitly mentioned, I am assuming that 'Purging' would also take care of Event-Index-Table. It may not be required to be part of Archiving but wouldn't hurt if included.
Reply | Threaded
Open this post in threaded view
|

Fwd: Incremental Data Processing for BAM

Sinthuja Ragendran
In reply to this post by Dipesh Chheda
Hi Dipesh,

(sending this mail again as my first attempt of this mail was bounced)

First of all thanks for your feedback and suggestions. See my comments in-line.

On Sat, Apr 20, 2013 at 7:27 AM, dipeshchheda wrote:
Sinthuja Ragendran wrote
> 3) Incremental summarization process in Hive Engine
>
> In above two steps of we have been able to get the unprocessed raw data by
> each queries. The summarised result from eariler execution and the current
> execution should be joined, and new result should be produced. For this we
> can have another meta information table in Hive meta store, which holds
> the result of last summarization execution result along with all fields it
> has in the query and some more additional information to do some reverse
> calculation. For example, if we consider that we are going to do a simple
> average calculation in a field from cassandra column family, then we can't
> only the store the final average value what we have obtained, we need to
> calculate how much rows which has contributed to find that average. So
> that we can find the final average by following calculation.
>
> (last_average * last_total_rows_count)+(new_average *
> new_total_rows_count) / (last_total_rows_count+new_total_rows_count)
>
> Therefore we need to store some additional information within the meta
> information table, and this additional information we require may vary for
> each hive operation. And we need to handle separately in each hive
> operation about the additional information we need to do the reverse
> calculation and produce the final result.
> With this entire incremental data processing approach, we will also have
> some restrictions anyhow such as you cannot do rank, sort, etc which needs
> the entire data within the column family to produce the final result.

+1 on how it should be implemented and it would extremely helpful for the
end users if the product handles it. However I can't figure out how its even
possible and wondering should it be even part of the product.
 
As I described in my mail, the end user will be indicating whether a particular hive query is going to be processed incrementally via annotation (explained as topic-1 in the solution). And with the annotation we'll pass the names of the tables that need to be handled  incrementally. Ie, There may be lot of tables involved with in hive query (Eg: join query), and user will have the freedom to indicate the table names of which the data should be analyzed incrementally. Then according to this configuration, new/all data from respective tables will be fetched  and sent to Hive engine to analysis. And then the final result will be obtained as described in the 3-rd topic.
 

As you rightly mentioned, metadata requirements may vary by each Hive
Operation. So shouldn't be best handled in the hive scripts by the end
users.

Say when an average is computed based on the data from 'Source CF' and
stored in a 'Target CF', other relevant data points such as rows_count
required for incremental average computation; should also be stored in
'Target CF'.
 
Since this Target CF is what user is expecting to store the final result, IMHO he/she won't expect some  meta data  such as row_count, etc which is only used for hive internal processing to be stored in and exposed. And IMHO it may confuse the user and better to store that in another meta table in the Hive meta store where we also have the freedom of manipulating it without affecting the user's final target table.
 

Hive query should join Source and Target CFs to compute 'regular average'
for the new records and 'incremental average' for the existing records.
Hopefully, it should be possible to implement this using HQL. I shall
investigate more on that.
(As you may know, MySQL has INSERT...ON DUPLICATE KEY UPDATE... and Oracle
has MERGE...; to achieve this in the relational world)
 
I'm not clear on what do you mean by joining source and target tables. Can you elaborate bit more on the approach that you are suggesting?

At the most, BAM may provide few UDFs relevant to Incremental Aggregation
such as INCR_AVG(last avg, last_total...). And whatever else that will help
to implement this using HQL.

+1 for having new UDF functions for the incremental analysis. This enables to perform incremental(Eg: INCR_AVG) and regular (Eg: AVG) operations, within same hive query if user would like to have. And also it'll clearly represented by the query language what operations are handled incrementally and what are not.
But here one column family can be accessed by many hive queries for processing, therefore only introducing the custom UDF for incremental analysis won't be sufficient. We need some pointers/markers for each queries to differentiate the queries and isolate the data processed by each queries on the column family.  (similar to the topic-1).
 

And of course, one sample implementation always helps.

Thanks,
Sinthuja.



--
View this message in context: http://wso2-oxygen-tank.10903.n7.nabble.com/Incremental-Data-Processing-for-BAM-tp77582p77600.html
Sent from the WSO2 Architecture mailing list archive at Nabble.com.
_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture



--
Sinthuja Rajendran
Software Engineer
WSO2, Inc.:http://wso2.com

Blog: http://sinthu-rajan.blogspot.com/
Mobile: <a href="tel:%2B94774273955" value="+94774273955" target="_blank">+94774273955



--
Sinthuja Rajendran
Software Engineer
WSO2, Inc.:http://wso2.com

Blog: http://sinthu-rajan.blogspot.com/
Mobile: +94774273955



_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Sinthuja Ragendran
In reply to this post by Dipesh Chheda

Hi dipesh,

Thanks again for some valuable suggestions.

On Sat, Apr 20, 2013 at 2:50 PM, dipeshchheda wrote:
Sinthuja Ragendran wrote
> 1) Representing the incremental processing on hive queries.
>
> A column family can be processed by many hive queries, and each of them
> should incrementally process the data. Therefore we can't simply have one
> column in a row and mark it as processed, and ignore them for the next
> execution. And also there is a possibility of using more than one column
> family in a hive query. (Eg: join queries)
>
>  To cater this requirements annotation functionality can be used, which
> can
> do some pre-processing of Hive queries before they are being passed to the
> Hive engine.
>
> Eg: @incremental(name='marker', tableNames='Table1,Table2,Table3,..')
>
> [image: Inline image 1]
>
>  As shown in the above diagram, there will be a meta column family which
> stores the information such as markers name, column family name and the
> last processed event's time stamp. Before running the hive query, the last
> processed timestamp of the event will be retrieved for each column family
> name  and data received after this times stamp will be considered for the
> next summarization process in the respective column family.

+1, and ability to induce 'buffer duration' (delay) that guarantees data
processing of source/raw data points does not start until all potential data
points from the source are made available. This could be one possible way to
prevent incremental aggregation scenarios as long as delay and potential
consequences (risk) are acceptable to the stakeholders.

*Sample Use Case*

 - Compute average by 1 minute rollup window (group by 1 min)
 - Possible delay from the event data generation to persistence: up to 20
sec
 - As per the requirements, Business users are okay with delay up to 15 mins
on the availability of the analyzed data by 1 minute rollup window
 - Script scheduler: Executes every minute

Eg: @incremental(name='marker', tableNames='Table1,Table2,Table3,..',
buffer=10)
 - where, new parameter $buffer is in minutes i.e. 10 minutes

So as per the annotation in the script:
  $buffer = 10 minutes

And, assuming:
  $currentTime = System time during execution = 2013-04-20_14:01:05
  %lastProcessedTime = Timestamp of the Last Processed event =
2013-04-20_13:51:00

Based on the preconditions defined above, the script should process event
records, whose event_time >= $fromTime AND < $toTime.
Where,
  $fromTime = $lastProcessedTime = 2013-04-20_13:51:00
  $toTime = EndTime of 1 min Rollup Window on ($currentTime - $buffer) =
1min_Bucket_EndTime(13:51:05) = 2013-04-20_13:52:00

+1, we can introduce an additional parameter and make it as non-mandatory such as if the user wanted to have such a buffer he/she can have it depends on the usecase.



Sinthuja Ragendran wrote
> 2) Fetching new events from the cassandra which are not processed by the
> hive query
> ...
> ...
> The data and index table will be stored as shown in the above diagram.

I am assuming that Column Key of Event-Index-Table is shown and described as
'Timestamp' type for the sake of simplicity and readability. In actual
implementation, the column key would be 'TimeUUID' type (for the reasons
described  here
<http://grokbase.com/t/cassandra/user/116fvhs2s4/when-does-it-make-sense-to-use-timeuuid>
).

+1 and Yes, it's mentioned as 'Timestamp' sake of simplicity.

Also, Event-Index-Table could be used for the time-range filter conditions
(as Custom Index).

Although not explicitly mentioned, I am assuming that 'Purging' would also
take care of Event-Index-Table. It may not be required to be part of
Archiving but wouldn't hurt if included.

+1, we can provide the purging of event-index-table as part of archiving.

Thanks,
Sinthuja.



--
View this message in context: http://wso2-oxygen-tank.10903.n7.nabble.com/Incremental-Data-Processing-for-BAM-tp77582p77603.html
Sent from the WSO2 Architecture mailing list archive at Nabble.com.
_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture



--
Sinthuja Rajendran
Software Engineer
WSO2, Inc.:http://wso2.com

Blog: http://sinthu-rajan.blogspot.com/
Mobile: +94774273955



_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental Data Processing for BAM

Dipesh Chheda
In reply to this post by Sinthuja Ragendran
Sinthuja Ragendran wrote
On Sat, Apr 20, 2013 at 7:27 AM, dipeshchheda wrote:

> Sinthuja Ragendran wrote
> > 3) Incremental summarization process in Hive Engine
> >
> > In above two steps of we have been able to get the unprocessed raw data
> by
> > each queries. The summarised result from eariler execution and the
> current
> > execution should be joined, and new result should be produced. For this
> we
> > can have another meta information table in Hive meta store, which holds
> > the result of last summarization execution result along with all fields
> it
> > has in the query and some more additional information to do some reverse
> > calculation. For example, if we consider that we are going to do a simple
> > average calculation in a field from cassandra column family, then we
> can't
> > only the store the final average value what we have obtained, we need to
> > calculate how much rows which has contributed to find that average. So
> > that we can find the final average by following calculation.
> >
> > (last_average * last_total_rows_count)+(new_average *
> > new_total_rows_count) / (last_total_rows_count+new_total_rows_count)
> >
> > Therefore we need to store some additional information within the meta
> > information table, and this additional information we require may vary
> for
> > each hive operation. And we need to handle separately in each hive
> > operation about the additional information we need to do the reverse
> > calculation and produce the final result.
> > With this entire incremental data processing approach, we will also have
> > some restrictions anyhow such as you cannot do rank, sort, etc which
> needs
> > the entire data within the column family to produce the final result.
>
> +1 on how it should be implemented and it would extremely helpful for the
> end users if the product handles it. However I can't figure out how its
> even
> possible and wondering should it be even part of the product.
>

As I described in my mail, the end user will be indicating whether a
particular hive query is going to be processed incrementally via annotation
(explained as topic-1 in the solution). And with the annotation we'll pass
the names of the tables that need to be handled  incrementally. Ie, There
may be lot of tables involved with in hive query (Eg: join query), and user
will have the freedom to indicate the table names of which the data should
be analyzed incrementally. Then according to this configuration, new/all
data from respective tables will be fetched  and sent to Hive engine to
analysis. And then the final result will be obtained as described in the
3-rd topic.
Yes, I think I got that. It was nicely covered in sub topics #1 and #2, and I completely agree with that.

My above comments are ONLY for subtopic #3. And even for #3, I completely agree on how 'Incremental Aggregation' should be implemented. However, I doubt whether it can be productized to support several different scenarios. (*I=devil's advocate, I would be more than happy if I'm wrong here)

Mainly, I wanted to understand:
1) In the annotation, the user specifies just marker and table names. Then, how metadata attributes required for incremental aggregation (say Rows_Count for AVG) can be derived implicitly, if it is not explicitly defined/provided in the hive script by the end user.
2) Where this 'metadata' would be stored? (this is now covered in your next comment)

Sinthuja Ragendran wrote
>
> As you rightly mentioned, metadata requirements may vary by each Hive
> Operation. So shouldn't be best handled in the hive scripts by the end
> users.
>
> Say when an average is computed based on the data from 'Source CF' and
> stored in a 'Target CF', other relevant data points such as rows_count
> required for incremental average computation; should also be stored in
> 'Target CF'.
>

Since this Target CF is what user is expecting to store the final result,
IMHO he/she won't expect some  meta data  such as row_count, etc which is
only used for hive internal processing to be stored in and exposed. And
IMHO it may confuse the user and better to store that in another meta table
in the Hive meta store where we also have the freedom of manipulating it
without affecting the user's final target table.
+1 on keeping users away from the complexity as much as possible.

Wondering, Is Hive Meta Store the right place ( (MySQL in production) to persist this data?
Wouldn't that become a bottleneck while dealing with 'big data'?

Let me clarity that I see two types of metadata, as per your proposed design:

metadata1=incremental_processing:
    Stores what is described in subtopic #1 and #2 i.e. Markers + Tables + Last_Processed_Event_Timestamp, Time based Custom Index etc.

metadata2=incremental_aggregation:
    Stores Rows_Count like attributes that are required for incremental aggregation.

I was concerned on the feasibility to productize 'metadata2'. And now if you are proposing it to be stored in Hive Meta Store, I am concerned on the scalability of 'metadata2'

With metadata2:
For 'X' number of records inserted/updated by Hive script into TargetCF, the same number of metadata records (with attributes such as rows_count) will have to be persisted. (unless I have completely missed your point, big time)

So if the value of 'X' goes above few hundred thousands to millions, I doubt whether Hive Meta Store (RDBMS) can keep up with Cassandra.

IMHO, both should be stored in Cassandra.

If you see I am completely off track here and it will be easier to discuss this over a WebEx, I can setup one at your convenient time. It's up to you.

(Thanks for your patience!!)

Sinthuja Ragendran wrote
>
> Hive query should join Source and Target CFs to compute 'regular average'
> for the new records and 'incremental average' for the existing records.
>
Hopefully, it should be possible to implement this using HQL. I shall
> investigate more on that.
> (As you may know, MySQL has INSERT...ON DUPLICATE KEY UPDATE... and Oracle
> has MERGE...; to achieve this in the relational world)
>

I'm not clear on what do you mean by joining source and target tables. Can
you elaborate bit more on the approach that you are suggesting?
This is to support design where the incremental aggregation is implemented 'explicitly' by the end user (i.e. not by the product).
However, with the approach you are proposing, my comment is irrelevant for now so we can come back to this only if need to.
Nevertheless, please let me know if you still would like to elaborate more on that.

Sinthuja Ragendran wrote
> At the most, BAM may provide few UDFs relevant to Incremental Aggregation
> such as INCR_AVG(last avg, last_total...). And whatever else that will help
> to implement this using HQL.
>

+1 for having new UDF functions for the incremental analysis. This enables
to perform incremental(Eg: INCR_AVG) and regular (Eg: AVG) operations,
within same hive query if user would like to have. And also it'll clearly
represented by the query language what operations are handled incrementally
and what are not.
Oh yes, I didn't think of that advantage.

Sinthuja Ragendran wrote
But here one column family can be accessed by many hive queries for
processing, therefore only introducing the custom UDF for incremental
analysis won't be sufficient. We need some pointers/markers for each
queries to differentiate the queries and isolate the data processed by each
queries on the column family.  (similar to the topic-1).
+1 and again, We need all of subtopics #1 and #2. I didn't mean to discount that.
Just for #3, I thought it would be better if the
product only provides 'Custom UDFs' and rest of the implementation is handled by the end user in the hive scripts.
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental Data Processing for BAM

Sinthuja Ragendran
Hi Dipesh,

On Mon, Apr 22, 2013 at 11:52 AM, dipeshchheda  wrote:
Sinthuja Ragendran wrote
> On Sat, Apr 20, 2013 at 7:27 AM, dipeshchheda wrote:
>
>> Sinthuja Ragendran wrote
>> > 3) Incremental summarization process in Hive Engine
>> >
>> > In above two steps of we have been able to get the unprocessed raw data
>> by
>> > each queries. The summarised result from eariler execution and the
>> current
>> > execution should be joined, and new result should be produced. For this
>> we
>> > can have another meta information table in Hive meta store, which holds
>> > the result of last summarization execution result along with all fields
>> it
>> > has in the query and some more additional information to do some
>> reverse
>> > calculation. For example, if we consider that we are going to do a
>> simple
>> > average calculation in a field from cassandra column family, then we
>> can't
>> > only the store the final average value what we have obtained, we need
>> to
>> > calculate how much rows which has contributed to find that average. So
>> > that we can find the final average by following calculation.
>> >
>> > (last_average * last_total_rows_count)+(new_average *
>> > new_total_rows_count) / (last_total_rows_count+new_total_rows_count)
>> >
>> > Therefore we need to store some additional information within the meta
>> > information table, and this additional information we require may vary
>> for
>> > each hive operation. And we need to handle separately in each hive
>> > operation about the additional information we need to do the reverse
>> > calculation and produce the final result.
>> > With this entire incremental data processing approach, we will also
>> have
>> > some restrictions anyhow such as you cannot do rank, sort, etc which
>> needs
>> > the entire data within the column family to produce the final result.
>>
>> +1 on how it should be implemented and it would extremely helpful for the
>> end users if the product handles it. However I can't figure out how its
>> even
>> possible and wondering should it be even part of the product.
>>
>
> As I described in my mail, the end user will be indicating whether a
> particular hive query is going to be processed incrementally via
> annotation
> (explained as topic-1 in the solution). And with the annotation we'll pass
> the names of the tables that need to be handled  incrementally. Ie, There
> may be lot of tables involved with in hive query (Eg: join query), and
> user
> will have the freedom to indicate the table names of which the data should
> be analyzed incrementally. Then according to this configuration, new/all
> data from respective tables will be fetched  and sent to Hive engine to
> analysis. And then the final result will be obtained as described in the
> 3-rd topic.

Yes, I think I got that. It was nicely covered in sub topics #1 and #2, and
I completely agree with that.

My above comments are ONLY for subtopic #3. And even for #3, I completely
agree on how 'Incremental Aggregation' should be implemented. However, I
doubt whether it can be productized to support several different scenarios.
(*I=devil's advocate, I would be more than happy if I'm wrong here)

Mainly, I wanted to understand:
1) In the annotation, the user specifies just marker and table names. Then,
how metadata attributes required for incremental aggregation (say Rows_Count
for AVG) can be derived implicitly, if it is not explicitly defined/provided
in the hive script by the end user.
 
Our plan is to implement this implicitly within the hive core operations (AVG, etc) at the code level and introduce some additional attributes to be stored retrieved depending on the operations used in the hive query. As you suggested having custom UDF will be more better way of handling this, and I believe it'll reduce the extend we have to interfere in hive code considerably. But anyhow for #3 we need to go to the code level of hive and do some alterations required.
 
2) Where this 'metadata' would be stored? (this is now covered in your next
comment)


Initially our plan is to store the Meta data of the final result in Hive meta store, which is MySQL server in production. But this design was initialised  with the assumption of it won't have a massive amount of data. I see a valid argument of your scenario, therefore we can go ahead of having this in Cassandra rather hive meta store.

Sinthuja Ragendran wrote
>>
>> As you rightly mentioned, metadata requirements may vary by each Hive
>> Operation. So shouldn't be best handled in the hive scripts by the end
>> users.
>>
>> Say when an average is computed based on the data from 'Source CF' and
>> stored in a 'Target CF', other relevant data points such as rows_count
>> required for incremental average computation; should also be stored in
>> 'Target CF'.
>>
>
> Since this Target CF is what user is expecting to store the final result,
> IMHO he/she won't expect some  meta data  such as row_count, etc which is
> only used for hive internal processing to be stored in and exposed. And
> IMHO it may confuse the user and better to store that in another meta
> table
> in the Hive meta store where we also have the freedom of manipulating it
> without affecting the user's final target table.

+1 on keeping users away from the complexity as much as possible.

Wondering, Is Hive Meta Store the right place ( (MySQL in production) to
persist this data?
Wouldn't that become a bottleneck while dealing with 'big data'?

Let me clarity that I see two types of metadata, as per your proposed
design:

metadata1=incremental_processing:
    Stores what is described in subtopic #1 and #2 i.e. Markers + Tables +
Last_Processed_Event_Timestamp, Time based Custom Index etc.

It'll stored in cassandra in different Keyspace such as META_HIVE_KS. we'll make this configurable also. 


metadata2=incremental_aggregation:
    Stores Rows_Count like attributes that are required for incremental
aggregation.

As agreed above, we'll have this table in cassandra in the same META_HIVE_KS.
 
 

I was concerned on the feasibility to productize 'metadata2'. And now if you
are proposing it to be stored in Hive Meta Store, I am concerned on the
scalability of 'metadata2'

With metadata2:
For 'X' number of records inserted/updated by Hive script into TargetCF, the
same number of metadata records (with attributes such as rows_count) will
have to be persisted. (unless I have completely missed your point, big time)

So if the value of 'X' goes above few hundred thousands to millions, I doubt
whether Hive Meta Store (RDBMS) can keep up with Cassandra.

IMHO, both should be stored in Cassandra.
 
+1. I certainly agree in your point as mentioned above. 

If you see I am completely off track here and it will be easier to discuss
this over a WebEx, I can setup one at your convenient time. It's up to you.


I think you have a really good understanding and gave good inputs and suggestions. Thanks for all your suggestions.  And since the mailing list covers large amount of users, devs, they also can provide some inputs if we discuss here.
 

(Thanks for your patience!!)


Sinthuja Ragendran wrote
>>
>> Hive query should join Source and Target CFs to compute 'regular average'
>> for the new records and 'incremental average' for the existing records.
>>
> Hopefully, it should be possible to implement this using HQL. I shall
>> investigate more on that.
>> (As you may know, MySQL has INSERT...ON DUPLICATE KEY UPDATE... and
>> Oracle
>> has MERGE...; to achieve this in the relational world)
>>
>
> I'm not clear on what do you mean by joining source and target tables. Can
> you elaborate bit more on the approach that you are suggesting?

This is to support design where the incremental aggregation is implemented
'explicitly' by the end user (i.e. not by the product).
However, with the approach you are proposing, my comment is irrelevant for
now so we can come back to this only if need to.
Nevertheless, please let me know if you still would like to elaborate more
on that.

IMHO implicit way is more better in user's perspective,therefore we'll try to implement it implicitly. If we encounter any major problems during POC, we'll look into the explicit approach you suggest.
 


Sinthuja Ragendran wrote
>> At the most, BAM may provide few UDFs relevant to Incremental Aggregation
>> such as INCR_AVG(last avg, last_total...). And whatever else that will
>> help
>> to implement this using HQL.
>>
>
> +1 for having new UDF functions for the incremental analysis. This enables
> to perform incremental(Eg: INCR_AVG) and regular (Eg: AVG) operations,
> within same hive query if user would like to have. And also it'll clearly
> represented by the query language what operations are handled
> incrementally
> and what are not.

Oh yes, I didn't think of that advantage.


Sinthuja Ragendran wrote
> But here one column family can be accessed by many hive queries for
> processing, therefore only introducing the custom UDF for incremental
> analysis won't be sufficient. We need some pointers/markers for each
> queries to differentiate the queries and isolate the data processed by
> each
> queries on the column family.  (similar to the topic-1).

+1 and again, We need all of subtopics #1 and #2. I didn't mean to discount
that.
Just for #3, I thought it would be better if the
product only provides 'Custom UDFs' and rest of the implementation is
handled by the end user in the hive scripts.


Thanks,
Sinthuja.


--
View this message in context: http://wso2-oxygen-tank.10903.n7.nabble.com/Incremental-Data-Processing-for-BAM-tp77582p77634.html
Sent from the WSO2 Architecture mailing list archive at Nabble.com.
_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture



--
Sinthuja Rajendran
Software Engineer
WSO2, Inc.:http://wso2.com

Blog: http://sinthu-rajan.blogspot.com/
Mobile: <a href="tel:%2B94774273955" value="+94774273955" target="_blank">+94774273955



_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Incremental Data Processing for BAM

Dipesh Chheda
Hi Sinthuja,

Sinthuja Ragendran wrote
Our plan is to implement this implicitly within the hive core operations
(AVG, etc) at the code level and introduce some additional attributes to be
stored retrieved depending on the operations used in the hive query. As you
suggested having custom UDF will be more better way of handling this, and I
believe it'll reduce the extend we have to interfere in hive code
considerably. But anyhow for #3 we need to go to the code level of hive and
do some alterations required.
I see. With limited knowledge of Hive at the code level, it was/is difficult for me to understand this. So I would rather just wait for the release now :)

Sinthuja Ragendran wrote
It'll stored in cassandra in different Keyspace such as META_HIVE_KS. we'll
make this configurable also.

> metadata2=incremental_aggregation:
>     Stores Rows_Count like attributes that are required for incremental
> aggregation.
>

As agreed above, we'll have this table in cassandra in the same
META_HIVE_KS.
Great!!

Sinthuja Ragendran wrote
IMHO implicit way is more better in user's perspective,therefore we'll try
to implement it implicitly. If we encounter any major problems during POC,
we'll look into the explicit approach you suggest.
+1.


Thanks a lot for your help and time!!

Regards,
Dipesh
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Dipesh Chheda
In reply to this post by Sinthuja Ragendran
Sinthuja Ragendran wrote
 *Requirement *

Currently BAM Hive script considers the entire data in a column family
during the summarization process, and it summarizes repeatedly already
summarized data.

 *Solution*
...

1) Representing the incremental processing on hive queries.
...
...
 2) Fetching new events from the cassandra which are not processed by the
hive query
...
...
Hi Sinthuja,

I am planning (would like) to use 'incremental processing' (i.e. sub topics #1 and #2) for 'non summarization' processing such as maintaining 'Custom Indexes' to support required filter/sort conditions on data retrieval.

**E.g. Use Case**

*Requirements*
  - Central Data Store to persist Log Data (log4j etc.) from various services
  - Log Viewer should support filter by 'LogTime' or 'Severity' or 'LogTime + Severity'

*Solution*
  - Log data will be persisted using BAM into Cassandra (CF Name: LogStreamCF)
  - Hive script will prepare Custom Index data on all possible permutation of 'LogTime' and 'Severity'
  - To achieve Incremental Processing, Hive script will use following annotation:
    @incremental(name='LogIndexMarker', tableNames='LogStreamCF')
  - DSS and Gadgets ... (details not required for this discussion)

*Question*
Will Incremental Processing still work as described in sub topics #1 and #2 for this use case, even though there is no 'summarization' involve?
Please confirm.

Thanks,
Dipesh
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Sinthuja Ragendran
Hi Dipesh,

sorry for the delayed response.


On Mon, Apr 22, 2013 at 4:48 PM, Dipesh Chheda wrote:
Sinthuja Ragendran wrote
>  *Requirement *
>
> Currently BAM Hive script considers the entire data in a column family
> during the summarization process, and it summarizes repeatedly already
> summarized data.
>
>  *Solution*
> ...
>
> 1) Representing the incremental processing on hive queries.
> ...
> ...
>  2) Fetching new events from the cassandra which are not processed by the
> hive query
> ...
> ...

Hi Sinthuja,

I am planning (would like) to use 'incremental processing' (i.e. sub topics
#1 and #2) for 'non summarization' processing such as maintaining 'Custom
Indexes' to support required filter/sort conditions on data retrieval.

**E.g. Use Case**

*Requirements*
  - Central Data Store to persist Log Data (log4j etc.) from various
services
  - Log Viewer should support filter by 'LogTime' or 'Severity' or 'LogTime
+ Severity'

*Solution*
  - Log data will be persisted using BAM into Cassandra (CF Name:
LogStreamCF)
  - Hive script will prepare Custom Index data on all possible permutation
of 'LogTime' and 'Severity'
  - To achieve Incremental Processing, Hive script will use following
annotation:
    @incremental(name='LogIndexMarker', tableNames='LogStreamCF')
  - DSS and Gadgets ... (details not required for this discussion) 

*Question*
Will Incremental Processing still work as described in sub topics #1 and #2
for this use case, even though there is no 'summarization' involve?
Please confirm.

Basically you are going to manipulate the LogStreamCF with simple select statements rather doing any summarization such as AVG,SUM, etc and in that case you need newly added data from the column family rather the whole/old data. Is this your requirement?

If so, yes it'll only retrieve the new data not the whole/old data. When you are trying to do some incremental summarization such as INCR_AVG, INCR_SUM, etc, the topic #3 will be coming into the processing logic and will be trying to consolidate the summarization with previous summarization.

Thanks,
Sinthuja.

Thanks,
Dipesh



--
View this message in context: http://wso2-oxygen-tank.10903.n7.nabble.com/Incremental-Data-Processing-for-BAM-tp77582p77648.html
Sent from the WSO2 Architecture mailing list archive at Nabble.com.
_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture



--
Sinthuja Rajendran
Software Engineer
WSO2, Inc.:http://wso2.com

Blog: http://sinthu-rajan.blogspot.com/
Mobile: +94774273955



_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Dipesh Chheda
Sinthuja Ragendran wrote
Hi Dipesh,

sorry for the delayed response.


On Mon, Apr 22, 2013 at 4:48 PM, Dipesh Chheda wrote:

> Sinthuja Ragendran wrote
> >  *Requirement *
> >
> > Currently BAM Hive script considers the entire data in a column family
> > during the summarization process, and it summarizes repeatedly already
> > summarized data.
> >
> >  *Solution*
> > ...
> >
> > 1) Representing the incremental processing on hive queries.
> > ...
> > ...
> >  2) Fetching new events from the cassandra which are not processed by the
> > hive query
> > ...
> > ...
>
> Hi Sinthuja,
>
> I am planning (would like) to use 'incremental processing' (i.e. sub topics
> #1 and #2) for 'non summarization' processing such as maintaining 'Custom
> Indexes' to support required filter/sort conditions on data retrieval.
>
> **E.g. Use Case**
>
> *Requirements*
>   - Central Data Store to persist Log Data (log4j etc.) from various
> services
>   - Log Viewer should support filter by 'LogTime' or 'Severity' or 'LogTime
> + Severity'
>
> *Solution*
>   - Log data will be persisted using BAM into Cassandra (CF Name:
> LogStreamCF)
>   - Hive script will prepare Custom Index data on all possible permutation
> of 'LogTime' and 'Severity'
>   - To achieve Incremental Processing, Hive script will use following
> annotation:
>     @incremental(name='LogIndexMarker', tableNames='LogStreamCF')
>   - DSS and Gadgets ... (details not required for this discussion)


> *Question*
> Will Incremental Processing still work as described in sub topics #1 and #2
> for this use case, even though there is no 'summarization' involve?
> Please confirm.
>

Basically you are going to manipulate the LogStreamCF with simple select
statements rather doing any summarization such as AVG,SUM, etc and in that
case you need newly added data from the column family rather the whole/old
data. Is this your requirement?
Yes.

Sinthuja Ragendran wrote
If so, yes it'll only retrieve the new data not the whole/old data.
Great!!

Sinthuja Ragendran wrote
When you are trying to do some incremental summarization such as INCR_AVG,
INCR_SUM, etc, the topic #3 will be coming into the processing logic and
will be trying to consolidate the summarization with previous summarization.

Thanks,
Sinthuja.
Again, thanks for your time and help!!

Regards,
Dipesh
Reply | Threaded
Open this post in threaded view
|

Re: Incremental Data Processing for BAM

Srinath Perera-3
Hi Sinthuja, 

Couple of comments. 

As Dipesh mentioned, meta store is not the right place the keep intermediate results. It should be in cassandra. 

Also we have to think a bit on difference between "Incremental processing" vs. "only look at last  hours data"

Note #2 is very much like CEP windows vs. "Incremental processing" is an optimization. 

--Srinath

_______________________________________________
Architecture mailing list
[hidden email]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture