# encoding: utf-8 # # Antilibrary Worker # Latest version of this client on https://github.com/antilibrary/antilibrary_worker require 'pry' if ENV['environment'] == 'dev' require 'open3' require 'net/http' require 'json' require 'base64' require 'fileutils' require 'optparse' require 'yaml' VERSION = '0.4' TRACKER_ID = 'QmfYZKUBCrHhLfE5mrG48hQ2wHGSNKzRfS4QoaCFhbWYCf' # If script is being used outside the vagrant box set daemon ip to localhost if ENV['ipfs_api_addr'].nil? @ipfs_api_addr = '127.0.0.1' else @ipfs_api_addr = ENV['ipfs_api_addr'] end def create_config # create new config file file_content = <> " command, ipfs_hash = data.split(':', 2) #prepare response message new_message = Hash.new new_message['worker_nickname'] = WORKER_NICKNAME new_message['command'] = command new_message['ipfs_hash'] = ipfs_hash new_message['local_space_used'] = @local_space_used new_message['storage_limit'] = @local_space_limit new_message['version'] = VERSION case command when "pin" # check if space limit is not overrun if @local_space_used < @local_space_limit.to_i # get file info file_info = execute_ipfs_command("file ls --timeout 30s #{ipfs_hash}", false) # if file is not in the ipfs network if file_info.include?('Error: request canceled') puts "[SKIP]" new_message['response'] = file_info else file_size = JSON.parse(file_info)['Objects'].first[1]['Size'] # calculate timeout: (filesize converted from b to kb)/(main seed node speed in kbit/s)+20 timeout_delay = (file_size/1024)/(3072/8)+20 new_message['response'] = execute_ipfs_command("pin add --timeout #{timeout_delay}s #{ipfs_hash}", true) if !new_message['response'].include?("Error") @local_space_used += JSON.parse(execute_ipfs_command("object stat #{ipfs_hash}", false))['CumulativeSize'] end end else puts "Storage limit reached! Not accepting new files." new_message['response'] = "No space left!" end new_message['local_space_used'] = @local_space_used message_tracker(new_message) when "unpin" @local_space_used -= JSON.parse(execute_ipfs_command("object stat #{ipfs_hash}", false))['CumulativeSize'] new_message['response'] = execute_ipfs_command("pin rm #{ipfs_hash}", true) new_message['local_space_used'] = @local_space_used message_tracker(new_message) when "check" new_message['response'] = execute_ipfs_command("pin ls --type=recursive #{ipfs_hash}", true) message_tracker(new_message) when "list_all" new_message['response'] = execute_ipfs_command("pin ls --type=recursive", true) # replace output with ipfs file url File.open("list_all.temp", 'w') {|file| file.write(new_message['response'])} new_message['response'] = execute_ipfs_command("add -q list_all.temp", true) message_tracker(new_message) FileUtils.rm("list_all.temp") when "ping" puts "sending pong!" new_message['response'] = "pong" message_tracker(new_message) when "joining" puts "Got ACK from tracker!" else puts "Command not recognized!" end end end end end end def send_joining # Send joining message to tracker print "Sending handshake message to tracker..." new_message = Hash.new new_message['worker_nickname'] = WORKER_NICKNAME new_message['command'] = 'joining' new_message['storage_limit'] = @local_space_limit new_message['local_space_used'] = @local_space_used new_message['version'] = VERSION message_tracker(new_message) puts "[DONE]" end def message_tracker(message) message_json = JSON.generate(message) uri = URI(URI.encode("http://#{@ipfs_api_addr}:5001/api/v0/pubsub/pub?arg=#{TRACKER_ID}&arg=#{message_json}")) Net::HTTP.start(uri.host, uri.port) do |http| request = Net::HTTP::Get.new uri http.request request end end def execute_ipfs_command(command, verbose=true) print "ipfs #{command.split(' ')[0, 2].join(' ')} > " if verbose stdout, stdeerr, status = Open3.capture3("#{@config['ipfs_bin_path']} --enc=json #{command}") if !status.success? output = stdeerr else output = stdout end puts "[OK]" if verbose return output end # main loop - avoid from existing on non critical error loop { begin listener rescue => e if e.to_s.include?('Failed to open TCP connection to localhost:5001') or e.to_s.include?('Error: api not running') restart_local_daemon elsif e.to_s.include?("Failed to open TCP connection to #{@ipfs_api_addr}:5001") warn " " warn "#################################################" warn "ERROR - IPFS Daemon is not running in the host machine" warn "Please run: 'ipfs daemon --enable-pubsub-experiment' in your computer (not inside the vagrant box)" warn "Once you have the daemon running, rerun the worker with: 'vagrant provision'" warn " " exit end puts "#{e} >> Restarting... (timeouts are expected - anything else is not)" end }