Tasks with Timeouts – Contd.

As mentioned in the last post we can now have tasks with individual timeouts. The code looked a little heavy. Can we do better ? Yes we can !!


var tasks = new List<Task>();
try
{
//instead of checking for fault within the continutaion, 
//we can just use a TaskContinuationOption to tell communicate the right semantics

var t1 = Task.Factory.StartNew(_ => LongRunningTask(), TaskCreationOptions.AttachedToParent)
              .TimeoutAfter(1000)
              .ContinueWith(t => SomethingUsefulWithTheResult(), 
                                 TaskContinuationOptions.NotOnFaulted);

var t2 = Task.Factory.StartNew(_ => LongRunningTask(), TaskCreationOptions.AttachedToParent)
              .ContinueWith(t => SomethingUsefulWithTheResult());

var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), TaskCreationOptions.AttachedToParent)
              .ContinueWith(t => SomethingUsefulWithTheResult());
            
tasks.Add(t1);
tasks.Add(t2);
tasks.Add(t3);
                            
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{                
    Console.WriteLine("There was an exception");
    Console.WriteLine(ex.InnerException.Message);               
}
Advertisements

Tasks with Timeouts

So the task is to timeout a task. now, I never thought it would take me as long as it did. Turns out it is a really tricky problem problem. I was expecting something within the framework to make life easier, there isn’t anything by default but msdn to the rescue Tasks With Timeout.

So, the extension method is

    public static class TaskWithTimeout
    {
        internal struct VoidTypeStruct
        { }
        internal static void MarshalTaskResults<TResult>(Task source, TaskCompletionSource<TResult> proxy)
        {
            switch (source.Status)
            {
                case TaskStatus.Faulted:
                    proxy.TrySetException(source.Exception);
                    break;
                case TaskStatus.Canceled:
                    proxy.TrySetCanceled();
                    break;
                case TaskStatus.RanToCompletion:
                    Task<TResult> castedSource = source as Task<TResult>;
                    proxy.TrySetResult(
                        castedSource == null ? default(TResult) : // source is a Task
                            castedSource.Result); // source is a Task<TResult>
                    break;
            }
        }
        
        public static Task TimeoutAfter(this Task task, int millisecondsTimeout)
        {
            // Short-circuit #1: infinite timeout or task already completed
            if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
            {
                // Either the task has already completed or timeout will never occur.
                // No proxy necessary.
                return task;
            }

            // tcs.Task will be returned as a proxy to the caller
            TaskCompletionSource<VoidTypeStruct> tcs = new TaskCompletionSource<VoidTypeStruct>();

            // Short-circuit #2: zero timeout
            if (millisecondsTimeout == 0)
            {
                // We've already timed out.
                tcs.SetException(new TimeoutException());
                return tcs.Task;
            }

            // Set up a timer to complete after the specified timeout period
            Timer timer = new Timer(state =>
            {
                // Recover your state information
                var myTcs = (TaskCompletionSource<VoidTypeStruct>)state;
                // Fault our proxy with a TimeoutException
                myTcs.TrySetException(new TimeoutException());
            }, tcs, millisecondsTimeout, Timeout.Infinite);

            // Wire up the logic for what happens when source task completes
            task.ContinueWith(antecedent =>
                                {
                                    timer.Dispose(); // Cancel the timer
                                    MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
                                },
                                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

            return tcs.Task;
        }

        public static Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, int millisecondsTimeout)
        {
            // Short-circuit #1: infinite timeout or task already completed
            if (task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
            {
                // Either the task has already completed or timeout will never occur.
                // No proxy necessary.
                return task;
            }

            // tcs.Task will be returned as a proxy to the caller
            TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();

            // Short-circuit #2: zero timeout
            if (millisecondsTimeout == 0)
            {
                // We've already timed out.
                tcs.SetException(new TimeoutException());
                return tcs.Task;
            }

            // Set up a timer to complete after the specified timeout period
            Timer timer = new Timer(state =>
                                    {
                                        // Recover your state information
                                        var myTcs = (TaskCompletionSource<TResult>)state;
                                        // Fault our proxy with a TimeoutException
                                        myTcs.TrySetException(new TimeoutException());
                                    }, tcs, millisecondsTimeout, Timeout.Infinite);

            // Wire up the logic for what happens when source task completes
            task.ContinueWith(antecedent =>
                                {
                                    timer.Dispose(); // Cancel the timer
                                    MarshalTaskResults(antecedent, tcs); // Marshal results to proxy
                                }, 
                                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);

            return tcs.Task;
        }
    }

A lot of code for doing this, and the msdn article remains the better source of explanation.

Now using this.

public class Program
	{
		private static List<int> Output = new List<int>();
		private static Random _random = new Random();
		
		public static int LongRunningTask(string message)
		{
			Console.WriteLine(message);
			Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
			//Simulate a long running task
            Thread.Sleep(TimeSpan.FromSeconds(2));
			var number = _random.Next();
			Console.WriteLine("Adding " + number + " From thread  - " + Thread.CurrentThread.ManagedThreadId);
			return number;
		}

		public static void Main(string[] args)
		{
			Console.WriteLine("In Main");
			Console.WriteLine("Managed thread Id " + Thread.CurrentThread.ManagedThreadId);
			var cts = new CancellationTokenSource();
			var tasks = new List<Task>();
			try
			{
//In the continuation check for the condition of fault (or something more if you so need) and perform the //continuation
				var t1 = Task.Factory.StartNew(_ =>	LongRunningTask("Entering task1"),
												    TaskCreationOptions.AttachedToParent)
                                     .TimeoutAfter(1000)
                                     .ContinueWith(antecedent => {
						if(!(antecedent.IsCanceled || antecedent.IsFaulted))
                                                         Output.Add(antecedent.Result);
								}
								, cts.Token);
				var t2 = Task.Factory.StartNew(_ => LongRunningTask("Entering task2"),
													TaskCreationOptions.AttachedToParent)
                                     .ContinueWith(_ => Output.Add(_.Result));
                var t3 = Task.Factory.StartNew(_ => LongRunningTask("Entering task3"), 
													TaskCreationOptions.AttachedToParent)
                                     .ContinueWith(_ => Output.Add(_.Result));
            
                tasks.Add(t1);
                tasks.Add(t2);
                tasks.Add(t3);
                            
                Task.WaitAll(tasks.ToArray());
            }
            catch (Exception ex)
            {                
                Console.WriteLine("There was an exception");
                Console.WriteLine(ex.InnerException.Message);               
            }

            Console.WriteLine("Output :");
            Output.ForEach(_ => Console.WriteLine(_));

            Console.ReadLine();
        }
    }

The important part being that the continuation is applied after the timeout and it won’t work with the other way around.
The output therefore basically looks like this :

In Main
Managed thread Id 9
Entering task1
Managed thread Id 10
Entering task2
Managed thread Id 11
Entering task3
Managed thread Id 14
Adding 194443354 From thread  - 10
Adding 792426557 From thread  - 11
Adding 230130793 From thread  - 14
Output :
792426557
230130793

Next, time I will try and write about some other things on Flirting with Tasks.

Review : C# In Depth

I have read the second edition as well. The third edition carries on from there and provides a deeper look in C# 5 and it’s key feature async/await. If you want to understand what goes on behind the scenes, this one is for you. Jon Skeet ( yes the famous guy Jon Skeet) has managed to come out with a great book once again.

Jon starts easy on this, but doesn’t hold back. The prose is lucid yet well paced. It is one of the few books that make the effort to take the reader on journey. The journey begins with C#1 and then continues to C#5. Hardly any chapters drag on anymore than they should. Linq is covered in great depth and is a nicely written with enough diagrams to visualize what is happening under the hood. Generics is one my personal favourites. The text is nice, so are the code samples. I have never really understood generics completely, but the books does make things a lot clearer. The treatment of dynamic is really nice. It goes into the heart of the DLR and shows everything that you need to know to really understand dynamic.

The book really shines when you move to async/await. Jon makes a rather tough concept easier. I had to read it several times but each time the concept became easier. I only wish we moved away from the download the web page example for async. The book does spend time on compiler transformations that are behind async/await. Be patient when you read it, it will take time to sink in.

The only part that has been left out from the third edition is the chapter on Code Contracts. Jon clearly mentions that the topic hasn’t gained as much traction as he hoped. It may come back in the future though. As many have mentioned this before this is not a book for beginners. Use this to become a better C# programmer, after you have written C# for some time. This is a must have in your collection, especially if you work with C# day in and day out. It will give you a greater understanding of how the language designers wanted you think, and what makes C# a real joy.
A 4.5/5 for this one, keeping the 0.5 for the next edition :).

P.S. Even the appendix is handy.

Disclosure : I got a free copy of the book to review. The review is my own opinion and not influenced by anyone else.

MongoDB…Ops Stuff

In the previous two posts we have seen some basic querying and how to leverage the querying mechanism to get up and running. Now, we are off in the wild world and we also need to some more complicated stuff.

Creating Indexes
The api surface is really smooth with this, allowing us to specify the sort order of the indexes and the manner of building them foreground or background.

 public void CreateIndex()
{
var QuestionConnectionHandler = new MongoConnectionHandler<Question>("MongoDBDemo");
QuestionConnectionHandler.MongoCollection.EnsureIndex( 
                          IndexKeys.Ascending("Difficulty"), IndexOptions.SetBackground(true));
}

Dropping indexes is also easy with

QuestionConnectionHandler.MongoCollection.DropAllIndexes();

What happens when you want to see what is going on under the hood ? You let the database Explain it’s Plan.

QuestionConnectionHandler.MongoCollection.AsQueryable()
                  .Where(q => q.Difficulty >= 3).Explain();
//or if you went the other way 
var query = Query<Question>.GTE(q => q.Difficulty, 3);
var explainPlan = QuestionConnectionHandler.MongoCollection
                          .FindAs<Question>(query).Explain();

Now, if only we could have some stats about our database and the indexes. All wrapped in a nice syntax.

    var stats = QuestionConnectionHandler.MongoCollection.GetStats();
    Console.WriteLine("Namespace : {0}", stats.Namespace);
    Console.WriteLine("DataSize : {0}", stats.DataSize);
    Console.WriteLine("Index Count : {0}", stats.IndexCount);
    stats.IndexSizes.Keys.ForEach(Console.WriteLine);
    var size = QuestionConnectionHandler.MongoCollection.GetTotalDataSize();
    Console.WriteLine("The total datasize for this collection is {0}", size);

A routine task is to get all the collections in a database and all the databases on the server itself. Easy peasy!!

    var collections = QuestionConnectionHandler.MongoCollection.Database.GetCollectionNames();
    Console.WriteLine("\nThe following collections are present in the database");
    collections.ForEach(Console.WriteLine);
    var client = new MongoClient(@"mongodb://localhost");
    var server = client.GetServer();
    var databases = server.GetDatabaseNames().ToList();
    Console.WriteLine("\nAll the databases in the server");
    databases.ForEach(Console.WriteLine);

MongoDB C# Driver Part 3

In the previous post we have seen some simple queries. It is time we move onto something more concrete and realistic. There are basically two ways of querying MongoDB with the driver. First, as I showed last time is using LINQ. To use LINQ we need to first move into the Queryable world and then proceed with actual querying.
Be careful about pulling all the documents locally and then performing operations on them. What we really want to do is offload all our querying to MongoDB and then only use the results. The driver implements the IQueryable interface and hence we should use it.

	var result = UserConnectionHandler.MongoCollection.AsQueryable()
                                      .Where(u => u.Reputation > reputation);

The alternative to this form of querying is using a BsonDocument and a MongoQuery. The way to build up such a query is below. Note, that the first lambda is the property and the second parameter is the key for the filtering. The query builder is in the MongoDB.Drivers.Builders namespace.

var query = Query<User>.GT(u => u.Reputation, reputation);
var result = UserConnectionHandler.MongoCollection.FindAs<User>(query);

It is a bit more work to specify queries by hand so I would prefer LINQ, but both options are available.
Another, interesting querying mechanism is Regex. It was kind of hard to locate in the API ( or may be I just didn’t know where to look). It is present in Bson namesapce.

public void UserNameStartsWith(string searchKey)
{
var query = Query.Matches("Name", new BsonRegularExpression(string.Format("^{0}", searchKey)));
var result = UserConnectionHandler.MongoCollection.Find(query);
Console.WriteLine("We found {0} Users whose name starts with {1}", result.Count(), searchKey);
}

Select does not result in fewer fields being returned from the server. The entire document is pulled back and passed to the native Select method. Therefore, the projection is performed client side. We should use the IQueryable implementation from the MongoDB.Driver.Linq namespace. Alternatively, there is SetFields() that is available to selectively bring fields back from the database.

var query = Query.Matches("Name", new BsonRegularExpression(string.Format("^{0}", searchKey)));
var result = UserConnectionHandler.MongoCollection.Find(query)
     			.SetFields(Fields<User>.Include(u => u.Name, u => u.Reputation));

MongoDB C# Driver Part 2

Having done the inital work for talking to MongoDB we can now create some POCO classes and then do some querying on top of it. As usual, my model is Questions and Users.

public class Question : MongoEntity
    {
        public string Text { get; set; }
        public string Answer { get; set; }
        public DateTime CreatedOn { get; set; }
        public int Difficulty { get; set; }
    }
public class User : MongoEntity
    {
        public string Name { get; set; }
        public int Reputation { get; set; }
    }

Finally, we get down to some rela stuff.

public class SimpleQueries
    {
        protected readonly MongoConnectionHandler<User> UserConnectionHandler;
        protected readonly MongoConnectionHandler<Question> QuestionConnectionHandler;

        public SimpleQueries()
        {
            UserConnectionHandler = new MongoConnectionHandler<User>("MongoDBDemo");
            QuestionConnectionHandler = new MongoConnectionHandler<Question>("MongoDBDemo");
        }

        public void CreateQuestion(Question question)
        {
            //// Save the entity with safe mode (WriteConcern.Acknowledged)
            var result = QuestionConnectionHandler.MongoCollection.Save<Question>(question, 
                                 new MongoInsertOptions { WriteConcern = WriteConcern.Acknowledged});

            if (!result.Ok)
            {
                Console.WriteLine(result.LastErrorMessage);
            }
            else if (result.Response["err"] != null)
            {
                Console.WriteLine("Insertion was successfull");
            }
        }

        public void CreateUser(User user)
        {
            //// Save the entity with safe mode (WriteConcern.Acknowledged)
            var result = UserConnectionHandler.MongoCollection.Save<User>(user, 
                              new MongoInsertOptions { WriteConcern = WriteConcern.Acknowledged });

            if (!result.Ok)
            {
                Console.WriteLine(result.LastErrorMessage);
            }
            else if (result.Response["err"] != null)
            {
                Console.WriteLine("Insertion was successfull");
            }
        }

        public void GetAllQuestions()
        {
            var cursor = QuestionConnectionHandler.MongoCollection.AsQueryable();
            var resultSet = cursor.ToList();

            Console.WriteLine("Writing out all the questions");
            foreach (var result in resultSet)
            {
                Console.WriteLine("Text : {0},  Answer : {1}", result.Text, result.Answer);
            }
        }

        public ObjectId GetOneQuestion()
        {
            var cursor = QuestionConnectionHandler.MongoCollection.AsQueryable().FirstOrDefault();

            Console.WriteLine(cursor.Id);
            return cursor.Id;
        }

        public void DeleteQuestion(ObjectId id)
        {
            var result = QuestionConnectionHandler.MongoCollection.Remove(
                Query<Question>.EQ(e => e.Id, id), RemoveFlags.None, WriteConcern.Acknowledged);

            if (!result.Ok)
            {
                Console.WriteLine(result.ErrorMessage);
            }
            else
            {
                Console.WriteLine("Delete Operation OK : {0}", result.Ok);
            }
        }
    }

Now, that we have some capabilities in our application, we can query away.

//Seed Data
var question = new Question { Text = "Who are you ?", Answer = "I am MongoDB.",
                              CreatedOn = DateTime.Now, Difficulty = 3 };
var user = new User {Name = "Ashutosh", Reputation = 100};
var queries = new SimpleQueries();
queries.CreateQuestion(question);
queries.CreateUser(user);
var queries = new SimpleQueries();
queries.GetAllQuestions();
var id = queries.GetOneQuestion();
queries.DeleteQuestion(id);

If all is well then you will see some output and the world will be a better place.

MongoDB..C# Driver Part 1

There are several drivers available for C#. I do not plan to go thorugh all of them here. Since, the official driver now has LINQ(although not complete yet) support, we will go with it.
Basic Setup..get the stuff of NuGet. It should put in two dll’s in there
1. MongoDB.Bson
2. MongoDB.Driver

We will get to what does what later. For now assume that we only want to get some data in and out of MongoDB.
Let’s connect to MongoDB now(the code below is just quick and dirty, we will see a better version later).

//MongoDB should be running by now, and assuming you have inserted some documents in there
var client = new MongoClient(@"mongodb://localhost");
var server = client.GetServer();
var database = server.GetDatabase("YourDataBaseName");
var mongoCollection = database.GetCollection("SomeCollectionName");
//Getting all the documents
var cursor = mongoCollection.AsQueryable();
cursor.ForEach(Console.WriteLine);	

It is time for some explanataion.

What is MongoClient ?
MongoClient is the standard way of accessing the driver. I have dabbled with the python driver(pymongo) and it is the same there. I believe it was changed to keep the drivers for different languages in sync.
Reading up on this told me that SafeMode settings were dropped in favour of WriteConcern and instead of SlaveOK , ReadPreference should be used. The settings were present previously in MongoServerSettings, the new ones are on MongoClientSettings. IpV6 setting is also in MongoClientSettings.

What is MongoServer ?
The server manages the life cycle of ServerProxies. Gives access to databases and some sort of connection management. More needs to be said about the server, I will stop short for now.

Notice that I did not use a genric GetCollection here (yet, will do so soon). The generic method is also available, which we will put to use soon.

All documents in MongoDB have an Id which has the type ObjectId (ObjectId resides in MongoDB.Bson).
So, we can have an interface which takes care of this and subsequently all our types can implement this.

public interface IMongoEntity
{
    ObjectId Id { get; set; }
}
public class MongoEntity : IMongoEntity
{
   public ObjectId Id { get; set; }
}

Refinement and obtaining a better MongoDBHandler(you can do much better than what I will show you here, but that depends to large extent on your taste).

 public class MongoConnectionHandler<T> where T : IMongoEntity
    {
        public MongoCollection<T> MongoCollection { get; private set; }
        private const string ConnectionString = @"mongodb://localhost";

        public MongoConnectionHandler(string databaseName)
        {
            var client = new MongoClient(ConnectionString);
            var server = client.GetServer();
            var database = server.GetDatabase(databaseName);
            MongoCollection = database.GetCollection<T>(typeof (T).Name.ToLower() + "s");
        }
    }

Having a the *databaseName* as a parameter is very subjective, if you only have a single database then you might want to just stick it directly in the method call, otherwise it just increases the burden of the caller and spreads the database name all over the code base. Another option that comes to my mind striaghtaway is having an Enum of DatabaseName or in some sort of configuration files. The same goes for ConnectionString, put it in some place configurable. The collection names are plural so you need to just stick an extra “s” in there(this took me quite a while to figure out).
So, we are setup nicely to go forward and do some more interesting work with MongoDB.