Pub/Sub with WCF (Part 2)
Service Bus May CTP has a small glitch when it comes to pub/sub messaging using the WCF programing model. The May CTP API out-of-box doesn’t pick up filter/promoted properties from the WCF data contracts and requires you to explicitly specify these properties on the BrokeredMessage object outside of core WCF programing model as shown in part 1.
I didn’t like this repetition and decided to prototype a solution using the WCF extensibility model and after few hours of coding created a solution which looks quite cool :)
In my solution a DataMember can be marked with [PromotedProperty] attribute and a custom operation behavior picks these annotations and promote them as filter properties by automatically attaching them with the outgoing message.
public class Order
{
public double Amount { get; set; }
[PromotedProperty]
public string ShipCity { get; set; }
}
[ServiceContract]
public interface IOrderService
{
[OperationContract(Name = "SubmitFlat", IsOneWay = true)]
[PropertyPromotionBehavior]
void Submit(double amount, [PromotedProperty] string shipCity);
[OperationContract(IsOneWay = true)]
[PropertyPromotionBehavior]
void Submit(Order order);
}
I have decided to use a custom formatter to implement property lifting and injection functionality primarily because at the formatter level I still have a fairly typed view of the method call. At message inspector level most of typed-ness has gone and it would have required more work.
The [PropertyPromotionBehavior] creates a ‘promotion model’ (list of properties needs to be promoted) by reflecting on the data contract. The ‘promotion model’ is then populated by the custom formatter with actual parameter values extracted from the call context. [PropertyPromotionBehavior] also replaces the default formatter with a custom PromotionFormatter which wraps the default formatter and does the additional work of property promotion. I have highlighted the relevant bits below.
[AttributeUsage(AttributeTargets.Method)]
publicclassPropertyPromotionBehaviorAttribute : Attribute, IOperationBehavior
{
publicvoid Validate(OperationDescription operationDescription) { }
publicvoid ApplyDispatchBehavior(OperationDescription operationDescription, DispatchOperation dispatchOperation) { }
publicvoid ApplyClientBehavior(OperationDescription operationDescription, ClientOperation clientOperation)
{
var promotedProperties = LoadPromotedProperties(operationDescription);
if (promotedProperties.Count <= 0) return;
var dummy = newClientOperation(clientOperation.Parent, "dummy", "urn:dummy");
var behavior =
operationDescription.Behaviors.Find<DataContractSerializerOperationBehavior>() asIOperationBehavior;
behavior.ApplyClientBehavior(operationDescription, dummy);
clientOperation.Formatter = newPromotionFormatter(dummy.Formatter, promotedProperties);
clientOperation.SerializeRequest = dummy.SerializeRequest;
}
publicvoid AddBindingParameters(OperationDescription operationDescription, BindingParameterCollection bindingParameters) { }
...
}
classPromotionFormatter : IClientMessageFormatter
{
privatereadonlyIClientMessageFormatter _orignalFormatter;
privatereadonlyIList<PromotedPropertyDescription> _promotions;
public PromotionFormatter(IClientMessageFormatter orignalFormatter, IList<PromotedPropertyDescription> promotions)
{
_orignalFormatter = orignalFormatter;
_promotions = promotions;
}
publicobject DeserializeReply(Message message, object[] parameters)
{
return _orignalFormatter.DeserializeReply(message, parameters);
}
publicMessage SerializeRequest(MessageVersion messageVersion, object[] parameters)
{
var message = _orignalFormatter.SerializeRequest(messageVersion, parameters);
...
...
if (_promotions.Count > 0)
{
var bmp = newBrokeredMessageProperty();
foreach (var promotion in _promotions)
{
bmp.Properties[promotion.Name] = promotion.Value;
}
message.Properties[BrokeredMessageProperty.Name] = bmp;
}
return message;
}
}
With these extensions in place, the code looks just like the normal WCF code and property promotion happens in the background as you would expect.
var cf = new ChannelFactory<IOrderService>(serviceBusBinding, topicAddress);
var proxy = cf.CreateChannel();
proxy.Submit(new Order { Amount = 200, ShipCity = "london" });
proxy.Submit(new Order { Amount = 322, ShipCity = "reading" });
proxy.Submit(101, "reading");
Console.WriteLine("Submitted.");
Hopefully Service Bus programing model will support this kind of behavior soon but for the time being these extensions would probably fill the gap.
I have attached complete source code with this post so please feel free to download and use.