commit 2ad87067a6190ab0c8626c1f6532cc0c003bbe15
parent f5bf85c7dc41e7dfee38ac82ee644d4d3374bfc4
Author: Gaute Hope <eg@gaute.vetsj.com>
Date: Mon, 24 Jun 2013 11:52:21 +0200
Merge #84: Lock source poll: Thread safe polling
Diffstat:
2 files changed, 64 insertions(+), 48 deletions(-)
diff --git a/lib/sup/poll.rb b/lib/sup/poll.rb
@@ -35,7 +35,7 @@ EOS
@mutex = Mutex.new
@thread = nil
@last_poll = nil
- @polling = false
+ @polling = Mutex.new
@poll_sources = nil
@mode = nil
@should_clear_running_totals = false
@@ -77,21 +77,27 @@ EOS
end
def poll
- return if @polling
- @polling = true
- @poll_sources = SourceManager.usual_sources
- num, numi = poll_with_sources
- @polling = false
- [num, numi]
+ if @polling.try_lock
+ @poll_sources = SourceManager.usual_sources
+ num, numi = poll_with_sources
+ @polling.unlock
+ [num, numi]
+ else
+ debug "poll already in progress."
+ return
+ end
end
def poll_unusual
- return if @polling
- @polling = true
- @poll_sources = SourceManager.unusual_sources
- num, numi = poll_with_sources
- @polling = false
- [num, numi]
+ if @polling.try_lock
+ @poll_sources = SourceManager.unusual_sources
+ num, numi = poll_with_sources
+ @polling.unlock
+ [num, numi]
+ else
+ debug "poll_unusual already in progress."
+ return
+ end
end
def start
@@ -157,7 +163,6 @@ EOS
loaded_labels = loaded_labels - LabelManager::HIDDEN_RESERVED_LABELS - [:inbox, :killed]
yield "Done polling; loaded #{total_num} new messages total"
@last_poll = Time.now
- @polling = false
end
[total_num, total_numi, from_and_subj, from_and_subj_inbox, loaded_labels]
end
@@ -166,42 +171,51 @@ EOS
## labels and locations set correctly. The Messages are saved to or removed
## from the index after being yielded.
def poll_from source, opts={}
- begin
- source.poll do |sym, args|
- case sym
- when :add
- m = Message.build_from_source source, args[:info]
- old_m = Index.build_message m.id
- m.labels += args[:labels]
- m.labels.delete :inbox if source.archived?
- m.labels.delete :unread if source.read?
- m.labels.delete :unread if m.source_marked_read? # preserve read status if possible
- m.labels.each { |l| LabelManager << l }
- m.labels = old_m.labels + (m.labels - [:unread, :inbox]) if old_m
- m.locations = old_m.locations + m.locations if old_m
- HookManager.run "before-add-message", :message => m
- yield :add, m, old_m, args[:progress] if block_given?
- Index.sync_message m, true
-
- ## We need to add or unhide the message when it either did not exist
- ## before at all or when it was updated. We do *not* add/unhide when
- ## the same message was found at a different location
- if !old_m or not old_m.locations.member? m.location
- UpdateManager.relay self, :added, m
- end
- when :delete
- Index.each_message :location => [source.id, args[:info]] do |m|
- m.locations.delete Location.new(source, args[:info])
- yield :delete, m, [source,args[:info]], args[:progress] if block_given?
- Index.sync_message m, false
- #UpdateManager.relay self, :deleted, m
+ debug "trying to acquiring poll lock for: #{source}.."
+ if source.poll_lock.try_lock
+ debug "lock acquired for: #{source}."
+ begin
+ source.poll do |sym, args|
+ case sym
+ when :add
+ m = Message.build_from_source source, args[:info]
+ old_m = Index.build_message m.id
+ m.labels += args[:labels]
+ m.labels.delete :inbox if source.archived?
+ m.labels.delete :unread if source.read?
+ m.labels.delete :unread if m.source_marked_read? # preserve read status if possible
+ m.labels.each { |l| LabelManager << l }
+ m.labels = old_m.labels + (m.labels - [:unread, :inbox]) if old_m
+ m.locations = old_m.locations + m.locations if old_m
+ HookManager.run "before-add-message", :message => m
+ yield :add, m, old_m, args[:progress] if block_given?
+ Index.sync_message m, true
+
+ ## We need to add or unhide the message when it either did not exist
+ ## before at all or when it was updated. We do *not* add/unhide when
+ ## the same message was found at a different location
+ if !old_m or not old_m.locations.member? m.location
+ UpdateManager.relay self, :added, m
+ end
+ when :delete
+ Index.each_message :location => [source.id, args[:info]] do |m|
+ m.locations.delete Location.new(source, args[:info])
+ yield :delete, m, [source,args[:info]], args[:progress] if block_given?
+ Index.sync_message m, false
+ #UpdateManager.relay self, :deleted, m
+ end
end
end
- end
- source.go_idle
- rescue SourceError => e
- warn "problem getting messages from #{source}: #{e.message}"
+ rescue SourceError => e
+ warn "problem getting messages from #{source}: #{e.message}"
+
+ ensure
+ source.go_idle
+ source.poll_lock.unlock
+ end
+ else
+ debug "source #{source} is already being polled."
end
end
diff --git a/lib/sup/source.rb b/lib/sup/source.rb
@@ -62,7 +62,7 @@ class Source
bool_accessor :usual, :archived
attr_reader :uri
- attr_accessor :id
+ attr_accessor :id, :poll_lock
def initialize uri, usual=true, archived=false, id=nil
raise ArgumentError, "id must be an integer: #{id.inspect}" unless id.is_a? Fixnum if id
@@ -71,6 +71,8 @@ class Source
@usual = usual
@archived = archived
@id = id
+
+ @poll_lock = Mutex.new
end
## overwrite me if you have a disk incarnation (currently used only for sup-sync-back)