Monday, 24 October 2016

Hadoop

Hadoop Ecosystem

HDFS - Hadoop Distributed File System
YARN - Yet Another Resource Negotiator



It's an Ecosystem of applications each having their unique role in moving, storing, and computing data.

Sqoop, Flume, Avro
Use to import and export the data depending on what kind of data it is.
HDFS [Hadoop Distributed File System]
- data get store in HDFS
MapReduce -
-is a standard computation engine for hadoop, which is how we make calculations on data and manipulate it.
O0zie
-is used schedule jobs for completion

HDFS
- file system for Hadoop
- spans all nodes in a cluster
-stores data in 64Meg chunks on multiple servers

YARN
-controls access to cluster resources
-new in Hadoop v2
-allows multiple compute engines to run (MapReduce, Spark, Tez, so on)



Sqoop [SQl on hadOOP]
- a tool for retrieving data from databases
- pulls data from relational databases
- stores on HDFS or imports data directly to a table in Hive
- uses drivers which we need to download as per the database we use.




Flume - a tool for pulling a constant stream of data
- used to import streaming data (server logs, tweets, and so on)
- only a transport agent
- Buffered
- 3 parts:
Source - that accepts the data
Channel - that temporarily stores it
Sink - pulls the data from channel and writes it to HDFS or some other destination


MapReduce -
-is a standard computation engine for hadoop, which is how we make calculations on data and manipulate it.
- contains 2 phases: Map phase and Reduce phase


In Java program,
Map phase - "Tokenize" words, create key/value pairs
Reduce phase - Sum instances of each word from all line; creating new key/value pairs

Pig - scripting language for hadoop - alternative to java
- is a Dataflow scripting language
- Builds MapReduce programs from scripts
- User Definable Functions [UDFs]

Hive
- Data warehousing solution for Hadoop
- Uses tables, just like a traditional database
- HiveQL - SQL-ish query language
- Schema on load
- Uses MapReduce as engine




Saturday, 15 October 2016

Cassandra - Designing the Data Model - II

CQL - Cassandra Query Language

CREATE KEYSPACE KEYSPACE_SimpleStrategy

  WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
CREATE KEYSPACE "KEYSPACE_NetworkTopologyStrategy"  WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 2};
SELECT * FROM system.schema_keyspaces;

Modelling by Query

RDMS models data by relationship, whereas
Cassandra models data by query

Indexing

Cassandra organises the data in rows. For each row a rowKey is assigned.
As we know, a row must reside in a single node, i.e., a rowKey determines the node location of the row.
PRIMARY_KEY is used to partition the rows.
compound primary key
composite partition key

Secondary Index - adds overhead


Friday, 14 October 2016

Cassandra - Foundation - I

nodetool status

NoSQL [Not Only SQL]
http://www.planetcassandra.org/what-is-apache-cassandra/

Apache Cassandra is a free and open-source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure.

Cassandra’s architecture is responsible for its ability to scale, perform, and offer continuous uptime. Rather than using a legacy master-slave or a manual and difficult-to-maintain sharded architecture, Cassandra has a masterless “ring” design that is elegant, easy to setup, and easy to maintain.

Dynamo-Amazon's Highly available key-value store
replicates data across nodes and clusters to tolerate node failure,
and advocates eventual consistency

Bigtable - a distributed storage system for structured data - from Google
- address data persistence and management on web scale
- provides super read/write performance linear scalability, and continuous availability

Cassandra Architecture


  • Distribution Model - Dynamo
  • Storage Model - BigTable
  • Peer-to-peer [runs on p-p architecture]
  • Column-oriented  - store, provides great schema-less flexibility for application developers
  • Commitlog - from BigTable, which ensures data durability
  • Partition
    •  - Inherited from Dynamo, ability to scale horizontally and incrementally is a key design - dynamically partitions the data over the nodes and clusters to achieve it
  • Replication Strategy - it retains high availability by replicating data at number of nodes by configuring this.
Write Path


When a write occurs, the data will be immediately appended to the commitlog on Disk to ensure durability.
Then cassandra store the data to InMemory store called memtable
When memtable is full it contents will be flushed to SSTable using sequential i/o and commitlog is purged after the flush.
As Random i/o is avoided write performance is very high
commitlog - Writes go here first. Failed nodes can recover from the data in the commit log. A kind of journal
SSTable - Sorted String Table [Disk based immutable data file (table data)]
memtable - A write-back cache of data rows

Read Path

When a read occurs, the data to be returned will be merged from all the related SSTables and memtable. Timestamps will used to determine the up-to-date record
The merge value is also stored in write-through row-cache to improve future read performance

Write/Read tuneable Consistency Levels


Number of replicas that acknowledges a successful write/read

Repair mechanisms
3 repair mechanism to tolerate failure
  • Read repair - When a node is found to have old data during a read it can be brought up to date with the other replicas
  • Hinted handoff - If a write is made and a replica node for the key is down (and hinted_handoff_enabled == true), Cassandra will write a hint to the coordinator node indicating that the write needs to be replayed to the unavailable node
  • Anti-entropy - run by administrators manually                                                                              -Anti-entropy is a process of comparing the data of all replicas and updating each replica to the newest version. nodetool repair

Cassandra Data Model

Storage Model
- column-oriented store
- Map<RowKey, SortedMap<ColumnKey, ColumnValue>>
-Schemaless

Logical Data Structure


Column
- is smallest data model element and storage unit
- is name-value pair plus a Timestamp and an optional TTL [Time-To-Live]
- name, and value are byte arrays corresponds to ColumnKey and ColumnValue of SortedMap
- Timestamp supplied by client which can ignored while data modelling. can we used to resolve transaction conflicts
- TTL - to mark the column as deleted after expiration
Data is marked with a Tombstone after the TTL expires
Tombstones in Cassandra is a Marker indication that the data can be deleted during compaction

CAP TheoremConsistent, Available, Partition tolerant - you may only choose 2 only in a distributed system

Saturday, 3 September 2016

Apache Kafka

Thought of preparing a small blog related to Kafka while going through the video sessions of Gwen Shapira and Stephane Marek.
Kafka is a distributed, partitioned, replicated commit log service.

a messaging system, so how different is this from others [Active MQ, ..,]

Producers write data to Kafka and consumers read data from Kafka.
producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:

Kafka provides a fast, distributed, highly scalable, highly available publish-subscribe messaging system.
It solves much harder problem:
- communication and integration between components of large software systems.

What's unique about Kafka?

  • Large number of consumers
  • Ad-hoc consumers
  • Batch consumers
  • Automatic recovery from broker failure


What do we do with Kafka?

  • Messaging - communicating between apps
  • Website Activity Tracking (clicks, searches...)
  • Metrics collection - instead of writing to logs
  • Audit
  • Source and target of stream processing


The Basics

  • Messages are organised into topics
  • Producers push messages
  • Consumers pull messages
  • Kafka runs in a cluster. Nodes are called brokers.

Topics, Partitions and Logs








A Topic can have multiple partitions,
If we have more data in a topic and if you want to have data in more than one machine,
then you partition the topic into multiple partitions and put each partition in a separate machine which allows the flexibility of more memory, disk space, etc.,

Consumers can also be parallelised, it can have multiple consumers reading the same topic and each reading a different partition, which allows more throughput of the system.

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.

Each Partition is a Log


The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

Each Broker[Node] has many partitions


Here, the topic is split into 3 partitions, one partition in each node [orange].
Each partition has multiple replicas [exact copies in other 2 nodes/brokers] in the cluster, this will ensure high availability.
The orange one is the leader, where all the read/writes will happen and other 2 [Blue] are replicas/followers where producers and consumers never interact with it. so the leader replicates the followers. 
Followers will acknowledge to the leader.





Producers will load balance and are free to write to any leader partitions.









Producers











//create safe producerproperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

Producer Batching

High Throughput Producer
// high throughput producer ( at the expense of a bit of latency and cpu usage)properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // 20 ms lagproperties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size


Consumers
Consumers get data from topics in partitions


We can have consumers groups [set of consumers which are subscribed to the topic].
Each consumer in the group will fetch part of the data.
In the above example, C1 will get data from P0 and P3 and C2 will get data from P1,P2
So each consumer will get different sets of data, so there are no duplicates.
This allows more scaling.





























from Kafka > v0.10 the offsets are stored in the topics.