Commit f06fff00 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Remove oneshot functionality

parent c3d0c1f6
......@@ -7,6 +7,7 @@ import (
"log"
"net/http"
"os"
"strings"
"time"
"github.com/streadway/amqp"
......@@ -56,6 +57,7 @@ type (
Vhost string `json:"vhost"`
}
// FetchedConfig config from backend
FetchedConfig struct {
RabbitMQConfig rabbitMQConfig `json:"rabbitmq_config"`
Services []service `json:"services"`
......@@ -118,7 +120,7 @@ func (k sshKey) String() string {
}
func (t task) String() string {
return fmt.Sprintf("%s:%s %8s %s", t.Service, t.User, t.Action, t.Key)
return fmt.Sprintf("%s:%s %8s %s", t.Service, t.User, t.Key, strings.ToUpper(t.Action))
}
func filterPermitted(permitted []service, wanted []service) (remainder []service) {
......@@ -431,6 +433,7 @@ func (c *config) ackTask(ti task) (ok bool, err error) {
ti.ID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
// TODO retransmit ACK
return
}
......@@ -539,7 +542,7 @@ func main() {
app := kingpin.New("fumClient", "Client for the 'Federated user management in the Helmholtz Data Federation' project")
configFile := app.Arg("config", "Config file to file to use.").Required().String()
oneshot := app.Flag("oneshot", "Fetch deployments with HTTP. Do not use rabbitmq.").Short('1').Bool()
// oneshot := app.Flag("oneshot", "Fetch deployments with HTTP. Do not use rabbitmq.").Short('1').Bool()
app.Author("Lukas Burgey")
app.Version(version)
......@@ -554,27 +557,37 @@ func main() {
}
if len(c.Services) < 1 {
log.Fatalf("[Err] No services. Exiting")
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
// start task handler and acker and fetcher
// start task handler and acker
go c.taskHandler()
go c.taskAcker()
go c.taskFetcher()
if !*oneshot {
if len(c.Services) < 1 {
log.Printf("[P] Not starting pubsub because the are no services to subscribe to")
return
}
consumer := c.consumer()
defer consumer.close()
consumer := c.consumer()
defer consumer.close()
consumer.startConsuming()
consumer.startConsuming()
// start the fetcher after the consuming starts
// -> we miss nothing
go c.taskFetcher()
// run till killed
forever := make(chan bool)
<-forever
}
// run till killed
forever := make(chan bool)
<-forever
//if !*oneshot {
//} else {
// // fetch once and exit
// if err := c.fetchTasks(); err != nil {
// log.Printf("[Fetch] error fetching: %s", err)
// }
// close(c.NewTasks)
// log.Printf("[Fetch] error fetching")
// c.taskHandler()
// close(c.DoneTasks)
// go c.taskAcker()
//}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment