commit c13b2038a56bf56a8e915990611b0524298ec263
parent 50d96a0531c8fe210a0a1fae052431a4656a061c
Author: Rich Lane <rlane@club.cc.cmu.edu>
Date: Mon, 22 Mar 2010 20:14:52 -0700
Merge branch 'master' into next
Diffstat:
13 files changed, 193 insertions(+), 903 deletions(-)
diff --git a/bin/sup b/bin/sup
@@ -154,26 +154,6 @@ begin
Index.load
Index.start_sync_worker unless $opts[:no_threads]
- if Redwood::SourceManager.sources.any? { |x| x.is_a? Redwood::MBox::SSHLoader }
- $stderr.puts <<EOS
-mbox+ssh sources are deprecated and will be removed in the next release.
-Running rsync in your before-poll hook is a good alternative.
-
-Press enter to continue.
-EOS
- $stdin.gets
- end
-
- if Redwood::SourceManager.sources.any? { |x| x.is_a? Redwood::IMAP }
- $stderr.puts <<EOS
-IMAP sources are deprecated and will be removed in the next release.
-Running offlineimap or fetchmail in your before-poll hook is a good alternative.
-
-Press enter to continue.
-EOS
- $stdin.gets
- end
-
$die = false
trap("TERM") { |x| $die = true }
trap("WINCH") { |x| BufferManager.sigwinch_happened! }
diff --git a/bin/sup-recover-sources b/bin/sup-recover-sources
@@ -58,17 +58,7 @@ ARGV.each do |fn|
next if Redwood::SourceManager.source_for fn
## TODO: merge this code with the same snippet in import
- source =
- case fn
- when %r!^imaps?://!
- print "Username for #{fn}: "
- username = $stdin.gets.chomp
- print "Password for #{fn} (warning: cleartext): "
- password = $stdin.gets.chomp
- Redwood::IMAP.new(fn, username, password, nil, !$opts[:unusual], $opts[:archive])
- else
- Redwood::MBox::Loader.new(fn, nil, !$opts[:unusual], $opts[:archive])
- end
+ source = Redwood::MBox::Loader.new(fn, nil, !$opts[:unusual], $opts[:archive])
source_ids = Hash.new 0
count = 0
diff --git a/bin/sup-sync-back b/bin/sup-sync-back
@@ -4,7 +4,6 @@ require 'rubygems'
require 'uri'
require 'tempfile'
require 'trollop'
-require 'enumerator'
require "sup"; Redwood::check_library_version_against "git"
## save a message 'm' to an open file pointer 'fp'
@@ -17,7 +16,7 @@ def die msg
end
def has_any_from_source_with_label? index, source, label
query = { :source_id => source.id, :label => label, :limit => 1, :load_spam => true, :load_deleted => true, :load_killed => true }
- not Enumerable::Enumerator.new(index, :each_id, query).map.empty?
+ index.num_results_for(query) != 0
end
opts = Trollop::options do
diff --git a/bin/sup-tweak-labels b/bin/sup-tweak-labels
@@ -2,7 +2,6 @@
require 'rubygems'
require 'trollop'
-require 'enumerator'
require "sup"; Redwood::check_library_version_against "git"
class Float
@@ -74,18 +73,18 @@ begin
end.map { |s| s.id }
Trollop::die "nothing to do: no sources" if source_ids.empty?
- query = "+(" + source_ids.map { |id| "source_id:#{id}" }.join(" OR ") + ")"
+ query = "(" + source_ids.map { |id| "source_id:#{id}" }.join(" OR ") + ")"
if add_labels.empty?
## if all we're doing is removing labels, we can further restrict the
## query to only messages with those labels
- query += " +(" + remove_labels.map { |l| "label:#{l}" }.join(" ") + ")"
+ query += " (" + remove_labels.map { |l| "label:#{l}" }.join(" OR ") + ")"
end
query += ' ' + opts[:query] if opts[:query]
parsed_query = index.parse_query query
parsed_query.merge! :load_spam => true, :load_deleted => true, :load_killed => true
- ids = Enumerable::Enumerator.new(index, :each_id, parsed_query).map
- num_total = ids.size
+ ids = Enumerator.new(index, :each_id, parsed_query)
+ num_total = index.num_results_for parsed_query
$stderr.puts "Found #{num_total} documents across #{source_ids.length} sources. Scanning..."
diff --git a/lib/sup.rb b/lib/sup.rb
@@ -51,6 +51,7 @@ module Redwood
SUICIDE_FN = File.join(BASE_DIR, "please-kill-yourself")
HOOK_DIR = File.join(BASE_DIR, "hooks")
SEARCH_FN = File.join(BASE_DIR, "searches.txt")
+ LOG_FN = File.join(BASE_DIR, "log")
YAML_DOMAIN = "masanjin.net"
YAML_DATE = "2006-10-01"
@@ -286,6 +287,7 @@ Redwood::HookManager.init Redwood::HOOK_DIR
## everything we need to get logging working
require "sup/logger"
Redwood::Logger.init.add_sink $stderr
+Redwood::Logger.add_sink File.open(Redwood::LOG_FN, 'a')
include Redwood::LogsStuff
## determine encoding and character set
@@ -309,7 +311,6 @@ require "sup/message"
require "sup/source"
require "sup/mbox"
require "sup/maildir"
-require "sup/imap"
require "sup/person"
require "sup/account"
require "sup/thread"
diff --git a/lib/sup/imap.rb b/lib/sup/imap.rb
@@ -1,349 +0,0 @@
-require 'uri'
-require 'net/imap'
-require 'stringio'
-require 'time'
-require 'rmail'
-require 'cgi'
-require 'set'
-
-## TODO: remove synchronized method protector calls; use a Monitor instead
-## (ruby's reentrant mutex)
-
-## fucking imap fucking sucks. what the FUCK kind of committee of dunces
-## designed this shit.
-##
-## imap talks about 'unique ids' for messages, to be used for
-## cross-session identification. great---just what sup needs! except it
-## turns out the uids can be invalidated every time the 'uidvalidity'
-## value changes on the server, and 'uidvalidity' can change without
-## restriction. it can change any time you log in. it can change EVERY
-## time you log in. of course the imap spec "strongly recommends" that it
-## never change, but there's nothing to stop people from just setting it
-## to the current timestamp, and in fact that's EXACTLY what the one imap
-## server i have at my disposal does. thus the so-called uids are
-## absolutely useless and imap provides no cross-session way of uniquely
-## identifying a message. but thanks for the "strong recommendation",
-## guys!
-##
-## so right now i'm using the 'internal date' and the size of each
-## message to uniquely identify it, and i scan over the entire mailbox
-## each time i open it to map those things to message ids. that can be
-## slow for large mailboxes, and we'll just have to hope that there are
-## no collisions. ho ho! a perfectly reasonable solution!
-##
-## and here's another thing. check out RFC2060 2.2.2 paragraph 5:
-##
-## A client MUST be prepared to accept any server response at all
-## times. This includes server data that was not requested.
-##
-## yeah. that totally makes a lot of sense. and once again, the idiocy of
-## the spec actually happens in practice. you'll request flags for one
-## message, and get it interspersed with a random bunch of flags for some
-## other messages, including a different set of flags for the same
-## message! totally ok by the imap spec. totally retarded by any other
-## metric.
-##
-## fuck you, imap committee. you managed to design something nearly as
-## shitty as mbox but goddamn THIRTY YEARS LATER.
-module Redwood
-
-class IMAP < Source
- include SerializeLabelsNicely
- SCAN_INTERVAL = 60 # seconds
-
- ## upon these errors we'll try to rereconnect a few times
- RECOVERABLE_ERRORS = [ Errno::EPIPE, Errno::ETIMEDOUT, OpenSSL::SSL::SSLError ]
-
- attr_accessor :username, :password
- yaml_properties :uri, :username, :password, :cur_offset, :usual,
- :archived, :id, :labels
-
- def initialize uri, username, password, last_idate=nil, usual=true, archived=false, id=nil, labels=[]
- raise ArgumentError, "username and password must be specified" unless username && password
- raise ArgumentError, "not an imap uri" unless uri =~ %r!imaps?://!
-
- super uri, last_idate, usual, archived, id
-
- @parsed_uri = URI(uri)
- @username = username
- @password = password
- @imap = nil
- @imap_state = {}
- @ids = []
- @last_scan = nil
- @labels = Set.new((labels || []) - LabelManager::RESERVED_LABELS)
- @say_id = nil
- @mutex = Mutex.new
- end
-
- def self.suggest_labels_for path
- path =~ /([^\/]*inbox[^\/]*)/i ? [$1.downcase.intern] : []
- end
-
- def host; @parsed_uri.host; end
- def port; @parsed_uri.port || (ssl? ? 993 : 143); end
- def mailbox
- x = @parsed_uri.path[1..-1]
- (x.nil? || x.empty?) ? 'INBOX' : CGI.unescape(x)
- end
- def ssl?; @parsed_uri.scheme == 'imaps' end
-
- def check; end # do nothing because anything we do will be too slow,
- # and we'll catch the errors later.
-
- ## is this necessary? TODO: remove maybe
- def == o; o.is_a?(IMAP) && o.uri == self.uri && o.username == self.username; end
-
- def load_header id
- parse_raw_email_header StringIO.new(raw_header(id))
- end
-
- def load_message id
- RMail::Parser.read raw_message(id)
- end
-
- def each_raw_message_line id
- StringIO.new(raw_message(id)).each { |l| yield l }
- end
-
- def raw_header id
- unsynchronized_scan_mailbox
- header, flags = get_imap_fields id, 'RFC822.HEADER'
- header.gsub(/\r\n/, "\n")
- end
- synchronized :raw_header
-
- def store_message date, from_email, &block
- message = StringIO.new
- yield message
- message.string.gsub! /\n/, "\r\n"
-
- safely { @imap.append mailbox, message.string, [:Seen], Time.now }
- end
-
- def raw_message id
- unsynchronized_scan_mailbox
- get_imap_fields(id, 'RFC822').first.gsub(/\r\n/, "\n")
- end
- synchronized :raw_message
-
- def mark_as_deleted ids
- ids = [ids].flatten # accept single arguments
- unsynchronized_scan_mailbox
- imap_ids = ids.map { |i| @imap_state[i] && @imap_state[i][:id] }.compact
- return if imap_ids.empty?
- @imap.store imap_ids, "+FLAGS", [:Deleted]
- end
- synchronized :mark_as_deleted
-
- def expunge
- @imap.expunge
- unsynchronized_scan_mailbox true
- true
- end
- synchronized :expunge
-
- def connect
- return if @imap
- safely { } # do nothing!
- end
- synchronized :connect
-
- def scan_mailbox force=false
- return if !force && @last_scan && (Time.now - @last_scan) < SCAN_INTERVAL
- last_id = safely do
- @imap.examine mailbox
- @imap.responses["EXISTS"].last
- end
- @last_scan = Time.now
-
- @ids = [] if force
- return if last_id == @ids.length
-
- range = (@ids.length + 1) .. last_id
- debug "fetching IMAP headers #{range}"
- fetch(range, ['RFC822.SIZE', 'INTERNALDATE', 'FLAGS']).each do |v|
- id = make_id v
- @ids << id
- @imap_state[id] = { :id => v.seqno, :flags => v.attr["FLAGS"] }
- end
- debug "done fetching IMAP headers"
- end
- synchronized :scan_mailbox
-
- def each
- return unless start_offset
-
- ids =
- @mutex.synchronize do
- unsynchronized_scan_mailbox
- @ids
- end
-
- start = ids.index(cur_offset || start_offset) or raise OutOfSyncSourceError, "Unknown message id #{cur_offset || start_offset}."
-
- start.upto(ids.length - 1) do |i|
- id = ids[i]
- state = @mutex.synchronize { @imap_state[id] } or next
- self.cur_offset = id
- labels = { :Flagged => :starred,
- :Deleted => :deleted
- }.inject(@labels) do |cur, (imap, sup)|
- cur + (state[:flags].include?(imap) ? [sup] : [])
- end
-
- labels += [:unread] unless state[:flags].include?(:Seen)
-
- yield id, labels
- end
- end
-
- def start_offset
- unsynchronized_scan_mailbox
- @ids.first
- end
- synchronized :start_offset
-
- def end_offset
- unsynchronized_scan_mailbox
- @ids.last + 1
- end
- synchronized :end_offset
-
- def pct_done; 100.0 * (@ids.index(cur_offset) || 0).to_f / (@ids.length - 1).to_f; end
-
-private
-
- def fetch ids, fields
- results = safely { @imap.fetch ids, fields }
- good_results =
- if ids.respond_to? :member?
- results.find_all { |r| ids.member?(r.seqno) && fields.all? { |f| r.attr.member?(f) } }
- else
- results.find_all { |r| ids == r.seqno && fields.all? { |f| r.attr.member?(f) } }
- end
-
- if good_results.empty?
- raise FatalSourceError, "no IMAP response for #{ids} containing all fields #{fields.join(', ')} (got #{results.size} results)"
- elsif good_results.size < results.size
- warn "Your IMAP server sucks. It sent #{results.size} results for a request for #{good_results.size} messages. What are you using, Binc?"
- end
-
- good_results
- end
-
- def unsafe_connect
- say "Connecting to IMAP server #{host}:#{port}..."
-
- ## apparently imap.rb does a lot of threaded stuff internally and if
- ## an exception occurs, it will catch it and re-raise it on the
- ## calling thread. but i can't seem to catch that exception, so i've
- ## resorted to initializing it in its own thread. surely there's a
- ## better way.
- exception = nil
- ::Thread.new do
- begin
- #raise Net::IMAP::ByeResponseError, "simulated imap failure"
- @imap = Net::IMAP.new host, port, ssl?
- say "Logging in..."
-
- ## although RFC1730 claims that "If an AUTHENTICATE command fails
- ## with a NO response, the client may try another", in practice
- ## it seems like they can also send a BAD response.
- begin
- raise Net::IMAP::NoResponseError unless @imap.capability().member? "AUTH=CRAM-MD5"
- @imap.authenticate 'CRAM-MD5', @username, @password
- rescue Net::IMAP::BadResponseError, Net::IMAP::NoResponseError => e
- debug "CRAM-MD5 authentication failed: #{e.class}. Trying LOGIN auth..."
- begin
- raise Net::IMAP::NoResponseError unless @imap.capability().member? "AUTH=LOGIN"
- @imap.authenticate 'LOGIN', @username, @password
- rescue Net::IMAP::BadResponseError, Net::IMAP::NoResponseError => e
- debug "LOGIN authentication failed: #{e.class}. Trying plain-text LOGIN..."
- @imap.login @username, @password
- end
- end
- say "Successfully connected to #{@parsed_uri}."
- rescue Exception => e
- exception = e
- ensure
- shutup
- end
- end.join
-
- raise exception if exception
- end
-
- def say s
- @say_id = BufferManager.say s, @say_id if BufferManager.instantiated?
- info s
- end
-
- def shutup
- BufferManager.clear @say_id if BufferManager.instantiated?
- @say_id = nil
- end
-
- def make_id imap_stuff
- # use 7 digits for the size. why 7? seems nice.
- %w(RFC822.SIZE INTERNALDATE).each do |w|
- raise FatalSourceError, "requested data not in IMAP response: #{w}" unless imap_stuff.attr[w]
- end
-
- msize, mdate = imap_stuff.attr['RFC822.SIZE'] % 10000000, Time.parse(imap_stuff.attr["INTERNALDATE"])
- sprintf("%d%07d", mdate.to_i, msize).to_i
- end
-
- def get_imap_fields id, *fields
- raise OutOfSyncSourceError, "Unknown message id #{id}" unless @imap_state[id]
-
- imap_id = @imap_state[id][:id]
- result = fetch(imap_id, (fields + ['RFC822.SIZE', 'INTERNALDATE']).uniq).first
- got_id = make_id result
-
- ## I've turned off the following sanity check because Microsoft
- ## Exchange fails it. Exchange actually reports two different
- ## INTERNALDATEs for the exact same message when queried at different
- ## points in time.
- ##
- ## RFC2060 defines the semantics of INTERNALDATE for messages that
- ## arrive via SMTP for via various IMAP commands, but states that
- ## "All other cases are implementation defined.". Great, thanks guys,
- ## yet another useless field.
- ##
- ## Of course no OTHER imap server I've encountered returns DIFFERENT
- ## values for the SAME message. But it's Microsoft; what do you
- ## expect? If their programmers were any good they'd be working at
- ## Google.
-
- # raise OutOfSyncSourceError, "IMAP message mismatch: requested #{id}, got #{got_id}." unless got_id == id
-
- fields.map { |f| result.attr[f] or raise FatalSourceError, "empty response from IMAP server: #{f}" }
- end
-
- ## execute a block, connected if unconnected, re-connected up to 3
- ## times if a recoverable error occurs, and properly dying if an
- ## unrecoverable error occurs.
- def safely
- retries = 0
- begin
- begin
- unsafe_connect unless @imap
- yield
- rescue *RECOVERABLE_ERRORS => e
- if (retries += 1) <= 3
- @imap = nil
- warn "got #{e.class.name}: #{e.message.inspect}"
- sleep 2
- retry
- end
- raise
- end
- rescue SocketError, Net::IMAP::Error, SystemCallError, IOError, OpenSSL::SSL::SSLError => e
- raise FatalSourceError, "While communicating with IMAP server (type #{e.class.name}): #{e.message.inspect}"
- end
- end
-
-end
-
-end
diff --git a/lib/sup/index.rb b/lib/sup/index.rb
@@ -3,6 +3,7 @@ ENV["XAPIAN_FLUSH_THRESHOLD"] = "1000"
require 'xapian'
require 'set'
require 'fileutils'
+require 'monitor'
begin
require 'chronic'
diff --git a/lib/sup/mbox.rb b/lib/sup/mbox.rb
@@ -1,13 +1,182 @@
-require "sup/mbox/loader"
-require "sup/mbox/ssh-file"
-require "sup/mbox/ssh-loader"
+require 'rmail'
+require 'uri'
+require 'set'
module Redwood
-module MBox
+class MBox < Source
BREAK_RE = /^From \S+ (.+)$/
- def is_break_line? l
+ include SerializeLabelsNicely
+ yaml_properties :uri, :cur_offset, :usual, :archived, :id, :labels
+
+ attr_reader :labels
+
+ ## uri_or_fp is horrific. need to refactor.
+ def initialize uri_or_fp, start_offset=nil, usual=true, archived=false, id=nil, labels=nil
+ @mutex = Mutex.new
+ @labels = Set.new((labels || []) - LabelManager::RESERVED_LABELS)
+
+ case uri_or_fp
+ when String
+ uri = URI(Source.expand_filesystem_uri(uri_or_fp))
+ raise ArgumentError, "not an mbox uri" unless uri.scheme == "mbox"
+ raise ArgumentError, "mbox URI ('#{uri}') cannot have a host: #{uri.host}" if uri.host
+ raise ArgumentError, "mbox URI must have a path component" unless uri.path
+ @f = File.open uri.path, 'rb'
+ @path = uri.path
+ else
+ @f = uri_or_fp
+ @path = uri_or_fp.path
+ end
+
+ start_offset ||= 0
+ super uri_or_fp, start_offset, usual, archived, id
+ end
+
+ def file_path; @path end
+ def is_source_for? uri; super || (self.uri.is_a?(String) && (URI(Source.expand_filesystem_uri(uri)) == URI(Source.expand_filesystem_uri(self.uri)))) end
+
+ def self.suggest_labels_for path
+ ## heuristic: use the filename as a label, unless the file
+ ## has a path that probably represents an inbox.
+ if File.dirname(path) =~ /\b(var|usr|spool)\b/
+ []
+ else
+ [File.basename(path).downcase.intern]
+ end
+ end
+
+ def check
+ if (cur_offset ||= start_offset) > end_offset
+ raise OutOfSyncSourceError, "mbox file is smaller than last recorded message offset. Messages have probably been deleted by another client."
+ end
+ end
+
+ def start_offset; 0; end
+ def end_offset; File.size @f; end
+
+ def load_header offset
+ header = nil
+ @mutex.synchronize do
+ @f.seek offset
+ l = @f.gets
+ unless MBox::is_break_line? l
+ raise OutOfSyncSourceError, "mismatch in mbox file offset #{offset.inspect}: #{l.inspect}."
+ end
+ header = parse_raw_email_header @f
+ end
+ header
+ end
+
+ def load_message offset
+ @mutex.synchronize do
+ @f.seek offset
+ begin
+ ## don't use RMail::Mailbox::MBoxReader because it doesn't properly ignore
+ ## "From" at the start of a message body line.
+ string = ""
+ l = @f.gets
+ string << l until @f.eof? || MBox::is_break_line?(l = @f.gets)
+ RMail::Parser.read string
+ rescue RMail::Parser::Error => e
+ raise FatalSourceError, "error parsing mbox file: #{e.message}"
+ end
+ end
+ end
+
+ ## scan forward until we're at the valid start of a message
+ def correct_offset!
+ @mutex.synchronize do
+ @f.seek cur_offset
+ string = ""
+ until @f.eof? || MBox::is_break_line?(l = @f.gets)
+ string << l
+ end
+ self.cur_offset += string.length
+ end
+ end
+
+ def raw_header offset
+ ret = ""
+ @mutex.synchronize do
+ @f.seek offset
+ until @f.eof? || (l = @f.gets) =~ /^\r*$/
+ ret << l
+ end
+ end
+ ret
+ end
+
+ def raw_message offset
+ ret = ""
+ each_raw_message_line(offset) { |l| ret << l }
+ ret
+ end
+
+ def store_message date, from_email, &block
+ need_blank = File.exists?(@filename) && !File.zero?(@filename)
+ File.open(@filename, "ab") do |f|
+ f.puts if need_blank
+ f.puts "From #{from_email} #{date.rfc2822}"
+ yield f
+ end
+ end
+
+ ## apparently it's a million times faster to call this directly if
+ ## we're just moving messages around on disk, than reading things
+ ## into memory with raw_message.
+ ##
+ ## i hoped never to have to move shit around on disk but
+ ## sup-sync-back has to do it.
+ def each_raw_message_line offset
+ @mutex.synchronize do
+ @f.seek offset
+ yield @f.gets
+ until @f.eof? || MBox::is_break_line?(l = @f.gets)
+ yield l
+ end
+ end
+ end
+
+ def next
+ returned_offset = nil
+ next_offset = cur_offset
+
+ begin
+ @mutex.synchronize do
+ @f.seek cur_offset
+
+ ## cur_offset could be at one of two places here:
+
+ ## 1. before a \n and a mbox separator, if it was previously at
+ ## EOF and a new message was added; or,
+ ## 2. at the beginning of an mbox separator (in all other
+ ## cases).
+
+ l = @f.gets or return nil
+ if l =~ /^\s*$/ # case 1
+ returned_offset = @f.tell
+ @f.gets # now we're at a BREAK_RE, so skip past it
+ else # case 2
+ returned_offset = cur_offset
+ ## we've already skipped past the BREAK_RE, so just go
+ end
+
+ while(line = @f.gets)
+ break if MBox::is_break_line? line
+ next_offset = @f.tell
+ end
+ end
+ rescue SystemCallError, IOError => e
+ raise FatalSourceError, "Error reading #{@f.path}: #{e.message}"
+ end
+
+ self.cur_offset = next_offset
+ [returned_offset, (labels + [:unread])]
+ end
+
+ def self.is_break_line? l
l =~ BREAK_RE or return false
time = $1
begin
@@ -19,6 +188,9 @@ module MBox
false
end
end
- module_function :is_break_line?
+
+ class Loader < self
+ yaml_properties :uri, :cur_offset, :usual, :archived, :id, :labels
+ end
end
end
diff --git a/lib/sup/mbox/loader.rb b/lib/sup/mbox/loader.rb
@@ -1,180 +0,0 @@
-require 'rmail'
-require 'uri'
-require 'set'
-
-module Redwood
-module MBox
-
-class Loader < Source
- include SerializeLabelsNicely
- yaml_properties :uri, :cur_offset, :usual, :archived, :id, :labels
-
- attr_reader :labels
-
- ## uri_or_fp is horrific. need to refactor.
- def initialize uri_or_fp, start_offset=nil, usual=true, archived=false, id=nil, labels=nil
- @mutex = Mutex.new
- @labels = Set.new((labels || []) - LabelManager::RESERVED_LABELS)
-
- case uri_or_fp
- when String
- uri = URI(Source.expand_filesystem_uri(uri_or_fp))
- raise ArgumentError, "not an mbox uri" unless uri.scheme == "mbox"
- raise ArgumentError, "mbox URI ('#{uri}') cannot have a host: #{uri.host}" if uri.host
- raise ArgumentError, "mbox URI must have a path component" unless uri.path
- @f = File.open uri.path, 'rb'
- @path = uri.path
- else
- @f = uri_or_fp
- @path = uri_or_fp.path
- end
-
- start_offset ||= 0
- super uri_or_fp, start_offset, usual, archived, id
- end
-
- def file_path; @path end
- def is_source_for? uri; super || (self.uri.is_a?(String) && (URI(Source.expand_filesystem_uri(uri)) == URI(Source.expand_filesystem_uri(self.uri)))) end
-
- def self.suggest_labels_for path
- ## heuristic: use the filename as a label, unless the file
- ## has a path that probably represents an inbox.
- if File.dirname(path) =~ /\b(var|usr|spool)\b/
- []
- else
- [File.basename(path).downcase.intern]
- end
- end
-
- def check
- if (cur_offset ||= start_offset) > end_offset
- raise OutOfSyncSourceError, "mbox file is smaller than last recorded message offset. Messages have probably been deleted by another client."
- end
- end
-
- def start_offset; 0; end
- def end_offset; File.size @f; end
-
- def load_header offset
- header = nil
- @mutex.synchronize do
- @f.seek offset
- l = @f.gets
- unless MBox::is_break_line? l
- raise OutOfSyncSourceError, "mismatch in mbox file offset #{offset.inspect}: #{l.inspect}."
- end
- header = parse_raw_email_header @f
- end
- header
- end
-
- def load_message offset
- @mutex.synchronize do
- @f.seek offset
- begin
- ## don't use RMail::Mailbox::MBoxReader because it doesn't properly ignore
- ## "From" at the start of a message body line.
- string = ""
- l = @f.gets
- string << l until @f.eof? || MBox::is_break_line?(l = @f.gets)
- RMail::Parser.read string
- rescue RMail::Parser::Error => e
- raise FatalSourceError, "error parsing mbox file: #{e.message}"
- end
- end
- end
-
- ## scan forward until we're at the valid start of a message
- def correct_offset!
- @mutex.synchronize do
- @f.seek cur_offset
- string = ""
- until @f.eof? || MBox::is_break_line?(l = @f.gets)
- string << l
- end
- self.cur_offset += string.length
- end
- end
-
- def raw_header offset
- ret = ""
- @mutex.synchronize do
- @f.seek offset
- until @f.eof? || (l = @f.gets) =~ /^\r*$/
- ret << l
- end
- end
- ret
- end
-
- def raw_message offset
- ret = ""
- each_raw_message_line(offset) { |l| ret << l }
- ret
- end
-
- def store_message date, from_email, &block
- need_blank = File.exists?(@filename) && !File.zero?(@filename)
- File.open(@filename, "ab") do |f|
- f.puts if need_blank
- f.puts "From #{from_email} #{date.rfc2822}"
- yield f
- end
- end
-
- ## apparently it's a million times faster to call this directly if
- ## we're just moving messages around on disk, than reading things
- ## into memory with raw_message.
- ##
- ## i hoped never to have to move shit around on disk but
- ## sup-sync-back has to do it.
- def each_raw_message_line offset
- @mutex.synchronize do
- @f.seek offset
- yield @f.gets
- until @f.eof? || MBox::is_break_line?(l = @f.gets)
- yield l
- end
- end
- end
-
- def next
- returned_offset = nil
- next_offset = cur_offset
-
- begin
- @mutex.synchronize do
- @f.seek cur_offset
-
- ## cur_offset could be at one of two places here:
-
- ## 1. before a \n and a mbox separator, if it was previously at
- ## EOF and a new message was added; or,
- ## 2. at the beginning of an mbox separator (in all other
- ## cases).
-
- l = @f.gets or return nil
- if l =~ /^\s*$/ # case 1
- returned_offset = @f.tell
- @f.gets # now we're at a BREAK_RE, so skip past it
- else # case 2
- returned_offset = cur_offset
- ## we've already skipped past the BREAK_RE, so just go
- end
-
- while(line = @f.gets)
- break if MBox::is_break_line? line
- next_offset = @f.tell
- end
- end
- rescue SystemCallError, IOError => e
- raise FatalSourceError, "Error reading #{@f.path}: #{e.message}"
- end
-
- self.cur_offset = next_offset
- [returned_offset, (labels + [:unread])]
- end
-end
-
-end
-end
diff --git a/lib/sup/mbox/ssh-file.rb b/lib/sup/mbox/ssh-file.rb
@@ -1,254 +0,0 @@
-require 'net/ssh'
-
-module Redwood
-module MBox
-
-class SSHFileError < StandardError; end
-
-## this is a file-like interface to a file that actually lives on the
-## other end of an ssh connection. it works by using wc, head and tail
-## to simulate (buffered) random access. on a fast connection, this
-## can have a good bandwidth, but the latency is pretty terrible:
-## about 1 second (!) per request. luckily, we're either just reading
-## straight through the mbox (an import) or we're reading a few
-## messages at a time (viewing messages) so the latency is not a problem.
-
-## all of the methods here can throw SSHFileErrors, SocketErrors,
-## Net::SSH::Exceptions and Errno::ENOENTs.
-
-## a simple buffer of contiguous data
-class Buffer
- def initialize
- clear!
- end
-
- def clear!
- @start = nil
- @buf = ""
- end
-
- def empty?; @start.nil?; end
- def start; @start; end
- def endd; @start + @buf.length; end
-
- def add data, offset=endd
- #MBox::debug "+ adding #{data.length} bytes; size will be #{size + data.length}; limit #{SSHFile::MAX_BUF_SIZE}"
-
- if start.nil?
- @buf = data
- @start = offset
- return
- end
-
- raise "non-continguous data added to buffer (data #{offset}:#{offset + data.length}, buf range #{start}:#{endd})" if offset + data.length < start || offset > endd
-
- if offset < start
- @buf = data[0 ... (start - offset)] + @buf
- @start = offset
- else
- return if offset + data.length < endd
- @buf += data[(endd - offset) .. -1]
- end
- end
-
- def [](o)
- raise "only ranges supported due to programmer's laziness" unless o.is_a? Range
- @buf[Range.new(o.first - @start, o.last - @start, o.exclude_end?)]
- end
-
- def index what, start=0
- x = @buf.index(what, start - @start)
- x.nil? ? nil : x + @start
- end
-
- def rindex what, start=0
- x = @buf.rindex(what, start - @start)
- x.nil? ? nil : x + @start
- end
-
- def size; empty? ? 0 : @buf.size; end
- def to_s; empty? ? "<empty>" : "[#{start}, #{endd})"; end # for debugging
-end
-
-## sharing a ssh connection to one machines between sources seems to
-## create lots of broken situations: commands returning bizarre (large
-## positive integer) return codes despite working; commands
-## occasionally not working, etc. i suspect this is because of the
-## fragile nature of the ssh syncshell.
-##
-## at any rate, we now open up one ssh connection per file, which is
-## probably silly in the extreme case.
-
-## the file-like interface to a remote file
-class SSHFile
- MAX_BUF_SIZE = 1024 * 1024 # bytes
- MAX_TRANSFER_SIZE = 1024 * 128
- REASONABLE_TRANSFER_SIZE = 1024 * 32
- SIZE_CHECK_INTERVAL = 60 * 1 # seconds
-
- ## upon these errors we'll try to rereconnect a few times
- RECOVERABLE_ERRORS = [ Errno::EPIPE, Errno::ETIMEDOUT ]
-
- @@shells = {}
- @@shells_mutex = Mutex.new
-
- def initialize host, fn, ssh_opts={}
- @buf = Buffer.new
- @host = host
- @fn = fn
- @ssh_opts = ssh_opts
- @file_size = nil
- @offset = 0
- @say_id = nil
- @shell = nil
- @shell_mutex = nil
- @buf_mutex = Mutex.new
- end
-
- def to_s; "mbox+ssh://#@host/#@fn"; end ## TODO: remove this EVILness
-
- def connect
- do_remote nil
- end
-
- def eof?; @offset >= size; end
- def eof; eof?; end # lame but IO's method is named this and rmail calls that
- def seek loc; @offset = loc; end
- def tell; @offset; end
- def total; size; end
- def path; @fn end
-
- def size
- if @file_size.nil? || (Time.now - @last_size_check) > SIZE_CHECK_INTERVAL
- @last_size_check = Time.now
- @file_size = do_remote("wc -c #@fn").split.first.to_i
- end
- @file_size
- end
-
- def gets
- return nil if eof?
- @buf_mutex.synchronize do
- make_buf_include @offset
- expand_buf_forward while @buf.index("\n", @offset).nil? && @buf.endd < size
- returning(@buf[@offset .. (@buf.index("\n", @offset) || -1)]) { |line| @offset += line.length }
- end
- end
-
- def read n
- return nil if eof?
- @buf_mutex.synchronize do
- make_buf_include @offset, n
- @buf[@offset ... (@offset += n)]
- end
- end
-
-private
-
- ## TODO: share this code with imap
- def say s
- @say_id = BufferManager.say s, @say_id if BufferManager.instantiated?
- info s
- end
-
- def shutup
- BufferManager.clear @say_id if BufferManager.instantiated? && @say_id
- @say_id = nil
- end
-
- def unsafe_connect
- return if @shell
-
- @key = [@host, @ssh_opts[:username]]
- begin
- @shell, @shell_mutex = @@shells_mutex.synchronize do
- unless @@shells.member? @key
- say "Opening SSH connection to #{@host} for #@fn..."
- session = Net::SSH.start @host, @ssh_opts
- say "Starting SSH shell..."
- @@shells[@key] = [session.shell.sync, Mutex.new]
- end
- @@shells[@key]
- end
-
- say "Checking for #@fn..."
- @shell_mutex.synchronize { raise Errno::ENOENT, @fn unless @shell.test("-e #@fn").status == 0 }
- ensure
- shutup
- end
- end
-
- def do_remote cmd, expected_size=0
- retries = 0
- result = nil
-
- begin
- unsafe_connect
- if cmd
- # MBox::debug "sending command: #{cmd.inspect}"
- result = @shell_mutex.synchronize { x = @shell.send_command cmd; sleep 0.25; x }
- raise SSHFileError, "Failure during remote command #{cmd.inspect}: #{(result.stderr || result.stdout || "")[0 .. 100]}" unless result.status == 0
- end
- ## Net::SSH::Exceptions seem to happen every once in a while for
- ## no good reason.
- rescue Net::SSH::Exception, *RECOVERABLE_ERRORS
- if (retries += 1) <= 3
- @@shells_mutex.synchronize do
- @shell = nil
- @@shells[@key] = nil
- end
- retry
- end
- raise
- end
-
- result.stdout if cmd
- end
-
- def get_bytes offset, size
- do_remote "tail -c +#{offset + 1} #@fn | head -c #{size}", size
- end
-
- def expand_buf_forward n=REASONABLE_TRANSFER_SIZE
- @buf.add get_bytes(@buf.endd, n)
- end
-
- ## try our best to transfer somewhere between
- ## REASONABLE_TRANSFER_SIZE and MAX_TRANSFER_SIZE bytes
- def make_buf_include offset, size=0
- good_size = [size, REASONABLE_TRANSFER_SIZE].max
-
- trans_start, trans_size =
- if @buf.empty?
- [offset, good_size]
- elsif offset < @buf.start
- if @buf.start - offset <= good_size
- start = [@buf.start - good_size, 0].max
- [start, @buf.start - start]
- elsif @buf.start - offset < MAX_TRANSFER_SIZE
- [offset, @buf.start - offset]
- else
- MBox::debug "clearing SSH buffer because buf.start #{@buf.start} - offset #{offset} >= #{MAX_TRANSFER_SIZE}"
- @buf.clear!
- [offset, good_size]
- end
- else
- return if [offset + size, self.size].min <= @buf.endd # whoohoo!
- if offset - @buf.endd <= good_size
- [@buf.endd, good_size]
- elsif offset - @buf.endd < MAX_TRANSFER_SIZE
- [@buf.endd, offset - @buf.endd]
- else
- MBox::debug "clearing SSH buffer because offset #{offset} - buf.end #{@buf.endd} >= #{MAX_TRANSFER_SIZE}"
- @buf.clear!
- [offset, good_size]
- end
- end
-
- @buf.clear! if @buf.size > MAX_BUF_SIZE
- @buf.add get_bytes(trans_start, trans_size), trans_start
- end
-end
-
-end
-end
diff --git a/lib/sup/mbox/ssh-loader.rb b/lib/sup/mbox/ssh-loader.rb
@@ -1,74 +0,0 @@
-require 'net/ssh'
-
-module Redwood
-module MBox
-
-class SSHLoader < Source
- attr_accessor :username, :password
-
- yaml_properties :uri, :username, :password, :cur_offset, :usual,
- :archived, :id, :labels
-
- def initialize uri, username=nil, password=nil, start_offset=nil, usual=true, archived=false, id=nil, labels=[]
- raise ArgumentError, "not an mbox+ssh uri: #{uri.inspect}" unless uri =~ %r!^mbox\+ssh://!
-
- super uri, start_offset, usual, archived, id
-
- @parsed_uri = URI(uri)
- @username = username
- @password = password
- @uri = uri
- @cur_offset = start_offset
- @labels = (labels || []).freeze
-
- opts = {}
- opts[:username] = @username if @username
- opts[:password] = @password if @password
-
- @f = SSHFile.new host, filename, opts
- @loader = Loader.new @f, start_offset, usual, archived, id
-
- ## heuristic: use the filename as a label, unless the file
- ## has a path that probably represents an inbox.
- end
-
- def self.suggest_labels_for path; Loader.suggest_labels_for(path) end
-
- def connect; safely { @f.connect }; end
- def host; @parsed_uri.host; end
- def filename; @parsed_uri.path[1..-1] end
-
- def next
- safely do
- offset, labels = @loader.next
- self.cur_offset = @loader.cur_offset # superclass keeps @cur_offset which is used by yaml
- [offset, (labels + @labels).uniq] # add our labels
- end
- end
-
- def end_offset
- safely { @f.size }
- end
-
- def cur_offset= o; @cur_offset = @loader.cur_offset = o; @dirty = true; end
- def id; @loader.id; end
- def id= o; @id = @loader.id = o; end
- # def cur_offset; @loader.cur_offset; end # think we'll be ok without this
- def to_s; @parsed_uri.to_s; end
-
- def safely
- begin
- yield
- rescue Net::SSH::Exception, SocketError, SSHFileError, SystemCallError, IOError => e
- m = "error communicating with SSH server #{host} (#{e.class.name}): #{e.message}"
- raise FatalSourceError, m
- end
- end
-
- [:start_offset, :load_header, :load_message, :raw_header, :raw_message].each do |meth|
- define_method(meth) { |*a| safely { @loader.send meth, *a } }
- end
-end
-
-end
-end
diff --git a/lib/sup/modes/console-mode.rb b/lib/sup/modes/console-mode.rb
@@ -8,7 +8,7 @@ class Console
end
def query(query)
- Enumerable::Enumerator.new(Index, :each_message, Index.parse_query(query))
+ Enumerator.new(Index, :each_message, Index.parse_query(query))
end
def add_labels(query, *labels)
diff --git a/lib/sup/util.rb b/lib/sup/util.rb
@@ -3,6 +3,7 @@ require 'lockfile'
require 'mime/types'
require 'pathname'
require 'set'
+require 'enumerator'
## time for some monkeypatching!
class Lockfile
@@ -466,6 +467,10 @@ module Enumerable
end
end
+unless Object.const_defined? :Enumerator
+ Enumerator = Enumerable::Enumerator
+end
+
class Array
def flatten_one_level
inject([]) { |a, e| a + e }