Tasks with Timeouts – Contd.

As mentioned in the last post we can now have tasks with individual timeouts. The code looked a little heavy. Can we do better ? Yes we can !!


var tasks = new List<Task>();
try
{
//instead of checking for fault within the continutaion, 
//we can just use a TaskContinuationOption to tell communicate the right semantics

var t1 = Task.Factory.StartNew(_ => LongRunningTask(), TaskCreationOptions.AttachedToParent)
              .TimeoutAfter(1000)
              .ContinueWith(t => SomethingUsefulWithTheResult(), 
                                 TaskContinuationOptions.NotOnFaulted);

var t2 = Task.Factory.StartNew(_ => LongRunningTask(), TaskCreationOptions.AttachedToParent)
              .ContinueWith(t => SomethingUsefulWithTheResult());

var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), TaskCreationOptions.AttachedToParent)
              .ContinueWith(t => SomethingUsefulWithTheResult());
            
tasks.Add(t1);
tasks.Add(t2);
tasks.Add(t3);
                            
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{                
    Console.WriteLine("There was an exception");
    Console.WriteLine(ex.InnerException.Message);               
}

Tasks with Timeouts

So the task is to timeout a task. now, I never thought it would take me as long as it did. Turns out it is a really tricky problem problem. I was expecting something within the framework to make life easier, there isn’t anything by default but msdn to the rescue Tasks With Timeout.

So, the extension method is

    public static class TaskWithTimeout
    {
        internal struct VoidTypeStruct
        { }
        internal static void MarshalTaskResults<TResult>(Task source, TaskCompletionSource<TResult> proxy)
        {
            switch (source.Status)
            {
                case TaskStatus.Faulted:
                    proxy.TrySetException(source.Exception);
                    break;
                case TaskStatus.Canceled:
                    proxy.TrySetCanceled();
                    break;
                case TaskStatus.RanToCompletion:
                    Task<TResult> castedSource = source as Task<TResult>;
                    proxy.TrySetResult(
                        castedSource == null ? default(TResult) : // source is a Task
                            castedSource.Result); // source is a Task<TResult>
                    break;
            }
        }
        
        public static Task TimeoutAfter(this Task task, int millisecondsTimeout)
        {
            // Short-circuit #1: infinite timeout or task already completed
            if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
            {
                // Either the task has already completed or timeout will never occur.
                // No proxy necessary.
                return task;
            }

            // tcs.Task will be returned as a proxy to the caller
            TaskCompletionSource<VoidTypeStruct> tcs = new TaskCompletionSource<VoidTypeStruct>();

            // Short-circuit #2: zero timeout
            if (millisecondsTimeout == 0)
            {
                // We've already timed out.
                tcs.SetException(new TimeoutException());
                return tcs.Task;
            }

            // Set up a timer to complete after the specified timeout period
            Timer timer = new Timer(state =>
            {
                // Recover your state information
                var myTcs = (TaskCompletionSource<VoidTypeStruct>)state;
                // Fault our proxy with a TimeoutException
                myTcs.TrySetException(new TimeoutException());
            }, tcs, millisecondsTimeout, Timeout.Infinite);

            // Wire up the logic for what happens when source task completes
            task.ContinueWith(antecedent =>
                                {
                                    timer.Dispose(); // Cancel the timer
                                    MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
                                },
                                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

            return tcs.Task;
        }

        public static Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, int millisecondsTimeout)
        {
            // Short-circuit #1: infinite timeout or task already completed
            if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
            {
                // Either the task has already completed or timeout will never occur.
                // No proxy necessary.
                return task;
            }

            // tcs.Task will be returned as a proxy to the caller
            TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();

            // Short-circuit #2: zero timeout
            if (millisecondsTimeout == 0)
            {
                // We've already timed out.
                tcs.SetException(new TimeoutException());
                return tcs.Task;
            }

            // Set up a timer to complete after the specified timeout period
            Timer timer = new Timer(state =>
                                    {
                                        // Recover your state information
                                        var myTcs = (TaskCompletionSource<TResult>)state;
                                        // Fault our proxy with a TimeoutException
                                        myTcs.TrySetException(new TimeoutException());
                                    }, tcs, millisecondsTimeout, Timeout.Infinite);

            // Wire up the logic for what happens when source task completes
            task.ContinueWith(antecedent =>
                                {
                                    timer.Dispose(); // Cancel the timer
                                    MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
                                }, 
                                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);

            return tcs.Task;
        }
    }

A lot of code for doing this, and the msdn article remains the better source of explanation.

Now using this.

public class Program
	{
		private static List<int> Output = new List<int>();
		private static Random _random = new Random();
		
		public static int LongRunningTask(string message)
		{
			Console.WriteLine(message);
			Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
			//Simulate a long running task
            Thread.Sleep(TimeSpan.FromSeconds(2));
			var number = _random.Next();
			Console.WriteLine("Adding " + number + " From thread  - " + Thread.CurrentThread.ManagedThreadId);
			return number;
		}

		public static void Main(string[] args)
		{
			Console.WriteLine("In Main");
			Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
			var cts = new CancellationTokenSource();
			var tasks = new List<Task>();
			try
			{
//In the continuation check for the condition of fault (or something more if you so need) and perform the //continuation
				var t1 = Task.Factory.StartNew(_ =>	LongRunningTask("Entering task1"),
												    TaskCreationOptions.AttachedToParent)
                                     .TimeoutAfter(1000)
                                     .ContinueWith(antecedent => {
						if(!(antecedent.IsCanceled || antecedent.IsFaulted))
                                                         Output.Add(antecedent.Result);
								}
								, cts.Token);
				var t2 = Task.Factory.StartNew(_ => LongRunningTask("Entering task2"),
													TaskCreationOptions.AttachedToParent)
                                     .ContinueWith(_ => Output.Add(_.Result));
                var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), 
													TaskCreationOptions.AttachedToParent)
                                     .ContinueWith(_ => Output.Add(_.Result));
            
                tasks.Add(t1);
                tasks.Add(t2);
                tasks.Add(t3);
                            
                Task.WaitAll(tasks.ToArray());
            }
            catch (Exception ex)
            {                
                Console.WriteLine("There was an exception");
                Console.WriteLine(ex.InnerException.Message);               
            }

            Console.WriteLine("Output :");
            Output.ForEach(_ => Console.WriteLine(_));

            Console.ReadLine();
        }
    }

The important part being that the continuation is applied after the timeout and it won’t work with the other way around.
The output therefore basically looks like this :

In Main
Managed thread Id 9
Entering task1
Managed thread Id 10
Entering task2
Managed thread Id 11
Entering task3
Managed thread Id 14
Adding 194443354 From thread  - 10
Adding 792426557 From thread  - 11
Adding 230130793 From thread  - 14
Output :
792426557
230130793

Next, time I will try and write about some other things on Flirting with Tasks.

MongoDB : Sharding

Sharding

Mongodb sharding is based on shard key.

K1 -> k2 on shard1
K2 -> k3 on shard2 etc..

Each shard is then replicated for higher availability and DR etc..Sharding is therefore range based. Sharding is done per collections basis.Range based sharding helps it do range based queries.

All of the documents on a particular shard are known as chunks ~~ 100mb.

There are two operations that happen in the background in sharding and these are done automatically for us.

  1.  Split – splits a range if the range is producing bigger chunks, this is fairly expensive
  2. Migrate – moves chunks to somewhere else in the cluster, this is somewhat expensive.
    • Between a pair of shards there will not be more than one migration activity.
    • We can still read and write from the data when we are migrating. So, it is live.
    • “Balancer” decides when to do the balancing. It balances on the number of chunks today.

Both of these are done to maintain a balance in the shards w.r.t. the documents.
The metadata about these shards and our system is stored in config servers. These are light weight.
Conceptually these shards are processes and not separate physical machines or virtual machines although they can and most likely will be.

Mongos gives the client the big picture of the whole setup. Client is therefore insulated from the underlying architecture that is used to implement sharding, replication etc..

ShardingSharding

End client applications should go through mongos.

To create a shard connect to mongos

Then

sh.addShard("hostname:port")
sh.enableSharding("dbname")
db.ensureIndex(Key pattern for your shard key)
sh.shardCollection("namespaceforyourCollection",shardkey);

Choosing a shard key

– Filed should be involved in most of the queries
– Good cardinality/granularity
– Shard key should not increase monotonically

Review : C# In Depth

I have read the second edition as well. The third edition carries on from there and provides a deeper look in C# 5 and it’s key feature async/await. If you want to understand what goes on behind the scenes, this one is for you. Jon Skeet ( yes the famous guy Jon Skeet) has managed to come out with a great book once again.

Jon starts easy on this, but doesn’t hold back. The prose is lucid yet well paced. It is one of the few books that make the effort to take the reader on journey. The journey begins with C#1 and then continues to C#5. Hardly any chapters drag on anymore than they should. Linq is covered in great depth and is a nicely written with enough diagrams to visualize what is happening under the hood. Generics is one my personal favourites. The text is nice, so are the code samples. I have never really understood generics completely, but the books does make things a lot clearer. The treatment of dynamic is really nice. It goes into the heart of the DLR and shows everything that you need to know to really understand dynamic.

The book really shines when you move to async/await. Jon makes a rather tough concept easier. I had to read it several times but each time the concept became easier. I only wish we moved away from the download the web page example for async. The book does spend time on compiler transformations that are behind async/await. Be patient when you read it, it will take time to sink in.

The only part that has been left out from the third edition is the chapter on Code Contracts. Jon clearly mentions that the topic hasn’t gained as much traction as he hoped. It may come back in the future though. As many have mentioned this before this is not a book for beginners. Use this to become a better C# programmer, after you have written C# for some time. This is a must have in your collection, especially if you work with C# day in and day out. It will give you a greater understanding of how the language designers wanted you think, and what makes C# a real joy.
A 4.5/5 for this one, keeping the 0.5 for the next edition :).

P.S. Even the appendix is handy.

Disclosure : I got a free copy of the book to review. The review is my own opinion and not influenced by anyone else.

MongoDB…Ops Stuff

In the previous two posts we have seen some basic querying and how to leverage the querying mechanism to get up and running. Now, we are off in the wild world and we also need to some more complicated stuff.

Creating Indexes
The api surface is really smooth with this, allowing us to specify the sort order of the indexes and the manner of building them foreground or background.

 public void CreateIndex()
{
var QuestionConnectionHandler = new MongoConnectionHandler<Question>("MongoDBDemo");
QuestionConnectionHandler.MongoCollection.EnsureIndex( 
                          IndexKeys.Ascending("Difficulty"), IndexOptions.SetBackground(true));
}

Dropping indexes is also easy with

QuestionConnectionHandler.MongoCollection.DropAllIndexes();

What happens when you want to see what is going on under the hood ? You let the database Explain it’s Plan.

QuestionConnectionHandler.MongoCollection.AsQueryable()
                  .Where(q => q.Difficulty >= 3).Explain();
//or if you went the other way 
var query = Query<Question>.GTE(q => q.Difficulty, 3);
var explainPlan = QuestionConnectionHandler.MongoCollection
                          .FindAs<Question>(query).Explain();

Now, if only we could have some stats about our database and the indexes. All wrapped in a nice syntax.

    var stats = QuestionConnectionHandler.MongoCollection.GetStats();
    Console.WriteLine("Namespace : {0}", stats.Namespace);
    Console.WriteLine("DataSize : {0}", stats.DataSize);
    Console.WriteLine("Index Count : {0}", stats.IndexCount);
    stats.IndexSizes.Keys.ForEach(Console.WriteLine);
    var size = QuestionConnectionHandler.MongoCollection.GetTotalDataSize();
    Console.WriteLine("The total datasize for this collection is {0}", size);

A routine task is to get all the collections in a database and all the databases on the server itself. Easy peasy!!

    var collections = QuestionConnectionHandler.MongoCollection.Database.GetCollectionNames();
    Console.WriteLine("\nThe following collections are present in the database");
    collections.ForEach(Console.WriteLine);
    var client = new MongoClient(@"mongodb://localhost");
    var server = client.GetServer();
    var databases = server.GetDatabaseNames().ToList();
    Console.WriteLine("\nAll the databases in the server");
    databases.ForEach(Console.WriteLine);

MongoDB C# Driver Part 3

In the previous post we have seen some simple queries. It is time we move onto something more concrete and realistic. There are basically two ways of querying MongoDB with the driver. First, as I showed last time is using LINQ. To use LINQ we need to first move into the Queryable world and then proceed with actual querying.
Be careful about pulling all the documents locally and then performing operations on them. What we really want to do is offload all our querying to MongoDB and then only use the results. The driver implements the IQueryable interface and hence we should use it.

	var result = UserConnectionHandler.MongoCollection.AsQueryable()
                                      .Where(u => u.Reputation > reputation);

The alternative to this form of querying is using a BsonDocument and a MongoQuery. The way to build up such a query is below. Note, that the first lambda is the property and the second parameter is the key for the filtering. The query builder is in the MongoDB.Drivers.Builders namespace.

var query = Query<User>.GT(u => u.Reputation, reputation);
var result = UserConnectionHandler.MongoCollection.FindAs<User>(query);

It is a bit more work to specify queries by hand so I would prefer LINQ, but both options are available.
Another, interesting querying mechanism is Regex. It was kind of hard to locate in the API ( or may be I just didn’t know where to look). It is present in Bson namesapce.

public void UserNameStartsWith(string searchKey)
{
var query = Query.Matches("Name", new BsonRegularExpression(string.Format("^{0}", searchKey)));
var result = UserConnectionHandler.MongoCollection.Find(query);
Console.WriteLine("We found {0} Users whose name starts with {1}", result.Count(), searchKey);
}

Select does not result in fewer fields being returned from the server. The entire document is pulled back and passed to the native Select method. Therefore, the projection is performed client side. We should use the IQueryable implementation from the MongoDB.Driver.Linq namespace. Alternatively, there is SetFields() that is available to selectively bring fields back from the database.

var query = Query.Matches("Name", new BsonRegularExpression(string.Format("^{0}", searchKey)));
var result = UserConnectionHandler.MongoCollection.Find(query)
     			.SetFields(Fields<User>.Include(u => u.Name, u => u.Reputation));

MongoDB C# Driver Part 2

Having done the inital work for talking to MongoDB we can now create some POCO classes and then do some querying on top of it. As usual, my model is Questions and Users.

public class Question : MongoEntity
    {
        public string Text { get; set; }
        public string Answer { get; set; }
        public DateTime CreatedOn { get; set; }
        public int Difficulty { get; set; }
    }
public class User : MongoEntity
    {
        public string Name { get; set; }
        public int Reputation { get; set; }
    }

Finally, we get down to some rela stuff.

public class SimpleQueries
    {
        protected readonly MongoConnectionHandler<User> UserConnectionHandler;
        protected readonly MongoConnectionHandler<Question> QuestionConnectionHandler;

        public SimpleQueries()
        {
            UserConnectionHandler = new MongoConnectionHandler<User>("MongoDBDemo");
            QuestionConnectionHandler = new MongoConnectionHandler<Question>("MongoDBDemo");
        }

        public void CreateQuestion(Question question)
        {
            //// Save the entity with safe mode (WriteConcern.Acknowledged)
            var result = QuestionConnectionHandler.MongoCollection.Save<Question>(question, 
                                 new MongoInsertOptions { WriteConcern = WriteConcern.Acknowledged});

            if (!result.Ok)
            {
                Console.WriteLine(result.LastErrorMessage);
            }
            else if (result.Response["err"] != null)
            {
                Console.WriteLine("Insertion was successfull");
            }
        }

        public void CreateUser(User user)
        {
            //// Save the entity with safe mode (WriteConcern.Acknowledged)
            var result = UserConnectionHandler.MongoCollection.Save<User>(user, 
                              new MongoInsertOptions { WriteConcern = WriteConcern.Acknowledged });

            if (!result.Ok)
            {
                Console.WriteLine(result.LastErrorMessage);
            }
            else if (result.Response["err"] != null)
            {
                Console.WriteLine("Insertion was successfull");
            }
        }

        public void GetAllQuestions()
        {
            var cursor = QuestionConnectionHandler.MongoCollection.AsQueryable();
            var resultSet = cursor.ToList();

            Console.WriteLine("Writing out all the questions");
            foreach (var result in resultSet)
            {
                Console.WriteLine("Text : {0},  Answer : {1}", result.Text, result.Answer);
            }
        }

        public ObjectId GetOneQuestion()
        {
            var cursor = QuestionConnectionHandler.MongoCollection.AsQueryable().FirstOrDefault();

            Console.WriteLine(cursor.Id);
            return cursor.Id;
        }

        public void DeleteQuestion(ObjectId id)
        {
            var result = QuestionConnectionHandler.MongoCollection.Remove(
                Query<Question>.EQ(e => e.Id, id), RemoveFlags.None, WriteConcern.Acknowledged);

            if (!result.Ok)
            {
                Console.WriteLine(result.ErrorMessage);
            }
            else
            {
                Console.WriteLine("Delete Operation OK : {0}", result.Ok);
            }
        }
    }

Now, that we have some capabilities in our application, we can query away.

//Seed Data
var question = new Question { Text = "Who are you ?", Answer = "I am MongoDB.",
                              CreatedOn = DateTime.Now, Difficulty = 3 };
var user = new User {Name = "Ashutosh", Reputation = 100};
var queries = new SimpleQueries();
queries.CreateQuestion(question);
queries.CreateUser(user);
var queries = new SimpleQueries();
queries.GetAllQuestions();
var id = queries.GetOneQuestion();
queries.DeleteQuestion(id);

If all is well then you will see some output and the world will be a better place.