Skip to content

Commit

Permalink
Merge pull request #13 from steveniemitz/thriftmux-client-id
Browse files Browse the repository at this point in the history
Support for client IDs in thriftmux
  • Loading branch information
steveniemitz authored Oct 23, 2017
2 parents 6d72813 + ca1f0d9 commit 071617d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
13 changes: 8 additions & 5 deletions scales/thriftmux/builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .sink import (
ClientIdInterceptorSink,
ThriftMuxMessageSerializerSink,
SocketTransportSink,
)
Expand All @@ -9,16 +10,18 @@

class ThriftMux(object):
@staticmethod
def NewBuilder(Iface):
return Scales.NewBuilder(Iface) \
.WithSink(ThriftMuxMessageSerializerSink.Builder()) \
def NewBuilder(Iface, client_id=None):
builder = Scales.NewBuilder(Iface)
if client_id:
builder = builder.WithSink(ClientIdInterceptorSink.Builder(client_id=client_id))
return builder.WithSink(ThriftMuxMessageSerializerSink.Builder()) \
.WithSink(ApertureBalancerSink.Builder()) \
.WithSink(ResurrectorSink.Builder()) \
.WithSink(SocketTransportSink.Builder())

@staticmethod
def NewClient(Iface, uri, timeout=10):
return ThriftMux.NewBuilder(Iface) \
def NewClient(Iface, uri, timeout=10, client_id=None):
return ThriftMux.NewBuilder(Iface, client_id=client_id) \
.SetUri(uri) \
.SetTimeout(timeout) \
.Build()
3 changes: 3 additions & 0 deletions scales/thriftmux/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def _WriteContext(ctx, buf):
if isinstance(v, Deadline):
buf.write(pack('!h', 16))
buf.write(pack('!qq', v._ts, v._timeout))
elif isinstance(v, basestring):
v_len = len(v)
buf.write(pack('!h%ds' % v_len, v_len, v))
else:
raise NotImplementedError("Unsupported value type in context.")

Expand Down
21 changes: 21 additions & 0 deletions scales/thriftmux/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,24 @@ def AsyncProcessResponse(self, sink_stack, context, stream, msg):
sink_stack.AsyncProcessResponseMessage(msg)

ThriftMuxMessageSerializerSink.Builder = SinkProvider(ThriftMuxMessageSerializerSink)

class ClientIdInterceptorSink(ClientMessageSink):
__slots__ = '_client_id',

CLIENT_ID_HEADER = 'com.twitter.finagle.thrift.ClientIdContext'

def __init__(self, next_provider, sink_properties, global_properties):
super(ClientIdInterceptorSink, self).__init__()
self._client_id = sink_properties.client_id
self.next_sink = next_provider.CreateSink(global_properties)

def AsyncProcessRequest(self, sink_stack, msg, stream, headers):
msg.properties[self.CLIENT_ID_HEADER] = self._client_id
self.next_sink.AsyncProcessRequest(sink_stack, msg, stream, headers)

def AsyncProcessResponse(self, sink_stack, context, stream, msg):
raise NotImplementedError("This should never be called")

ClientIdInterceptorSink.Builder = SinkProvider(
ClientIdInterceptorSink,
client_id='client')

0 comments on commit 071617d

Please sign in to comment.