I have finally decided to move away from wordpress. WordPress has been a good companion to start my blogging and I have been here for well over 2 years now.

My new blog is now at ashutoshraina.github.io.

I have migrated most of the posts but not all as some of them are no longer relevant technically. This also gives me an opportunity to revisit those topics and look at them with fresh eyes. I plan to do that over the coming few months.

Cheers,
Ashutosh

Tasks with Timeouts – Contd.

As mentioned in the last post we can now have tasks with individual timeouts. The code looked a little heavy. Can we do better ? Yes we can !!


var tasks = new List<Task>();
try
{
//instead of checking for fault within the continutaion, 
//we can just use a TaskContinuationOption to tell communicate the right semantics

var t1 = Task.Factory.StartNew(_ => LongRunningTask(), TaskCreationOptions.AttachedToParent)
              .TimeoutAfter(1000)
              .ContinueWith(t => SomethingUsefulWithTheResult(), 
                                 TaskContinuationOptions.NotOnFaulted);

var t2 = Task.Factory.StartNew(_ => LongRunningTask(), TaskCreationOptions.AttachedToParent)
              .ContinueWith(t => SomethingUsefulWithTheResult());

var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), TaskCreationOptions.AttachedToParent)
              .ContinueWith(t => SomethingUsefulWithTheResult());
            
tasks.Add(t1);
tasks.Add(t2);
tasks.Add(t3);
                            
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{                
    Console.WriteLine("There was an exception");
    Console.WriteLine(ex.InnerException.Message);               
}

Tasks with Timeouts

So the task is to timeout a task. now, I never thought it would take me as long as it did. Turns out it is a really tricky problem problem. I was expecting something within the framework to make life easier, there isn’t anything by default but msdn to the rescue Tasks With Timeout.

So, the extension method is

    public static class TaskWithTimeout
    {
        internal struct VoidTypeStruct
        { }
        internal static void MarshalTaskResults<TResult>(Task source, TaskCompletionSource<TResult> proxy)
        {
            switch (source.Status)
            {
                case TaskStatus.Faulted:
                    proxy.TrySetException(source.Exception);
                    break;
                case TaskStatus.Canceled:
                    proxy.TrySetCanceled();
                    break;
                case TaskStatus.RanToCompletion:
                    Task<TResult> castedSource = source as Task<TResult>;
                    proxy.TrySetResult(
                        castedSource == null ? default(TResult) : // source is a Task
                            castedSource.Result); // source is a Task<TResult>
                    break;
            }
        }
        
        public static Task TimeoutAfter(this Task task, int millisecondsTimeout)
        {
            // Short-circuit #1: infinite timeout or task already completed
            if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
            {
                // Either the task has already completed or timeout will never occur.
                // No proxy necessary.
                return task;
            }

            // tcs.Task will be returned as a proxy to the caller
            TaskCompletionSource<VoidTypeStruct> tcs = new TaskCompletionSource<VoidTypeStruct>();

            // Short-circuit #2: zero timeout
            if (millisecondsTimeout == 0)
            {
                // We've already timed out.
                tcs.SetException(new TimeoutException());
                return tcs.Task;
            }

            // Set up a timer to complete after the specified timeout period
            Timer timer = new Timer(state =>
            {
                // Recover your state information
                var myTcs = (TaskCompletionSource<VoidTypeStruct>)state;
                // Fault our proxy with a TimeoutException
                myTcs.TrySetException(new TimeoutException());
            }, tcs, millisecondsTimeout, Timeout.Infinite);

            // Wire up the logic for what happens when source task completes
            task.ContinueWith(antecedent =>
                                {
                                    timer.Dispose(); // Cancel the timer
                                    MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
                                },
                                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

            return tcs.Task;
        }

        public static Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, int millisecondsTimeout)
        {
            // Short-circuit #1: infinite timeout or task already completed
            if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
            {
                // Either the task has already completed or timeout will never occur.
                // No proxy necessary.
                return task;
            }

            // tcs.Task will be returned as a proxy to the caller
            TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();

            // Short-circuit #2: zero timeout
            if (millisecondsTimeout == 0)
            {
                // We've already timed out.
                tcs.SetException(new TimeoutException());
                return tcs.Task;
            }

            // Set up a timer to complete after the specified timeout period
            Timer timer = new Timer(state =>
                                    {
                                        // Recover your state information
                                        var myTcs = (TaskCompletionSource<TResult>)state;
                                        // Fault our proxy with a TimeoutException
                                        myTcs.TrySetException(new TimeoutException());
                                    }, tcs, millisecondsTimeout, Timeout.Infinite);

            // Wire up the logic for what happens when source task completes
            task.ContinueWith(antecedent =>
                                {
                                    timer.Dispose(); // Cancel the timer
                                    MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
                                }, 
                                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);

            return tcs.Task;
        }
    }

A lot of code for doing this, and the msdn article remains the better source of explanation.

Now using this.

public class Program
	{
		private static List<int> Output = new List<int>();
		private static Random _random = new Random();
		
		public static int LongRunningTask(string message)
		{
			Console.WriteLine(message);
			Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
			//Simulate a long running task
            Thread.Sleep(TimeSpan.FromSeconds(2));
			var number = _random.Next();
			Console.WriteLine("Adding " + number + " From thread  - " + Thread.CurrentThread.ManagedThreadId);
			return number;
		}

		public static void Main(string[] args)
		{
			Console.WriteLine("In Main");
			Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
			var cts = new CancellationTokenSource();
			var tasks = new List<Task>();
			try
			{
//In the continuation check for the condition of fault (or something more if you so need) and perform the //continuation
				var t1 = Task.Factory.StartNew(_ =>	LongRunningTask("Entering task1"),
												    TaskCreationOptions.AttachedToParent)
                                     .TimeoutAfter(1000)
                                     .ContinueWith(antecedent => {
						if(!(antecedent.IsCanceled || antecedent.IsFaulted))
                                                         Output.Add(antecedent.Result);
								}
								, cts.Token);
				var t2 = Task.Factory.StartNew(_ => LongRunningTask("Entering task2"),
													TaskCreationOptions.AttachedToParent)
                                     .ContinueWith(_ => Output.Add(_.Result));
                var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), 
													TaskCreationOptions.AttachedToParent)
                                     .ContinueWith(_ => Output.Add(_.Result));
            
                tasks.Add(t1);
                tasks.Add(t2);
                tasks.Add(t3);
                            
                Task.WaitAll(tasks.ToArray());
            }
            catch (Exception ex)
            {                
                Console.WriteLine("There was an exception");
                Console.WriteLine(ex.InnerException.Message);               
            }

            Console.WriteLine("Output :");
            Output.ForEach(_ => Console.WriteLine(_));

            Console.ReadLine();
        }
    }

The important part being that the continuation is applied after the timeout and it won’t work with the other way around.
The output therefore basically looks like this :

In Main
Managed thread Id 9
Entering task1
Managed thread Id 10
Entering task2
Managed thread Id 11
Entering task3
Managed thread Id 14
Adding 194443354 From thread  - 10
Adding 792426557 From thread  - 11
Adding 230130793 From thread  - 14
Output :
792426557
230130793

Next, time I will try and write about some other things on Flirting with Tasks.

MongoDB : Sharding

Sharding

Mongodb sharding is based on shard key.

K1 -> k2 on shard1
K2 -> k3 on shard2 etc..

Each shard is then replicated for higher availability and DR etc..Sharding is therefore range based. Sharding is done per collections basis.Range based sharding helps it do range based queries.

All of the documents on a particular shard are known as chunks ~~ 100mb.

There are two operations that happen in the background in sharding and these are done automatically for us.

  1.  Split – splits a range if the range is producing bigger chunks, this is fairly expensive
  2. Migrate – moves chunks to somewhere else in the cluster, this is somewhat expensive.
    • Between a pair of shards there will not be more than one migration activity.
    • We can still read and write from the data when we are migrating. So, it is live.
    • “Balancer” decides when to do the balancing. It balances on the number of chunks today.

Both of these are done to maintain a balance in the shards w.r.t. the documents.
The metadata about these shards and our system is stored in config servers. These are light weight.
Conceptually these shards are processes and not separate physical machines or virtual machines although they can and most likely will be.

Mongos gives the client the big picture of the whole setup. Client is therefore insulated from the underlying architecture that is used to implement sharding, replication etc..

ShardingSharding

End client applications should go through mongos.

To create a shard connect to mongos

Then

sh.addShard("hostname:port")
sh.enableSharding("dbname")
db.ensureIndex(Key pattern for your shard key)
sh.shardCollection("namespaceforyourCollection",shardkey);

Choosing a shard key

- Filed should be involved in most of the queries
– Good cardinality/granularity
– Shard key should not increase monotonically

MongoDB – Replication

Replication

Replication helps us achieve availability and fault-tolerance. A replica set is a set of mongo nodes that replicate data amongst each other asynchronously. One of the replica sets is primary while the rest of them will be secondary.
Writes only happen to the primary. If the primary goes down then an election happens and the new primary comes up.
Minimum number of nodes will be 3, since the election requires a majority of the original set.
If there were only 2 sets then the remaining one is not a majority and you would not be able to write.

Replica Set Elections

Regular Node : It has the data and can become primary or secondary.
Arbiter : It is just there for voting purposes. We need it if we want an even number of nodes.
It has no data on it.
Delayed/Regular : It can be set to a few hours after the nodes. It cannot become primary. It’s priority is set to 0.
Hidden Node : It cannot become a primary node. It’s priority is set to 0.

By default the reads and writes go to the primary. You can go to secondary for reading. This means that you might read stale data. The lag between nodes is not guaranteed since the process is async. If you read from secondary then what we have is “eventual consistency”.

rs.slaveOk() -- ok to read from the secondary
rs.isMaster() -- checks whether the node is master
rs.status() -- gives the current status of the replica set.

In the database locals , the collection oplogs.rs has all the operational logs. Replication happens when secondary nodes query the primary for the changes from a given timestamp. OpLog is the statement based replication log.
Replication happens in a statement driven manner?
e.g If a statement deletes 100 documents on the primary then there will 100 statements that are sent to the secondary to execute. There is no binary replication. This allows us to run different version of mongodb on different machines.

  • Try to keep the OpLog small on 64 bit machine since it defaults to a large value on 64 bit systems.
  • For replica sets don’t use localhost or the ip address of the machine.
  • Use a logical name, that is the best practice.
  • Use DNS.
  • Pick appropriate TTL as well.

Review : C# In Depth

I have read the second edition as well. The third edition carries on from there and provides a deeper look in C# 5 and it’s key feature async/await. If you want to understand what goes on behind the scenes, this one is for you. Jon Skeet ( yes the famous guy Jon Skeet) has managed to come out with a great book once again.

Jon starts easy on this, but doesn’t hold back. The prose is lucid yet well paced. It is one of the few books that make the effort to take the reader on journey. The journey begins with C#1 and then continues to C#5. Hardly any chapters drag on anymore than they should. Linq is covered in great depth and is a nicely written with enough diagrams to visualize what is happening under the hood. Generics is one my personal favourites. The text is nice, so are the code samples. I have never really understood generics completely, but the books does make things a lot clearer. The treatment of dynamic is really nice. It goes into the heart of the DLR and shows everything that you need to know to really understand dynamic.

The book really shines when you move to async/await. Jon makes a rather tough concept easier. I had to read it several times but each time the concept became easier. I only wish we moved away from the download the web page example for async. The book does spend time on compiler transformations that are behind async/await. Be patient when you read it, it will take time to sink in.

The only part that has been left out from the third edition is the chapter on Code Contracts. Jon clearly mentions that the topic hasn’t gained as much traction as he hoped. It may come back in the future though. As many have mentioned this before this is not a book for beginners. Use this to become a better C# programmer, after you have written C# for some time. This is a must have in your collection, especially if you work with C# day in and day out. It will give you a greater understanding of how the language designers wanted you think, and what makes C# a real joy.
A 4.5/5 for this one, keeping the 0.5 for the next edition :).

P.S. Even the appendix is handy.

Disclosure : I got a free copy of the book to review. The review is my own opinion and not influenced by anyone else.

MongoDB- Understanding your queries through ExplainPlan

Understanding the queries we write is very critical and MongoDB does a good job here. Developers will find it easy to understand what the queries are doing and where to look for bottlenecks. Well defined parameters and also well documented ones make life a lot easier.
The details have been taken from the mongodb website and presented here for continuity of series.

Explain Output Fields

explain.cursor
cursor is a string that reports the type of cursor used by the query operation:

BasicCursor indicates a full collection scan.
BtreeCursor indicates that the query used an index. The cursor includes name of the index. When a query uses an index, the output of explain() includes indexBounds details.
GeoSearchCursor indicates that the query used a geospatial index.
explain.isMultiKey
isMultiKey is a boolean. When true, the query uses a multikey index, where one of the fields in the index holds an array.

explain.n
n is a number that reflects the number of documents that match the query selection criteria.

explain.nscannedObjects
Specifies the total number of documents scanned during the query. The nscannedObjects may be lower than nscanned, such as if the index covers a query. See indexOnly. Additionally, the nscannedObjects may be lower than nscanned in the case of multikey index on an array field with duplicate documents.

explain.nscanned
Specifies the total number of documents or index entries scanned during the database operation. You want n and nscanned to be close in value as possible. The nscanned value may be higher than the nscannedObjects value, such as if the index covers a query. See indexOnly.

explain.nscannedObjectsAllPlans
nscannedObjectsAllPlans is a number that reflects the total number of documents scanned for all query plans during the database operation.

explain.nscannedAllPlans
nscannedAllPlans is a number that reflects the total number of documents or index entries scanned for all query plans during the database operation.

explain.scanAndOrder
scanAndOrder is a boolean that is true when the query cannot use the index for returning sorted results.

When true, MongoDB must sort the documents after it retrieves them from either an index cursor or a cursor that scans the entire collection.

explain.indexOnly
indexOnly is a boolean value that returns true when the query is covered by the index indicated in the cursor field. When an index covers a query, MongoDB can both match the query conditions and return the results using only the index because:

all the fields in the query are part of that index, and
all the fields returned in the results set are in the same index.
explain.nYields
nYields is a number that reflects the number of times this query yielded the read lock to allow waiting writes execute.

explain.nChunkSkips
nChunkSkips is a number that reflects the number of documents skipped because of active chunk migrations in a sharded system. Typically this will be zero. A number greater than zero is ok, but indicates a little bit of inefficiency.

explain.millis
millis is a number that reflects the time in milliseconds to complete the query.

explain.indexBounds
indexBounds is a document that contains the lower and upper index key bounds. This field resembles one of the following:

“indexBounds” : {
“start” : { : , … },
“end” : { : , … }
},
“indexBounds” : { “” : [ [ , ] ],

}
explain.allPlans
allPlans is an array that holds the list of plans the query optimizer runs in order to select the index for the query. Displays only when the parameter to explain() is true or 1.

explain.oldPlan
oldPlan is a document value that contains the previous plan selected by the query optimizer for the query. Displays only when the parameter to explain() is true or 1.

explain.server
server is a string that reports the MongoDB server.

$or Query Output Fields
explain.clauses
clauses is an array that holds the Core Explain Output Fields information for each clause of the $or expression. clauses is only included when the clauses in the $or expression use indexes.

Sharded Collections Output Fields
explain.clusteredType
clusteredType is a string that reports the access pattern for shards. The value is:

ParallelSort, if the mongos queries shards in parallel.
SerialServer, if the mongos queries shards sequentially.
explain.shards
shards contains fields for each shard in the cluster accessed during the query. Each field holds the Core Explain Output Fields for that shard.

explain.millisShardTotal
millisShardTotal is a number that reports the total time in milliseconds for the query to run on the shards.

explain.millisShardAvg
millisShardAvg is a number that reports the average time in millisecond for the query to run on each shard.

explain.numQueries
numQueries is a number that reports the total number of queries executed.

explain.numShards
numShards is a number that reports the total number of shards queried.

Lastly, some more profiling tips that can be pretty useful.

Logging Slow Queries

  • There are 3 levels for the profiler 0 – OFF , 1- Slow, 2 – ALL
  • Enable this using mongod — profile 1 –slowms 2
  • Db.getProfilingLevel() to get the current profiling level.
  • If you want to get all queries that took longer than 3 second, ordered by timestamp descending
  • db.system.profile.find({millis:{$gt:3000}}).sort({ts:-1})

Mongotop can be run set to a specific time interval to determine where is the majority of the time spent. This utility tracks the time that mongod spends on reads and writes. The information is on a per collection basis.
I prefer to increase the time for reporting from the default 1 sec to somewhere close to 5 sec or more (this is just a random number that I feel comfortable with..otherwise the data is to verbose to make any sense).

This should be a good start. We will go into some more concepts about monitoring mongodb before going into sharding etc..