commit d1eea42f3fa7a3e771ac6628b01a82617ffb226b
parent 8f592630eedb9c47eb79ef4237c3693724fa3bd1
Author: Gaute Hope <eg@gaute.vetsj.com>
Date: Thu, 20 Jun 2013 11:14:45 +0200
Source poll thread safety: lock source polling
Diffstat:
2 files changed, 45 insertions(+), 34 deletions(-)
diff --git a/lib/sup/poll.rb b/lib/sup/poll.rb
@@ -166,42 +166,51 @@ EOS
## labels and locations set correctly. The Messages are saved to or removed
## from the index after being yielded.
def poll_from source, opts={}
- begin
- source.poll do |sym, args|
- case sym
- when :add
- m = Message.build_from_source source, args[:info]
- old_m = Index.build_message m.id
- m.labels += args[:labels]
- m.labels.delete :inbox if source.archived?
- m.labels.delete :unread if source.read?
- m.labels.delete :unread if m.source_marked_read? # preserve read status if possible
- m.labels.each { |l| LabelManager << l }
- m.labels = old_m.labels + (m.labels - [:unread, :inbox]) if old_m
- m.locations = old_m.locations + m.locations if old_m
- HookManager.run "before-add-message", :message => m
- yield :add, m, old_m, args[:progress] if block_given?
- Index.sync_message m, true
-
- ## We need to add or unhide the message when it either did not exist
- ## before at all or when it was updated. We do *not* add/unhide when
- ## the same message was found at a different location
- if !old_m or not old_m.locations.member? m.location
- UpdateManager.relay self, :added, m
- end
- when :delete
- Index.each_message :location => [source.id, args[:info]] do |m|
- m.locations.delete Location.new(source, args[:info])
- yield :delete, m, [source,args[:info]], args[:progress] if block_given?
- Index.sync_message m, false
- #UpdateManager.relay self, :deleted, m
+ debug "trying to acquiring poll lock for: #{source}.."
+ if source.poll_lock.try_lock
+ debug "lock acquired for: #{source}."
+ begin
+ source.poll do |sym, args|
+ case sym
+ when :add
+ m = Message.build_from_source source, args[:info]
+ old_m = Index.build_message m.id
+ m.labels += args[:labels]
+ m.labels.delete :inbox if source.archived?
+ m.labels.delete :unread if source.read?
+ m.labels.delete :unread if m.source_marked_read? # preserve read status if possible
+ m.labels.each { |l| LabelManager << l }
+ m.labels = old_m.labels + (m.labels - [:unread, :inbox]) if old_m
+ m.locations = old_m.locations + m.locations if old_m
+ HookManager.run "before-add-message", :message => m
+ yield :add, m, old_m, args[:progress] if block_given?
+ Index.sync_message m, true
+
+ ## We need to add or unhide the message when it either did not exist
+ ## before at all or when it was updated. We do *not* add/unhide when
+ ## the same message was found at a different location
+ if !old_m or not old_m.locations.member? m.location
+ UpdateManager.relay self, :added, m
+ end
+ when :delete
+ Index.each_message :location => [source.id, args[:info]] do |m|
+ m.locations.delete Location.new(source, args[:info])
+ yield :delete, m, [source,args[:info]], args[:progress] if block_given?
+ Index.sync_message m, false
+ #UpdateManager.relay self, :deleted, m
+ end
end
end
- end
- source.go_idle
- rescue SourceError => e
- warn "problem getting messages from #{source}: #{e.message}"
+ source.go_idle
+ rescue SourceError => e
+ warn "problem getting messages from #{source}: #{e.message}"
+
+ ensure
+ source.poll_lock.unlock
+ end
+ else
+ debug "source #{source} is already being polled."
end
end
diff --git a/lib/sup/source.rb b/lib/sup/source.rb
@@ -62,7 +62,7 @@ class Source
bool_accessor :usual, :archived
attr_reader :uri
- attr_accessor :id
+ attr_accessor :id, :poll_lock
def initialize uri, usual=true, archived=false, id=nil
raise ArgumentError, "id must be an integer: #{id.inspect}" unless id.is_a? Fixnum if id
@@ -71,6 +71,8 @@ class Source
@usual = usual
@archived = archived
@id = id
+
+ @poll_lock = Mutex.new
end
## overwrite me if you have a disk incarnation (currently used only for sup-sync-back)