Friday 14 August 2015

Concurrency In Practice - PART III [Liveness, Performance, and Testing]

Concurrency In Practice  - PART III  [Liveness, Performance, and Testing]

Testing Concurrent Programs

Liveness properties present their own testing challenges. Liveness tests include tests of progress and non-progress, which are hard to quantify—how do you verify that a method is blocking and not merely running slowly? Similarly, how do you test that an algorithm does not deadlock? How long should you wait before you declare it to have failed?

Related to liveness tests are performance tests. Performance can be measured in a number of ways, including:
Throughput: the rate at which a set of concurrent tasks is completed;
Responsiveness: the delay between a request for and completion of some action (also called latency);
Scalability: the improvement in throughput (or lack thereof) as more resources (usually CPUs) are made available.


Wednesday 12 August 2015

Concurrency In Practice - PART II [Structuring Concurrent Applications]

Concurrency In Practice  - PART II  [Structuring Concurrent Applications]

Chapter 6 - Task Execution

Most concurrent applications are organized around the execution of tasks: abstract, discrete units of work.

Executing tasks in threads
Ideally, tasks are independent activities, which can be executed in parallel if there are adequate processing resources.

The Executor framework
Tasks are logical units of work, and threads are a mechanism by which tasks can run asynchronously.
It's a flexible thread pool implementation.
Executor may be a simple interface, but it forms the basis for a flexible and powerful framework for asynchronous task execution that supports a wide variety of task execution policies.
It provides a standard means of decoupling task submission from task execution, describing tasks with Runnable .
The Executor implementations also provide lifecycle support and hooks for adding statistics gathering, application management, and monitoring.

Executor is based on the producer-consumer pattern, where activities that submit tasks are the producers (producing units of work to be done) and the threads that execute tasks are the consumers (consuming those units of work).
Using an Executor is usually the easiest path to implementing a producer-consumer design in your application.

Whenever you see code of the form: new Thread(runnable).start()
and you think you might at some point want a more flexible execution policy, seriously consider replacing it with the use of an Executor as below.
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
exec.execute(task);
Submitting a task with execute adds the task to the work queue, and the worker threads repeatedly dequeue tasks from the work queue and execute them.

A thread pool, as its name suggests, manages a homogeneous pool of worker threads.
A thread pool is tightly bound to a work queue holding tasks waiting to be executed.
Worker threads have a simple life: request the next task from the work queue, execute it, and go back to waiting for another task.
You can create a thread pool by calling one of the static factory methods in Executors :

newFixedThreadPool . A fixed-size thread pool creates threads as tasks are submitted, up to the maximum pool size, and then attempts to keep the pool size constant (adding new threads if a thread dies due to an unexpected Exception ).
newCachedThreadPool . A cached thread pool has more flexibility to reap idle threads when the current size of the pool exceeds the demand for processing, and to add new threads when demand increases, but places no bounds on the size of the pool.
newSingleThreadExecutor . A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly. Tasks are guaranteed to be processed sequentially according to the order imposed by the task queue (FIFO, LIFO, priority order).
newScheduledThreadPool . A fixed-size thread pool that supports delayed and periodic task execution, similar to Timer .

Executor lifecycle
the ExecutorService interface extends Executor , adding a number of methods for lifecycle management.

public interface ExecutorService extends Executor {
void shutdown();
List shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ... additional convenience methods for task submission
}

The lifecycle implied by ExecutorService has three states—running, shutting down, and terminated. ExecutorService s are initially created in the running state.
The shutdown method initiates a graceful shutdown: no new tasks are accepted but previously submitted tasks are allowed to complete—including those that have not yet begun execution.
The shutdownNow method initiates an abrupt shutdown: it attempts to cancel outstanding tasks and does not start any tasks that are queued but not begun.
Once all tasks have completed, the ExecutorService transitions to the terminated state.

Finding exploitable parallelism
Result-bearing tasks: Callable and Future
The Executor framework uses Runnable as its basic task representation. Runnable is a fairly limiting abstraction; run cannot return a value or throw checked exceptions.

Runnable and Callable describe abstract computational tasks. Tasks are usually finite: they have a clear starting point and they eventually terminate. The lifecycle of a task executed by an Executor has four phases: created, submitted, started, and completed.
Since tasks can take a long time to run, we also want to be able to cancel a task.
In the Executor framework, tasks that have been submitted but not yet started can always be cancelled, and tasks that have started can sometimes be cancelled if they are responsive to interruption. Cancelling a task that has already completed has no effect.
Future represents the lifecycle of a task and provides methods to test whether the task has completed or been cancelled, retrieve its result, and cancel the task.
The behavior of get varies depending on the task state (not yet started, running, completed)
It returns immediately or throws an Exception if the task has already completed, but if not it blocks until the task completes.
The submit methods in ExecutorService all return a Future , so that you can submit a Runnable or a Callable to an executor and get back a Future that can be used to retrieve the result or cancel the task.

The real performance payoff of dividing a program’s workload into tasks comes when there are a large number of independent, homogeneous tasks that can be processed concurrently.

ExecutorCompletionService - uses Executor +BlockingQueue
Example :- https://dzone.com/articles/executorcompletionservice
https://github.com/prashanthmamidi/PraticeJava/blob/master/src/main/java/com/concurrent/api/ExecutorCompletionServiceDemo.java

Chapter 8 - Applying Thread Pools

ThreadLocal should not be used in pool threads to communicate values between tasks.
Thread starvation deadlock 
If tasks that depend on other tasks execute in a thread pool, they can deadlock.
In larger thread pools if all threads are executing tasks that are blocked waiting for other tasks still on the work queue. This is called thread starvation deadlock.

Sizing thread pools
The ideal size for a thread pool depends on the types of tasks that will be submitted and the characteristics of the deployment system.
Thread pool sizes should rarely be hard-coded; instead pool sizes should be provided by a configuration mechanism or computed dynamically by consulting Runtime.availableProcessors .

Configuring ThreadPoolExecutor
ThreadPoolExecutor is a flexible, robust pool implementation that allows a variety of customizations.
General constructor for ThreadPoolExecutor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }

The core pool size, maximum pool size, and keep-alive time govern thread creation and teardown.
The core size is the target size
The maximumpool size is the upper bound on how many pool threads can be active at once.
A thread that has been idle for longer than the keep-alive time becomes a candidate for reaping and can be terminated if the current pool size exceeds the core size.
ThreadPoolExecutor allows you to supply a BlockingQueue to hold tasks awaiting execution.

Saturation policies
When a bounded work queue fills up, the saturation policy comes into play. The saturation policy for a ThreadPoolExecutor can be modified by calling setRejectedExecutionHandler .
Several implementations of RejectedExecutionHandler are provided, each implementing a different saturation policy:
1. AbortPolicy - default policy, causes execute to throw the unchecked RejectedExecutionException ; the caller can catch this exception and implement its own overflow handling as it sees fit.
2. DiscardPolicy - silently discards the newly submitted task if it cannot be queued for execution
3.. DiscardOldestPolicy - discards the task that would otherwise be executed next and tries to resubmit the new task.
4. CallerRunsPolicy - implements a form of throttling that neither discards tasks nor throws an exception, but instead tries to slow down the flow of new tasks by pushing some of the work back to the caller.It executes the newly submitted task not in a pool thread, but in the thread that calls execute
Ex:-  Creating a fixed-sized thread pool with a bounded queue and the caller-runs saturation policy

ThreadPoolExecutor executor= new ThreadPoolExecutor(N_THREADS, N_THREADS,
0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(CAPACITY));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Thread factories
Whenever a thread pool needs to create a thread, it does so through a thread factory.
The default thread factory creates a new, nondaemon thread with no special configuration. Specifying a thread factory allows you to customize the configuration of pool threads. ThreadFactory has a single method, newThread , that is called whenever a thread pool needs to create a new thread.

Extending ThreadPoolExecutor
ThreadPoolExecutor was designed for extension, providing several “hooks” for subclasses to override— beforeExecute , afterExecute , and terminated —that can be used to extend the behavior of ThreadPoolExecutor .


Friday 7 August 2015

Short Notes from Java Concurrency In Practice


Concurrency In Practice  - PART I  [Fundamentals]

Thread Safety

A class is thread-safe if it behaves correctly when accessed from multiple threads, regardless of the scheduling or interleaving of the execution of those threads by the runtime environment, and with no additional synchronization or other coordination on the part of the calling code.

Thread-safe classes encapsulate any needed synchronization so that clients need not provide their own.

Stateless objects are always thread-safe.
It is only when servlets want to remember things from one request to another that the thread safety requirement becomes an issue.


Atomicity

Increment counter is an example of a read-modify-write operation, in which the resulting state is derived from the previous state. It is not atomic, which means that it does not execute as a single, indivisible operation.

Operations A and B are atomic with respect to each other if, from the perspective of a thread executing A, when another thread executes B, either all of B has executed or none of it has.
An atomic operation is one that is atomic with respect to all operations, including itself, that operate
on the same state.

Race condition -> check-then-act

To ensure thread safety, check-then-act operations (like lazy initialization) and read-modify-write operations (like increment) must always be atomic.We refer collectively to check-then-act and read-modify-write sequences as compound actions: sequences of operations that must be executed atomically in order to remain thread-safe.

Solution: By replacing the long counter with an AtomicLong , we ensure that all actions that access
the counter state are atomic. [java.util.concurrent.atomic package]

Locking

To preserve state consistency, update related state variables in a single atomic operation.

Intrinsic locks [Monitor locks]
synchronized (lock) {
// Access or modify shared state guarded by lock
}
Intrinsic locks in Java act as mutexes (or mutual exclusion locks), which means that at most one thread may own the lock. When thread A attempts to acquire a lock held by thread B, A must wait, or block, until B releases it. If B never releases the lock, A waits forever.

Reentrancy
Reentrancy means that locks are acquired on a per-thread rather than per-invocation basis.
It is implemented by associating with each lock an acquisition count and an owning thread.
Reentrancy saves us from deadlock[i.e., where subclass overriden syncronized method calls the super class synchronized method] - page 27

Guarding state with locks
For each mutable state variable that may be accessed by more than one thread, all accesses to that variable must be performed with the same lock held. In this case, we say that the variable is guarded by that lock.
Every shared, mutable variable should be guarded by exactly one lock. Make it clear to maintainers which lock that is.

Liveness and performance
Avoid holding locks during lengthy computations or operations at risk of not completing quickly such as network or console I/O.

Chapter 3 - Sharing Objects

Locking is not just about mutual exclusion; it is also about memory visibility.
To ensure that all threads see the most up-to-date values of shared mutable variables, the reading and writing threads must synchronize on a common lock.
Volatile variables - weaker form of synchronization
will ensure that updates to a variable are propagated predictably to other threads.

When a field is declared volatile , the compiler and runtime are put on notice that this variable is shared and that operations on it should not be reordered with other memory operations.
Volatile variables are not cached in registers or in caches where they are hidden from other processors, so a read of a volatile variable always returns the most recent write by any thread.

Thread confinement - If data is only accessed from a single thread, no synchronization is needed
[Local variables, ThreadLocal]

Stack confinement is a special case of thread confinement in which an object can
only be reached through local variables

Immutable objects are always thread-safe.
An object is immutable if:
• Its state cannot be modified after construction;
• All its fields are final; and
• It is properly constructed (the this reference does not escape during construction).


The most useful policies for using and sharing objects in a concurrent
program are:
Thread-confined. A thread-confined object is owned exclusively by and confined to one thread, and can be modified by its owning thread.
Shared read-only. A shared read-only object can be accessed concurrently by multiple threads without additional synchronization, but cannot be modified by any thread. Shared read-only objects include immutable and effectively immutable objects.
Shared thread-safe. A thread-safe object performs synchronization internally, so multiple threads can freely access it through its public interface without further synchronization.
Guarded. A guarded object can be accessed only with a specific lock held. Guarded objects include those that are encapsulated within other thread-safe objects and published objects that are known to
be guarded by a specific lock.

Chapter 5 - Building Blocks

Synchronized collections 
Vector and Hashtable and the synchronized wrapper classes created by the Collections.synchronizedXXX factory methods.
These classes achieve thread safety by encapsulating their state and synchronizing every public method so that only one thread at a time can access the collection state.

Iterators and ConcurrentModificationException
The iterators returned by the synchronized collections are not designed to deal with concurrent modification, and they are fail-fast—meaning that if they detect that the collection has changed since iteration began, they throw the unchecked ConcurrentModificationException .

Solution - the way to prevent ConcurrentModificationException is to hold the collection lock for the duration of the iteration.
The longer a lock is held, the more likely it is to be contended, and if many threads are blocked waiting for a lock throughput and CPU utilization can suffer.
An alternative to locking the collection during iteration is to clone the collection and iterate the copy instead.
All of the below indirect uses of iteration can cause ConcurrentModificationException .
hashCode. equals, toString, containsAll, removeAll, and retainAll

Concurrent collections
Replacing synchronized collections with concurrent collections can offer dramatic scalability improvements with little risk.
ConcurrentHashMap , a replacement for synchronized hash-based Map implementations,
Instead of synchronizing every method on a common lock, restricting access to a single thread at a time, it uses a finer-grained locking mechanism called lock striping to allow a greater degree of shared access.
Arbitrarily many reading threads can access the map concurrently, readers can access the map
concurrently with writers, and a limited number of writers can modify the map concurrently.
The iterators returned by ConcurrentHashMap are weakly consistent instead of fail-fast.
A weakly consistent iterator can tolerate concurrent modification, traverses elements as they existed when the iterator was constructed, and may (but is not guaranteed to) reflect modifications to the col-
lection after the construction of the iterator.

The below code demonstrate with and without ConcurrentHashMap -

        //Map concurrentHashMap = new ConcurrentHashMap();
        Map map = new HashMap();

        map.put(1, "Hello");
        map.put(2, "Hello2");
        map.put(3, "Hello3");
        map.forEach(
            (key, value) -> {
                map.put(4, "Hello5"); // We get ConcurrentModificationException when we add new element to the collection
                map.put(3, "I am modified"); // if we update/remove the existing records, we don't get ConcurrentModificationException
                System.out.println(key + "," + value);
            }
        );

So when we add new elements, then we have to use ConcurrentMap
Ex: https://github.com/prashanthmamidi/PraticeJava/blob/master/src/main/java/com/multithreading/thread/safe/ConcurrentHashMapDemo.java

CopyOnWriteArrayList , a replacement for synchronized List implementations for cases where traversal is the dominant operation. CopyOnWriteArraySet is a concurrent replacement for a synchronized Set
They implement mutability by creating and republishing a new copy of the collection every time it is modified.

  
        List list = new CopyOnWriteArrayList();
        //List list = new ArrayList();
        list.add(1);list.add(2);list.add(3);
        list.stream()
            .forEach(element -> {
                list.add(4);
                System.out.println(element);
            });
Ex: https://github.com/prashanthmamidi/PraticeJava/blob/master/src/main/java/com/multithreading/thread/safe/CopyOnWriteArrayListDemo.java

From the above example its clear that:
  1. Concurrent Collection classes can be modified avoiding ConcurrentModificationException
  2. In case of CopyOnWriteArrayList, iterator doesn’t accommodate the changes in the list and works on the original list.
  3. In case of ConcurrentHashMap, the behavior is not always the same.
Java 6 adds ConcurrentSkipListMap and ConcurrentSkipListSet , which are concurrent replacements for a synchronized SortedMap or SortedSet (such as TreeMap or TreeSet wrapped with synchronizedMap ).

Java 5.0 also adds two new collection types, Queue and BlockingQueue .
[Refer: https://docs.oracle.com/javase/8/docs/api/index.html?java/util/Queue.html]
Blocking queues provide blocking put and take methods as well as the timed equivalents offer and poll .
If the queue is full, put blocks until space becomes available;
if the queue is empty, take blocks until an element is available.
Queues can be bounded or unbounded; unbounded queues are never full, so a put on an unbounded queue never blocks.
Blocking queues support the producer-consumer design pattern
If blocking queues don’t fit easily into your design, you can create other blocking data structures using Semaphore
PriorityBlockingQueue is a priority-ordered queue, which is useful when you want to process elements in an order other than FIFO.
Ex: ProducerConsumerService

Deques and work stealing
Java 6 also adds another two collection types, Deque (pronounced “deck”) and BlockingDeque , that extend Queue and BlockingQueue .
A Deque is a double-ended queue that allows efficient insertion and removal from both the head and
the tail. Implementations include ArrayDeque and LinkedBlockingDeque .
In a work stealing design, every consumer has its own deque. If a consumer exhausts the work in its
own deque, it can steal work from the tail of someone else’s deque.

Synchronizers
A synchronizer is any object that coordinates the control flow of threads based on its state.
Blocking queues can act as synchronizers; other types of synchronizers include semaphores, barriers, and latches.

Latches
A latch is a synchronizer that can delay the progress of threads until it reaches its terminal state.
Once the latch reaches the terminal state, it cannot change state again, so it remains open forever.
CountDownLatch is a flexible latch implementation which allows one or more threads to wait for a set of events to occur.

Latches can facilitate starting a group of related activities or waiting for a group of related activities to complete. Latches are single-use objects; once a latch enters the terminal state, it cannot be reset.
Ex: https://github.com/prashanthmamidi/PraticeJava/blob/master/src/main/java/com/concurrent/api/CountDownLatchDemo.java
FutureTask also acts like a latch.
A computation represented by a FutureTask is implemented with a Callable , the result-bearing
equivalent of Runnable , and can be in one of three states: waiting to run, running, or completed

Semaphores
Counting semaphores are used to control the number of activities that can access a certain resource or perform a given action at the same time.
Counting semaphores can be used to implement resource pools or to impose a bound on a collection.
Semaphores are useful for implementing resource pools such as database connection pools. While it is easy to construct a fixed-sized pool that fails if you request a resource from an empty pool, what you really want is to block if the pool is empty and unblock when it becomes nonempty again.
Example:
https://github.com/prashanthmamidi/PraticeJava/blob/master/src/main/java/com/concurrent/api/SemaphoreDemo.java

Barriers
Barriers are similar to latches in that they block a group of threads until some event has occurred.
The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed.
Latches are for waiting for events; barriers are for waiting for other threads.
CyclicBarrier allows a fixed number of parties to rendezvous repeatedly at a barrier point and is useful in parallel iterative algorithms that break down a problem into a fixed number of independent subproblems.
Ex: https://github.com/prashanthmamidi/PraticeJava/blob/master/src/main/java/com/concurrent/api/CyclicBarrierExample.java