commit d04beff2424ac64e3bef40b5d645787400b9d168
parent 52b02c92c8feeb7259bab19e264c1401404a3483
Author: Rich Lane <rlane@club.cc.cmu.edu>
Date: Mon, 12 Apr 2010 20:59:41 -0700
standalone sup-server
Diffstat:
| M |
bin/sup-cmd |
| |
121 |
++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
|
| A |
bin/sup-server |
| |
43 |
+++++++++++++++++++++++++++++++++++++++++++
|
| A |
lib/sup/client.rb |
| |
72 |
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
| D |
lib/sup/connection.rb |
| |
63 |
---------------------------------------------------------------
|
| A |
lib/sup/protocol.rb |
| |
147 |
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
| A |
lib/sup/server.rb |
| |
95 |
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
6 files changed, 428 insertions(+), 113 deletions(-)
diff --git a/bin/sup-cmd b/bin/sup-cmd
@@ -2,6 +2,7 @@
require 'rubygems'
require 'trollop'
require 'sup'
+require 'sup/client'
require 'pp'
require 'yaml'
include Redwood
@@ -10,7 +11,7 @@ SUB_COMMANDS = %w(query count label add)
global_opts = Trollop::options do
#version = "sup-cmd (sup #{Redwood::VERSION})"
banner <<EOS
-Interact with a Sup index.
+Connect to a running sup-server.
Usage:
sup-cmd [global options] command [options]
@@ -20,6 +21,8 @@ Usage:
Global options:
EOS
+ opt :host, "server address", :type => :string, :default => 'localhost', :short => 'o'
+ opt :port, "server port", :type => :int, :default => 4300
opt :verbose
stop_on SUB_COMMANDS
@@ -50,61 +53,79 @@ else
Trollop::die "unrecognized command #{cmd.inspect}"
end
-def get_query
- text = ARGV.first or fail "query argument required"
- Redwood::Index.parse_query text
-end
-
-Redwood.start
-Index.init
-Index.lock_interactively or exit
-begin
- if(s = Redwood::SourceManager.source_for SentManager.source_uri)
- SentManager.source = s
- else
- Redwood::SourceManager.add_source SentManager.default_source
+class SupCmd < Redwood::Client
+ def initialize cmd, args, opts
+ @cmd = cmd
+ @opts = opts
+ @args = args
+ super()
end
- Index.load
- c = Redwood::Connection.new
-
-case cmd
-when "query"
- c.query get_query, cmd_opts[:offset], cmd_opts[:limit], cmd_opts[:raw] do |result|
- puts YAML.dump(result['summary'])
- puts YAML.dump(result['raw']) if cmd_opts[:raw]
- end
-when "count"
- puts c.count(get_query)
-when "label"
- c.label get_query, cmd_opts[:remove_labels].split(','), cmd_opts[:add_labels].split(',')
-when "add"
- ARGF.binmode
- labels = cmd_opts[:labels].split(',')
- get_message = lambda do
- return ARGF.gets(nil) unless cmd_opts[:mbox]
- str = ""
- l = ARGF.gets
- str << l until ARGF.closed? || ARGF.eof? || MBox::is_break_line?(l = ARGF.gets)
- str.empty? ? nil : str
+ def get_query
+ @args.first or fail "query argument required"
end
- i_s = i = 0
- t = Time.now
- while raw = get_message[]
- i += 1
- t_d = Time.now - t
- if t_d >= 5
- i_d = i - i_s
- puts "indexed #{i} messages (#{i_d/t_d} m/s)" if global_opts[:verbose]
+
+ def connection_established
+ case @cmd
+ when "query"
+ query get_query, @opts[:offset], @opts[:limit], @opts[:raw] do |result|
+ if result
+ puts YAML.dump(result['summary'])
+ puts YAML.dump(result['raw']) if @opts[:raw]
+ else
+ close_connection
+ end
+ end
+ when "count"
+ count(get_query) do |x|
+ puts x
+ close_connection
+ end
+ when "label"
+ label get_query, @opts[:remove_labels].split(','), @opts[:add_labels].split(',') do
+ close_connection
+ end
+ when "add"
+ ARGF.binmode
+ labels = @opts[:labels].split(',')
+ get_message = lambda do
+ return ARGF.gets(nil) unless @opts[:mbox]
+ str = ""
+ l = ARGF.gets
+ str << l until ARGF.closed? || ARGF.eof? || MBox::is_break_line?(l = ARGF.gets)
+ str.empty? ? nil : str
+ end
+ i_s = i = 0
t = Time.now
- i_s = i
+ while raw = get_message[]
+ i += 1
+ t_d = Time.now - t
+ if t_d >= 5
+ i_d = i - i_s
+ puts "indexed #{i} messages (#{i_d/t_d} m/s)" if global_opts[:verbose]
+ t = Time.now
+ i_s = i
+ end
+ add raw, labels do
+ close_connection
+ end
+ end
+ else
+ fail "#{@cmd} command unimplemented"
+ close_connection
end
- c.add raw, labels
end
-else
- fail "#{cmd} command unimplemented"
+
+ def unbind
+ EM.stop
+ end
end
-ensure
- Index.unlock
+
+EM.run do
+ EM.connect global_opts[:host], global_opts[:port],
+ SupCmd, cmd, ARGV, cmd_opts.merge(global_opts)
end
+
+exit 0
+
diff --git a/bin/sup-server b/bin/sup-server
@@ -0,0 +1,43 @@
+#!/usr/bin/env ruby
+require 'rubygems'
+require 'trollop'
+require 'sup'
+require 'sup/server'
+require 'pp'
+require 'yaml'
+include Redwood
+
+global_opts = Trollop::options do
+ #version = "sup-cmd (sup #{Redwood::VERSION})"
+ banner <<EOS
+Interact with a Sup index.
+
+Usage:
+ sup-server [options]
+EOS
+
+ opt :host, "address to listen on", :type => :string, :default => 'localhost', :short => 'o'
+ opt :port, "port to listen on", :type => :int, :default => 4300
+ opt :verbose
+end
+
+Redwood.start
+Index.init
+Index.lock_interactively or exit
+begin
+ if(s = Redwood::SourceManager.source_for SentManager.source_uri)
+ SentManager.source = s
+ else
+ Redwood::SourceManager.add_source SentManager.default_source
+ end
+
+ Index.load
+
+ EM.run do
+ EM.start_server global_opts[:host], global_opts[:port], Redwood::Server
+ EM.next_tick { puts "ready" }
+ end
+
+ensure
+ Index.unlock
+end
diff --git a/lib/sup/client.rb b/lib/sup/client.rb
@@ -0,0 +1,72 @@
+require 'sup/protocol'
+
+class Redwood::Client < EM::P::RedwoodClient
+ def initialize *a
+ @next_tag = 1
+ @cbs = {}
+ super *a
+ end
+
+ def mktag &b
+ @next_tag.tap do |x|
+ @cbs[x] = b
+ @next_tag += 1
+ end
+ end
+
+ def rmtag tag
+ @cbs.delete tag
+ end
+
+ def query qstr, offset, limit, raw, &b
+ tag = mktag do |type,tag,args|
+ if type == 'message'
+ b.call args
+ else
+ fail unless type == 'done'
+ b.call nil
+ rmtag tag
+ end
+ end
+ send_message 'query', tag,
+ 'query' => qstr,
+ 'offset' => offset,
+ 'limit' => limit,
+ 'raw' => raw
+ end
+
+ def count qstr, &b
+ tag = mktag do |type,tag,args|
+ b.call args['count']
+ rmtag tag
+ end
+ send_message 'count', tag,
+ 'query' => qstr
+ end
+
+ def label qstr, add, remove, &b
+ tag = mktag do |type,tag,args|
+ b.call
+ rmtag tag
+ end
+ send_message 'label', tag,
+ 'query' => qstr,
+ 'add' => add,
+ 'remove' => remove
+ end
+
+ def add raw, labels, &b
+ tag = mktag do |type,tag,args|
+ b.call
+ rmtag tag
+ end
+ send_message 'add', tag,
+ 'raw' => raw,
+ 'labels' => labels
+ end
+
+ def receive_message type, tag, args
+ cb = @cbs[tag] or fail "invalid tag #{tag.inspect}"
+ cb[type, tag, args]
+ end
+end
diff --git a/lib/sup/connection.rb b/lib/sup/connection.rb
@@ -1,63 +0,0 @@
-module Redwood
-
-## Hacky implementation of the sup-server API using existing Sup code
-class Connection
- def result_from_message m, raw
- mkperson = lambda { |p| { :email => p.email, :name => p.name } }
- {
- 'summary' => {
- 'message_id' => m.id,
- 'date' => m.date,
- 'from' => mkperson[m.from],
- 'to' => m.to.map(&mkperson),
- 'cc' => m.cc.map(&mkperson),
- 'bcc' => m.bcc.map(&mkperson),
- 'subject' => m.subj,
- 'refs' => m.refs,
- 'replytos' => m.replytos,
- 'labels' => m.labels.map(&:to_s),
- },
- 'raw' => raw ? m.raw_message : nil,
- }
- end
-
- def query query, offset, limit, raw
- c = 0
- Index.each_message query do |m|
- next if c < offset
- break if c >= offset + limit if limit
- yield result_from_message(m, raw)
- c += 1
- end
- nil
- end
-
- def count query
- Index.num_results_for query
- end
-
- def label query, remove_labels, add_labels
- Index.each_message query do |m|
- remove_labels.each { |l| m.remove_label l }
- add_labels.each { |l| m.add_label l }
- Index.update_message_state m
- end
- nil
- end
-
- def add raw, labels
- SentManager.source.store_message Time.now, "test@example.com" do |io|
- io.write raw
- end
- m2 = nil
- PollManager.each_message_from(SentManager.source) do |m|
- PollManager.add_new_message m
- m2 = m
- end
- m2.labels = Set.new(labels.map(&:to_sym))
- Index.update_message_state m2
- nil
- end
-end
-
-end
diff --git a/lib/sup/protocol.rb b/lib/sup/protocol.rb
@@ -0,0 +1,147 @@
+require 'eventmachine'
+require 'socket'
+require 'stringio'
+require 'yajl'
+
+class EM::P::Redwood < EM::Connection
+ VERSION = 1
+ ENCODINGS = %w(marshal json)
+
+ def initialize *args
+ @state = :negotiating
+ @version_buf = ""
+ super
+ end
+
+ def receive_data data
+ if @state == :negotiating
+ @version_buf << data
+ if i = @version_buf.index("\n")
+ l = @version_buf.slice!(0..i)
+ receive_version *parse_version(l.strip)
+ x = @version_buf
+ @version_buf = nil
+ @state = :established
+ connection_established
+ receive_data x
+ end
+ else
+ @filter.decode(data).each { |msg| receive_message *msg }
+ end
+ end
+
+ def connection_established
+ puts "client connection established"
+ end
+
+ def send_version encodings, extensions
+ fail if encodings.empty?
+ send_data "Redwood #{VERSION} #{encodings * ','} #{extensions.empty? ? :none : (extensions * ',')}\n"
+ end
+
+ def send_message type, tag, params={}
+ fail "attempted to send message during negotiation" unless @state == :established
+ send_data @filter.encode([type,tag,params])
+ end
+
+ def receive_version l
+ fail "unimplemented"
+ end
+
+ def receive_message type, params
+ fail "unimplemented"
+ end
+
+private
+
+ def parse_version l
+ l =~ /^Redwood\s+(\d+)\s+([\w,]+)\s+([\w,]+)$/ or fail "unexpected banner #{l.inspect}"
+ version, encodings, extensions = $1.to_i, $2, $3
+ encodings = encodings.split ','
+ extensions = extensions.split ','
+ extensions = [] if extensions == ['none']
+ fail unless version == VERSION
+ fail if encodings.empty?
+ [encodings, extensions]
+ end
+
+ def create_filter encoding
+ case encoding
+ when 'json' then JSONFilter.new
+ when 'marshal' then MarshalFilter.new
+ else fail "unknown encoding #{encoding.inspect}"
+ end
+ end
+
+ class JSONFilter
+ def initialize
+ @parser = Yajl::Parser.new :check_utf8 => false
+ end
+
+ def decode chunk
+ parsed = []
+ @parser.on_parse_complete = lambda { |o| parsed << o }
+ @parser << chunk
+ parsed
+ end
+
+ def encode *os
+ os.inject('') { |s, o| s << Yajl::Encoder.encode(o) }
+ end
+ end
+
+ class MarshalFilter
+ def initialize
+ @buf = ''
+ @state = :prefix
+ @size = 0
+ end
+
+ def decode chunk
+ received = []
+ @buf << chunk
+
+ begin
+ if @state == :prefix
+ break unless @buf.size >= 4
+ prefix = @buf.slice!(0...4)
+ @size = prefix.unpack('N')[0]
+ @state = :data
+ end
+
+ fail unless @state == :data
+ break if @buf.size < @size
+ received << Marshal.load(@buf.slice!(0...@size))
+ @state = :prefix
+ end until @buf.empty?
+
+ received
+ end
+
+ def encode o
+ data = Marshal.dump o
+ [data.size].pack('N') + data
+ end
+ end
+end
+
+class EM::P::RedwoodServer < EM::P::Redwood
+ def post_init
+ send_version ENCODINGS, []
+ end
+
+ def receive_version encodings, extensions
+ fail unless encodings.size == 1
+ fail unless ENCODINGS.member? encodings.first
+ @filter = create_filter encodings.first
+ end
+end
+
+class EM::P::RedwoodClient < EM::P::Redwood
+ def receive_version encodings, extensions
+ encoding = (ENCODINGS & encodings).first
+ fail unless encoding
+ @filter = create_filter encoding
+ send_version [encoding], []
+ end
+end
diff --git a/lib/sup/server.rb b/lib/sup/server.rb
@@ -0,0 +1,95 @@
+require 'sup/protocol'
+
+class Redwood::Server < EM::P::RedwoodServer
+ def receive_message type, tag, params
+ if respond_to? :"request_#{type}"
+ send :"request_#{type}", tag, params
+ else
+ fail "bad request type #{type}"
+ end
+ end
+
+ def request_query tag, a
+ q = Redwood::Index.parse_query a['query']
+ query q, a['offset'], a['limit'], a['raw'] do |r|
+ send_message 'message', tag, r
+ end
+ send_message 'done', tag
+ end
+
+ def request_count tag, a
+ q = Redwood::Index.parse_query a['query']
+ c = count q
+ send_message 'count', tag, 'count' => c
+ end
+
+ def request_label tag, a
+ q = Redwood::Index.parse_query a['query']
+ label q, a['add'], a['remove']
+ send_message 'done', tag
+ end
+
+ def request_add tag, a
+ add a['raw'], a['labels']
+ send_message 'done', tag
+ end
+
+private
+
+ def result_from_message m, raw
+ mkperson = lambda { |p| { :email => p.email, :name => p.name } }
+ {
+ 'summary' => {
+ 'message_id' => m.id,
+ 'date' => m.date,
+ 'from' => mkperson[m.from],
+ 'to' => m.to.map(&mkperson),
+ 'cc' => m.cc.map(&mkperson),
+ 'bcc' => m.bcc.map(&mkperson),
+ 'subject' => m.subj,
+ 'refs' => m.refs,
+ 'replytos' => m.replytos,
+ 'labels' => m.labels.map(&:to_s),
+ },
+ 'raw' => raw ? m.raw_message : nil,
+ }
+ end
+
+ def query query, offset, limit, raw
+ c = 0
+ Index.each_message query do |m|
+ next if c < offset
+ break if c >= offset + limit if limit
+ yield result_from_message(m, raw)
+ c += 1
+ end
+ nil
+ end
+
+ def count query
+ Index.num_results_for query
+ end
+
+ def label query, remove_labels, add_labels
+ Index.each_message query do |m|
+ remove_labels.each { |l| m.remove_label l }
+ add_labels.each { |l| m.add_label l }
+ Index.update_message_state m
+ end
+ nil
+ end
+
+ def add raw, labels
+ SentManager.source.store_message Time.now, "test@example.com" do |io|
+ io.write raw
+ end
+ m2 = nil
+ PollManager.each_message_from(SentManager.source) do |m|
+ PollManager.add_new_message m
+ m2 = m
+ end
+ m2.labels = Set.new(labels.map(&:to_sym))
+ Index.update_message_state m2
+ nil
+ end
+end