package kraftakt
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"github.com/octo/kraftakt/app"
"github.com/octo/kraftakt/fitbit"
"github.com/octo/kraftakt/gfit"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
"google.golang.org/appengine/delay"
"google.golang.org/appengine/log"
"google.golang.org/appengine/user"
)
var delayedHandleNotifications = delay.Func("handleNotifications", handleNotifications)
func init() {
http.HandleFunc("/fitbit/setup", fitbitSetupHandler)
http.Handle("/fitbit/grant", AuthenticatedHandler(fitbitGrantHandler))
http.Handle("/fitbit/notify", ContextHandler(fitbitNotifyHandler))
http.HandleFunc("/google/setup", googleSetupHandler)
http.Handle("/google/grant", AuthenticatedHandler(googleGrantHandler))
http.Handle("/", AuthenticatedHandler(indexHandler))
}
// ContextHandler implements http.Handler
type ContextHandler func(context.Context, http.ResponseWriter, *http.Request) error
func (hndl ContextHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
if err := app.LoadConfig(ctx); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := hndl(ctx, w, r); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type AuthenticatedHandler func(context.Context, http.ResponseWriter, *http.Request, *app.User) error
func (hndl AuthenticatedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
if err := app.LoadConfig(ctx); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
gaeUser := user.Current(ctx)
if gaeUser == nil {
url, err := user.LoginURL(ctx, r.URL.String())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
return
}
u, err := app.NewUser(ctx, gaeUser.Email)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := hndl(ctx, w, r, u); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func indexHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error {
_, err := u.Token(ctx, "Fitbit")
if err != nil && err != datastore.ErrNoSuchEntity {
return err
}
haveFitbitToken := err == nil
_, err = u.Token(ctx, "Google")
if err != nil && err != datastore.ErrNoSuchEntity {
return err
}
haveGoogleToken := err == nil
fmt.Fprintln(w, "
Kraftakt")
fmt.Fprintln(w, "Kraftakt
")
fmt.Fprintln(w, "Kraftakt copies your Fitbit data to Google Fit, seconds after you sync.
")
fmt.Fprintf(w, "Hello %s
\n", user.Current(ctx).Email)
fmt.Fprintln(w, "")
fmt.Fprint(w, "- Fitbit: ")
if haveFitbitToken {
fmt.Fprint(w, `Authorized`)
} else {
fmt.Fprint(w, `Not authorized (Authorize)`)
}
fmt.Fprintln(w, "
")
fmt.Fprint(w, "- Google Fit: ")
if haveGoogleToken {
fmt.Fprint(w, `Authorized`)
} else {
fmt.Fprint(w, `Not authorized (Authorize)`)
}
fmt.Fprintln(w, "
")
fmt.Fprintln(w, "
")
fmt.Fprintln(w, "")
return nil
}
func fitbitSetupHandler(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, fitbit.AuthURL(), http.StatusTemporaryRedirect)
}
func fitbitGrantHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error {
if err := fitbit.ParseToken(ctx, r, u); err != nil {
return err
}
c, err := fitbit.NewClient(ctx, "-", u)
if err != nil {
return err
}
for _, collection := range []string{"activities", "sleep"} {
if err := c.Subscribe(ctx, collection); err != nil {
return fmt.Errorf("c.Subscribe(%q) = %v", collection, err)
}
log.Infof(ctx, "Successfully subscribed to %q", collection)
}
redirectURL := r.URL
redirectURL.Path = "/"
redirectURL.RawQuery = ""
redirectURL.Fragment = ""
http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
return nil
}
func googleSetupHandler(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, gfit.AuthURL(), http.StatusTemporaryRedirect)
}
func googleGrantHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error {
if err := gfit.ParseToken(ctx, r, u); err != nil {
return err
}
redirectURL := r.URL
redirectURL.Path = "/"
redirectURL.RawQuery = ""
redirectURL.Fragment = ""
http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
return nil
}
// fitbitNotifyHandler is called by Fitbit whenever there are updates to a
// subscription. It verifies the payload, splits it into individual
// notifications and adds it to the taskqueue service.
func fitbitNotifyHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
fitbitTimeout := 3 * time.Second
ctx, cancel := context.WithTimeout(ctx, fitbitTimeout)
defer cancel()
// this is used when setting up a new subscriber in the UI. Once set
// up, this code path should not be triggered.
if verify := r.FormValue("verify"); verify != "" {
if verify == app.Config.FitbitSubscriberCode {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusNotFound)
}
return nil
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// Fitbit recommendation: "If signature verification fails, you should
// respond with a 404"
if !fitbit.CheckSignature(ctx, data, r.Header.Get("X-Fitbit-Signature")) {
log.Warningf(ctx, "signature mismatch")
w.WriteHeader(http.StatusNotFound)
return nil
}
if err := delayedHandleNotifications.Call(ctx, data); err != nil {
return err
}
w.WriteHeader(http.StatusCreated)
return nil
}
// handleNotifications parses fitbit notifications and requests the individual
// activities from Fitbit. It is executed asynchronously via the delay package.
func handleNotifications(ctx context.Context, payload []byte) error {
if err := app.LoadConfig(ctx); err != nil {
return err
}
var subscriptions []fitbit.Subscription
if err := json.Unmarshal(payload, &subscriptions); err != nil {
return err
}
for _, s := range subscriptions {
if s.CollectionType != "activities" {
log.Warningf(ctx, "ignoring collection type %q", s.CollectionType)
continue
}
if err := handleNotification(ctx, &s); err != nil {
log.Errorf(ctx, "handleNotification() = %v", err)
continue
}
}
return nil
}
func handleNotification(ctx context.Context, s *fitbit.Subscription) error {
u, err := app.UserByID(ctx, s.SubscriptionID)
if err != nil {
return err
}
fitbitClient, err := fitbit.NewClient(ctx, s.OwnerID, u)
if err != nil {
return err
}
var (
wg = &sync.WaitGroup{}
errs appengine.MultiError
summary *fitbit.ActivitySummary
profile *fitbit.Profile
)
wg.Add(1)
go func() {
var err error
summary, err = fitbitClient.ActivitySummary(ctx, s.Date)
if err != nil {
errs = append(errs, fmt.Errorf("fitbitClient.ActivitySummary(%q) = %v", s.Date, err))
}
wg.Done()
}()
wg.Add(1)
go func() {
var err error
profile, err = fitbitClient.Profile(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("fitbitClient.Profile(%q) = %v", s.Date, err))
}
wg.Done()
}()
wg.Wait()
if len(errs) != 0 {
return errs
}
tm, err := time.ParseInLocation("2006-01-02", s.Date, profile.Timezone)
if err != nil {
return err
}
log.Debugf(ctx, "%s (%s) took %d steps on %s",
profile.Name, u.Email, summary.Summary.Steps, tm)
gfitClient, err := gfit.NewClient(ctx, u)
if err != nil {
return err
}
wg.Add(1)
go func() {
if err := gfitClient.SetSteps(ctx, summary.Summary.Steps, tm); err != nil {
errs = append(errs, fmt.Errorf("gfitClient.SetSteps(%d) = %v", summary.Summary.Steps, err))
}
wg.Done()
}()
wg.Add(1)
go func() {
if err := gfitClient.SetCalories(ctx, summary.Summary.CaloriesOut, tm); err != nil {
errs = append(errs, fmt.Errorf("gfitClient.SetCalories(%d) = %v", summary.Summary.CaloriesOut, err))
}
wg.Done()
}()
wg.Add(1)
go func() {
defer wg.Done()
var distanceMeters float64
for _, d := range summary.Summary.Distances {
if d.Activity != "total" {
continue
}
distanceMeters = 1000.0 * d.Distance
break
}
if err := gfitClient.SetDistance(ctx, distanceMeters, tm); err != nil {
errs = append(errs, fmt.Errorf("gfitClient.SetDistance(%d) = %v", distanceMeters, err))
return
}
}()
wg.Add(1)
go func() {
if err := gfitClient.SetHeartRate(ctx, summary.Summary.HeartRateZones, summary.Summary.RestingHeartRate, tm); err != nil {
errs = append(errs, fmt.Errorf("gfitClient.SetHeartRate() = %v", err))
}
wg.Done()
}()
wg.Add(1)
go func() {
defer wg.Done()
var activities []gfit.Activity
for _, a := range summary.Activities {
if !a.HasStartTime {
continue
}
startTime, err := time.ParseInLocation("2006-01-02T15:04", a.StartDate+"T"+a.StartTime, profile.Timezone)
if err != nil {
errs = append(errs, fmt.Errorf("gfitClient.SetActivities() = %v", err))
return
}
endTime := startTime.Add(time.Duration(a.Duration) * time.Millisecond)
activities = append(activities, gfit.Activity{
Start: startTime,
End: endTime,
Type: gfit.ParseFitbitActivity(a.Name),
})
}
if err := gfitClient.SetActivities(ctx, activities, tm); err != nil {
errs = append(errs, fmt.Errorf("gfitClient.SetActivities() = %v", err))
return
}
}()
wg.Wait()
if len(errs) != 0 {
return errs
}
return nil
}